提交 49e50691 authored 作者: lidongxu's avatar lidongxu

暂时完成售点稽查低价数据的增删改查业务

上级 cd0b6e9d
# 按业务域划分的 API 子包
# 售点稽查:清洗、低价稽查、risk_audit_visit CRUD
...@@ -4,7 +4,7 @@ import importlib.util ...@@ -4,7 +4,7 @@ import importlib.util
from pathlib import Path from pathlib import Path
from typing import Any, Callable from typing import Any, Callable
_CODE_BASE = Path(__file__).resolve().parent.parent _CODE_BASE = Path(__file__).resolve().parent.parent.parent.parent
_CP_SCRIPT = _CODE_BASE / "py_" / "audit" / "point_sale" / "data_chengyu_puling.py" _CP_SCRIPT = _CODE_BASE / "py_" / "audit" / "point_sale" / "data_chengyu_puling.py"
......
...@@ -4,7 +4,7 @@ import importlib.util ...@@ -4,7 +4,7 @@ import importlib.util
from pathlib import Path from pathlib import Path
from typing import Any, Callable from typing import Any, Callable
_CODE_BASE = Path(__file__).resolve().parent.parent _CODE_BASE = Path(__file__).resolve().parent.parent.parent.parent
_LOW_SCRIPT = _CODE_BASE / "py_" / "audit" / "point_sale" / "low_price.py" _LOW_SCRIPT = _CODE_BASE / "py_" / "audit" / "point_sale" / "low_price.py"
......
...@@ -7,18 +7,18 @@ from typing import Any ...@@ -7,18 +7,18 @@ from typing import Any
from fastapi import APIRouter, HTTPException from fastapi import APIRouter, HTTPException
from api.chengyu_puling_loader import ( from api.audit.point_sale.chengyu_puling_loader import (
PRODUCT_GROUPS, PRODUCT_GROUPS,
PRODUCT_GROUPS_CY, PRODUCT_GROUPS_CY,
default_chengyu_target_path, default_chengyu_target_path,
default_puling_target_path, default_puling_target_path,
run_puling_conversion, run_puling_conversion,
) )
from api.audit.point_sale.low_price_loader import run_low_price_audit
from api.audit.point_sale.schemas import CleanRequestBody
from api.audit.point_sale.team_conversion_loader import default_team_target_path, run_team_conversion
from api.response import ApiEnvelope, ok from api.response import ApiEnvelope, ok
from api.schemas import CleanRequestBody
from api.low_price_loader import run_low_price_audit
from api.settings import get_settings from api.settings import get_settings
from api.team_conversion_loader import default_team_target_path, run_team_conversion
from utils.clean_output_merge import ( from utils.clean_output_merge import (
default_merged_target_path, default_merged_target_path,
read_merged_dataframe, read_merged_dataframe,
......
...@@ -8,8 +8,8 @@ from mysql.connector import errors as mysql_errors ...@@ -8,8 +8,8 @@ from mysql.connector import errors as mysql_errors
from fastapi import APIRouter, HTTPException, Query from fastapi import APIRouter, HTTPException, Query
from fastapi.encoders import jsonable_encoder from fastapi.encoders import jsonable_encoder
from api.audit.point_sale.schemas import RiskAuditVisitReplaceBody
from api.response import ApiEnvelope, ok from api.response import ApiEnvelope, ok
from api.schemas import RiskAuditVisitReplaceBody
from api.settings import get_settings from api.settings import get_settings
risk_audit_visit_router = APIRouter(prefix="/api") risk_audit_visit_router = APIRouter(prefix="/api")
......
...@@ -5,7 +5,7 @@ from datetime import datetime ...@@ -5,7 +5,7 @@ from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import Any, Callable from typing import Any, Callable
_CODE_BASE = Path(__file__).resolve().parent.parent _CODE_BASE = Path(__file__).resolve().parent.parent.parent.parent
_TEAM_SCRIPT = _CODE_BASE / "py_" / "audit" / "point_sale" / "data_conversion.py" _TEAM_SCRIPT = _CODE_BASE / "py_" / "audit" / "point_sale" / "data_conversion.py"
......
from fastapi import FastAPI from fastapi import FastAPI
from fastapi.exceptions import HTTPException, RequestValidationError from fastapi.exceptions import HTTPException, RequestValidationError
from api.audit.point_sale.routes_clean import api_router
from api.audit.point_sale.routes_risk_audit_visit import risk_audit_visit_router
from api.exception_handlers import http_exception_handler, validation_exception_handler from api.exception_handlers import http_exception_handler, validation_exception_handler
from api.routes_clean import api_router
from api.routes_risk_audit_visit import risk_audit_visit_router
from api.settings import get_settings from api.settings import get_settings
get_settings() get_settings()
......
...@@ -349,7 +349,7 @@ def run_low_price_audit( ...@@ -349,7 +349,7 @@ def run_low_price_audit(
"low_price_target_file": str(out) if out else None, "low_price_target_file": str(out) if out else None,
"low_price_rows": int(len(result)), "low_price_rows": int(len(result)),
"low_price_flagged_rows": flagged, "low_price_flagged_rows": flagged,
"risk_audit_visit_rows": rav["executed_rows"], "risk_audit_visit_rows": rav["distinct_uk_biz"],
"risk_audit_visit_distinct_uk_biz": rav["distinct_uk_biz"], "risk_audit_visit_distinct_uk_biz": rav["distinct_uk_biz"],
"risk_audit_visit_collapsed_duplicate_rows": dup, "risk_audit_visit_collapsed_duplicate_rows": dup,
"risk_audit_visit_overwritten_rows": ow, "risk_audit_visit_overwritten_rows": ow,
......
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
| 路径 | 职责 | | 路径 | 职责 |
|------|------| |------|------|
| `code/` | 运行时代码:FastAPI、`api/``utils/`、稽查转换脚本目录 `py_/audit/...` | | `code/` | 运行时代码:FastAPI、`api/`(通用装配与 `api/audit/point_sale/` 售点稽查)、`utils/`、稽查转换脚本 `py_/audit/...` |
| `code/cache/` | 清洗输出目录:`team_*.xlsx``puling_*.xlsx``chengyu_*.xlsx`;任一路成功落盘后另有汇总 `merged_*.xlsx` | | `code/cache/` | 清洗输出目录:`team_*.xlsx``puling_*.xlsx``chengyu_*.xlsx`;任一路成功落盘后另有汇总 `merged_*.xlsx` |
| `docs/` | 本索引与各模块说明 | | `docs/` | 本索引与各模块说明 |
...@@ -26,13 +26,13 @@ ...@@ -26,13 +26,13 @@
flowchart TB flowchart TB
subgraph http [HTTP] subgraph http [HTTP]
A[FastAPI index.py] A[FastAPI index.py]
R[api/routes_clean.py] R[api/audit/point_sale/routes_clean.py]
RV[api/routes_risk_audit_visit.py] RV[api/audit/point_sale/routes_risk_audit_visit.py]
A --> R A --> R
A --> RV A --> RV
end end
subgraph load [加载] subgraph load [加载]
L[api/team_conversion_loader.py] L[api/audit/point_sale/team_conversion_loader.py]
R --> L R --> L
end end
subgraph biz [业务转换] subgraph biz [业务转换]
......
# HTTP API(`code/api/`) # HTTP API(`code/api/`,售点稽查实现在 `code/api/audit/point_sale/`
## 应用装配 ## 应用装配
- **`code/index.py`**`FastAPI` 实例 `app`,注册 `HTTPException` / `RequestValidationError` 处理器,挂载 `api_router` - **`code/index.py`**`FastAPI` 实例 `app`,注册 `HTTPException` / `RequestValidationError` 处理器,挂载 `api_router``risk_audit_visit_router`(二者均来自 `api/audit/point_sale/`
## 路由 ## 路由
- **前缀**`/api` - **前缀**`/api`
- **清洗**`POST /api/v1/clean``routes_clean.py` - **清洗**`POST /api/v1/clean``api/audit/point_sale/routes_clean.py`
- **稽查走访表 CRUD**`routes_risk_audit_visit.py``GET/PUT/DELETE /api/v1/risk-audit-visit...`,库表见 `code/sql_/risk_audit_visit.sql` - **稽查走访表 CRUD**`api/audit/point_sale/routes_risk_audit_visit.py``GET/PUT/DELETE /api/v1/risk-audit-visit...`,库表见 `code/sql_/risk_audit_visit.sql`
### POST /api/v1/clean ### POST /api/v1/clean
...@@ -23,7 +23,7 @@ ...@@ -23,7 +23,7 @@
- **汇总 merged + 低价稽查 + 写库 risk_audit_visit**:凡有 **≥1 路**清洗 `ok` 且带 `target_file`,按 **团队 → 浦零 → 诚予** 顺序收集路径,**去重**后读各文件 **「合并后」** sheet,列对齐纵向拼接;若写 cache 则写入 `default_merged_target_path()``code/cache/merged_{时间戳}.xlsx`**不再次读盘**:同一份内存中的合并 `DataFrame` 交给 `py_/audit/point_sale/low_price.py``run_low_price_audit`:从 MySQL **`bi_price_xx`** 拉价盘、比对低价;若写 cache 则另写 **`code/cache/low_price_{时间戳}.xlsx`**;再将**全量行** **`INSERT ... ON DUPLICATE KEY UPDATE`** 写入 **`risk_audit_visit`**(唯一键 `uk_biz``code/sql_/risk_audit_visit.sql`)。价盘读失败、或 **`risk_audit_visit` 写入失败** → HTTP **502**。若本次没有任何成功落盘,**不写库**,成功响应里 `data` 为空对象 `{}` - **汇总 merged + 低价稽查 + 写库 risk_audit_visit**:凡有 **≥1 路**清洗 `ok` 且带 `target_file`,按 **团队 → 浦零 → 诚予** 顺序收集路径,**去重**后读各文件 **「合并后」** sheet,列对齐纵向拼接;若写 cache 则写入 `default_merged_target_path()``code/cache/merged_{时间戳}.xlsx`**不再次读盘**:同一份内存中的合并 `DataFrame` 交给 `py_/audit/point_sale/low_price.py``run_low_price_audit`:从 MySQL **`bi_price_xx`** 拉价盘、比对低价;若写 cache 则另写 **`code/cache/low_price_{时间戳}.xlsx`**;再将**全量行** **`INSERT ... ON DUPLICATE KEY UPDATE`** 写入 **`risk_audit_visit`**(唯一键 `uk_biz``code/sql_/risk_audit_visit.sql`)。价盘读失败、或 **`risk_audit_visit` 写入失败** → HTTP **502**。若本次没有任何成功落盘,**不写库**,成功响应里 `data` 为空对象 `{}`
- **year / month / day**:可选;若均提供则拼为 `YYYYMMDD` 传入各清洗分支作为稽查日期线索。 - **year / month / day**:可选;若均提供则拼为 `YYYYMMDD` 传入各清洗分支作为稽查日期线索。
请求体模型见 **`code/api/schemas.py`**`CleanRequestBody`)。 请求体模型见 **`code/api/audit/point_sale/schemas.py`**`CleanRequestBody`)。
### risk_audit_visit CRUD ### risk_audit_visit CRUD
...@@ -35,8 +35,8 @@ ...@@ -35,8 +35,8 @@
连接参数同清洗写库:`Settings.mysql_connect_kwargs()`(见 [database.md](database.md))。 连接参数同清洗写库:`Settings.mysql_connect_kwargs()`(见 [database.md](database.md))。
**成功时 `data` 形态**:仅返回与本次合并入库相关的统计(无成功落盘需合并时 `data``{}`): **成功时 `data` 形态**:仅返回与本次合并入库相关的统计(无成功落盘需合并时 `data``{}`):
- `merged_rows`本次合并后表总行数 - `merged_rows`多路「合并后」sheet 纵向拼接后的**总行数**(与低价稽查入参行数一致)
- `risk_audit_visit_rows`写入 `risk_audit_visit` 的 upsert 执行行数(与合并后待写行数一致); - `risk_audit_visit_rows``uk_biz` 去重后、与库中最终保留行数一致的有效行数(合并总行数 − `risk_audit_visit_collapsed_duplicate_rows`);
- `risk_audit_visit_collapsed_duplicate_rows`:数据源内同 `uk_biz` 多行折叠时,被覆盖条数(与「去重后 uk_biz 数」之差,语义见 `low_price.py`); - `risk_audit_visit_collapsed_duplicate_rows`:数据源内同 `uk_biz` 多行折叠时,被覆盖条数(与「去重后 uk_biz 数」之差,语义见 `low_price.py`);
- `risk_audit_visit_overwritten_rows`:被覆盖行的明细数组(每项为字段名 → 值,日期为 ISO 字符串);同键仅最后一行留在库中,其余行出现在此列表。 - `risk_audit_visit_overwritten_rows`:被覆盖行的明细数组(每项为字段名 → 值,日期为 ISO 字符串);同键仅最后一行留在库中,其余行出现在此列表。
...@@ -47,7 +47,7 @@ ...@@ -47,7 +47,7 @@
## 团队转换失败 → HTTP ## 团队转换失败 → HTTP
`_raise_http_for_failed_result``routes_clean.py`)根据返回 `dict``error` 文案区分: `_raise_http_for_failed_result``api/audit/point_sale/routes_clean.py`)根据返回 `dict``error` 文案区分:
-`source_url 须为` → 400 -`source_url 须为` → 400
- 含「从 URL 读取源表失败」或「读取源表失败」前缀 → 502 - 含「从 URL 读取源表失败」或「读取源表失败」前缀 → 502
...@@ -55,5 +55,6 @@ ...@@ -55,5 +55,6 @@
## 相关文件 ## 相关文件
- `routes_clean.py``routes_risk_audit_visit.py``schemas.py``response.py``exception_handlers.py``team_conversion_loader.py``chengyu_puling_loader.py``low_price_loader.py``settings.py` - **售点稽查**`api/audit/point_sale/``routes_clean.py``routes_risk_audit_visit.py``schemas.py``team_conversion_loader.py``chengyu_puling_loader.py``low_price_loader.py`
- **通用**`response.py``exception_handlers.py``settings.py`
- 数据库环境变量说明见 [database.md](database.md) - 数据库环境变量说明见 [database.md](database.md)
...@@ -4,4 +4,10 @@ ...@@ -4,4 +4,10 @@
## 2026-03-27 ## 2026-03-27
- **`POST /api/v1/clean` 统计语义**`merged_rows` 仍为**合并后物理总行数**`risk_audit_visit_rows` 表示按 `uk_biz` 去重后的有效入库行数(同键多行仅最后一行留在库);`risk_audit_visit_collapsed_duplicate_rows` 为「合并待写总行数 − 去重键数」。
- **脚本清理**:删除一次性 Navicat 备份转换脚本 **`scripts/transform_rav_backup_sql.py`**(仓库内无其它引用)。
- **API 目录结构**:售点稽查相关路由与 loader 迁入 **`code/api/audit/point_sale/`**(清洗、`risk_audit_visit` CRUD、团队/浦零/诚予加载器、低价稽查加载器、请求体模型);`code/api/` 根下保留通用层 **`response.py`、`exception_handlers.py`、`settings.py`**`code/index.py` 从子包挂载路由;HTTP 路径未变。
- **Agent Skill(`.cursor/skills/project-docs-workflow/SKILL.md`)**:开工前**不再强制**通读 `docs/` 下全部 md,由 Agent 按需求难度决定是否读索引/模块文档;**完工后仍必须在 `docs/` 留下可读变更说明**(更新模块文档或本文),作为人机共用的工作痕迹。 - **Agent Skill(`.cursor/skills/project-docs-workflow/SKILL.md`)**:开工前**不再强制**通读 `docs/` 下全部 md,由 Agent 按需求难度决定是否读索引/模块文档;**完工后仍必须在 `docs/` 留下可读变更说明**(更新模块文档或本文),作为人机共用的工作痕迹。
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
## 位置与加载方式 ## 位置与加载方式
- 脚本路径:**`code/py_/audit/point_sale/data_conversion.py`** - 脚本路径:**`code/py_/audit/point_sale/data_conversion.py`**
- **`api/team_conversion_loader.py`**`importlib` 按文件路径动态加载,导出 **`run_team_conversion`**,避免包名与历史中文文件名纠缠。 - **`api/audit/point_sale/team_conversion_loader.py`**`importlib` 按文件路径动态加载,导出 **`run_team_conversion`**,避免包名与历史中文文件名纠缠。
## 入口:`run_team_conversion` ## 入口:`run_team_conversion`
......
...@@ -23,4 +23,4 @@ ...@@ -23,4 +23,4 @@
- **`merge_clean_result_xlsx`**`read_merged_dataframe` + `write_merged_dataframe` 一步封装(脚本或其它调用方可直接用)。 - **`merge_clean_result_xlsx`**`read_merged_dataframe` + `write_merged_dataframe` 一步封装(脚本或其它调用方可直接用)。
- **`default_merged_target_path`**`code/cache/merged_{时间戳}.xlsx` - **`default_merged_target_path`**`code/cache/merged_{时间戳}.xlsx`
**`api/routes_clean.py`** 在多路清洗成功后:先 `read_merged_dataframe` → 写 merged → 再对同一 `DataFrame` 跑低价稽查写 `low_price_*.xlsx` **`api/audit/point_sale/routes_clean.py`** 在多路清洗成功后:先 `read_merged_dataframe` → 写 merged → 再对同一 `DataFrame` 跑低价稽查写 `low_price_*.xlsx`
"""Strip DROP/CREATE from Navicat dump; emit INSERT IGNORE into risk_audit_visit."""
from __future__ import annotations
import sys
HEADER = """/*
* 由 risk_audit_visit_back260327.sql 生成(勿直接跑原文件,否则会 DROP/CREATE 备份表结构)。
* 用法(Navicat 查询里执行本文件):
* 1)目标库已存在空表 risk_audit_visit,且含 UNIQUE uk_biz(列顺序与备份一致)。
* 2)本脚本仅 INSERT IGNORE,不建表;重复 uk_biz 或重复 rav_id 的行会被静默跳过。
* 3)导入后可执行:ALTER TABLE risk_audit_visit AUTO_INCREMENT=493621;(按备份里下一自增值调整)
*/
"""
OLD = "INSERT INTO `risk_audit_visit_back260327`"
NEW = "INSERT IGNORE INTO `risk_audit_visit`"
def main() -> None:
path_in = r"c:\Users\lenovo\Desktop\risk_audit_visit_back260327.sql"
path_out = r"c:\Users\lenovo\Desktop\risk_audit_visit_import_uk_biz.sql"
if len(sys.argv) >= 3:
path_in, path_out = sys.argv[1], sys.argv[2]
with open(path_in, "r", encoding="utf-8") as f:
lines = f.readlines()
out: list[str] = [HEADER, "SET FOREIGN_KEY_CHECKS=0;\n"]
started = False
n_ins = 0
for line in lines:
if line.startswith("SET FOREIGN_KEY_CHECKS"):
continue
if not started:
if line.startswith("INSERT INTO "):
started = True
else:
continue
out.append(line.replace(OLD, NEW))
n_ins += 1
out.append("\nSET FOREIGN_KEY_CHECKS=1;\n")
with open(path_out, "w", encoding="utf-8") as f:
f.writelines(out)
print(f"OK: {n_ins} INSERTs -> {path_out}")
if __name__ == "__main__":
main()
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论