提交 385dcafc authored 作者: lidongxu's avatar lidongxu

分别清洗数据完成

上级 3b2e71ec
---
name: project-docs-workflow
description: Enforces reading project markdown under docs/ before implementing features in this repository, and updating those docs after code changes. Use when changing Python under code/, adding API routes, data conversion, or audit/clean pipelines; use when the user mentions 项目说明、架构、数据流、文档同步.
---
# 项目文档先行与收尾
## 开工前
1. 阅读仓库根目录下的 **`docs/PROJECT_INDEX.md`**(索引与模块分类)。
2. 按任务打开索引中链接的 **模块文档**(如 `docs/api.md``docs/team-conversion.md`),再读相关源码;用户规则要求改代码前也要读目标文件。
3. 索引或模块文档缺失、与代码明显不一致时:在实现过程中**顺带补一节或修正**(仍遵守「注释简洁」与「不大段复制源码」)。
## 完工后
1. 若行为、接口、数据流或目录职责有变:更新对应 **`docs/*.md`**,必要时更新 **`docs/PROJECT_INDEX.md`** 的模块列表或说明。
2. 新增可复用模块或新流水线:在索引中增加分类与链接,并新增或扩写模块文档。
## 文档原则
- **简洁**:架构、数据流用短句 + mermaid 即可;细节用文件路径指向代码。
- **渐进**:Skill 保持短小;长说明放在 `docs/` 按需阅读。
- **调试中脚本****勿**`code/py_/audit/point_sale/data_chengyu_puling.py` 写入项目文档,除非用户明确要求纳入。
## 文档根路径
- 索引:`docs/PROJECT_INDEX.md`
...@@ -7,4 +7,4 @@ __pycache__/ ...@@ -7,4 +7,4 @@ __pycache__/
venv/ venv/
.env .env
# 团队转换默认输出目录 # 团队转换默认输出目录
code/cache/ code/cache/
\ No newline at end of file
"""动态加载浦零/诚予宽表转换脚本(与 team_conversion_loader 同方式)。"""
import importlib.util
from pathlib import Path
from typing import Any, Callable
_CODE_BASE = Path(__file__).resolve().parent.parent
_CP_SCRIPT = _CODE_BASE / "py_" / "audit" / "point_sale" / "data_chengyu_puling.py"
def _load_mod() -> Any:
spec = importlib.util.spec_from_file_location("chengyu_puling_data", _CP_SCRIPT)
if spec is None or spec.loader is None:
raise RuntimeError(f"无法加载浦零/诚予转换模块: {_CP_SCRIPT}")
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
if getattr(mod, "run_puling_conversion", None) is None:
raise RuntimeError("data_chengyu_puling 中缺少 run_puling_conversion")
return mod
_mod = _load_mod()
run_puling_conversion: Callable[..., dict[str, Any]] = _mod.run_puling_conversion
default_puling_target_path: Callable[[], str] = _mod.default_puling_target_path
default_chengyu_target_path: Callable[[], str] = _mod.default_chengyu_target_path
PRODUCT_GROUPS: list = _mod.PRODUCT_GROUPS
PRODUCT_GROUPS_CY: list = _mod.PRODUCT_GROUPS_CY
"""清洗相关 HTTP 路由:校验入参、调用团队转换、映射业务错误到 HTTP 状态码。""" """清洗相关 HTTP 路由:校验入参、团队/浦零/诚予转换、映射业务错误到 HTTP 状态码。"""
from typing import Any
from fastapi import APIRouter, HTTPException from fastapi import APIRouter, HTTPException
from api.chengyu_puling_loader import (
PRODUCT_GROUPS,
PRODUCT_GROUPS_CY,
default_chengyu_target_path,
default_puling_target_path,
run_puling_conversion,
)
from api.response import ApiEnvelope, ok from api.response import ApiEnvelope, ok
from api.schemas import CleanRequestBody from api.schemas import CleanRequestBody
from api.team_conversion_loader import default_team_target_path, run_team_conversion from api.team_conversion_loader import default_team_target_path, run_team_conversion
...@@ -42,16 +51,49 @@ def post_clean(body: CleanRequestBody) -> ApiEnvelope: ...@@ -42,16 +51,49 @@ def post_clean(body: CleanRequestBody) -> ApiEnvelope:
) )
team_url = (body.team_url or "").strip() team_url = (body.team_url or "").strip()
team_target = (body.team_target_path or "").strip() or default_team_target_path() puling_url = (body.puling_url or "").strip()
if not team_url: chengyu_url = (body.chengyu_url or "").strip()
if not team_url and not puling_url and not chengyu_url:
raise HTTPException( raise HTTPException(
status_code=400, status_code=400,
detail={"ok": False, "error": "team_url 不能为空"}, detail={
"ok": False,
"error": "team_url、puling_url、chengyu_url 至少填写一个非空地址",
},
) )
audit_date_str = _audit_date_str_from_body(body) audit_date_str = _audit_date_str_from_body(body)
result = run_team_conversion(team_url, team_target, audit_date_str) data: dict[str, Any] = {"team": None, "puling": None, "chengyu": None}
if result.get("ok"):
return ok(data=result, msg="成功") if team_url:
_raise_http_for_failed_result(result) team_target = (body.team_target_path or "").strip() or default_team_target_path()
return ok(data=result, msg=str(result.get("message") or "")) r = run_team_conversion(team_url, team_target, audit_date_str)
if not r.get("ok"):
_raise_http_for_failed_result(r)
data["team"] = r
if puling_url:
r = run_puling_conversion(
puling_url,
default_puling_target_path(),
audit_date_str,
yname="浦零",
product_groups=PRODUCT_GROUPS,
)
if not r.get("ok"):
_raise_http_for_failed_result(r)
data["puling"] = r
if chengyu_url:
r = run_puling_conversion(
chengyu_url,
default_chengyu_target_path(),
audit_date_str,
yname="诚予",
product_groups=PRODUCT_GROUPS_CY,
)
if not r.get("ok"):
_raise_http_for_failed_result(r)
data["chengyu"] = r
return ok(data=data, msg="成功")
...@@ -8,7 +8,7 @@ class CleanRequestBody(BaseModel): ...@@ -8,7 +8,7 @@ class CleanRequestBody(BaseModel):
year: int | None = None year: int | None = None
month: int | None = None month: int | None = None
day: int | None = None day: int | None = None
team_url: str | None = None team_url: str | None = None # 非空则走 data_conversion.run_team_conversion
team_target_path: str | None = None # 默认:项目下 cache/team_时间戳.xlsx team_target_path: str | None = None # 默认:项目下 cache/team_时间戳.xlsx
puling_url: str | None = None puling_url: str | None = None # 非空则走 data_chengyu_puling(浦零列布局)
chengyu_url: str | None = None chengyu_url: str | None = None # 非空则走 data_chengyu_puling(诚予列布局)
# 数据转换_浦零(及诚予列布局预留)
# 宽表仅从 URL 拉取;跳过前两行、第三行为表头;转窄表写入目标 xlsx「合并后」sheet,并计算临期/大日期/新鲜度等。
import copy
import io
import os
import sys
import urllib.error
import urllib.request
from datetime import datetime
from pathlib import Path
import pandas as pd
from dateutil.relativedelta import relativedelta
# point_sale → audit → py_ → code,保证与仓库中 utils 包一致(见 docs/team-conversion.md)
_CODE_ROOT = Path(__file__).resolve().parents[3]
if str(_CODE_ROOT) not in sys.path:
sys.path.insert(0, str(_CODE_ROOT))
from utils.dates import ( # noqa: E402
approx_gap_months_calendar,
first_yyyy_mm_dd_in_dataframe,
first_yyyy_mm_dd_in_iloc,
normalize_year_month_to_day01,
to_yyyy_mm_dd,
)
def _resolve_audit_date(
audit_date_str: str | None,
df_target: pd.DataFrame,
df_source: pd.DataFrame | None = None,
*,
source_audit_col: int = 0,
) -> tuple[str | None, str | None]:
"""稽查日期:宽表指定列 → 显式参数 → 目标表列/第三列。返回 (YYYY-MM-DD, 错误信息)。"""
n = None
if df_source is not None and df_source.shape[1] > source_audit_col:
n = first_yyyy_mm_dd_in_iloc(df_source, source_audit_col)
if n is None and audit_date_str is not None and str(audit_date_str).strip():
n = to_yyyy_mm_dd(audit_date_str)
if n is None:
return None, f"稽查日期参数无法解析: {audit_date_str!r}"
if n is None:
n = first_yyyy_mm_dd_in_dataframe(
df_target,
("稽查日期列", "稽查日期"),
third_column_fallback=True,
)
if n:
return n, None
return None, (
"未获取到稽查日期:请在宽表「稽核/稽查日期」列填写;"
"或在目标表「合并后」填写;或传入 audit_date_str。"
)
COLUMN_MAPPING = {
"稽查日期": "稽查日期",
"稽查来源": "稽查来源",
"勤策门店编码": "勤策门店编码",
"勤策门店名称": "勤策门店名称",
"经销商名称": "经销商名称",
"城市": "城市",
"渠道类型": "渠道类型(稽查源提供)",
"产品系列": "产品系列",
"产品口味": "产品口味",
"产品克重": "产品克重",
"产品价格": "产品价格",
"产品生产月份": "产品生产月份",
}
# 诚予国际(首列多一列,相对浦零整体 +1)
PRODUCT_GROUPS_CY = [
{
"price_col": 7,
"flavor_cols": [8, 9, 10, 11, 12, 13, 14],
"series": "虎皮凤爪",
"weight": "210g",
"flavors": ["卤香", "香辣", "椒麻", "火锅", "微辣", "麻辣", "黑鸭"],
},
{
"price_col": 15,
"flavor_cols": [16, 17, 18, 19, 20, 21, 22],
"series": "虎皮凤爪",
"weight": "105g",
"flavors": ["卤香", "香辣", "椒麻", "火锅", "微辣", "麻辣", "黑鸭"],
},
{
"price_col": 23,
"flavor_cols": [24, 25, 26, 27, 28],
"series": "虎皮凤爪",
"weight": "68g",
"flavors": ["卤香", "香辣", "椒麻", "麻辣", "黑鸭"],
},
{
"price_col": 29,
"flavor_cols": [30, 31],
"series": "鸡肉豆堡",
"weight": "120g",
"flavors": ["卤香", "香辣"],
},
{
"price_col": 32,
"flavor_cols": [33, 34],
"series": "牛肉豆堡",
"weight": "120g",
"flavors": ["卤香", "香辣"],
},
{
"price_col": 35,
"flavor_cols": [36, 37],
"series": "去骨凤爪",
"weight": "72g",
"flavors": ["柠檬", "香辣"],
},
{
"price_col": 38,
"flavor_cols": [39, 40],
"series": "去骨凤爪",
"weight": "138g",
"flavors": ["柠檬", "香辣"],
},
{
"price_col": 41,
"flavor_cols": [42, 43],
"series": "虎皮小鸡腿",
"weight": "80g",
"flavors": ["卤香", "香辣"],
},
{
"price_col": 44,
"flavor_cols": [45],
"series": "老卤凤爪",
"weight": "95g",
"flavors": ["卤香"],
},
{
"price_col": 44,
"flavor_cols": [46],
"series": "老卤鸭掌",
"weight": "95g",
"flavors": ["卤香"],
},
{
"price_col": 47,
"flavor_cols": [48, 49],
"series": "虎皮凤爪",
"weight": "25g",
"flavors": ["卤香", "香辣"],
},
{
"price_col": 50,
"flavor_cols": [51, 52, 53],
"series": "虎皮凤爪",
"weight": "散称",
"flavors": ["卤香", "香辣", "黑鸭"],
},
]
# 浦零数据源(第三行表头:门店编码在列 1 起)
PRODUCT_GROUPS = [
{
"price_col": 6,
"flavor_cols": [7, 8, 9, 10, 11, 12, 13],
"series": "虎皮凤爪",
"weight": "210g",
"flavors": ["卤香", "香辣", "椒麻", "火锅", "微辣", "麻辣", "黑鸭"],
},
{
"price_col": 14,
"flavor_cols": [15, 16, 17, 18, 19, 20, 21],
"series": "虎皮凤爪",
"weight": "105g",
"flavors": ["卤香", "香辣", "椒麻", "火锅", "微辣", "麻辣", "黑鸭"],
},
{
"price_col": 22,
"flavor_cols": [23, 24, 25, 26, 27],
"series": "虎皮凤爪",
"weight": "68g",
"flavors": ["卤香", "香辣", "椒麻", "麻辣", "黑鸭"],
},
{
"price_col": 28,
"flavor_cols": [29, 30],
"series": "鸡肉豆堡",
"weight": "120g",
"flavors": ["卤香", "香辣"],
},
{
"price_col": 31,
"flavor_cols": [32, 33],
"series": "牛肉豆堡",
"weight": "120g",
"flavors": ["卤香", "香辣"],
},
{
"price_col": 34,
"flavor_cols": [35, 36],
"series": "去骨凤爪",
"weight": "72g",
"flavors": ["柠檬", "香辣"],
},
{
"price_col": 37,
"flavor_cols": [38, 39],
"series": "去骨凤爪",
"weight": "138g",
"flavors": ["柠檬", "香辣"],
},
{
"price_col": 40,
"flavor_cols": [41, 42],
"series": "虎皮小鸡腿",
"weight": "80g",
"flavors": ["卤香", "香辣"],
},
{
"price_col": 43,
"flavor_cols": [44],
"series": "老卤凤爪",
"weight": "95g",
"flavors": ["卤香"],
},
{
"price_col": 45,
"flavor_cols": [46],
"series": "老卤鸭掌",
"weight": "95g",
"flavors": ["卤香"],
},
{
"price_col": 47,
"flavor_cols": [48, 49],
"series": "虎皮凤爪",
"weight": "25g",
"flavors": ["卤香", "香辣"],
},
{
"price_col": 50,
"flavor_cols": [51, 52, 53],
"series": "虎皮凤爪",
"weight": "散称",
"flavors": ["卤香", "香辣", "黑鸭"],
},
]
PULING_WIDE_AUDIT_DATE_COL_FALLBACK = 0
def _find_wide_table_audit_col(header_row: pd.Series) -> int | None:
"""表头行:列名含「稽核日期」或「稽查日期」的下标(与数据 iloc 对齐)。"""
keys = ("稽核日期", "稽查日期")
for i in range(len(header_row)):
v = header_row.iloc[i]
if v is None or (isinstance(v, float) and pd.isna(v)):
continue
s = str(v).strip().replace(" ", "")
if s and any(k in s for k in keys):
return i
return None
def main(
df_source,
yname,
pg,
target_file_path,
audit_date_str=None,
*,
source_audit_col: int = PULING_WIDE_AUDIT_DATE_COL_FALLBACK,
):
tf = Path(target_file_path)
try:
try:
df_target = pd.read_excel(tf, sheet_name="合并后", dtype=str)
existing_columns = df_target.columns.tolist()
except (FileNotFoundError, ValueError):
standard_columns = [
"稽查日期", "稽查来源", "大区", "战区", "经销商编码", "经销商名称",
"勤策门店编码", "勤策门店名称", "客户经理工号", "客户经理",
"勤策渠道大类", "稽核渠道(对N列清洗)", "城市", "渠道类型(稽查源提供)",
"产品系列", "产品口味", "产品克重", "产品价格", "是否低价", "破价价差", "低价整改状态",
"低价整改说明", "产品生产月份", "临期月份数", "临期状态", "新鲜度",
"大日期整改状态", "大日期整改说明",
]
df_target = pd.DataFrame(columns=standard_columns)
existing_columns = standard_columns
ad, ad_err = _resolve_audit_date(
audit_date_str,
df_target,
df_source,
source_audit_col=source_audit_col,
)
if ad_err:
print(f"❌ {ad_err}")
return {"ok": False, "error": ad_err}
records = []
src_has_audit_col = df_source.shape[1] > source_audit_col
for idx, row in df_source.iterrows():
base_data = {
"勤策门店编码": str(row.iloc[1]).strip() if pd.notna(row.iloc[1]) else "",
"城市": str(row.iloc[2]).strip() if pd.notna(row.iloc[2]) else "",
"勤策门店名称": str(row.iloc[3]).strip() if pd.notna(row.iloc[3]) else "",
"经销商名称": str(row.iloc[4]).strip() if pd.notna(row.iloc[4]) else "",
"渠道类型": str(row.iloc[5]).strip() if pd.notna(row.iloc[5]) else "",
}
base_row = {}
if COLUMN_MAPPING["稽查日期"] in existing_columns:
row_ad = to_yyyy_mm_dd(row.iloc[source_audit_col]) if src_has_audit_col else None
base_row[COLUMN_MAPPING["稽查日期"]] = row_ad or ad
if COLUMN_MAPPING["稽查来源"] in existing_columns:
base_row[COLUMN_MAPPING["稽查来源"]] = yname
if COLUMN_MAPPING["勤策门店编码"] in existing_columns:
base_row[COLUMN_MAPPING["勤策门店编码"]] = base_data["勤策门店编码"]
if COLUMN_MAPPING["勤策门店名称"] in existing_columns:
base_row[COLUMN_MAPPING["勤策门店名称"]] = base_data["勤策门店名称"]
if COLUMN_MAPPING["经销商名称"] in existing_columns:
base_row[COLUMN_MAPPING["经销商名称"]] = base_data["经销商名称"]
if COLUMN_MAPPING["城市"] in existing_columns:
base_row[COLUMN_MAPPING["城市"]] = base_data["城市"]
if COLUMN_MAPPING["渠道类型"] in existing_columns:
base_row[COLUMN_MAPPING["渠道类型"]] = base_data["渠道类型"]
for group in pg:
price_col = group["price_col"]
flavor_cols = group["flavor_cols"]
flavors = group["flavors"]
series = group["series"]
weight = group["weight"]
src_price = str(row.iloc[price_col]).strip() if pd.notna(row.iloc[price_col]) else ""
if not src_price or src_price == "无价签":
src_price = ""
row_with_price = copy.deepcopy(base_row)
if COLUMN_MAPPING["产品价格"] in existing_columns:
row_with_price[COLUMN_MAPPING["产品价格"]] = src_price
for i, col_idx in enumerate(flavor_cols):
flavor_name = flavors[i]
src_month = str(row.iloc[col_idx]).strip() if pd.notna(row.iloc[col_idx]) else ""
if src_month:
new_rec = copy.deepcopy(row_with_price)
src_month = normalize_year_month_to_day01(src_month)
_set_product_fields(new_rec, series, flavor_name, weight, src_month, existing_columns)
rDate(new_rec)
records.append(new_rec)
elif src_price:
new_rec = copy.deepcopy(row_with_price)
_set_product_fields(new_rec, series, flavor_name, weight, None, existing_columns)
rDate(new_rec)
records.append(new_rec)
if not records:
msg = "无有效数据需要追加。"
print(f"⚠️ {msg}")
return {"ok": False, "message": msg}
df_new = pd.DataFrame(records, columns=existing_columns)
df_combined = pd.concat([df_target, df_new], ignore_index=True)
if os.path.exists(tf):
with pd.ExcelWriter(tf, engine="openpyxl", mode="a", if_sheet_exists="replace") as writer:
df_combined.to_excel(writer, sheet_name="合并后", index=False)
else:
with pd.ExcelWriter(tf, engine="openpyxl", mode="w") as writer:
df_combined.to_excel(writer, sheet_name="合并后", index=False)
print(f"✅ 成功追加 {len(records)} 条记录到目标表!")
return {
"ok": True,
"records_added": len(records),
"target_file": str(tf),
}
except Exception as e:
print(f"❌ 错误: {e}")
import traceback
traceback.print_exc()
return {"ok": False, "error": str(e)}
def _set_product_fields(record, series, flavor, weight, prod_month_str, existing_columns):
if COLUMN_MAPPING["产品系列"] in existing_columns:
record[COLUMN_MAPPING["产品系列"]] = series
if COLUMN_MAPPING["产品口味"] in existing_columns:
record[COLUMN_MAPPING["产品口味"]] = flavor
if COLUMN_MAPPING["产品克重"] in existing_columns:
record[COLUMN_MAPPING["产品克重"]] = weight
if prod_month_str and COLUMN_MAPPING["产品生产月份"] in existing_columns:
try:
dt = datetime.strptime(prod_month_str, "%Y-%m-%d")
record[COLUMN_MAPPING["产品生产月份"]] = dt.date()
except (ValueError, TypeError):
record[COLUMN_MAPPING["产品生产月份"]] = None
def rDate(row_dict):
prod_date = row_dict.get("产品生产月份", None)
inspect_date_str = row_dict.get("稽查日期", "").strip()
if not prod_date or not inspect_date_str:
row_dict["临期状态"] = ""
row_dict["新鲜度"] = ""
row_dict["临期月份数"] = ""
return
try:
inspect_date = datetime.strptime(inspect_date_str, "%Y-%m-%d")
except ValueError:
row_dict["临期状态"] = ""
row_dict["新鲜度"] = ""
row_dict["临期月份数"] = ""
return
product_series = row_dict.get("产品系列", "")
zg_status = "未整改"
if product_series == "去骨凤爪":
expiry_date = prod_date + relativedelta(months=6)
gap_months = approx_gap_months_calendar(expiry_date, inspect_date)
if gap_months >= 2:
status, freshness, zg_status = "非大日期", "高", ""
elif 1 <= gap_months < 2:
status, freshness = "大日期", "低"
elif 0 <= gap_months < 1:
status, freshness = "临期", "低"
else:
status, freshness = "过期", "低"
else:
expiry_date = prod_date + relativedelta(months=9)
gap_months = approx_gap_months_calendar(expiry_date, inspect_date)
if gap_months >= 3:
status, freshness, zg_status = "非大日期", "高", ""
elif 1 <= gap_months < 3:
status, freshness = "大日期", "低"
elif 0 <= gap_months < 1:
status, freshness = "临期", "低"
else:
status, freshness = "过期", "低"
row_dict["临期状态"] = status
row_dict["新鲜度"] = freshness
row_dict["临期月份数"] = round(gap_months, 2)
row_dict["大日期整改状态"] = zg_status
def read_puling_source_from_url(
url: str,
*,
timeout: float = 300,
user_agent: str = "clean-data-api/1.0",
dtype=str,
) -> tuple[pd.DataFrame, int]:
"""浦零类宽表:跳过前两行;第三行为表头;返回 (数据, 稽核/稽查日期列下标)。"""
req = urllib.request.Request(url.strip(), headers={"User-Agent": user_agent})
with urllib.request.urlopen(req, timeout=timeout) as resp:
raw = resp.read()
buf = io.BytesIO(raw)
head_preview = pd.read_excel(buf, header=None, dtype=dtype, nrows=3)
buf.seek(0)
header_row = head_preview.iloc[2]
data_df = pd.read_excel(buf, skiprows=2, header=0, dtype=dtype)
col = _find_wide_table_audit_col(header_row)
if col is None:
col = PULING_WIDE_AUDIT_DATE_COL_FALLBACK
print(
f"⚠️ 宽表第 3 行未识别「稽核日期/稽查日期」列,稽查日期回退用列索引 {col}"
)
else:
print(f"✅ 宽表稽核/稽查日期列索引 {col},表头: {header_row.iloc[col]!r}")
return data_df, col
def _print_source_preview(df_source_p: pd.DataFrame) -> None:
print(f"✅ 成功读取 {len(df_source_p)} 行数据。")
if len(df_source_p) > 0:
print("前 2 行数据预览:")
print(df_source_p.head(2))
print(f"列索引范围:0 到 {len(df_source_p.columns) - 1}")
def _run_puling_after_load(
df_source_p: pd.DataFrame,
target_path: str | Path,
audit_date_str: str | None,
yname: str = "浦零",
product_groups: list | None = None,
*,
source_audit_col: int = PULING_WIDE_AUDIT_DATE_COL_FALLBACK,
) -> dict:
pg = product_groups if product_groups is not None else PRODUCT_GROUPS
result = main(
df_source_p,
yname,
pg,
target_file_path=target_path,
audit_date_str=audit_date_str,
source_audit_col=source_audit_col,
)
if result is None:
return {"ok": False, "error": "main 未返回结果"}
return {"source_rows": len(df_source_p), **result}
def run_puling_conversion(
source_url: str,
target_path: str | Path,
audit_date_str: str | None = None,
*,
yname: str = "浦零",
product_groups: list | None = None,
timeout: float = 300,
user_agent: str = "clean-data-api/1.0",
dtype=str,
) -> dict:
"""从 source_url 下载浦零类宽表 xlsx,转换后写入 target_path。"""
s = (source_url or "").strip()
low = s.lower()
if not s or not (low.startswith("http://") or low.startswith("https://")):
return {"ok": False, "error": "source_url 须为非空的 http(s) 地址"}
print("正在从 URL 读取【浦零】源文件(跳过前两行,第三行作表头)...")
try:
df_source_p, source_audit_col = read_puling_source_from_url(
s,
timeout=timeout,
user_agent=user_agent,
dtype=dtype,
)
except urllib.error.HTTPError as e:
return {"ok": False, "error": f"从 URL 读取源表失败: HTTP {e.code}"}
except urllib.error.URLError as e:
return {"ok": False, "error": f"从 URL 读取源表失败: {e.reason!s}"}
except Exception as e:
return {"ok": False, "error": f"读取源表失败: {e}"}
_print_source_preview(df_source_p)
return _run_puling_after_load(
df_source_p,
target_path,
audit_date_str,
yname=yname,
product_groups=product_groups,
source_audit_col=source_audit_col,
)
def default_puling_target_path() -> str:
"""未传路径时:code/cache/puling_{时间戳}.xlsx"""
code_root = Path(__file__).resolve().parents[3]
d = code_root / "cache"
d.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
return str(d / f"puling_{ts}.xlsx")
def default_chengyu_target_path() -> str:
"""未传路径时:code/cache/chengyu_{时间戳}.xlsx"""
code_root = Path(__file__).resolve().parents[3]
d = code_root / "cache"
d.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
return str(d / f"chengyu_{ts}.xlsx")
# clean_data 项目说明(索引)
面向 Agent 与人类:改代码前先读本索引,再点进对应模块文档。实现后若行为有变,请同步更新本文或子文档。
## 仓库结构(概览)
| 路径 | 职责 |
|------|------|
| `code/` | 运行时代码:FastAPI、`api/``utils/`、稽查转换脚本目录 `py_/audit/...` |
| `code/cache/` | 默认团队转换输出目录(`team_时间戳.xlsx`) |
| `docs/` | 本索引与各模块说明 |
## 模块文档
| 文档 | 内容 |
|------|------|
| [api.md](api.md) | HTTP API、路由、统一响应与错误映射 |
| [team-conversion.md](team-conversion.md) | 团队宽表 URL → 窄表 → `合并后` sheet 流水线 |
| [utils.md](utils.md) | `utils/dates``utils/excel_http` 职责 |
## 架构(逻辑分层)
```mermaid
flowchart TB
subgraph http [HTTP]
A[FastAPI index.py]
R[api/routes_clean.py]
A --> R
end
subgraph load [加载]
L[api/team_conversion_loader.py]
R --> L
end
subgraph biz [业务转换]
D[py_/audit/point_sale/data_conversion.py]
L --> D
end
subgraph util [工具]
U1[utils/excel_http]
U2[utils/dates]
D --> U1
D --> U2
end
D --> X[(本地 xlsx 合并后)]
```
## 数据流(当前已接线:团队清洗)
```mermaid
sequenceDiagram
participant C as 客户端
participant API as POST /api/v1/clean
participant R as routes_clean
participant T as team_conversion_loader
participant D as data_conversion.run_team_conversion
participant URL as team_url xlsx
participant FS as 目标 xlsx
C->>API: JSON body
API->>R: 校验 department / team_url
R->>T: run_team_conversion(...)
T->>D: 动态加载并调用
D->>URL: HTTP 读宽表
D->>D: 宽转窄 + 临期/新鲜度等
D->>FS: 写入/覆盖「合并后」sheet
D-->>R: dict ok / error
R-->>C: ApiEnvelope 或 HTTP 错误
```
## 需求 / 能力拆分(现状)
- **已落地**`department = 风控稽查数据清洗` + `team_url` → 团队宽表转换并落盘;可选 `year/month/day` 拼稽查日期;可选 `team_target_path`(默认 `code/cache/team_{时间戳}.xlsx`)。
- **请求体预留**`puling_url``chengyu_url` 等在 `api/schemas.py` 中已声明,**当前路由未使用**;后续接线时再在子文档中补充数据流(调试中的脚本不写入本文档,除非明确纳入)。
## 运行入口说明
- FastAPI 应用实例在 **`code/index.py`**`app`
- `code/README.md` 中若仍为 `uvicorn main:app`,以实际文件为准:一般在 `code` 目录执行 `uvicorn index:app`(或配置等价模块路径)。
## 维护约定
- 改接口契约、状态码语义、转换步骤或输出文件格式 → 更新 [api.md](api.md) / [team-conversion.md](team-conversion.md)
- 新增独立工具模块 → 更新 [utils.md](utils.md) 并在上表增加一行链接。
# HTTP API(`code/api/`)
## 应用装配
- **`code/index.py`**`FastAPI` 实例 `app`,注册 `HTTPException` / `RequestValidationError` 处理器,挂载 `api_router`
## 路由
- **前缀**`/api`
- **清洗**`POST /api/v1/clean``routes_clean.py`
### POST /api/v1/clean
- **department**:必须为 **`风控稽查数据清洗`**(常量 `DEPARTMENT_RISK_AUDIT_CLEAN`),否则 400。
- **team_url / puling_url / chengyu_url****至少一个**非空;各 URL 须为 `http://``https://`(由下游校验),否则 400;读 URL 失败等映射规则见下。
- **team_url**:若提供 → `team_conversion_loader` 加载的 **`data_conversion.run_team_conversion`**
- **puling_url**:若提供 → **`data_chengyu_puling.run_puling_conversion`**`yname=浦零``PRODUCT_GROUPS`)。
- **chengyu_url**:若提供 → 同一脚本 **`run_puling_conversion`**`yname=诚予``PRODUCT_GROUPS_CY`)。
- **team_target_path**:可选;仅团队分支使用;为空则 `default_team_target_path()``code/cache/team_{时间戳}.xlsx`
- **浦零 / 诚予** 输出路径:分别为 `default_puling_target_path()``default_chengyu_target_path()``code/cache/puling_*.xlsx``chengyu_*.xlsx`
- **year / month / day**:可选;若均提供则拼为 `YYYYMMDD` 传入各清洗分支作为稽查日期线索。
请求体模型见 **`code/api/schemas.py`**`CleanRequestBody`)。
**成功时 `data` 形态**`{ "team": dict | null, "puling": dict | null, "chengyu": dict | null }`,仅对本次请求中**非空 URL** 对应的分支写入结果,其余为 `null`
## 统一响应
- 成功封装:**`ApiEnvelope`**`code/api/response.py`):`code=0``data``msg`
- `HTTPException` / 校验失败:**`exception_handlers.py`** 将详情映射为同类 JSON(`code` 可能为 HTTP 状态码或 422)。
## 团队转换失败 → HTTP
`_raise_http_for_failed_result``routes_clean.py`)根据返回 `dict``error` 文案区分:
-`source_url 须为` → 400
- 含「从 URL 读取源表失败」或「读取源表失败」前缀 → 502
- `message` 存在且无 `error`(如无有效数据)→ 不抛异常,正常包进 `ApiEnvelope`
## 相关文件
- `routes_clean.py``schemas.py``response.py``exception_handlers.py``team_conversion_loader.py``chengyu_puling_loader.py`
# 团队稽查宽表 → 窄表(`data_conversion.py`)
## 位置与加载方式
- 脚本路径:**`code/py_/audit/point_sale/data_conversion.py`**
- **`api/team_conversion_loader.py`**`importlib` 按文件路径动态加载,导出 **`run_team_conversion`**,避免包名与历史中文文件名纠缠。
## 入口:`run_team_conversion`
参数要点:
- **`source_url`**:团队宽表 xlsx 的 URL;须 `http(s)`,否则返回 `ok: False`
- **`target_path`**:输出 xlsx 路径。
- **`audit_date_str`**:可选;与宽表/目标表内稽查日期列协同解析(见 `_resolve_audit_date`)。
流程摘要:
1. **`read_team_source_from_url`**`utils/excel_http.read_excel_from_url_skip1_with_header_row` — 跳过第 1 行,第 2 行起为数据;从第 1 行识别「稽核日期/稽查日期」列索引,失败则用回退列索引。
2. **`main`**:读或建目标工作簿的 **`合并后`** sheet,按 **`PRODUCT_GROUPS_JC`** 将「价格 + 多口味生产月份列」展开为多行窄表,写门店维度列,计算 **临期 / 大日期 / 新鲜度**`rDate`)。
3. 返回 `dict``ok``records_added``target_file``error` / `message`
## 依赖
- **pandas / openpyxl / python-dateutil**(见 `code/requirements.txt`
- **`code/utils/dates.py`****`code/utils/excel_http.py`**`data_conversion` 会把 `code` 上级目录插入 `sys.path` 以 import `utils`
## 与 HTTP 的衔接
[api.md](api.md)`POST /api/v1/clean` 在校验通过后调用 `run_team_conversion(team_url, team_target, audit_date_str)`
# 工具层(`code/utils/`)
## `excel_http.py`
- **`read_excel_from_url`**`urllib` 拉取 xlsx 到内存,`pandas.read_excel`,可配置 `skiprows` / `header`
- **`read_excel_from_url_skip1_with_header_row`**:团队宽表专用 — 返回 **(数据 DataFrame, 第 1 行表头 Series)**,数据从第 2 行起、`header=None`,列下标与业务 `iloc` 约定对齐。
## `dates.py`
- **`to_yyyy_mm_dd`**:单元格值 → `YYYY-MM-DD`
- **`first_yyyy_mm_dd_in_iloc` / `first_yyyy_mm_dd_in_dataframe`**:在宽表或目标表中解析稽查日期。
- **`normalize_year_month_to_day01`**:生产月份字符串规范为 `YYYY-MM-01`
- **`approx_gap_months_calendar`**:到期日相对稽查日的剩余月数近似(供临期/新鲜度逻辑使用)。
业务侧主要使用者:**`py_/audit/point_sale/data_conversion.py`**
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论