提交 cd0b6e9d authored 作者: lidongxu's avatar lidongxu

完成清洗新增数据主要逻辑

上级 010b38f0
---
name: project-docs-workflow
description: Enforces reading project markdown under docs/ before implementing features in this repository, and updating those docs after code changes. Use when changing Python under code/, adding API routes, data conversion, or audit/clean pipelines; use when the user mentions 项目说明、架构、数据流、文档同步.
description: 本仓库改代码时:按需读文档与代码,不必每次通读 docs;但必须在 docs/ 留下清晰、可读的变更记录(人机共用的工作痕迹)。适用于改 code/、API、清洗/稽查流水线;用户提到文档同步、变更记录、给以后留档时使用。
---
# 项目文档先行与收尾
# 项目文档:按需阅读 + 必留痕迹
## 开工前
## 开工前(自行判断,不强制通读 docs)
1. 阅读仓库根目录下的 **`docs/PROJECT_INDEX.md`**(索引与模块分类)。
2. 按任务打开索引中链接的 **模块文档**(如 `docs/api.md``docs/team-conversion.md`),再读相关源码;用户规则要求改代码前也要读目标文件。
3. 索引或模块文档缺失、与代码明显不一致时:在实现过程中**顺带补一节或修正**(仍遵守「注释简洁」与「不大段复制源码」)。
- **不必**每次先读遍 `docs/` 下所有 md。根据需求复杂度决定:
- **小改、路径明确**(例如只改某个已知路由、某个函数):直接读**将要修改的源码**即可;若对话/缓存里已有上下文,可优先复用。
- **跨模块、新能力、不熟悉目录**:再打开 **`docs/PROJECT_INDEX.md`** 定位模块,按需点开 **`docs/api.md`****与本次任务相关**的文档,避免为找文件而读完整个架构说明。
- 用户规则「改代码前先读目标文件」仍然遵守:动手改的文件要自己读当前内容。
## 完工后
## 完工后(必须做:docs 里留记录)
1. 若行为、接口、数据流或目录职责有变:更新对应 **`docs/*.md`**,必要时更新 **`docs/PROJECT_INDEX.md`** 的模块列表或说明。
2. 新增可复用模块或新流水线:在索引中增加分类与链接,并新增或扩写模块文档。
- **每次**因需求产生的行为/接口/配置/数据流变化,都要在 **`docs/`** 里落到**人能读懂**的文字,方便以后你、我或其他协作者追溯。
- **写法**:用完整短句,写清「改了什么、为什么、怎么用(如环境变量、路径)」;避免只贴函数名而不说明语义。可适度用列表、小标题,**不必**大段复制源码。
- **落点**
- 有对应模块文档的(如 API → `docs/api.md`,库配置 → `docs/database.md`)→ **更新该文件**相关小节。
- 触及索引结构或新模块 → 同步 **`docs/PROJECT_INDEX.md`**(链接、一句话职责)。
- 若一次改动分散在多处文档不好归类,可在 **`docs/`** 下增加一篇**简短变更纪要**(例如 `docs/changelog.md` 追加一条带日期的条目,或 `docs/notes/YYYY-MM-DD_主题.md`),避免「改了代码但文档零记录」。
## 文档原则
- **简洁**:架构、数据流用短句 + mermaid 即可;细节用文件路径指向代码。
- **渐进**Skill 保持短小;长说明放在 `docs/` 按需阅读
- **调试中脚本****勿**`code/py_/audit/point_sale/data_chengyu_puling.py` 写入项目文档,除非用户明确要求纳入。
- **简洁**:架构、数据流仍可用短句 + mermaid;细节用文件路径指向代码。
- **渐进**本 Skill 保持短小;长说明写在 `docs/`
- **调试中脚本****勿**`code/py_/audit/point_sale/data_chengyu_puling.py` 等临时调试脚本写进项目文档,除非用户明确要求纳入。
## 文档根路径
......
# 复制为同目录下的 .env(已在 .gitignore 中忽略)
# 本地开发:填测试库;生产:在服务器/容器设置同名环境变量,勿将 .env 提交仓库
ENVIRONMENT=development
# 测试环境示例(密码请在本机 .env 中填写或使用导出环境变量)
DB_HOST=192.168.100.39
DB_PORT=25301
DB_NAME=market_bi
DB_USER=root
DB_PASSWORD=
# 生产示例(上线时改为实际值,仅通过部署平台注入)
# ENVIRONMENT=production
# DB_HOST=
# DB_PORT=3306
# DB_NAME=market_bi
# DB_USER=
# DB_PASSWORD=
"""统一 API 响应:code=0 成功,非 0 为逻辑/业务错误码;data 为载荷;msg 为说明文案。"""
"""统一 API 响应:code=200 成功;失败时 code 多为 HTTP 状态码(如 400、422、502);data 为载荷;msg 为说明。"""
from typing import Any
......@@ -6,12 +6,12 @@ from pydantic import BaseModel, Field
class ApiEnvelope(BaseModel):
code: int = Field(..., description="0 成功,非 0 失败")
code: int = Field(..., description="200 成功,否则为错误码(常与 HTTP 状态一致)")
data: Any = None
msg: str = ""
model_config = {"json_schema_extra": {"example": {"code": 0, "data": {}, "msg": "成功"}}}
model_config = {"json_schema_extra": {"example": {"code": 200, "data": {}, "msg": "成功"}}}
def ok(data: Any = None, msg: str = "") -> ApiEnvelope:
return ApiEnvelope(code=0, data=data, msg=msg)
return ApiEnvelope(code=200, data=data, msg=msg)
"""清洗相关 HTTP 路由:校验入参、团队/浦零/诚予转换、映射业务错误到 HTTP 状态码。"""
import os
import tempfile
from pathlib import Path
from typing import Any
from fastapi import APIRouter, HTTPException
......@@ -22,7 +25,7 @@ from utils.clean_output_merge import (
write_merged_dataframe,
)
DEPARTMENT_RISK_AUDIT_CLEAN = "风控稽查数据清洗"
DEPARTMENT_RISK_AUDIT_CLEAN = "售点稽查低价数据清洗"
api_router = APIRouter(prefix="/api")
......@@ -33,6 +36,13 @@ def _audit_date_str_from_body(body: CleanRequestBody) -> str | None:
return f"{body.year:04d}{body.month:02d}{body.day:02d}"
def _temp_xlsx_path(cleanup_list: list[str]) -> str:
fd, path = tempfile.mkstemp(suffix=".xlsx")
os.close(fd)
cleanup_list.append(path)
return path
def _raise_http_for_failed_result(result: dict) -> None:
"""团队转换返回 ok=False 时,按 error 文案选择状态码。"""
err = result.get("error") or ""
......@@ -70,19 +80,43 @@ def post_clean(body: CleanRequestBody) -> ApiEnvelope:
)
audit_date_str = _audit_date_str_from_body(body)
settings = get_settings()
write_cache = settings.effective_clean_write_cache()
cleanup_tmp_xlsx: list[str] = []
def branch_target(explicit: str, shared: str | None, default_cache_fn) -> str:
e = (explicit or "").strip()
if e:
return e
if shared:
return shared
if write_cache:
return default_cache_fn()
return _temp_xlsx_path(cleanup_tmp_xlsx)
try:
# 分支结果仅用于过程校验与收集 part_paths,成功时不再整包返回前端
data: dict[str, Any] = {"team": None, "puling": None, "chengyu": None}
# 与团队同一次请求时,浦零/诚予写入同一 xlsx,才能从「合并后」读到团队行的稽查日期作 ad 兜底
shared_target_after_team: str | None = None
if team_url:
team_target = (body.team_target_path or "").strip() or default_team_target_path()
team_target = branch_target(
body.team_target_path or "", None, default_team_target_path
)
shared_target_after_team = team_target
r = run_team_conversion(team_url, team_target, audit_date_str)
if not r.get("ok"):
_raise_http_for_failed_result(r)
data["team"] = r
if puling_url:
puling_target = branch_target(
"", shared_target_after_team, default_puling_target_path
)
r = run_puling_conversion(
puling_url,
default_puling_target_path(),
puling_target,
audit_date_str,
yname="浦零",
product_groups=PRODUCT_GROUPS,
......@@ -92,9 +126,12 @@ def post_clean(body: CleanRequestBody) -> ApiEnvelope:
data["puling"] = r
if chengyu_url:
chengyu_target = branch_target(
"", shared_target_after_team, default_chengyu_target_path
)
r = run_puling_conversion(
chengyu_url,
default_chengyu_target_path(),
chengyu_target,
audit_date_str,
yname="诚予",
product_groups=PRODUCT_GROUPS_CY,
......@@ -105,12 +142,16 @@ def post_clean(body: CleanRequestBody) -> ApiEnvelope:
# 凡有成功落盘的分支,将「合并后」sheet 汇总到 merged_*.xlsx(1 路即单表写入,多路则纵向拼接)
part_paths: list[str] = []
seen_paths: set[str] = set()
for key in ("team", "puling", "chengyu"):
br = data.get(key)
if isinstance(br, dict) and br.get("ok") and br.get("target_file"):
part_paths.append(str(br["target_file"]))
p = str(br["target_file"])
if p not in seen_paths:
seen_paths.add(p)
part_paths.append(p)
success_payload: dict[str, Any] = {}
if part_paths:
merged_path = default_merged_target_path()
try:
merged_df = read_merged_dataframe(part_paths)
except Exception as e:
......@@ -118,6 +159,8 @@ def post_clean(body: CleanRequestBody) -> ApiEnvelope:
status_code=500,
detail={"ok": False, "error": f"读取待合并文件失败: {e}"},
) from e
if write_cache:
merged_path = default_merged_target_path()
try:
write_merged_dataframe(merged_df, merged_path)
except Exception as e:
......@@ -125,20 +168,29 @@ def post_clean(body: CleanRequestBody) -> ApiEnvelope:
status_code=500,
detail={"ok": False, "error": f"写入合并文件失败: {e}"},
) from e
mr: dict[str, Any] = {
"ok": True,
"merged_target_file": str(merged_path),
"merged_rows": int(len(merged_df)),
"merged_from": part_paths,
}
lp = run_low_price_audit(merged_df, get_settings().mysql_connect_kwargs())
lp = run_low_price_audit(
merged_df,
settings.mysql_connect_kwargs(),
write_result_xlsx=write_cache,
)
if not lp.get("ok"):
raise HTTPException(status_code=502, detail=lp)
mr["low_price_target_file"] = lp["low_price_target_file"]
mr["low_price_rows"] = lp["low_price_rows"]
mr["low_price_flagged_rows"] = lp["low_price_flagged_rows"]
data["merged"] = mr
else:
data["merged"] = None
return ok(data=data, msg="成功")
success_payload = {
"merged_rows": int(len(merged_df)),
"risk_audit_visit_rows": lp["risk_audit_visit_rows"],
"risk_audit_visit_collapsed_duplicate_rows": lp[
"risk_audit_visit_collapsed_duplicate_rows"
],
"risk_audit_visit_overwritten_rows": lp[
"risk_audit_visit_overwritten_rows"
],
}
return ok(data=success_payload, msg="成功")
finally:
if not write_cache:
for p in cleanup_tmp_xlsx:
try:
Path(p).unlink(missing_ok=True)
except OSError:
pass
"""`risk_audit_visit` 分页列表、详情、按主键全量更新与删除。"""
from __future__ import annotations
import mysql.connector
from mysql.connector import errors as mysql_errors
from fastapi import APIRouter, HTTPException, Query
from fastapi.encoders import jsonable_encoder
from api.response import ApiEnvelope, ok
from api.schemas import RiskAuditVisitReplaceBody
from api.settings import get_settings
risk_audit_visit_router = APIRouter(prefix="/api")
_RAV_COLS = [
"audit_date",
"source",
"region_name",
"district_name",
"dealer_code",
"dealer_name",
"store_code",
"store_name",
"f_emp_no",
"f_emp_name",
"qin_ce_type_large",
"jh_channel_type",
"city",
"channel_type",
"series",
"taste",
"weight",
"price",
"low_price",
"low_price_diff",
"low_price_status",
"low_price_rectify",
"production_month",
"near_month_num",
"near_month_status",
"fresh_status",
"large_date_status",
"large_date_rectify",
]
_SELECT_ROW = f"SELECT rav_id, {', '.join(_RAV_COLS)} FROM risk_audit_visit WHERE rav_id = %s"
_UPDATE_ROW = (
f"UPDATE risk_audit_visit SET {', '.join(f'{c}=%s' for c in _RAV_COLS)} WHERE rav_id = %s"
)
_MAX_PAGE_SIZE = 200
_DEFAULT_PAGE_SIZE = 20
def _connect():
try:
return mysql.connector.connect(**get_settings().mysql_connect_kwargs())
except mysql_errors.Error as e:
raise HTTPException(
status_code=502,
detail={"ok": False, "error": f"数据库连接失败: {e}"},
) from e
def _replace_body_to_params(body: RiskAuditVisitReplaceBody) -> tuple:
return (
body.audit_date,
body.source,
body.region_name,
body.district_name,
body.dealer_code,
body.dealer_name,
body.store_code,
body.store_name,
body.f_emp_no,
body.f_emp_name,
body.qin_ce_type_large,
body.jh_channel_type,
body.city,
body.channel_type,
body.series,
body.taste,
body.weight,
body.price,
body.low_price,
body.low_price_diff,
body.low_price_status,
body.low_price_rectify,
body.production_month,
body.near_month_num,
body.near_month_status,
body.fresh_status,
body.large_date_status,
body.large_date_rectify,
)
@risk_audit_visit_router.get("/v1/risk-audit-visit", response_model=ApiEnvelope)
def list_risk_audit_visit(
page: int = Query(1, ge=1, description="页码,从 1 开始"),
page_size: int = Query(
_DEFAULT_PAGE_SIZE,
ge=1,
le=_MAX_PAGE_SIZE,
description=f"每页条数,最大 {_MAX_PAGE_SIZE}",
),
) -> ApiEnvelope:
offset = (page - 1) * page_size
conn = _connect()
try:
cur = conn.cursor(dictionary=True)
cur.execute("SELECT COUNT(*) AS c FROM risk_audit_visit")
total_row = cur.fetchone()
total = int(total_row["c"]) if total_row else 0
cur.execute(
f"SELECT rav_id, {', '.join(_RAV_COLS)} FROM risk_audit_visit ORDER BY rav_id DESC LIMIT %s OFFSET %s",
(page_size, offset),
)
rows = cur.fetchall() or []
finally:
conn.close()
return ok(
data=jsonable_encoder(
{
"items": rows,
"total": total,
"page": page,
"page_size": page_size,
}
),
msg="成功",
)
@risk_audit_visit_router.get("/v1/risk-audit-visit/{rav_id}", response_model=ApiEnvelope)
def get_risk_audit_visit(rav_id: int) -> ApiEnvelope:
conn = _connect()
try:
cur = conn.cursor(dictionary=True)
cur.execute(_SELECT_ROW, (rav_id,))
row = cur.fetchone()
finally:
conn.close()
if not row:
raise HTTPException(
status_code=404,
detail={"ok": False, "error": f"记录不存在: rav_id={rav_id}"},
)
return ok(data=jsonable_encoder(row), msg="成功")
@risk_audit_visit_router.put("/v1/risk-audit-visit/{rav_id}", response_model=ApiEnvelope)
def put_risk_audit_visit(rav_id: int, body: RiskAuditVisitReplaceBody) -> ApiEnvelope:
params = (*_replace_body_to_params(body), rav_id)
conn = _connect()
try:
cur = conn.cursor()
try:
cur.execute(_UPDATE_ROW, params)
except mysql_errors.IntegrityError as e:
conn.rollback()
if getattr(e, "errno", None) == 1062:
raise HTTPException(
status_code=409,
detail={"ok": False, "error": f"唯一键冲突,无法保存: {e}"},
) from e
raise HTTPException(
status_code=502,
detail={"ok": False, "error": f"写入失败: {e}"},
) from e
if cur.rowcount == 0:
conn.rollback()
raise HTTPException(
status_code=404,
detail={"ok": False, "error": f"记录不存在: rav_id={rav_id}"},
)
conn.commit()
finally:
conn.close()
return ok(data={"rav_id": rav_id, "updated": True}, msg="成功")
@risk_audit_visit_router.delete("/v1/risk-audit-visit/{rav_id}", response_model=ApiEnvelope)
def delete_risk_audit_visit(rav_id: int) -> ApiEnvelope:
conn = _connect()
try:
cur = conn.cursor()
cur.execute("DELETE FROM risk_audit_visit WHERE rav_id = %s", (rav_id,))
if cur.rowcount == 0:
conn.rollback()
raise HTTPException(
status_code=404,
detail={"ok": False, "error": f"记录不存在: rav_id={rav_id}"},
)
conn.commit()
finally:
conn.close()
return ok(data={"rav_id": rav_id, "deleted": True}, msg="成功")
"""清洗接口请求体。"""
"""HTTP 请求体模型(清洗、risk_audit_visit CRUD 等)。"""
from datetime import date
from decimal import Decimal
from pydantic import BaseModel, Field
class CleanRequestBody(BaseModel):
department: str = Field(..., description="业务类型,风控稽查数据清洗 走团队转换")
department: str = Field(..., description="业务类型,售点稽查低价数据清洗 走团队转换")
year: int | None = None
month: int | None = None
day: int | None = None
......@@ -12,3 +15,37 @@ class CleanRequestBody(BaseModel):
team_target_path: str | None = None # 默认:项目下 cache/team_时间戳.xlsx
puling_url: str | None = None # 非空则走 data_chengyu_puling(浦零列布局)
chengyu_url: str | None = None # 非空则走 data_chengyu_puling(诚予列布局)
# 与 `code/sql_/risk_audit_visit.sql` 业务列一致(不含主键 rav_id)
class RiskAuditVisitReplaceBody(BaseModel):
"""PUT 全量覆盖:须提交全部键,值为 null 表示写入 NULL。"""
audit_date: date | None
source: str | None = Field(..., max_length=20)
region_name: str | None = Field(..., max_length=20)
district_name: str | None = Field(..., max_length=20)
dealer_code: str | None = Field(..., max_length=10)
dealer_name: str | None = Field(..., max_length=100)
store_code: str | None = Field(..., max_length=20)
store_name: str | None = Field(..., max_length=100)
f_emp_no: str | None = Field(..., max_length=20)
f_emp_name: str | None = Field(..., max_length=100)
qin_ce_type_large: str | None = Field(..., max_length=20)
jh_channel_type: str | None = Field(..., max_length=20)
city: str | None = Field(..., max_length=30)
channel_type: str | None = Field(..., max_length=30)
series: str | None = Field(..., max_length=20)
taste: str | None = Field(..., max_length=20)
weight: str | None = Field(..., max_length=20)
price: Decimal | None
low_price: str | None = Field(..., max_length=20)
low_price_diff: Decimal | None
low_price_status: str | None = Field(..., max_length=20)
low_price_rectify: str | None = Field(..., max_length=100)
production_month: date | None
near_month_num: int | None
near_month_status: str | None = Field(..., max_length=20)
fresh_status: str | None = Field(..., max_length=20)
large_date_status: str | None = Field(..., max_length=20)
large_date_rectify: str | None = Field(..., max_length=100)
......@@ -8,6 +8,8 @@ from pydantic_settings import BaseSettings, SettingsConfigDict
_CODE_DIR = Path(__file__).resolve().parents[1]
_PROD_ENV = frozenset({"production", "prod"})
class Settings(BaseSettings):
model_config = SettingsConfigDict(
......@@ -19,6 +21,9 @@ class Settings(BaseSettings):
# development:本地/测试库;production:线上(由部署环境覆盖变量)
environment: str = "development"
# None:按 environment 推断(production/prod → 不写 cache);true/false 强制开关(环境变量 CLEAN_WRITE_CACHE)
clean_write_cache: bool | None = None
db_host: str = ""
db_port: int = 3306
db_name: str = ""
......@@ -35,6 +40,12 @@ class Settings(BaseSettings):
"password": self.db_password,
}
def effective_clean_write_cache(self) -> bool:
"""是否写入 code/cache 下清洗产物(团队/合并/low_price 等 xlsx)。"""
if self.clean_write_cache is not None:
return self.clean_write_cache
return (self.environment or "").strip().lower() not in _PROD_ENV
@lru_cache
def get_settings() -> Settings:
......
......@@ -3,6 +3,7 @@ from fastapi.exceptions import HTTPException, RequestValidationError
from api.exception_handlers import http_exception_handler, validation_exception_handler
from api.routes_clean import api_router
from api.routes_risk_audit_visit import risk_audit_visit_router
from api.settings import get_settings
get_settings()
......@@ -11,3 +12,4 @@ app = FastAPI(title="Clean Data API")
app.add_exception_handler(HTTPException, http_exception_handler)
app.add_exception_handler(RequestValidationError, validation_exception_handler)
app.include_router(api_router)
app.include_router(risk_audit_visit_router)
......@@ -20,8 +20,9 @@ if str(_CODE_ROOT) not in sys.path:
from utils.dates import ( # noqa: E402
approx_gap_months_calendar,
first_yyyy_mm_dd_in_dataframe,
first_yyyy_mm_dd_in_iloc,
max_yyyy_mm_dd_in_dataframe,
month_boundary_from_audit_day,
normalize_year_month_to_day01,
to_yyyy_mm_dd,
)
......@@ -34,8 +35,9 @@ def _resolve_audit_date(
*,
source_audit_col: int = 0,
) -> tuple[str | None, str | None]:
"""稽查日期:宽表指定列 → 显式参数 → 目标表列/第三列。返回 (YYYY-MM-DD, 错误信息)。"""
"""稽查日期:宽表指定列 → 显式参数 → 合并后全行稽核日期的最大值(再按半月锚到月初/月末)。返回 (YYYY-MM-DD, 错误信息)。"""
n = None
ad_from_target_merge = False
if df_source is not None and df_source.shape[1] > source_audit_col:
n = first_yyyy_mm_dd_in_iloc(df_source, source_audit_col)
......@@ -45,11 +47,19 @@ def _resolve_audit_date(
return None, f"稽查日期参数无法解析: {audit_date_str!r}"
if n is None:
n = first_yyyy_mm_dd_in_dataframe(
n = max_yyyy_mm_dd_in_dataframe(
df_target,
("稽查日期列", "稽查日期"),
third_column_fallback=True,
)
if n:
ad_from_target_merge = True
if n and ad_from_target_merge:
anchored = month_boundary_from_audit_day(n)
if anchored:
n = anchored
if n:
return n, None
......
"""稽查合并表与价盘比对:标记低价、破价价差;价盘来自 MySQL `bi_price_xx`。"""
"""稽查合并表与价盘比对:标记低价;价盘来自 `bi_price_xx`;结果 upsert 到 `risk_audit_visit`。"""
from __future__ import annotations
from datetime import datetime
import sys
from collections import defaultdict
from datetime import date, datetime
from pathlib import Path
from typing import Any
import pandas as pd
_CODE_BASE = Path(__file__).resolve().parents[3]
if str(_CODE_BASE) not in sys.path:
sys.path.insert(0, str(_CODE_BASE))
from utils.dates import to_yyyy_mm_dd # noqa: E402
_RAV_DB_COLS = [
"audit_date",
"source",
"region_name",
"district_name",
"dealer_code",
"dealer_name",
"store_code",
"store_name",
"f_emp_no",
"f_emp_name",
"qin_ce_type_large",
"jh_channel_type",
"city",
"channel_type",
"series",
"taste",
"weight",
"price",
"low_price",
"low_price_diff",
"low_price_status",
"low_price_rectify",
"production_month",
"near_month_num",
"near_month_status",
"fresh_status",
"large_date_status",
"large_date_rectify",
]
_RAV_EXCEL_SPECS: list[tuple[str, str, Any]] = [
("audit_date", "稽查日期", "date"),
("source", "稽查来源", 20),
("region_name", "大区", 20),
("district_name", "战区", 20),
("dealer_code", "经销商编码", 10),
("dealer_name", "经销商名称", 100),
("store_code", "勤策门店编码", 20),
("store_name", "勤策门店名称", 100),
("f_emp_no", "客户经理工号", 20),
("f_emp_name", "客户经理", 100),
("qin_ce_type_large", "勤策渠道大类", 20),
("jh_channel_type", "稽核渠道(对N列清洗)", 20),
("city", "城市", 30),
("channel_type", "渠道类型(稽查源提供)", 30),
("series", "产品系列", 20),
("taste", "产品口味", 20),
("weight", "产品克重", 20),
("price", "产品价格", "dec"),
("low_price", "是否低价", 20),
("low_price_diff", "破价价差", "dec"),
("low_price_status", "低价整改状态", 20),
("low_price_rectify", "低价整改说明", 100),
("production_month", "产品生产月份", "date"),
("near_month_num", "临期月份数", "int"),
("near_month_status", "临期状态", 20),
("fresh_status", "新鲜度", 20),
("large_date_status", "大日期整改状态", 20),
("large_date_rectify", "大日期整改说明", 100),
]
# 与表 uk_biz 列一致:audit_date,source,store_name,channel_type,series,taste,weight,dealer_name
_RAV_UK_BIZ_INDICES = (0, 1, 7, 13, 14, 15, 16, 5)
_RAV_INSERT_SQL = (
f"INSERT INTO risk_audit_visit ({','.join(_RAV_DB_COLS)}) VALUES "
f"({','.join(['%s'] * len(_RAV_DB_COLS))}) ON DUPLICATE KEY UPDATE "
+ ",".join(f"{c}=VALUES({c})" for c in _RAV_DB_COLS)
)
def _rav_na(val: Any) -> bool:
return val is None or (isinstance(val, float) and pd.isna(val)) or pd.isna(val)
def _rav_str(val: Any, max_len: int) -> str | None:
if _rav_na(val):
return None
s = str(val).strip()
if not s:
return None
return s[:max_len] if len(s) > max_len else s
def _rav_date(val: Any):
if _rav_na(val):
return None
s = to_yyyy_mm_dd(val)
if not s:
return None
try:
return datetime.strptime(s[:10], "%Y-%m-%d").date()
except ValueError:
return None
def _rav_dec(val: Any) -> float | None:
if _rav_na(val):
return None
x = pd.to_numeric(val, errors="coerce")
if pd.isna(x):
return None
return round(float(x), 2)
def _rav_int(val: Any) -> int | None:
if _rav_na(val):
return None
x = pd.to_numeric(val, errors="coerce")
if pd.isna(x):
return None
return int(x)
def _row_to_rav_tuple(row: pd.Series) -> tuple[Any, ...]:
out: list[Any] = []
for _, zh, spec in _RAV_EXCEL_SPECS:
raw = row[zh] if zh in row.index else None
if spec == "date":
out.append(_rav_date(raw))
elif spec == "dec":
out.append(_rav_dec(raw))
elif spec == "int":
out.append(_rav_int(raw))
else:
out.append(_rav_str(raw, spec))
return tuple(out)
def _uk_biz_from_rav_tuple(tup: tuple[Any, ...]) -> tuple[Any, ...]:
return tuple(tup[i] for i in _RAV_UK_BIZ_INDICES)
def _serialize_rav_cell(val: Any) -> Any:
if val is None:
return None
if isinstance(val, (datetime, date)):
return val.isoformat()
if isinstance(val, float) and pd.isna(val):
return None
return val
def _serialize_rav_tuple(tup: tuple[Any, ...]) -> dict[str, Any]:
return {c: _serialize_rav_cell(v) for c, v in zip(_RAV_DB_COLS, tup)}
def _indices_overwritten_by_later_same_uk(rows: list[tuple[Any, ...]]) -> list[int]:
"""同键多行按顺序 upsert 时,除最后一行外均被后行覆盖;返回这些行的下标(升序)。"""
uk_to_indices: defaultdict[tuple[Any, ...], list[int]] = defaultdict(list)
for i, t in enumerate(rows):
uk_to_indices[_uk_biz_from_rav_tuple(t)].append(i)
out: list[int] = []
for idxs in uk_to_indices.values():
if len(idxs) > 1:
out.extend(idxs[:-1])
out.sort()
return out
def upsert_risk_audit_visit(
df: pd.DataFrame,
mysql_connect_kwargs: dict[str, Any],
*,
batch_size: int = 500,
) -> dict[str, Any]:
"""全量 upsert;唯一键 uk_biz 含 dealer_name。返回 executed_rows、distinct_uk_biz、overwritten_rows。"""
import mysql.connector
if df.empty:
return {"executed_rows": 0, "distinct_uk_biz": 0, "overwritten_rows": []}
work = df.copy()
work.columns = work.columns.str.strip()
rows = [_row_to_rav_tuple(work.iloc[i]) for i in range(len(work))]
uk_set = {_uk_biz_from_rav_tuple(t) for t in rows}
ow_idx = _indices_overwritten_by_later_same_uk(rows)
overwritten_rows = [_serialize_rav_tuple(rows[i]) for i in ow_idx]
conn = mysql.connector.connect(**mysql_connect_kwargs)
try:
cur = conn.cursor()
for i in range(0, len(rows), batch_size):
cur.executemany(_RAV_INSERT_SQL, rows[i : i + batch_size])
conn.commit()
cur.close()
except Exception:
conn.rollback()
raise
finally:
conn.close()
return {
"executed_rows": len(rows),
"distinct_uk_biz": len(uk_set),
"overwritten_rows": overwritten_rows,
}
def default_low_price_target_path() -> str:
......@@ -107,11 +309,14 @@ def run_low_price_audit(
df_y: pd.DataFrame,
mysql_connect_kwargs: dict[str, Any],
output_path: str | None = None,
write_result_xlsx: bool = True,
) -> dict[str, Any]:
"""
从库拉价盘,对内存中的合并表做低价标记;写出 xlsx 为全量行,「是否低价」等列区分低价/正常/无法判定
mysql_connect_kwargs:与 `Settings.mysql_connect_kwargs()` 一致(须已配置 market_bi)
拉价盘、做低价标记;可选写 low_price xlsx;再将全量结果 upsert 到 `risk_audit_visit`(uk_biz 见 `sql_/risk_audit_visit.sql`)
mysql_connect_kwargs:与 `Settings.mysql_connect_kwargs()` 一致。
"""
out: str | None = None
if write_result_xlsx:
out = output_path or default_low_price_target_path()
try:
df_p = _load_price_plate_from_db(mysql_connect_kwargs)
......@@ -123,6 +328,7 @@ def run_low_price_audit(
except Exception as e:
return {"ok": False, "error": f"低价稽查计算失败: {e}"}
if write_result_xlsx and out:
try:
Path(out).parent.mkdir(parents=True, exist_ok=True)
with pd.ExcelWriter(out, engine="openpyxl", mode="w") as writer:
......@@ -130,10 +336,21 @@ def run_low_price_audit(
except Exception as e:
return {"ok": False, "error": f"写入低价稽查结果失败: {e}"}
try:
rav = upsert_risk_audit_visit(result, mysql_connect_kwargs)
except Exception as e:
return {"ok": False, "error": f"写入 risk_audit_visit 失败: {e}"}
flagged = int((result["是否低价"] == "低价").sum()) if "是否低价" in result.columns else 0
dup = rav["executed_rows"] - rav["distinct_uk_biz"]
ow = rav["overwritten_rows"]
return {
"ok": True,
"low_price_target_file": str(out),
"low_price_target_file": str(out) if out else None,
"low_price_rows": int(len(result)),
"low_price_flagged_rows": flagged,
"risk_audit_visit_rows": rav["executed_rows"],
"risk_audit_visit_distinct_uk_biz": rav["distinct_uk_biz"],
"risk_audit_visit_collapsed_duplicate_rows": dup,
"risk_audit_visit_overwritten_rows": ow,
}
......@@ -2,6 +2,7 @@
from __future__ import annotations
import re
from calendar import monthrange
from collections.abc import Sequence
from datetime import datetime
......@@ -55,6 +56,21 @@ def first_yyyy_mm_dd_in_iloc(df: pd.DataFrame, col_idx: int) -> str | None:
return None
def month_boundary_from_audit_day(yyyy_mm_dd: str) -> str | None:
"""2 月:1–14 号 → 该月 1 日,15 号起 → 月末;其它月:1–15 → 月初,16 起 → 月末。输入须 YYYY-MM-DD。"""
if not yyyy_mm_dd or len(yyyy_mm_dd) < 10:
return None
try:
d = datetime.strptime(yyyy_mm_dd[:10], "%Y-%m-%d")
except ValueError:
return None
cutoff = 14 if d.month == 2 else 15
if d.day <= cutoff:
return d.replace(day=1).strftime("%Y-%m-%d")
last = monthrange(d.year, d.month)[1]
return d.replace(day=last).strftime("%Y-%m-%d")
def first_yyyy_mm_dd_in_dataframe(
df: pd.DataFrame,
column_names: Sequence[str],
......@@ -79,6 +95,39 @@ def first_yyyy_mm_dd_in_dataframe(
return None
def max_yyyy_mm_dd_in_dataframe(
df: pd.DataFrame,
column_names: Sequence[str],
*,
third_column_fallback: bool = True,
) -> str | None:
"""列选择同 first_yyyy_mm_dd_in_dataframe;取该列全部可解析日期中的最大日(YYYY-MM-DD)。"""
ser = None
if df is not None and df.shape[1] > 0:
for name in column_names:
if name in df.columns:
ser = df[name]
break
if ser is None and third_column_fallback and df.shape[1] > 2:
ser = df.iloc[:, 2]
if ser is None:
return None
best: str | None = None
best_dt: datetime | None = None
for val in ser:
n = to_yyyy_mm_dd(val)
if not n:
continue
try:
dt = datetime.strptime(n, "%Y-%m-%d")
except ValueError:
continue
if best_dt is None or dt > best_dt:
best_dt = dt
best = n
return best
def normalize_year_month_to_day01(src_month):
"""
生产月份类字符串 → YYYY-MM-01(供后续 strptime %Y-%m-%d)。
......
# clean_data 项目说明(索引)
面向 Agent 与人类:改代码前先读本索引,再点进对应模块文档。实现后若行为有变,请同步更新本文或子文档
面向 Agent 与人类:不必每次通读本索引;不熟悉目录或跨模块任务时再打开本页定位文档。实现后若行为有变,请同步更新模块文档,并在 [changelog.md](changelog.md) 或对应 md 中留下可读变更说明(工作痕迹)
## 仓库结构(概览)
......@@ -18,6 +18,7 @@
| [team-conversion.md](team-conversion.md) | 团队宽表 URL → 窄表 → `合并后` sheet 流水线 |
| [utils.md](utils.md) | `utils/dates``utils/excel_http` 职责 |
| [database.md](database.md) | MySQL 环境变量、`settings` 与本地/生产约定 |
| [changelog.md](changelog.md) | 重要约定与行为变更的简短纪要(可追溯) |
## 架构(逻辑分层)
......@@ -26,7 +27,9 @@ flowchart TB
subgraph http [HTTP]
A[FastAPI index.py]
R[api/routes_clean.py]
RV[api/routes_risk_audit_visit.py]
A --> R
A --> RV
end
subgraph load [加载]
L[api/team_conversion_loader.py]
......@@ -73,7 +76,7 @@ sequenceDiagram
## 需求 / 能力拆分(现状)
- **已落地**`department = 风控稽查数据清洗``team_url` / `puling_url` / `chengyu_url` 至少其一,各路分别落盘;可选 `year/month/day``team_target_path`(仅团队分支)。
- **已落地**`department = 售点稽查低价数据清洗``team_url` / `puling_url` / `chengyu_url` 至少其一,各路分别落盘;可选 `year/month/day``team_target_path`(仅团队分支)。
- **汇总 merged + 低价稽查**:同一次请求中只要有 ≥1 路清洗成功落盘,内存合并后写 `code/cache/merged_{时间戳}.xlsx`,并用同一份数据写 `code/cache/low_price_{时间戳}.xlsx`(价盘来自库表 `bi_price_xx`,见 `api.md`)。
## 运行入口说明
......@@ -85,3 +88,4 @@ sequenceDiagram
- 改接口契约、状态码语义、转换步骤或输出文件格式 → 更新 [api.md](api.md) / [team-conversion.md](team-conversion.md)
- 新增独立工具模块 → 更新 [utils.md](utils.md) 并在上表增加一行链接。
- 跨文档难归类或需留「当时为什么改」→ 在 [changelog.md](changelog.md) 追加一条(日期 + 摘要);约定见 `.cursor/skills/project-docs-workflow/SKILL.md`
......@@ -8,26 +8,41 @@
- **前缀**`/api`
- **清洗**`POST /api/v1/clean``routes_clean.py`
- **稽查走访表 CRUD**`routes_risk_audit_visit.py``GET/PUT/DELETE /api/v1/risk-audit-visit...`,库表见 `code/sql_/risk_audit_visit.sql`
### POST /api/v1/clean
- **department**:必须为 **`风控稽查数据清洗`**(常量 `DEPARTMENT_RISK_AUDIT_CLEAN`),否则 400。
- **department**:必须为 **`售点稽查低价数据清洗`**(常量 `DEPARTMENT_RISK_AUDIT_CLEAN`),否则 400。
- **team_url / puling_url / chengyu_url****至少一个**非空;各 URL 须为 `http://``https://`(由下游校验),否则 400;读 URL 失败等映射规则见下。
- **team_url**:若提供 → `team_conversion_loader` 加载的 **`data_conversion.run_team_conversion`**
- **puling_url**:若提供 → **`data_chengyu_puling.run_puling_conversion`**`yname=浦零``PRODUCT_GROUPS`)。
- **chengyu_url**:若提供 → 同一脚本 **`run_puling_conversion`**`yname=诚予``PRODUCT_GROUPS_CY`)。
- **team_target_path**:可选;仅团队分支使用;为空则 `default_team_target_path()``code/cache/team_{时间戳}.xlsx`
- **浦零 / 诚予** 输出路径:分别为 `default_puling_target_path()``default_chengyu_target_path()``code/cache/puling_*.xlsx``chengyu_*.xlsx`
- **汇总 merged_*.xlsx + 低价稽查 low_price_*.xlsx**:凡有 **≥1 路**清洗 `ok` 且带 `target_file`,先按 **团队 → 浦零 → 诚予** 顺序读各文件 **「合并后」** sheet,在内存中列对齐后纵向拼接,再写入 `default_merged_target_path()``code/cache/merged_{时间戳}.xlsx`(留档)。**不再次读盘**:同一份内存中的合并 `DataFrame` 交给 `py_/audit/point_sale/low_price.py``run_low_price_audit`:从 MySQL(`Settings` 指向的库,表 **`bi_price_xx`**,结构见 `code/sql_/bi_price_xx.sql`)拉取价盘字段 `bi_product / pro_weight / channel_type / low_price`,与合并表按原列位(产品系列/克重/渠道/价格)比对,写出 **`code/cache/low_price_{时间戳}.xlsx`**(「合并后」sheet,**全量行**与合并表一致,用「是否低价」「破价价差」等列标记低价问题)。若价盘库连接或查询失败 → HTTP **502**。若本次没有任何成功落盘,`data.merged``null`
- **team_target_path**:可选;团队分支落盘路径;为空且 **`Settings.effective_clean_write_cache()` 为 true** 时 → `default_team_target_path()``code/cache/team_{时间戳}.xlsx`;为 false 时(见下)使用系统临时目录下的 xlsx,请求结束后删除。
- **浦零 / 诚予** 输出路径:若**同次请求**已执行团队分支,则与团队**共用**上述 `team_target_path`(或默认 team / 临时路径);否则在「写 cache」模式下用 `default_puling_target_path()``default_chengyu_target_path()``code/cache/puling_*.xlsx``chengyu_*.xlsx`),否则同样用临时文件。
- **是否写 `code/cache`**:由 **`ENVIRONMENT`** 与可选 **`CLEAN_WRITE_CACHE`** 决定(见 [database.md](database.md))。`production` / `prod` 且未设 `CLEAN_WRITE_CACHE`**不写** cache:不写 `merged_*.xlsx``low_price_*.xlsx`,分支结果落在临时文件并在请求结束时删除;**仍写库** `risk_audit_visit``CLEAN_WRITE_CACHE=true` 可在生产强制留档;`CLEAN_WRITE_CACHE=false` 可在非生产关闭落盘。
- **汇总 merged + 低价稽查 + 写库 risk_audit_visit**:凡有 **≥1 路**清洗 `ok` 且带 `target_file`,按 **团队 → 浦零 → 诚予** 顺序收集路径,**去重**后读各文件 **「合并后」** sheet,列对齐纵向拼接;若写 cache 则写入 `default_merged_target_path()``code/cache/merged_{时间戳}.xlsx`**不再次读盘**:同一份内存中的合并 `DataFrame` 交给 `py_/audit/point_sale/low_price.py``run_low_price_audit`:从 MySQL **`bi_price_xx`** 拉价盘、比对低价;若写 cache 则另写 **`code/cache/low_price_{时间戳}.xlsx`**;再将**全量行** **`INSERT ... ON DUPLICATE KEY UPDATE`** 写入 **`risk_audit_visit`**(唯一键 `uk_biz``code/sql_/risk_audit_visit.sql`)。价盘读失败、或 **`risk_audit_visit` 写入失败** → HTTP **502**。若本次没有任何成功落盘,**不写库**,成功响应里 `data` 为空对象 `{}`
- **year / month / day**:可选;若均提供则拼为 `YYYYMMDD` 传入各清洗分支作为稽查日期线索。
请求体模型见 **`code/api/schemas.py`**`CleanRequestBody`)。
**成功时 `data` 形态**`{ "team": dict | null, "puling": dict | null, "chengyu": dict | null, "merged": dict | null }`。前三项仅对本次**非空 URL** 分支写入;`merged` 在写出汇总文件时为 `{ ok, merged_target_file, merged_rows, merged_from, low_price_target_file, low_price_rows, low_price_flagged_rows }`(后三项为低价稽查输出:`low_price_rows` 为写出文件的行数,与合并全量一致;`low_price_flagged_rows` 为其中「是否低价」= 低价的行数),否则 `null`
### risk_audit_visit CRUD
- **GET `/api/v1/risk-audit-visit`**:分页列表。查询参数 `page`(默认 1)、`page_size`(默认 20,最大 200)。成功时 `data``items`(行字典数组,含 `rav_id` 与全部业务列)、`total``page``page_size`;按 `rav_id` 降序。
- **GET `/api/v1/risk-audit-visit/{rav_id}`**:单条详情,返回该主键下**全部字段**`ApiEnvelope.data` 为一条完整记录)。不存在 → **404**
- **PUT `/api/v1/risk-audit-visit/{rav_id}`**:按主键**整行覆盖**更新。请求体为 **`RiskAuditVisitReplaceBody`**(与表业务列一致,**不含** `rav_id`;须提交全部字段键,值为 JSON `null` 表示写入 NULL)。不存在 → **404**;违反唯一键 `uk_biz`**409**;库连接等失败 → **502**
- **DELETE `/api/v1/risk-audit-visit/{rav_id}`**:按主键删除。不存在 → **404**
连接参数同清洗写库:`Settings.mysql_connect_kwargs()`(见 [database.md](database.md))。
**成功时 `data` 形态**:仅返回与本次合并入库相关的统计(无成功落盘需合并时 `data``{}`):
- `merged_rows`:本次合并后表总行数;
- `risk_audit_visit_rows`:写入 `risk_audit_visit` 的 upsert 执行行数(与合并后待写行数一致);
- `risk_audit_visit_collapsed_duplicate_rows`:数据源内同 `uk_biz` 多行折叠时,被覆盖条数(与「去重后 uk_biz 数」之差,语义见 `low_price.py`);
- `risk_audit_visit_overwritten_rows`:被覆盖行的明细数组(每项为字段名 → 值,日期为 ISO 字符串);同键仅最后一行留在库中,其余行出现在此列表。
## 统一响应
- 成功封装:**`ApiEnvelope`**`code/api/response.py`):`code=0``data``msg`
- 成功封装:**`ApiEnvelope`**`code/api/response.py`):`code=200``data``msg`
- `HTTPException` / 校验失败:**`exception_handlers.py`** 将详情映射为同类 JSON(`code` 可能为 HTTP 状态码或 422)。
## 团队转换失败 → HTTP
......@@ -40,5 +55,5 @@
## 相关文件
- `routes_clean.py``schemas.py``response.py``exception_handlers.py``team_conversion_loader.py``chengyu_puling_loader.py``low_price_loader.py``settings.py`
- `routes_clean.py``routes_risk_audit_visit.py``schemas.py``response.py``exception_handlers.py``team_conversion_loader.py``chengyu_puling_loader.py``low_price_loader.py``settings.py`
- 数据库环境变量说明见 [database.md](database.md)
# 变更纪要(工作痕迹)
按时间倒序记录重要约定与行为变化,便于以后查阅;细节仍以各模块文档与代码为准。
## 2026-03-27
- **Agent Skill(`.cursor/skills/project-docs-workflow/SKILL.md`)**:开工前**不再强制**通读 `docs/` 下全部 md,由 Agent 按需求难度决定是否读索引/模块文档;**完工后仍必须在 `docs/` 留下可读变更说明**(更新模块文档或本文),作为人机共用的工作痕迹。
......@@ -4,7 +4,8 @@
| 变量 | 说明 |
|------|------|
| `ENVIRONMENT` | `development`(本地/测试)或 `production`(线上) |
| `ENVIRONMENT` | `development`(本地/测试)或 `production` / `prod`(线上) |
| `CLEAN_WRITE_CACHE` | 可选。`true` / `false` 是否把 `POST /api/v1/clean` 的中间与结果 xlsx 写入 `code/cache`**未设置时**`production` / `prod` 不写(仅用系统临时文件并在请求结束后删除),其它环境写 cache。 |
| `DB_HOST` | 主机 |
| `DB_PORT` | 端口(整数) |
| `DB_NAME` | 库名 |
......@@ -16,6 +17,11 @@
1. **本地**:将 `code/.env.example` 复制为 `code/.env`,填写测试库连接信息(`code/.env` 已被 git 忽略)。
2. **线上**:在部署环境配置上述变量指向生产库,**不要**把生产密码写进仓库。
加载:`from api.settings import get_settings``get_settings().mysql_connect_kwargs()` 供 MySQL 客户端使用。
加载:`from api.settings import get_settings``get_settings().mysql_connect_kwargs()` 供 MySQL 客户端使用`get_settings().effective_clean_write_cache()` 供清洗路由判断是否落盘 cache
应用启动时会执行一次 `get_settings()` 以载入配置。
## 业务表(DDL 见 `code/sql_/`)
- **`bi_price_xx`**:价盘,低价稽查只读。
- **`risk_audit_visit`**`POST /api/v1/clean` 在写出 `low_price_*.xlsx` 后,将合并结果全量 upsert;唯一键 **`uk_biz`**`audit_date, source, store_name, channel_type, series, taste, weight, dealer_name`
......@@ -8,7 +8,9 @@
## `dates.py`
- **`to_yyyy_mm_dd`**:单元格值 → `YYYY-MM-DD`
- **`first_yyyy_mm_dd_in_iloc` / `first_yyyy_mm_dd_in_dataframe`**:在宽表或目标表中解析稽查日期。
- **`first_yyyy_mm_dd_in_iloc` / `first_yyyy_mm_dd_in_dataframe`**:在宽表或目标表中取首个可解析稽查日期。
- **`max_yyyy_mm_dd_in_dataframe`**:目标表指定列(或第三列兜底)上所有可解析日期的最大值;浦零 `ad` 从合并后兜底时用。
- **`month_boundary_from_audit_day`**:稽核日锚到月初或月末(2 月以 14 号为界,其它月以 15 号为界);浦零对合并后兜底的 `ad` 会再经此函数。
- **`normalize_year_month_to_day01`**:生产月份字符串规范为 `YYYY-MM-01`
- **`approx_gap_months_calendar`**:到期日相对稽查日的剩余月数近似(供临期/新鲜度逻辑使用)。
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论