提交 cbe7b014 authored 作者: lidongxu's avatar lidongxu

合并1-多个表格到综合大表格里

上级 385dcafc
......@@ -14,6 +14,7 @@ from api.chengyu_puling_loader import (
from api.response import ApiEnvelope, ok
from api.schemas import CleanRequestBody
from api.team_conversion_loader import default_team_target_path, run_team_conversion
from utils.clean_output_merge import default_merged_target_path, merge_clean_result_xlsx
DEPARTMENT_RISK_AUDIT_CLEAN = "风控稽查数据清洗"
......@@ -96,4 +97,18 @@ def post_clean(body: CleanRequestBody) -> ApiEnvelope:
_raise_http_for_failed_result(r)
data["chengyu"] = r
# 凡有成功落盘的分支,将「合并后」sheet 汇总到 merged_*.xlsx(1 路即单表写入,多路则纵向拼接)
part_paths: list[str] = []
for key in ("team", "puling", "chengyu"):
br = data.get(key)
if isinstance(br, dict) and br.get("ok") and br.get("target_file"):
part_paths.append(str(br["target_file"]))
if part_paths:
mr = merge_clean_result_xlsx(part_paths, default_merged_target_path())
if not mr.get("ok"):
raise HTTPException(status_code=500, detail=mr)
data["merged"] = mr
else:
data["merged"] = None
return ok(data=data, msg="成功")
"""将多路清洗结果 xlsx 的「合并后」sheet 纵向合并为一个文件(写入 cache)。"""
from __future__ import annotations
from datetime import datetime
from pathlib import Path
from typing import Any, Sequence
import pandas as pd
_CODE_BASE = Path(__file__).resolve().parents[1]
MERGED_SHEET_NAME = "合并后"
def default_merged_target_path() -> str:
"""默认:code/cache/merged_{时间戳}.xlsx"""
d = _CODE_BASE / "cache"
d.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
return str(d / f"merged_{ts}.xlsx")
def _unify_column_order(dfs: list[pd.DataFrame]) -> list[str]:
seen: set[str] = set()
order: list[str] = []
for df in dfs:
for c in df.columns:
if c not in seen:
seen.add(c)
order.append(str(c))
return order
def merge_clean_result_xlsx(paths: Sequence[str], output_path: str | Path) -> dict[str, Any]:
"""读取各路径工作簿的「合并后」sheet,列对齐后纵向拼接写入 output_path(1 个路径时等同复制为该汇总文件)。"""
ps = [str(p).strip() for p in paths if str(p).strip()]
if len(ps) < 1:
return {"ok": False, "error": "至少需要 1 个结果文件路径"}
out = Path(output_path)
try:
dfs = [pd.read_excel(p, sheet_name=MERGED_SHEET_NAME, dtype=str) for p in ps]
except Exception as e:
return {"ok": False, "error": f"读取待合并文件失败: {e}"}
cols = _unify_column_order(dfs)
aligned = [df.reindex(columns=cols).fillna("") for df in dfs]
merged = pd.concat(aligned, ignore_index=True)
out.parent.mkdir(parents=True, exist_ok=True)
try:
with pd.ExcelWriter(out, engine="openpyxl", mode="w") as writer:
merged.to_excel(writer, sheet_name=MERGED_SHEET_NAME, index=False)
except Exception as e:
return {"ok": False, "error": f"写入合并文件失败: {e}"}
return {
"ok": True,
"merged_target_file": str(out),
"merged_rows": int(len(merged)),
"merged_from": ps,
}
......@@ -7,7 +7,7 @@
| 路径 | 职责 |
|------|------|
| `code/` | 运行时代码:FastAPI、`api/``utils/`、稽查转换脚本目录 `py_/audit/...` |
| `code/cache/` | 默认团队转换输出目录(`team_时间戳.xlsx` |
| `code/cache/` | 清洗输出目录:`team_*.xlsx``puling_*.xlsx``chengyu_*.xlsx`;任一路成功落盘后另有汇总 `merged_*.xlsx` |
| `docs/` | 本索引与各模块说明 |
## 模块文档
......@@ -38,10 +38,13 @@ flowchart TB
subgraph util [工具]
U1[utils/excel_http]
U2[utils/dates]
U3[utils/clean_output_merge]
D --> U1
D --> U2
end
D --> X[(本地 xlsx 合并后)]
R --> U3
D --> X[(各分支 xlsx)]
U3 --> M[(merged_*.xlsx)]
```
## 数据流(当前已接线:团队清洗)
......@@ -69,8 +72,8 @@ sequenceDiagram
## 需求 / 能力拆分(现状)
- **已落地**`department = 风控稽查数据清洗` + `team_url` → 团队宽表转换并落盘;可选 `year/month/day` 拼稽查日期;可选 `team_target_path`(默认 `code/cache/team_{时间戳}.xlsx`)。
- **请求体预留**`puling_url``chengyu_url` 等在 `api/schemas.py` 中已声明,**当前路由未使用**;后续接线时再在子文档中补充数据流(调试中的脚本不写入本文档,除非明确纳入)。
- **已落地**`department = 风控稽查数据清洗``team_url` / `puling_url` / `chengyu_url` 至少其一,各路分别落盘;可选 `year/month/day``team_target_path`(仅团队分支)。
- **汇总 merged**:同一次请求中只要有 ≥1 路清洗成功落盘,即生成 `code/cache/merged_{时间戳}.xlsx`(多路纵向拼接,单路等同一份「合并后」)。
## 运行入口说明
......
......@@ -18,11 +18,12 @@
- **chengyu_url**:若提供 → 同一脚本 **`run_puling_conversion`**`yname=诚予``PRODUCT_GROUPS_CY`)。
- **team_target_path**:可选;仅团队分支使用;为空则 `default_team_target_path()``code/cache/team_{时间戳}.xlsx`
- **浦零 / 诚予** 输出路径:分别为 `default_puling_target_path()``default_chengyu_target_path()``code/cache/puling_*.xlsx``chengyu_*.xlsx`
- **汇总文件 merged_*.xlsx**:凡有 **≥1 路**清洗 `ok` 且带 `target_file`,即调用 `merge_clean_result_xlsx`:按 **团队 → 浦零 → 诚予** 顺序纳入成功分支,读各文件 **「合并后」** sheet;**多路**时列对齐后纵向拼接,**仅一路**时写入同一张「合并后」(便于前端始终读 `data.merged`)。路径为 `default_merged_target_path()``code/cache/merged_{时间戳}.xlsx`。若本次没有任何成功落盘(例如仅传了 URL 但分支返回无数据且未写文件),`data.merged``null`
- **year / month / day**:可选;若均提供则拼为 `YYYYMMDD` 传入各清洗分支作为稽查日期线索。
请求体模型见 **`code/api/schemas.py`**`CleanRequestBody`)。
**成功时 `data` 形态**`{ "team": dict | null, "puling": dict | null, "chengyu": dict | null }`,仅对本次请求中**非空 URL** 对应的分支写入结果,其余为 `null`
**成功时 `data` 形态**`{ "team": dict | null, "puling": dict | null, "chengyu": dict | null, "merged": dict | null }`。前三项仅对本次**非空 URL** 分支写入;`merged` 在写出汇总文件时为 `{ ok, merged_target_file, merged_rows, merged_from }`,否则 `null`
## 统一响应
......
......@@ -13,3 +13,10 @@
- **`approx_gap_months_calendar`**:到期日相对稽查日的剩余月数近似(供临期/新鲜度逻辑使用)。
业务侧主要使用者:**`py_/audit/point_sale/data_conversion.py`**
## `clean_output_merge.py`
- **`merge_clean_result_xlsx`**:读 1 个或多个 xlsx 的 **「合并后」** sheet(`dtype=str`),统一列序后 `concat`(仅 1 个输入时结果即该表),写入目标路径(单 sheet)。
- **`default_merged_target_path`**`code/cache/merged_{时间戳}.xlsx`
**`api/routes_clean.py`** 在多路清洗成功后调用。
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论