提交 5f290d27 authored 作者: lidongxu's avatar lidongxu

重构售点稽查数据上传清洗

上级 730c39e8
---
name: project-docs-workflow
description: 本仓库改代码时:按需读文档与代码,不必每次通读 docs;但必须在 docs/ 留下清晰、可读的变更记录(人机共用的工作痕迹)。适用于改 code/、API、清洗/稽查流水线;用户提到文档同步、变更记录、给以后留档时使用。
---
# 项目文档:按需阅读 + 必留痕迹
## 开工前(自行判断,不强制通读 docs)
- **不必**每次先读遍 `docs/` 下所有 md。根据需求复杂度决定:
- **小改、路径明确**(例如只改某个已知路由、某个函数):直接读**将要修改的源码**即可;若对话/缓存里已有上下文,可优先复用。
- **跨模块、新能力、不熟悉目录**:再打开 **`docs/PROJECT_INDEX.md`** 定位模块,按需点开 **`docs/api.md`****与本次任务相关**的文档,避免为找文件而读完整个架构说明。
- 用户规则「改代码前先读目标文件」仍然遵守:动手改的文件要自己读当前内容。
## 完工后(必须做:docs 里留记录)
- **每次**因需求产生的行为/接口/配置/数据流变化,都要在 **`docs/`** 里落到**人能读懂**的文字,方便以后你、我或其他协作者追溯。
- **写法**:用完整短句,写清「改了什么、为什么、怎么用(如环境变量、路径)」;避免只贴函数名而不说明语义。可适度用列表、小标题,**不必**大段复制源码。
- **落点**
- 有对应模块文档的(如 API → `docs/api.md`,库配置 → `docs/database.md`)→ **更新该文件**相关小节。
- 触及索引结构或新模块 → 同步 **`docs/PROJECT_INDEX.md`**(链接、一句话职责)。
- 若一次改动分散在多处文档不好归类,可在 **`docs/`** 下增加一篇**简短变更纪要**(例如 `docs/changelog.md` 追加一条带日期的条目,或 `docs/notes/YYYY-MM-DD_主题.md`),避免「改了代码但文档零记录」。
## 文档原则
- **简洁**:架构、数据流仍可用短句 + mermaid;细节用文件路径指向代码。
- **渐进**:本 Skill 保持短小;长说明写在 `docs/`
- **调试中脚本****勿**`code/py_/audit/point_sale/data_chengyu_puling.py` 等临时调试脚本写进项目文档,除非用户明确要求纳入。
## 文档根路径
- 索引:`docs/PROJECT_INDEX.md`
cd code
uvicorn index:app --host 0.0.0.0 --port 8000 --reload
\ No newline at end of file
# HTTP 路由与子模块
# 按业务域划分的 API 子包
# 售点稽查:清洗、低价稽查、risk_audit_visit CRUD
"""动态加载浦零/诚予宽表转换脚本(与 team_conversion_loader 同方式)。"""
import importlib.util
from pathlib import Path
from typing import Any, Callable
_CODE_BASE = Path(__file__).resolve().parent.parent.parent.parent
_CP_SCRIPT = _CODE_BASE / "py_" / "audit" / "point_sale" / "data_chengyu_puling.py"
def _load_mod() -> Any:
spec = importlib.util.spec_from_file_location("chengyu_puling_data", _CP_SCRIPT)
if spec is None or spec.loader is None:
raise RuntimeError(f"无法加载浦零/诚予转换模块: {_CP_SCRIPT}")
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
if getattr(mod, "run_puling_conversion", None) is None:
raise RuntimeError("data_chengyu_puling 中缺少 run_puling_conversion")
return mod
_mod = _load_mod()
run_puling_conversion: Callable[..., dict[str, Any]] = _mod.run_puling_conversion
default_puling_target_path: Callable[[], str] = _mod.default_puling_target_path
default_chengyu_target_path: Callable[[], str] = _mod.default_chengyu_target_path
PRODUCT_GROUPS: list = _mod.PRODUCT_GROUPS
PRODUCT_GROUPS_CY: list = _mod.PRODUCT_GROUPS_CY
"""动态加载低价稽查模块(`py_/audit/point_sale/low_price.py`)。"""
import importlib.util
from pathlib import Path
from typing import Any, Callable
_CODE_BASE = Path(__file__).resolve().parent.parent.parent.parent
_LOW_SCRIPT = _CODE_BASE / "py_" / "audit" / "point_sale" / "low_price.py"
def _load_run_low_price_audit() -> Callable[..., dict[str, Any]]:
spec = importlib.util.spec_from_file_location("low_price_audit", _LOW_SCRIPT)
if spec is None or spec.loader is None:
raise RuntimeError(f"无法加载低价稽查模块: {_LOW_SCRIPT}")
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
fn = getattr(mod, "run_low_price_audit", None)
if fn is None:
raise RuntimeError("low_price 中缺少 run_low_price_audit")
return fn
run_low_price_audit: Callable[..., dict[str, Any]] = _load_run_low_price_audit()
"""清洗相关 HTTP 路由:校验入参、团队/浦零/诚予转换、映射业务错误到 HTTP 状态码。"""
import os
import tempfile
from pathlib import Path
from typing import Any
from fastapi import APIRouter, HTTPException
from api.audit.point_sale.chengyu_puling_loader import (
PRODUCT_GROUPS,
PRODUCT_GROUPS_CY,
default_chengyu_target_path,
default_puling_target_path,
run_puling_conversion,
)
from api.audit.point_sale.low_price_loader import run_low_price_audit
from api.audit.point_sale.schemas import CleanRequestBody
from api.audit.point_sale.team_conversion_loader import default_team_target_path, run_team_conversion
from api.response import ApiEnvelope, ok
from api.settings import get_settings
from utils.clean_output_merge import (
default_merged_target_path,
read_merged_dataframe,
write_merged_dataframe,
)
DEPARTMENT_RISK_AUDIT_CLEAN = "售点稽查低价数据清洗"
api_router = APIRouter(prefix="/api")
def _audit_date_str_from_body(body: CleanRequestBody) -> str | None:
if body.year is None or body.month is None or body.day is None:
return None
return f"{body.year:04d}{body.month:02d}{body.day:02d}"
def _temp_xlsx_path(cleanup_list: list[str]) -> str:
fd, path = tempfile.mkstemp(suffix=".xlsx")
os.close(fd)
cleanup_list.append(path)
return path
def _raise_http_for_failed_result(result: dict) -> None:
"""团队转换返回 ok=False 时,按 error 文案选择状态码。"""
err = result.get("error") or ""
if "source_url 须为" in err:
raise HTTPException(status_code=400, detail=result)
if "从 URL 读取源表失败" in err or err.startswith("读取源表失败"):
raise HTTPException(status_code=502, detail=result)
if result.get("message") and "error" not in result:
return
raise HTTPException(status_code=500, detail=result)
@api_router.post("/v1/clean", response_model=ApiEnvelope)
def post_clean(body: CleanRequestBody) -> ApiEnvelope:
dept = (body.department or "").strip()
if dept != DEPARTMENT_RISK_AUDIT_CLEAN:
raise HTTPException(
status_code=400,
detail={
"ok": False,
"error": f"不支持的 department: {dept!r},当前仅支持「{DEPARTMENT_RISK_AUDIT_CLEAN}」",
},
)
team_url = (body.team_url or "").strip()
puling_url = (body.puling_url or "").strip()
chengyu_url = (body.chengyu_url or "").strip()
if not team_url and not puling_url and not chengyu_url:
raise HTTPException(
status_code=400,
detail={
"ok": False,
"error": "team_url、puling_url、chengyu_url 至少填写一个非空地址",
},
)
audit_date_str = _audit_date_str_from_body(body)
settings = get_settings()
write_cache = settings.effective_clean_write_cache()
cleanup_tmp_xlsx: list[str] = []
def branch_target(explicit: str, shared: str | None, default_cache_fn) -> str:
e = (explicit or "").strip()
if e:
return e
if shared:
return shared
if write_cache:
return default_cache_fn()
return _temp_xlsx_path(cleanup_tmp_xlsx)
try:
# 分支结果仅用于过程校验与收集 part_paths,成功时不再整包返回前端
data: dict[str, Any] = {"team": None, "puling": None, "chengyu": None}
# 与团队同一次请求时,浦零/诚予写入同一 xlsx,才能从「合并后」读到团队行的稽查日期作 ad 兜底
shared_target_after_team: str | None = None
if team_url:
team_target = branch_target(
body.team_target_path or "", None, default_team_target_path
)
shared_target_after_team = team_target
r = run_team_conversion(team_url, team_target, audit_date_str)
if not r.get("ok"):
_raise_http_for_failed_result(r)
data["team"] = r
if puling_url:
puling_target = branch_target(
"", shared_target_after_team, default_puling_target_path
)
r = run_puling_conversion(
puling_url,
puling_target,
audit_date_str,
yname="浦零",
product_groups=PRODUCT_GROUPS,
)
if not r.get("ok"):
_raise_http_for_failed_result(r)
data["puling"] = r
if chengyu_url:
chengyu_target = branch_target(
"", shared_target_after_team, default_chengyu_target_path
)
r = run_puling_conversion(
chengyu_url,
chengyu_target,
audit_date_str,
yname="诚予",
product_groups=PRODUCT_GROUPS_CY,
)
if not r.get("ok"):
_raise_http_for_failed_result(r)
data["chengyu"] = r
# 凡有成功落盘的分支,将「合并后」sheet 汇总到 merged_*.xlsx(1 路即单表写入,多路则纵向拼接)
part_paths: list[str] = []
seen_paths: set[str] = set()
for key in ("team", "puling", "chengyu"):
br = data.get(key)
if isinstance(br, dict) and br.get("ok") and br.get("target_file"):
p = str(br["target_file"])
if p not in seen_paths:
seen_paths.add(p)
part_paths.append(p)
success_payload: dict[str, Any] = {}
if part_paths:
try:
merged_df = read_merged_dataframe(part_paths)
except Exception as e:
raise HTTPException(
status_code=500,
detail={"ok": False, "error": f"读取待合并文件失败: {e}"},
) from e
if write_cache:
merged_path = default_merged_target_path()
try:
write_merged_dataframe(merged_df, merged_path)
except Exception as e:
raise HTTPException(
status_code=500,
detail={"ok": False, "error": f"写入合并文件失败: {e}"},
) from e
lp = run_low_price_audit(
merged_df,
settings.mysql_connect_kwargs(),
write_result_xlsx=write_cache,
)
if not lp.get("ok"):
raise HTTPException(status_code=502, detail=lp)
success_payload = {
"merged_rows": int(len(merged_df)),
"risk_audit_visit_rows": lp["risk_audit_visit_rows"],
"risk_audit_visit_collapsed_duplicate_rows": lp[
"risk_audit_visit_collapsed_duplicate_rows"
],
"risk_audit_visit_overwritten_rows": lp[
"risk_audit_visit_overwritten_rows"
],
}
return ok(data=success_payload, msg="成功")
finally:
if not write_cache:
for p in cleanup_tmp_xlsx:
try:
Path(p).unlink(missing_ok=True)
except OSError:
pass
"""`risk_audit_visit` 分页列表、详情、按主键全量更新与删除。"""
from __future__ import annotations
import mysql.connector
from mysql.connector import errors as mysql_errors
from fastapi import APIRouter, HTTPException, Query
from fastapi.encoders import jsonable_encoder
from api.audit.point_sale.schemas import RiskAuditVisitReplaceBody
from api.response import ApiEnvelope, ok
from api.settings import get_settings
risk_audit_visit_router = APIRouter(prefix="/api")
_RAV_COLS = [
"audit_date",
"source",
"region_name",
"district_name",
"dealer_code",
"dealer_name",
"store_code",
"store_name",
"f_emp_no",
"f_emp_name",
"qin_ce_type_large",
"jh_channel_type",
"city",
"channel_type",
"series",
"taste",
"weight",
"price",
"low_price",
"low_price_diff",
"low_price_status",
"low_price_rectify",
"production_month",
"near_month_num",
"near_month_status",
"fresh_status",
"large_date_status",
"large_date_rectify",
]
_SELECT_ROW = f"SELECT rav_id, {', '.join(_RAV_COLS)} FROM risk_audit_visit WHERE rav_id = %s"
_UPDATE_ROW = (
f"UPDATE risk_audit_visit SET {', '.join(f'{c}=%s' for c in _RAV_COLS)} WHERE rav_id = %s"
)
_MAX_PAGE_SIZE = 200
_DEFAULT_PAGE_SIZE = 20
def _connect():
try:
return mysql.connector.connect(**get_settings().mysql_connect_kwargs())
except mysql_errors.Error as e:
raise HTTPException(
status_code=502,
detail={"ok": False, "error": f"数据库连接失败: {e}"},
) from e
def _replace_body_to_params(body: RiskAuditVisitReplaceBody) -> tuple:
return (
body.audit_date,
body.source,
body.region_name,
body.district_name,
body.dealer_code,
body.dealer_name,
body.store_code,
body.store_name,
body.f_emp_no,
body.f_emp_name,
body.qin_ce_type_large,
body.jh_channel_type,
body.city,
body.channel_type,
body.series,
body.taste,
body.weight,
body.price,
body.low_price,
body.low_price_diff,
body.low_price_status,
body.low_price_rectify,
body.production_month,
body.near_month_num,
body.near_month_status,
body.fresh_status,
body.large_date_status,
body.large_date_rectify,
)
@risk_audit_visit_router.get("/v1/risk-audit-visit", response_model=ApiEnvelope)
def list_risk_audit_visit(
page: int = Query(1, ge=1, description="页码,从 1 开始"),
page_size: int = Query(
_DEFAULT_PAGE_SIZE,
ge=1,
le=_MAX_PAGE_SIZE,
description=f"每页条数,最大 {_MAX_PAGE_SIZE}",
),
) -> ApiEnvelope:
offset = (page - 1) * page_size
conn = _connect()
try:
cur = conn.cursor(dictionary=True)
cur.execute("SELECT COUNT(*) AS c FROM risk_audit_visit")
total_row = cur.fetchone()
total = int(total_row["c"]) if total_row else 0
cur.execute(
f"SELECT rav_id, {', '.join(_RAV_COLS)} FROM risk_audit_visit ORDER BY rav_id DESC LIMIT %s OFFSET %s",
(page_size, offset),
)
rows = cur.fetchall() or []
finally:
conn.close()
return ok(
data=jsonable_encoder(
{
"items": rows,
"total": total,
"page": page,
"page_size": page_size,
}
),
msg="成功",
)
@risk_audit_visit_router.get("/v1/risk-audit-visit/{rav_id}", response_model=ApiEnvelope)
def get_risk_audit_visit(rav_id: int) -> ApiEnvelope:
conn = _connect()
try:
cur = conn.cursor(dictionary=True)
cur.execute(_SELECT_ROW, (rav_id,))
row = cur.fetchone()
finally:
conn.close()
if not row:
raise HTTPException(
status_code=404,
detail={"ok": False, "error": f"记录不存在: rav_id={rav_id}"},
)
return ok(data=jsonable_encoder(row), msg="成功")
@risk_audit_visit_router.put("/v1/risk-audit-visit/{rav_id}", response_model=ApiEnvelope)
def put_risk_audit_visit(rav_id: int, body: RiskAuditVisitReplaceBody) -> ApiEnvelope:
params = (*_replace_body_to_params(body), rav_id)
conn = _connect()
try:
cur = conn.cursor()
try:
cur.execute(_UPDATE_ROW, params)
except mysql_errors.IntegrityError as e:
conn.rollback()
if getattr(e, "errno", None) == 1062:
raise HTTPException(
status_code=409,
detail={"ok": False, "error": f"唯一键冲突,无法保存: {e}"},
) from e
raise HTTPException(
status_code=502,
detail={"ok": False, "error": f"写入失败: {e}"},
) from e
if cur.rowcount == 0:
conn.rollback()
raise HTTPException(
status_code=404,
detail={"ok": False, "error": f"记录不存在: rav_id={rav_id}"},
)
conn.commit()
finally:
conn.close()
return ok(data={"rav_id": rav_id, "updated": True}, msg="成功")
@risk_audit_visit_router.delete("/v1/risk-audit-visit/{rav_id}", response_model=ApiEnvelope)
def delete_risk_audit_visit(rav_id: int) -> ApiEnvelope:
conn = _connect()
try:
cur = conn.cursor()
cur.execute("DELETE FROM risk_audit_visit WHERE rav_id = %s", (rav_id,))
if cur.rowcount == 0:
conn.rollback()
raise HTTPException(
status_code=404,
detail={"ok": False, "error": f"记录不存在: rav_id={rav_id}"},
)
conn.commit()
finally:
conn.close()
return ok(data={"rav_id": rav_id, "deleted": True}, msg="成功")
"""读取 fortune-hub.transfer_url 默认第一行的 url_link。"""
from __future__ import annotations
import mysql.connector
from mysql.connector import errors as mysql_errors
from fastapi import APIRouter, HTTPException
from api.response import ApiEnvelope, ok
from api.settings import get_settings
url_link_router = APIRouter(prefix="/api")
def _fortune_hub_connect():
connect_kwargs = get_settings().mysql_connect_kwargs().copy()
connect_kwargs["database"] = "fortune-hub"
try:
return mysql.connector.connect(**connect_kwargs)
except mysql_errors.Error as e:
raise HTTPException(
status_code=502,
detail={"ok": False, "error": f"数据库连接失败: {e}"},
) from e
@url_link_router.get("/v1/url-link", response_model=ApiEnvelope)
def get_url_link() -> ApiEnvelope:
conn = _fortune_hub_connect()
try:
cur = conn.cursor(dictionary=True)
cur.execute("SELECT url_link FROM transfer_url LIMIT 1")
row = cur.fetchone()
except mysql_errors.Error as e:
raise HTTPException(
status_code=502,
detail={"ok": False, "error": f"查询 transfer_url 失败: {e}"},
) from e
finally:
conn.close()
if not row:
raise HTTPException(
status_code=404,
detail={"ok": False, "error": "transfer_url 没有可用数据"},
)
return ok(data={"url_link": row["url_link"]}, msg="成功")
"""动态加载团队转换脚本(历史路径/中文文件名),对外只暴露可调用入口与路径工具。"""
import importlib.util
from datetime import datetime
from pathlib import Path
from typing import Any, Callable
_CODE_BASE = Path(__file__).resolve().parent.parent.parent.parent
_TEAM_SCRIPT = _CODE_BASE / "py_" / "audit" / "point_sale" / "data_conversion.py"
def _load_run_team_conversion() -> Callable[..., dict[str, Any]]:
spec = importlib.util.spec_from_file_location("team_data_convert", _TEAM_SCRIPT)
if spec is None or spec.loader is None:
raise RuntimeError(f"无法加载团队转换模块: {_TEAM_SCRIPT}")
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
fn = getattr(mod, "run_team_conversion", None)
if fn is None:
raise RuntimeError("data_conversion 中缺少 run_team_conversion")
return fn
run_team_conversion: Callable[..., dict[str, Any]] = _load_run_team_conversion()
def default_team_target_path() -> str:
"""未传路径时:cache/team_{时间戳}.xlsx"""
d = _CODE_BASE / "cache"
d.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
return str(d / f"team_{ts}.xlsx")
"""将异常与校验失败统一为 { code, data, msg } 响应体。"""
from typing import Any
from fastapi import Request
from fastapi.encoders import jsonable_encoder
from fastapi.exceptions import HTTPException, RequestValidationError
from fastapi.responses import JSONResponse
def _msg_from_detail(detail: str | dict | list | None) -> str:
if detail is None:
return "请求失败"
if isinstance(detail, str):
return detail
if isinstance(detail, dict):
return str(detail.get("error") or detail.get("msg") or detail.get("message") or "请求失败")
return "参数校验失败"
def _data_from_detail(detail: str | dict | list | None) -> Any:
if isinstance(detail, (dict, list)):
return jsonable_encoder(detail)
return None
async def http_exception_handler(request: Request, exc: HTTPException) -> JSONResponse:
body = {
"code": exc.status_code,
"data": _data_from_detail(exc.detail),
"msg": _msg_from_detail(exc.detail),
}
return JSONResponse(status_code=exc.status_code, content=body)
async def validation_exception_handler(request: Request, exc: RequestValidationError) -> JSONResponse:
errors = jsonable_encoder(exc.errors())
return JSONResponse(
status_code=422,
content={"code": 422, "data": errors, "msg": "参数校验失败"},
)
"""统一 API 响应:code=200 成功;失败时 code 多为 HTTP 状态码(如 400、422、502);data 为载荷;msg 为说明。"""
from typing import Any
from pydantic import BaseModel, Field
class ApiEnvelope(BaseModel):
code: int = Field(..., description="200 成功,否则为错误码(常与 HTTP 状态一致)")
data: Any = None
msg: str = ""
model_config = {"json_schema_extra": {"example": {"code": 200, "data": {}, "msg": "成功"}}}
def ok(data: Any = None, msg: str = "") -> ApiEnvelope:
return ApiEnvelope(code=200, data=data, msg=msg)
差异被折叠。
......@@ -21,6 +21,7 @@ class CleanRequestBody(BaseModel):
class RiskAuditVisitReplaceBody(BaseModel):
"""PUT 全量覆盖:须提交全部键,值为 null 表示写入 NULL。"""
seq_no: str = Field(..., max_length=50, description="DQ 序号(唯一约束)")
audit_date: date | None
source: str | None = Field(..., max_length=20)
region_name: str | None = Field(..., max_length=20)
......
"""从环境变量与 code/.env 读取配置。本地用 code/.env,线上用进程/容器注入同名变量。"""
from functools import lru_cache
from pathlib import Path
from typing import Any
from pydantic_settings import BaseSettings, SettingsConfigDict
_CODE_DIR = Path(__file__).resolve().parents[1]
_PROD_ENV = frozenset({"production", "prod"})
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=_CODE_DIR / ".env",
env_file_encoding="utf-8",
extra="ignore",
)
# development:本地/测试库;production:线上(由部署环境覆盖变量)
environment: str = "development"
# None:按 environment 推断(production/prod → 不写 cache);true/false 强制开关(环境变量 CLEAN_WRITE_CACHE)
clean_write_cache: bool | None = None
db_host: str = ""
db_port: int = 3306
db_name: str = ""
db_user: str = ""
db_password: str = ""
def mysql_connect_kwargs(self) -> dict[str, Any]:
"""供 mysql.connector / PyMySQL 等使用。"""
return {
"host": self.db_host,
"port": self.db_port,
"database": self.db_name,
"user": self.db_user,
"password": self.db_password,
}
def effective_clean_write_cache(self) -> bool:
"""是否写入 code/cache 下清洗产物(团队/合并/low_price 等 xlsx)。"""
if self.clean_write_cache is not None:
return self.clean_write_cache
return (self.environment or "").strip().lower() not in _PROD_ENV
@lru_cache
def get_settings() -> Settings:
return Settings()
"""项目配置:数据库、路径、超时等常量。"""
# ── 数据库 ──
DB_HOST = "192.168.100.39"
DB_PORT = 25301
DB_NAME = "market_bi"
DB_USER = "root"
DB_PASSWORD = "Zt%68Dsuv&M"
from fastapi import FastAPI
from fastapi.exceptions import HTTPException, RequestValidationError
from fastapi import FastAPI, APIRouter, Request
from fastapi.responses import JSONResponse
from api.audit.point_sale.routes_clean import api_router
from api.audit.point_sale.routes_risk_audit_visit import risk_audit_visit_router
from api.audit.point_sale.routes_url_link import url_link_router
from api.exception_handlers import http_exception_handler, validation_exception_handler
from api.settings import get_settings
from api.router import router
from utils.response import error
import config
get_settings()
app = FastAPI(title="CLEAN_DATA_TITLE")
app = FastAPI(title="Clean Data API")
app.add_exception_handler(HTTPException, http_exception_handler)
app.add_exception_handler(RequestValidationError, validation_exception_handler)
app.include_router(api_router)
app.include_router(risk_audit_visit_router)
app.include_router(url_link_router)
@app.exception_handler(404)
async def not_found_handler(request: Request, exc):
return JSONResponse(
status_code=404,
content=error(404, f"路径 {request.url.path} 不存在"),
)
api = APIRouter(prefix="/api")
api.include_router(router)
app.include_router(api)
......@@ -8,7 +8,7 @@ from typing import Any, Sequence
import pandas as pd
_CODE_BASE = Path(__file__).resolve().parents[1]
_CODE_BASE = Path(__file__).resolve().parents[2]
MERGED_SHEET_NAME = "合并后"
......
差异被折叠。
import shutil
from pathlib import Path
# code/cache 目录(dir_utils.py 在 py_/audit/point_sale/ 下,parents[2] = code/)
CACHE_DIR = Path(__file__).resolve().parents[2] / "cache"
def _clear_cache():
"""写入前清空 cache 文件夹"""
if not CACHE_DIR.exists():
return
for f in CACHE_DIR.iterdir():
if f.is_file():
f.unlink()
elif f.is_dir():
shutil.rmtree(f)
......@@ -17,6 +17,7 @@ if str(_CODE_BASE) not in sys.path:
from utils.dates import to_yyyy_mm_dd # noqa: E402
_RAV_DB_COLS = [
"seq_no",
"audit_date",
"source",
"region_name",
......@@ -48,6 +49,7 @@ _RAV_DB_COLS = [
]
_RAV_EXCEL_SPECS: list[tuple[str, str, Any]] = [
("seq_no", "序号", 50),
("audit_date", "稽查日期", "date"),
("source", "稽查来源", 20),
("region_name", "大区", 20),
......@@ -78,13 +80,8 @@ _RAV_EXCEL_SPECS: list[tuple[str, str, Any]] = [
("large_date_rectify", "大日期整改说明", 100),
]
# 与表 uk_biz 列一致:audit_date,source,store_name,channel_type,series,taste,weight,dealer_name
_RAV_UK_BIZ_INDICES = (0, 1, 7, 13, 14, 15, 16, 5)
# MySQL 唯一索引中 NULL 互不相等,会导致 ON DUPLICATE KEY 无法合并;空源数据先填占位再入库
_RAV_UK_BIZ_DEFAULT_DATE = date(1900, 1, 1)
_RAV_UK_BIZ_DEFAULT_STR = "-"
_RAV_UK_BIZ_STR_INDICES = (1, 7, 13, 14, 15, 16, 5) # uk 中 varchar 列下标(与 _RAV_DB_COLS 对齐)
# 与表 uk_seq_no 列一致:seq_no 为 DQ 行序号
_RAV_SEQ_NO_COL_INDEX = 0 # _RAV_DB_COLS 中 seq_no 的位置
_RAV_INSERT_SQL = (
f"INSERT INTO risk_audit_visit ({','.join(_RAV_DB_COLS)}) VALUES "
f"({','.join(['%s'] * len(_RAV_DB_COLS))}) ON DUPLICATE KEY UPDATE "
......@@ -135,19 +132,6 @@ def _rav_int(val: Any) -> int | None:
return int(x)
def _fill_uk_biz_defaults(tup: tuple[Any, ...]) -> tuple[Any, ...]:
lst = list(tup)
if lst[0] is None:
lst[0] = _RAV_UK_BIZ_DEFAULT_DATE
for i in _RAV_UK_BIZ_STR_INDICES:
if lst[i] is None:
spec = _RAV_EXCEL_SPECS[i][2]
mx = spec if isinstance(spec, int) else 20
s = _RAV_UK_BIZ_DEFAULT_STR[:mx]
lst[i] = s
return tuple(lst)
def _row_to_rav_tuple(row: pd.Series) -> tuple[Any, ...]:
out: list[Any] = []
for _, zh, spec in _RAV_EXCEL_SPECS:
......@@ -160,11 +144,11 @@ def _row_to_rav_tuple(row: pd.Series) -> tuple[Any, ...]:
out.append(_rav_int(raw))
else:
out.append(_rav_str(raw, spec))
return _fill_uk_biz_defaults(tuple(out))
return tuple(out)
def _uk_biz_from_rav_tuple(tup: tuple[Any, ...]) -> tuple[Any, ...]:
return tuple(tup[i] for i in _RAV_UK_BIZ_INDICES)
def _seq_no_from_rav_tuple(tup: tuple[Any, ...]) -> str | None:
return tup[_RAV_SEQ_NO_COL_INDEX]
def _serialize_rav_cell(val: Any) -> Any:
......@@ -181,13 +165,13 @@ def _serialize_rav_tuple(tup: tuple[Any, ...]) -> dict[str, Any]:
return {c: _serialize_rav_cell(v) for c, v in zip(_RAV_DB_COLS, tup)}
def _indices_overwritten_by_later_same_uk(rows: list[tuple[Any, ...]]) -> list[int]:
"""同多行按顺序 upsert 时,除最后一行外均被后行覆盖;返回这些行的下标(升序)。"""
uk_to_indices: defaultdict[tuple[Any, ...], list[int]] = defaultdict(list)
def _indices_overwritten_by_later_same_seq_no(rows: list[tuple[Any, ...]]) -> list[int]:
"""同 seq_no 多行按顺序 upsert 时,除最后一行外均被后行覆盖;返回这些行的下标(升序)。"""
seq_to_indices: defaultdict[str | None, list[int]] = defaultdict(list)
for i, t in enumerate(rows):
uk_to_indices[_uk_biz_from_rav_tuple(t)].append(i)
seq_to_indices[_seq_no_from_rav_tuple(t)].append(i)
out: list[int] = []
for idxs in uk_to_indices.values():
for idxs in seq_to_indices.values():
if len(idxs) > 1:
out.extend(idxs[:-1])
out.sort()
......@@ -200,16 +184,16 @@ def upsert_risk_audit_visit(
*,
batch_size: int = 500,
) -> dict[str, Any]:
"""全量 upsert;唯一键 uk_biz 含 dealer_name。返回 executed_rows、distinct_uk_biz、overwritten_rows。"""
"""全量 upsert;唯一键 seq_no 为 DQ 行序号。返回 executed_rows、distinct_seq_nos、overwritten_rows。"""
import mysql.connector
if df.empty:
return {"executed_rows": 0, "distinct_uk_biz": 0, "overwritten_rows": []}
return {"executed_rows": 0, "distinct_seq_nos": 0, "overwritten_rows": []}
work = df.copy()
work.columns = work.columns.str.strip()
rows = [_row_to_rav_tuple(work.iloc[i]) for i in range(len(work))]
uk_set = {_uk_biz_from_rav_tuple(t) for t in rows}
ow_idx = _indices_overwritten_by_later_same_uk(rows)
seq_set = {_seq_no_from_rav_tuple(t) for t in rows}
ow_idx = _indices_overwritten_by_later_same_seq_no(rows)
overwritten_rows = [_serialize_rav_tuple(rows[i]) for i in ow_idx]
conn = mysql.connector.connect(**mysql_connect_kwargs)
try:
......@@ -225,7 +209,7 @@ def upsert_risk_audit_visit(
conn.close()
return {
"executed_rows": len(rows),
"distinct_uk_biz": len(uk_set),
"distinct_seq_nos": len(seq_set),
"overwritten_rows": overwritten_rows,
}
......@@ -281,10 +265,10 @@ def audit_low_price_on_merged(
df_y = df_y.copy()
df_y.columns = df_y.columns.str.strip()
df_y["产品系列_clean"] = df_y.iloc[:, 14].apply(_clean_str)
df_y["产品克重_clean"] = df_y.iloc[:, 16].apply(_clean_str)
df_y["渠道类型_clean"] = df_y.iloc[:, 13].apply(_clean_str)
df_y["产品价格_num"] = pd.to_numeric(df_y.iloc[:, 17], errors="coerce")
df_y["产品系列_clean"] = df_y["产品系列"].apply(_clean_str)
df_y["产品克重_clean"] = df_y["产品克重"].apply(_clean_str)
df_y["渠道类型_clean"] = df_y["渠道类型(稽查源提供)"].apply(_clean_str)
df_y["产品价格_num"] = pd.to_numeric(df_y["产品价格"], errors="coerce")
df_y["match_key"] = (
df_y["产品系列_clean"]
......@@ -359,15 +343,15 @@ def run_low_price_audit(
return {"ok": False, "error": f"写入 risk_audit_visit 失败: {e}"}
flagged = int((result["是否低价"] == "低价").sum()) if "是否低价" in result.columns else 0
dup = rav["executed_rows"] - rav["distinct_uk_biz"]
dup = rav["executed_rows"] - rav["distinct_seq_nos"]
ow = rav["overwritten_rows"]
return {
"ok": True,
"low_price_target_file": str(out) if out else None,
"low_price_rows": int(len(result)),
"low_price_flagged_rows": flagged,
"risk_audit_visit_rows": rav["distinct_uk_biz"],
"risk_audit_visit_distinct_uk_biz": rav["distinct_uk_biz"],
"risk_audit_visit_rows": rav["distinct_seq_nos"],
"risk_audit_visit_distinct_seq_nos": rav["distinct_seq_nos"],
"risk_audit_visit_collapsed_duplicate_rows": dup,
"risk_audit_visit_overwritten_rows": ow,
}
import pandas as pd
def _cell_str(val) -> str:
return str(val).strip() if pd.notna(val) else ""
\ No newline at end of file
......@@ -44,40 +44,20 @@ def to_yyyy_mm_dd(val) -> str | None:
return None
return ts.strftime("%Y-%m-%d")
# 到期日相对检查日的剩余月数近似值(与原业务公式一致:年*12+月+日/30)。
def approx_gap_months_calendar(expiry_date, inspect_date) -> float:
diff_years = expiry_date.year - inspect_date.year
diff_months = expiry_date.month - inspect_date.month
diff_days = expiry_date.day - inspect_date.day
return diff_years * 12 + diff_months + diff_days / 30.0
def first_yyyy_mm_dd_in_iloc(df: pd.DataFrame, col_idx: int) -> str | None:
"""自上而下取第 col_idx 列首个可解析日期(宽表无表头时常用)。"""
if df is None or df.shape[1] <= col_idx or col_idx < 0:
return None
for val in df.iloc[:, col_idx]:
n = to_yyyy_mm_dd(val)
if n:
return n
return None
def month_boundary_from_audit_day(yyyy_mm_dd: str) -> str | None:
"""2 月:1–14 号 → 该月 1 日,15 号起 → 月末;其它月:1–15 → 月初,16 起 → 月末。输入须 YYYY-MM-DD。"""
if not yyyy_mm_dd or len(yyyy_mm_dd) < 10:
return None
try:
d = datetime.strptime(yyyy_mm_dd[:10], "%Y-%m-%d")
except ValueError:
return None
cutoff = 14 if d.month == 2 else 15
if d.day <= cutoff:
return d.replace(day=1).strftime("%Y-%m-%d")
last = monthrange(d.year, d.month)[1]
return d.replace(day=last).strftime("%Y-%m-%d")
# 按列名顺序找第一列,自上而下取首个可解析为日期的值;若无匹配列且允许则用第 3 列(下标 2)。
def first_yyyy_mm_dd_in_dataframe(
df: pd.DataFrame,
column_names: Sequence[str],
*,
third_column_fallback: bool = True,
) -> str | None:
"""按列名顺序找第一列,自上而下取首个可解析为日期的值;若无匹配列且允许则用第 3 列(下标 2)。"""
ser = None
if df is not None and df.shape[1] > 0:
for name in column_names:
......@@ -94,6 +74,31 @@ def first_yyyy_mm_dd_in_dataframe(
return n
return None
# 自上而下取第 col_idx 列首个可解析日期(宽表无表头时常用)
def first_yyyy_mm_dd_in_iloc(df: pd.DataFrame, col_idx: int) -> str | None:
if df is None or df.shape[1] <= col_idx or col_idx < 0:
return None
for val in df.iloc[:, col_idx]:
n = to_yyyy_mm_dd(val)
if n:
return n
return None
# 保质期计算需要用完整日期,但源表里的生产月份可能写的是 "2025-01" 或 "202501",不统一。这个函数统一转成 "YYYY-MM-01" 后,后续 _calc_expiry 才能用 datetime 做加减月份计算。
def normalize_year_month_to_day01(src_month):
if not isinstance(src_month, str):
return src_month
src_month = src_month.strip()
if not src_month:
return src_month
if re.fullmatch(r"\d{4}-\d{1,2}", src_month):
year, month = src_month.split("-")
return f"{year}-{month.zfill(2)}-01"
if re.fullmatch(r"\d{6}", src_month):
year = src_month[:4]
month = src_month[4:].zfill(2)
return f"{year}-{month}-01"
return src_month
def max_yyyy_mm_dd_in_dataframe(
df: pd.DataFrame,
......@@ -128,29 +133,5 @@ def max_yyyy_mm_dd_in_dataframe(
return best
def normalize_year_month_to_day01(src_month):
"""
生产月份类字符串 → YYYY-MM-01(供后续 strptime %Y-%m-%d)。
支持 yyyy-mm、yyyymm;其它类型/格式原样返回。
"""
if not isinstance(src_month, str):
return src_month
src_month = src_month.strip()
if not src_month:
return src_month
if re.fullmatch(r"\d{4}-\d{1,2}", src_month):
year, month = src_month.split("-")
return f"{year}-{month.zfill(2)}-01"
if re.fullmatch(r"\d{6}", src_month):
year = src_month[:4]
month = src_month[4:].zfill(2)
return f"{year}-{month}-01"
return src_month
def approx_gap_months_calendar(expiry_date, inspect_date) -> float:
"""到期日相对检查日的剩余月数近似值(与原业务公式一致:年*12+月+日/30)。"""
diff_years = expiry_date.year - inspect_date.year
diff_months = expiry_date.month - inspect_date.month
diff_days = expiry_date.day - inspect_date.day
return diff_years * 12 + diff_months + diff_days / 30.0
......@@ -12,29 +12,20 @@ def read_excel_from_url(
*,
timeout: float = 300,
user_agent: str = "clean-data-api/1.0",
skiprows: int = 0,
header=None,
skiprows: int = 0, # 跳过 Excel 前 N 行
header=None, # 指定哪一行作为列名,None 表示第一行
dtype=str,
) -> pd.DataFrame:
req = urllib.request.Request(url.strip(), headers={"User-Agent": user_agent})
return_header: bool = False, # True 时先读取首行作为表头
) -> pd.DataFrame | tuple[pd.DataFrame, pd.Series]:
req = urllib.request.Request(url.strip(), headers={"User-Agent": user_agent, "Connection": "close"})
with urllib.request.urlopen(req, timeout=timeout) as resp:
raw = resp.read()
return pd.read_excel(io.BytesIO(raw), skiprows=skiprows, header=header, dtype=dtype)
def read_excel_from_url_skip1_with_header_row(
url: str,
*,
timeout: float = 300,
user_agent: str = "clean-data-api/1.0",
dtype=str,
) -> tuple[pd.DataFrame, pd.Series]:
"""跳过第 1 行后的数据 + 被跳过的第 1 行(表头,0 基列与数据列对齐)。"""
req = urllib.request.Request(url.strip(), headers={"User-Agent": user_agent})
with urllib.request.urlopen(req, timeout=timeout) as resp:
raw = resp.read()
if return_header:
buf = io.BytesIO(raw)
header_df = pd.read_excel(buf, header=None, dtype=dtype, nrows=1)
buf.seek(0)
data_df = pd.read_excel(buf, skiprows=1, header=None, dtype=dtype)
return data_df, header_df.iloc[0]
return pd.read_excel(io.BytesIO(raw), skiprows=skiprows, header=header, dtype=dtype)
from typing import Any
def success(data: Any = None, msg: str = "ok") -> dict:
return {"code": 200, "data": data, "msg": msg}
def error(code: int = 500, msg: str = "请求失败", data: Any = None) -> dict:
return {"code": code, "data": data, "msg": msg}
# clean_data 项目说明(索引)
面向 Agent 与人类:不必每次通读本索引;不熟悉目录或跨模块任务时再打开本页定位文档。实现后若行为有变,请同步更新模块文档,并在 [changelog.md](changelog.md) 或对应 md 中留下可读变更说明(工作痕迹)。
## 仓库结构(概览)
| 路径 | 职责 |
|------|------|
| `code/` | 运行时代码:FastAPI、`api/`(通用装配与 `api/audit/point_sale/` 售点稽查)、`utils/`、稽查转换脚本 `py_/audit/...` |
| `code/cache/` | 清洗输出目录:`team_*.xlsx``puling_*.xlsx``chengyu_*.xlsx`;任一路成功落盘后另有汇总 `merged_*.xlsx` |
| `docs/` | 本索引与各模块说明 |
## 模块文档
| 文档 | 内容 |
|------|------|
| [api.md](api.md) | HTTP API、路由、统一响应与错误映射 |
| [team-conversion.md](team-conversion.md) | 团队宽表 URL → 窄表 → `合并后` sheet 流水线 |
| [utils.md](utils.md) | `utils/dates``utils/excel_http` 职责 |
| [database.md](database.md) | MySQL 环境变量、`settings` 与本地/生产约定 |
| [changelog.md](changelog.md) | 重要约定与行为变更的简短纪要(可追溯) |
## 架构(逻辑分层)
```mermaid
flowchart TB
subgraph http [HTTP]
A[FastAPI index.py]
R[api/audit/point_sale/routes_clean.py]
RV[api/audit/point_sale/routes_risk_audit_visit.py]
A --> R
A --> RV
end
subgraph load [加载]
L[api/audit/point_sale/team_conversion_loader.py]
R --> L
end
subgraph biz [业务转换]
D[py_/audit/point_sale/data_conversion.py]
L --> D
end
subgraph util [工具]
U1[utils/excel_http]
U2[utils/dates]
U3[utils/clean_output_merge]
D --> U1
D --> U2
end
R --> U3
D --> X[(各分支 xlsx)]
U3 --> M[(merged_*.xlsx)]
```
## 数据流(当前已接线:团队清洗)
```mermaid
sequenceDiagram
participant C as 客户端
participant API as POST /api/v1/clean
participant R as routes_clean
participant T as team_conversion_loader
participant D as data_conversion.run_team_conversion
participant URL as team_url xlsx
participant FS as 目标 xlsx
C->>API: JSON body
API->>R: 校验 department / team_url
R->>T: run_team_conversion(...)
T->>D: 动态加载并调用
D->>URL: HTTP 读宽表
D->>D: 宽转窄 + 临期/新鲜度等
D->>FS: 写入/覆盖「合并后」sheet
D-->>R: dict ok / error
R-->>C: ApiEnvelope 或 HTTP 错误
```
## 需求 / 能力拆分(现状)
- **已落地**`department = 售点稽查低价数据清洗``team_url` / `puling_url` / `chengyu_url` 至少其一,各路分别落盘;可选 `year/month/day``team_target_path`(仅团队分支)。
- **汇总 merged + 低价稽查**:同一次请求中只要有 ≥1 路清洗成功落盘,内存合并后写 `code/cache/merged_{时间戳}.xlsx`,并用同一份数据写 `code/cache/low_price_{时间戳}.xlsx`(价盘来自库表 `bi_price_xx`,见 `api.md`)。
## 运行入口说明
- FastAPI 应用实例在 **`code/index.py`**`app`
- `code/README.md` 中若仍为 `uvicorn main:app`,以实际文件为准:一般在 `code` 目录执行 `uvicorn index:app`(或配置等价模块路径)。
## 维护约定
- 改接口契约、状态码语义、转换步骤或输出文件格式 → 更新 [api.md](api.md) / [team-conversion.md](team-conversion.md)
- 新增独立工具模块 → 更新 [utils.md](utils.md) 并在上表增加一行链接。
- 跨文档难归类或需留「当时为什么改」→ 在 [changelog.md](changelog.md) 追加一条(日期 + 摘要);约定见 `.cursor/skills/project-docs-workflow/SKILL.md`
# HTTP API(`code/api/`,售点稽查实现在 `code/api/audit/point_sale/`)
## 应用装配
- **`code/index.py`**`FastAPI` 实例 `app`,注册 `HTTPException` / `RequestValidationError` 处理器,挂载 `api_router``risk_audit_visit_router``url_link_router`(均来自 `api/audit/point_sale/`)。
## 路由
- **前缀**`/api`
- **清洗**`POST /api/v1/clean``api/audit/point_sale/routes_clean.py`
- **稽查走访表 CRUD**`api/audit/point_sale/routes_risk_audit_visit.py``GET/PUT/DELETE /api/v1/risk-audit-visit...`,库表见 `code/sql_/risk_audit_visit.sql`
- **链接读取**`GET /api/v1/url-link``api/audit/point_sale/routes_url_link.py`,单独查 `fortune-hub.transfer_url`
### POST /api/v1/clean
- **department**:必须为 **`售点稽查低价数据清洗`**(常量 `DEPARTMENT_RISK_AUDIT_CLEAN`),否则 400。
- **team_url / puling_url / chengyu_url****至少一个**非空;各 URL 须为 `http://``https://`(由下游校验),否则 400;读 URL 失败等映射规则见下。
- **team_url**:若提供 → `team_conversion_loader` 加载的 **`data_conversion.run_team_conversion`**
- **puling_url**:若提供 → **`data_chengyu_puling.run_puling_conversion`**`yname=浦零``PRODUCT_GROUPS`)。
- **chengyu_url**:若提供 → 同一脚本 **`run_puling_conversion`**`yname=诚予``PRODUCT_GROUPS_CY`)。
- **team_target_path**:可选;团队分支落盘路径;为空且 **`Settings.effective_clean_write_cache()` 为 true** 时 → `default_team_target_path()``code/cache/team_{时间戳}.xlsx`;为 false 时(见下)使用系统临时目录下的 xlsx,请求结束后删除。
- **浦零 / 诚予** 输出路径:若**同次请求**已执行团队分支,则与团队**共用**上述 `team_target_path`(或默认 team / 临时路径);否则在「写 cache」模式下用 `default_puling_target_path()``default_chengyu_target_path()``code/cache/puling_*.xlsx``chengyu_*.xlsx`),否则同样用临时文件。
- **是否写 `code/cache`**:由 **`ENVIRONMENT`** 与可选 **`CLEAN_WRITE_CACHE`** 决定(见 [database.md](database.md))。`production` / `prod` 且未设 `CLEAN_WRITE_CACHE`**不写** cache:不写 `merged_*.xlsx``low_price_*.xlsx`,分支结果落在临时文件并在请求结束时删除;**仍写库** `risk_audit_visit``CLEAN_WRITE_CACHE=true` 可在生产强制留档;`CLEAN_WRITE_CACHE=false` 可在非生产关闭落盘。
- **汇总 merged + 低价稽查 + 写库 risk_audit_visit**:凡有 **≥1 路**清洗 `ok` 且带 `target_file`,按 **团队 → 浦零 → 诚予** 顺序收集路径,**去重**后读各文件 **「合并后」** sheet,列对齐纵向拼接;若写 cache 则写入 `default_merged_target_path()``code/cache/merged_{时间戳}.xlsx`**不再次读盘**:同一份内存中的合并 `DataFrame` 交给 `py_/audit/point_sale/low_price.py``run_low_price_audit`:从 MySQL **`bi_price_xx`** 拉价盘、比对低价;若写 cache 则另写 **`code/cache/low_price_{时间戳}.xlsx`**;再将**全量行** **`INSERT ... ON DUPLICATE KEY UPDATE`** 写入 **`risk_audit_visit`**(唯一键 `uk_biz``code/sql_/risk_audit_visit.sql`)。价盘读失败、或 **`risk_audit_visit` 写入失败** → HTTP **502**。若本次没有任何成功落盘,**不写库**,成功响应里 `data` 为空对象 `{}`
- **year / month / day**:可选;若均提供则拼为 `YYYYMMDD` 传入各清洗分支作为稽查日期线索。
请求体模型见 **`code/api/audit/point_sale/schemas.py`**`CleanRequestBody`)。
### risk_audit_visit CRUD
- **GET `/api/v1/risk-audit-visit`**:分页列表。查询参数 `page`(默认 1)、`page_size`(默认 20,最大 200)。成功时 `data``items`(行字典数组,含 `rav_id` 与全部业务列)、`total``page``page_size`;按 `rav_id` 降序。
- **GET `/api/v1/risk-audit-visit/{rav_id}`**:单条详情,返回该主键下**全部字段**`ApiEnvelope.data` 为一条完整记录)。不存在 → **404**
- **PUT `/api/v1/risk-audit-visit/{rav_id}`**:按主键**整行覆盖**更新。请求体为 **`RiskAuditVisitReplaceBody`**(与表业务列一致,**不含** `rav_id`;须提交全部字段键,值为 JSON `null` 表示写入 NULL)。`price``low_price_diff` 若前端传空字符串 `""`,与 `null` 同等视为写入 NULL。不存在 → **404**;违反唯一键 `uk_biz`**409**;库连接等失败 → **502**
- **DELETE `/api/v1/risk-audit-visit/{rav_id}`**:按主键删除。不存在 → **404**
### GET /api/v1/url-link
- 无请求参数。
- 连接信息仍取 `Settings.mysql_connect_kwargs()` 对应的主机、端口、账号、密码;**仅此接口**在发起连接前把 `database` 改为 **`fortune-hub`**,不影响其它接口仍使用 `DB_NAME`
- 查询表 **`transfer_url`**,直接取 **默认第一行**`SELECT url_link FROM transfer_url LIMIT 1`,不额外排序。
- 成功时返回:`data = {"url_link": "<第一行的 url_link>"}`
- 表为空时 → **404**;数据库连接/查询失败 → **502**
连接参数同清洗写库:`Settings.mysql_connect_kwargs()`(见 [database.md](database.md))。
**成功时 `data` 形态**:仅返回与本次合并入库相关的统计(无成功落盘需合并时 `data``{}`):
- `merged_rows`:多路「合并后」sheet 纵向拼接后的**总行数**(与低价稽查入参行数一致);
- `risk_audit_visit_rows`:按 `uk_biz` 去重后、与库中最终保留行数一致的有效行数(合并总行数 − `risk_audit_visit_collapsed_duplicate_rows`);
- `risk_audit_visit_collapsed_duplicate_rows`:数据源内同 `uk_biz` 多行折叠时,被覆盖条数(与「去重后 uk_biz 数」之差,语义见 `low_price.py`);
- `risk_audit_visit_overwritten_rows`:被覆盖行的明细数组(每项为字段名 → 值,日期为 ISO 字符串);同键仅最后一行留在库中,其余行出现在此列表。
## 统一响应
- 成功封装:**`ApiEnvelope`**`code/api/response.py`):`code=200``data``msg`
- `HTTPException` / 校验失败:**`exception_handlers.py`** 将详情映射为同类 JSON(`code` 可能为 HTTP 状态码或 422)。
## 团队转换失败 → HTTP
`_raise_http_for_failed_result``api/audit/point_sale/routes_clean.py`)根据返回 `dict``error` 文案区分:
-`source_url 须为` → 400
- 含「从 URL 读取源表失败」或「读取源表失败」前缀 → 502
- `message` 存在且无 `error`(如无有效数据)→ 不抛异常,正常包进 `ApiEnvelope`
## 相关文件
- **售点稽查**`api/audit/point_sale/``routes_clean.py``routes_risk_audit_visit.py``routes_url_link.py``schemas.py``team_conversion_loader.py``chengyu_puling_loader.py``low_price_loader.py`
- **通用**`response.py``exception_handlers.py``settings.py`
- 数据库环境变量说明见 [database.md](database.md)
# 变更纪要(工作痕迹)
按时间倒序记录重要约定与行为变化,便于以后查阅;细节仍以各模块文档与代码为准。
## 2026-03-28
- **`PUT /api/v1/risk-audit-visit/{rav_id}`**`RiskAuditVisitReplaceBody``price``low_price_diff` 在 Pydantic 层将 JSON 空字符串 `""` 规范为 `None`,避免前端用空串占位时触发 `decimal_parsing` 422;语义与 `null` 一致(写入 NULL)。
## 2026-03-27
- **`risk_audit_visit` 写入**`code/py_/audit/point_sale/low_price.py` 在组行入库前,对唯一键 `uk_biz` 八列中空值填占位(日期 `1900-01-01`、字符串 `-`),避免 MySQL 唯一索引对 `NULL` 不判等导致重复插入。
- **`POST /api/v1/clean` 统计语义**`merged_rows` 仍为**合并后物理总行数**`risk_audit_visit_rows` 表示按 `uk_biz` 去重后的有效入库行数(同键多行仅最后一行留在库);`risk_audit_visit_collapsed_duplicate_rows` 为「合并待写总行数 − 去重键数」。
- **脚本清理**:删除一次性 Navicat 备份转换脚本 **`scripts/transform_rav_backup_sql.py`**(仓库内无其它引用)。
- **API 目录结构**:售点稽查相关路由与 loader 迁入 **`code/api/audit/point_sale/`**(清洗、`risk_audit_visit` CRUD、团队/浦零/诚予加载器、低价稽查加载器、请求体模型);`code/api/` 根下保留通用层 **`response.py`、`exception_handlers.py`、`settings.py`**`code/index.py` 从子包挂载路由;HTTP 路径未变。
- **Agent Skill(`.cursor/skills/project-docs-workflow/SKILL.md`)**:开工前**不再强制**通读 `docs/` 下全部 md,由 Agent 按需求难度决定是否读索引/模块文档;**完工后仍必须在 `docs/` 留下可读变更说明**(更新模块文档或本文),作为人机共用的工作痕迹。
# 数据库配置(`code/api/settings.py`)
## 环境变量(同名,`.env` 或进程环境)
| 变量 | 说明 |
|------|------|
| `ENVIRONMENT` | `development`(本地/测试)或 `production` / `prod`(线上) |
| `CLEAN_WRITE_CACHE` | 可选。`true` / `false` 是否把 `POST /api/v1/clean` 的中间与结果 xlsx 写入 `code/cache`**未设置时**`production` / `prod` 不写(仅用系统临时文件并在请求结束后删除),其它环境写 cache。 |
| `DB_HOST` | 主机 |
| `DB_PORT` | 端口(整数) |
| `DB_NAME` | 库名 |
| `DB_USER` | 账号 |
| `DB_PASSWORD` | 密码 |
## 使用方式
1. **本地**:将 `code/.env.example` 复制为 `code/.env`,填写测试库连接信息(`code/.env` 已被 git 忽略)。
2. **线上**:在部署环境配置上述变量指向生产库,**不要**把生产密码写进仓库。
加载:`from api.settings import get_settings``get_settings().mysql_connect_kwargs()` 供 MySQL 客户端使用;`get_settings().effective_clean_write_cache()` 供清洗路由判断是否落盘 cache。
应用启动时会执行一次 `get_settings()` 以载入配置。
## 业务表(DDL 见 `code/sql_/`)
- **`bi_price_xx`**:价盘,低价稽查只读。
- **`risk_audit_visit`**`POST /api/v1/clean` 在写出 `low_price_*.xlsx` 后,将合并结果全量 upsert;唯一键 **`uk_biz`**`audit_date, source, store_name, channel_type, series, taste, weight, dealer_name`
- **`fortune-hub.transfer_url`**:仅 **`GET /api/v1/url-link`** 使用。该接口仍复用 `.env``DB_HOST``DB_PORT``DB_USER``DB_PASSWORD`,但会在接口内部把连接库名临时切到 **`fortune-hub`**,再读取 `transfer_url` 表默认第一行的 `url_link`**不会改动** 其它接口使用的 `DB_NAME`
# 团队稽查宽表 → 窄表(`data_conversion.py`)
## 位置与加载方式
- 脚本路径:**`code/py_/audit/point_sale/data_conversion.py`**
- **`api/audit/point_sale/team_conversion_loader.py`**`importlib` 按文件路径动态加载,导出 **`run_team_conversion`**,避免包名与历史中文文件名纠缠。
## 入口:`run_team_conversion`
参数要点:
- **`source_url`**:团队宽表 xlsx 的 URL;须 `http(s)`,否则返回 `ok: False`
- **`target_path`**:输出 xlsx 路径。
- **`audit_date_str`**:可选;与宽表/目标表内稽查日期列协同解析(见 `_resolve_audit_date`)。
流程摘要:
1. **`read_team_source_from_url`**`utils/excel_http.read_excel_from_url_skip1_with_header_row` — 跳过第 1 行,第 2 行起为数据;从第 1 行识别「稽核日期/稽查日期」列索引,失败则用回退列索引。
2. **`main`**:读或建目标工作簿的 **`合并后`** sheet,按 **`PRODUCT_GROUPS_JC`** 将「价格 + 多口味生产月份列」展开为多行窄表,写门店维度列,计算 **临期 / 大日期 / 新鲜度**`rDate`)。
3. 返回 `dict``ok``records_added``target_file``error` / `message`
## 依赖
- **pandas / openpyxl / python-dateutil**(见 `code/requirements.txt`
- **`code/utils/dates.py`****`code/utils/excel_http.py`**`data_conversion` 会把 `code` 上级目录插入 `sys.path` 以 import `utils`
## 与 HTTP 的衔接
[api.md](api.md)`POST /api/v1/clean` 在校验通过后调用 `run_team_conversion(team_url, team_target, audit_date_str)`
# 工具层(`code/utils/`)
## `excel_http.py`
- **`read_excel_from_url`**`urllib` 拉取 xlsx 到内存,`pandas.read_excel`,可配置 `skiprows` / `header`
- **`read_excel_from_url_skip1_with_header_row`**:团队宽表专用 — 返回 **(数据 DataFrame, 第 1 行表头 Series)**,数据从第 2 行起、`header=None`,列下标与业务 `iloc` 约定对齐。
## `dates.py`
- **`to_yyyy_mm_dd`**:单元格值 → `YYYY-MM-DD`
- **`first_yyyy_mm_dd_in_iloc` / `first_yyyy_mm_dd_in_dataframe`**:在宽表或目标表中取首个可解析稽查日期。
- **`max_yyyy_mm_dd_in_dataframe`**:目标表指定列(或第三列兜底)上所有可解析日期的最大值;浦零 `ad` 从合并后兜底时用。
- **`month_boundary_from_audit_day`**:稽核日锚到月初或月末(2 月以 14 号为界,其它月以 15 号为界);浦零对合并后兜底的 `ad` 会再经此函数。
- **`normalize_year_month_to_day01`**:生产月份字符串规范为 `YYYY-MM-01`
- **`approx_gap_months_calendar`**:到期日相对稽查日的剩余月数近似(供临期/新鲜度逻辑使用)。
业务侧主要使用者:**`py_/audit/point_sale/data_conversion.py`**
## `clean_output_merge.py`
- **`read_merged_dataframe`**:读 1 个或多个 xlsx 的 **「合并后」** sheet(`dtype=str`),统一列序后 `concat`,返回内存中的 `DataFrame`(不落盘)。
- **`write_merged_dataframe`**:将上述形态的 `DataFrame` 写入指定路径(单 sheet「合并后」)。
- **`merge_clean_result_xlsx`**`read_merged_dataframe` + `write_merged_dataframe` 一步封装(脚本或其它调用方可直接用)。
- **`default_merged_target_path`**`code/cache/merged_{时间戳}.xlsx`
**`api/audit/point_sale/routes_clean.py`** 在多路清洗成功后:先 `read_merged_dataframe` → 写 merged → 再对同一 `DataFrame` 跑低价稽查写 `low_price_*.xlsx`
"""Strip DROP/CREATE from Navicat dump; emit INSERT IGNORE into risk_audit_visit."""
from __future__ import annotations
import sys
HEADER = """/*
* 由 risk_audit_visit_back260327.sql 生成(勿直接跑原文件,否则会 DROP/CREATE 备份表结构)。
* 用法(Navicat 查询里执行本文件):
* 1)目标库已存在空表 risk_audit_visit,且含 UNIQUE uk_biz(列顺序与备份一致)。
* 2)本脚本仅 INSERT IGNORE,不建表;重复 uk_biz 或重复 rav_id 的行会被静默跳过。
* 3)导入后可执行:ALTER TABLE risk_audit_visit AUTO_INCREMENT=493621;(按备份里下一自增值调整)
*/
"""
OLD = "INSERT INTO `risk_audit_visit_back260327`"
NEW = "INSERT IGNORE INTO `risk_audit_visit`"
def main() -> None:
path_in = r"c:\Users\lenovo\Desktop\risk_audit_visit_back260327.sql"
path_out = r"c:\Users\lenovo\Desktop\risk_audit_visit_import_uk_biz.sql"
if len(sys.argv) >= 3:
path_in, path_out = sys.argv[1], sys.argv[2]
with open(path_in, "r", encoding="utf-8") as f:
lines = f.readlines()
out: list[str] = [HEADER, "SET FOREIGN_KEY_CHECKS=0;\n"]
started = False
n_ins = 0
for line in lines:
if line.startswith("SET FOREIGN_KEY_CHECKS"):
continue
if not started:
if line.startswith("INSERT INTO "):
started = True
else:
continue
out.append(line.replace(OLD, NEW))
n_ins += 1
out.append("\nSET FOREIGN_KEY_CHECKS=1;\n")
with open(path_out, "w", encoding="utf-8") as f:
f.writelines(out)
print(f"OK: {n_ins} INSERTs -> {path_out}")
if __name__ == "__main__":
main()
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论