提交 010b38f0 authored 作者: lidongxu's avatar lidongxu

保留所有数据_低价做标记

上级 cbe7b014
......@@ -7,4 +7,5 @@ __pycache__/
venv/
.env
# 团队转换默认输出目录
code/cache/
\ No newline at end of file
code/cache/
*.sql
\ No newline at end of file
# 复制为同目录下的 .env(已在 .gitignore 中忽略)
# 本地开发:填测试库;生产:在服务器/容器设置同名环境变量,勿将 .env 提交仓库
ENVIRONMENT=development
# 测试环境示例(密码请在本机 .env 中填写或使用导出环境变量)
DB_HOST=192.168.100.39
DB_PORT=25301
DB_NAME=market_bi
DB_USER=root
DB_PASSWORD=
# 生产示例(上线时改为实际值,仅通过部署平台注入)
# ENVIRONMENT=production
# DB_HOST=
# DB_PORT=3306
# DB_NAME=market_bi
# DB_USER=
# DB_PASSWORD=
uvicorn main:app --host 0.0.0.0 --port 8000 --reload
\ No newline at end of file
uvicorn index:app --host 0.0.0.0 --port 8000 --reload
\ No newline at end of file
"""动态加载低价稽查模块(`py_/audit/point_sale/low_price.py`)。"""
import importlib.util
from pathlib import Path
from typing import Any, Callable
_CODE_BASE = Path(__file__).resolve().parent.parent
_LOW_SCRIPT = _CODE_BASE / "py_" / "audit" / "point_sale" / "low_price.py"
def _load_run_low_price_audit() -> Callable[..., dict[str, Any]]:
spec = importlib.util.spec_from_file_location("low_price_audit", _LOW_SCRIPT)
if spec is None or spec.loader is None:
raise RuntimeError(f"无法加载低价稽查模块: {_LOW_SCRIPT}")
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
fn = getattr(mod, "run_low_price_audit", None)
if fn is None:
raise RuntimeError("low_price 中缺少 run_low_price_audit")
return fn
run_low_price_audit: Callable[..., dict[str, Any]] = _load_run_low_price_audit()
......@@ -13,8 +13,14 @@ from api.chengyu_puling_loader import (
)
from api.response import ApiEnvelope, ok
from api.schemas import CleanRequestBody
from api.low_price_loader import run_low_price_audit
from api.settings import get_settings
from api.team_conversion_loader import default_team_target_path, run_team_conversion
from utils.clean_output_merge import default_merged_target_path, merge_clean_result_xlsx
from utils.clean_output_merge import (
default_merged_target_path,
read_merged_dataframe,
write_merged_dataframe,
)
DEPARTMENT_RISK_AUDIT_CLEAN = "风控稽查数据清洗"
......@@ -104,9 +110,33 @@ def post_clean(body: CleanRequestBody) -> ApiEnvelope:
if isinstance(br, dict) and br.get("ok") and br.get("target_file"):
part_paths.append(str(br["target_file"]))
if part_paths:
mr = merge_clean_result_xlsx(part_paths, default_merged_target_path())
if not mr.get("ok"):
raise HTTPException(status_code=500, detail=mr)
merged_path = default_merged_target_path()
try:
merged_df = read_merged_dataframe(part_paths)
except Exception as e:
raise HTTPException(
status_code=500,
detail={"ok": False, "error": f"读取待合并文件失败: {e}"},
) from e
try:
write_merged_dataframe(merged_df, merged_path)
except Exception as e:
raise HTTPException(
status_code=500,
detail={"ok": False, "error": f"写入合并文件失败: {e}"},
) from e
mr: dict[str, Any] = {
"ok": True,
"merged_target_file": str(merged_path),
"merged_rows": int(len(merged_df)),
"merged_from": part_paths,
}
lp = run_low_price_audit(merged_df, get_settings().mysql_connect_kwargs())
if not lp.get("ok"):
raise HTTPException(status_code=502, detail=lp)
mr["low_price_target_file"] = lp["low_price_target_file"]
mr["low_price_rows"] = lp["low_price_rows"]
mr["low_price_flagged_rows"] = lp["low_price_flagged_rows"]
data["merged"] = mr
else:
data["merged"] = None
......
"""从环境变量与 code/.env 读取配置。本地用 code/.env,线上用进程/容器注入同名变量。"""
from functools import lru_cache
from pathlib import Path
from typing import Any
from pydantic_settings import BaseSettings, SettingsConfigDict
_CODE_DIR = Path(__file__).resolve().parents[1]
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=_CODE_DIR / ".env",
env_file_encoding="utf-8",
extra="ignore",
)
# development:本地/测试库;production:线上(由部署环境覆盖变量)
environment: str = "development"
db_host: str = ""
db_port: int = 3306
db_name: str = ""
db_user: str = ""
db_password: str = ""
def mysql_connect_kwargs(self) -> dict[str, Any]:
"""供 mysql.connector / PyMySQL 等使用。"""
return {
"host": self.db_host,
"port": self.db_port,
"database": self.db_name,
"user": self.db_user,
"password": self.db_password,
}
@lru_cache
def get_settings() -> Settings:
return Settings()
......@@ -3,6 +3,9 @@ from fastapi.exceptions import HTTPException, RequestValidationError
from api.exception_handlers import http_exception_handler, validation_exception_handler
from api.routes_clean import api_router
from api.settings import get_settings
get_settings()
app = FastAPI(title="Clean Data API")
app.add_exception_handler(HTTPException, http_exception_handler)
......
"""稽查合并表与价盘比对:标记低价、破价价差;价盘来自 MySQL `bi_price_xx`。"""
from __future__ import annotations
from datetime import datetime
from pathlib import Path
from typing import Any
import pandas as pd
_CODE_BASE = Path(__file__).resolve().parents[3]
def default_low_price_target_path() -> str:
"""cache/low_price_{时间戳}.xlsx"""
d = _CODE_BASE / "cache"
d.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
return str(d / f"low_price_{ts}.xlsx")
def _clean_str(s: Any) -> str:
if pd.isna(s):
return ""
return str(s).strip().upper()
def _load_price_plate_from_db(mysql_connect_kwargs: dict[str, Any]) -> pd.DataFrame:
import mysql.connector
sql = (
"SELECT bi_product, pro_weight, channel_type, low_price "
"FROM bi_price_xx"
)
conn = mysql.connector.connect(**mysql_connect_kwargs)
try:
df_p = pd.read_sql(sql, conn)
finally:
conn.close()
df_p = df_p.astype(object)
for c in ("bi_product", "pro_weight", "channel_type"):
if c in df_p.columns:
df_p[c] = df_p[c].apply(_clean_str)
else:
df_p[c] = ""
df_p["低价_num"] = pd.to_numeric(df_p.get("low_price"), errors="coerce")
df_p["match_key"] = (
df_p["bi_product"].astype(str)
+ "|"
+ df_p["pro_weight"].astype(str)
+ "|"
+ df_p["channel_type"].astype(str)
)
return df_p
def audit_low_price_on_merged(
df_y: pd.DataFrame,
df_p: pd.DataFrame,
) -> pd.DataFrame:
"""对「合并后」形态宽表做低价标记;Y 表列位置与原 Excel 版一致(iloc)。"""
df_y = df_y.copy()
df_y.columns = df_y.columns.str.strip()
df_y["产品系列_clean"] = df_y.iloc[:, 14].apply(_clean_str)
df_y["产品克重_clean"] = df_y.iloc[:, 16].apply(_clean_str)
df_y["渠道类型_clean"] = df_y.iloc[:, 13].apply(_clean_str)
df_y["产品价格_num"] = pd.to_numeric(df_y.iloc[:, 17], errors="coerce")
df_y["match_key"] = (
df_y["产品系列_clean"]
+ "|"
+ df_y["产品克重_clean"]
+ "|"
+ df_y["渠道类型_clean"]
)
price_map = df_p.set_index("match_key")["低价_num"].to_dict()
df_y["是否低价"] = "正常"
df_y["破价价差"] = None
for idx, row in df_y.iterrows():
key = row["match_key"]
y_price = row["产品价格_num"]
p_low_price = price_map.get(key)
if pd.notna(y_price) and pd.notna(p_low_price):
if y_price < p_low_price:
df_y.at[idx, "是否低价"] = "低价"
df_y.at[idx, "破价价差"] = round(float(p_low_price - y_price), 2)
df_y.at[idx, "低价整改状态"] = "未整改"
else:
df_y.at[idx, "是否低价"] = "正常"
df_y.at[idx, "破价价差"] = None
else:
df_y.at[idx, "是否低价"] = None
df_y.at[idx, "破价价差"] = None
drop_suffix = [c for c in df_y.columns if c.endswith("_clean")]
drop_extra = ["产品价格_num", "match_key"]
out_cols = [c for c in df_y.columns if c not in drop_suffix + drop_extra]
return df_y[out_cols]
def run_low_price_audit(
df_y: pd.DataFrame,
mysql_connect_kwargs: dict[str, Any],
output_path: str | None = None,
) -> dict[str, Any]:
"""
从库拉价盘,对内存中的合并表做低价标记;写出 xlsx 为全量行,「是否低价」等列区分低价/正常/无法判定。
mysql_connect_kwargs:与 `Settings.mysql_connect_kwargs()` 一致(须已配置 market_bi)。
"""
out = output_path or default_low_price_target_path()
try:
df_p = _load_price_plate_from_db(mysql_connect_kwargs)
except Exception as e:
return {"ok": False, "error": f"读取价盘表 bi_price_xx 失败: {e}"}
try:
result = audit_low_price_on_merged(df_y, df_p)
except Exception as e:
return {"ok": False, "error": f"低价稽查计算失败: {e}"}
try:
Path(out).parent.mkdir(parents=True, exist_ok=True)
with pd.ExcelWriter(out, engine="openpyxl", mode="w") as writer:
result.to_excel(writer, sheet_name="合并后", index=False)
except Exception as e:
return {"ok": False, "error": f"写入低价稽查结果失败: {e}"}
flagged = int((result["是否低价"] == "低价").sum()) if "是否低价" in result.columns else 0
return {
"ok": True,
"low_price_target_file": str(out),
"low_price_rows": int(len(result)),
"low_price_flagged_rows": flagged,
}
fastapi>=0.115.0
uvicorn[standard]>=0.32.0
pydantic-settings>=2.0.0
# 数据转换_团队.py
pandas>=2.0.0
openpyxl>=3.1.0
python-dateutil>=2.8.0
mysql-connector-python>=9.0.0
......@@ -31,6 +31,25 @@ def _unify_column_order(dfs: list[pd.DataFrame]) -> list[str]:
return order
def read_merged_dataframe(paths: Sequence[str]) -> pd.DataFrame:
"""读多路 xlsx 的「合并后」sheet,列对齐后纵向拼接(不落盘)。"""
ps = [str(p).strip() for p in paths if str(p).strip()]
if len(ps) < 1:
raise ValueError("至少需要 1 个结果文件路径")
dfs = [pd.read_excel(p, sheet_name=MERGED_SHEET_NAME, dtype=str) for p in ps]
cols = _unify_column_order(dfs)
aligned = [df.reindex(columns=cols).fillna("") for df in dfs]
return pd.concat(aligned, ignore_index=True)
def write_merged_dataframe(df: pd.DataFrame, output_path: str | Path) -> Path:
out = Path(output_path)
out.parent.mkdir(parents=True, exist_ok=True)
with pd.ExcelWriter(out, engine="openpyxl", mode="w") as writer:
df.to_excel(writer, sheet_name=MERGED_SHEET_NAME, index=False)
return out
def merge_clean_result_xlsx(paths: Sequence[str], output_path: str | Path) -> dict[str, Any]:
"""读取各路径工作簿的「合并后」sheet,列对齐后纵向拼接写入 output_path(1 个路径时等同复制为该汇总文件)。"""
ps = [str(p).strip() for p in paths if str(p).strip()]
......@@ -38,16 +57,11 @@ def merge_clean_result_xlsx(paths: Sequence[str], output_path: str | Path) -> di
return {"ok": False, "error": "至少需要 1 个结果文件路径"}
out = Path(output_path)
try:
dfs = [pd.read_excel(p, sheet_name=MERGED_SHEET_NAME, dtype=str) for p in ps]
merged = read_merged_dataframe(ps)
except Exception as e:
return {"ok": False, "error": f"读取待合并文件失败: {e}"}
cols = _unify_column_order(dfs)
aligned = [df.reindex(columns=cols).fillna("") for df in dfs]
merged = pd.concat(aligned, ignore_index=True)
out.parent.mkdir(parents=True, exist_ok=True)
try:
with pd.ExcelWriter(out, engine="openpyxl", mode="w") as writer:
merged.to_excel(writer, sheet_name=MERGED_SHEET_NAME, index=False)
write_merged_dataframe(merged, out)
except Exception as e:
return {"ok": False, "error": f"写入合并文件失败: {e}"}
return {
......
......@@ -17,6 +17,7 @@
| [api.md](api.md) | HTTP API、路由、统一响应与错误映射 |
| [team-conversion.md](team-conversion.md) | 团队宽表 URL → 窄表 → `合并后` sheet 流水线 |
| [utils.md](utils.md) | `utils/dates``utils/excel_http` 职责 |
| [database.md](database.md) | MySQL 环境变量、`settings` 与本地/生产约定 |
## 架构(逻辑分层)
......@@ -73,7 +74,7 @@ sequenceDiagram
## 需求 / 能力拆分(现状)
- **已落地**`department = 风控稽查数据清洗``team_url` / `puling_url` / `chengyu_url` 至少其一,各路分别落盘;可选 `year/month/day``team_target_path`(仅团队分支)。
- **汇总 merged**:同一次请求中只要有 ≥1 路清洗成功落盘,即生成 `code/cache/merged_{时间戳}.xlsx`(多路纵向拼接,单路等同一份「合并后」)。
- **汇总 merged + 低价稽查**:同一次请求中只要有 ≥1 路清洗成功落盘,内存合并后写 `code/cache/merged_{时间戳}.xlsx`,并用同一份数据写 `code/cache/low_price_{时间戳}.xlsx`(价盘来自库表 `bi_price_xx`,见 `api.md`)。
## 运行入口说明
......
......@@ -18,12 +18,12 @@
- **chengyu_url**:若提供 → 同一脚本 **`run_puling_conversion`**`yname=诚予``PRODUCT_GROUPS_CY`)。
- **team_target_path**:可选;仅团队分支使用;为空则 `default_team_target_path()``code/cache/team_{时间戳}.xlsx`
- **浦零 / 诚予** 输出路径:分别为 `default_puling_target_path()``default_chengyu_target_path()``code/cache/puling_*.xlsx``chengyu_*.xlsx`
- **汇总文件 merged_*.xlsx**:凡有 **≥1 路**清洗 `ok` 且带 `target_file`,即调用 `merge_clean_result_xlsx`:按 **团队 → 浦零 → 诚予** 顺序纳入成功分支,读各文件 **「合并后」** sheet;**多路**时列对齐后纵向拼接,**仅一路**时写入同一张「合并后」(便于前端始终读 `data.merged`)。路径为 `default_merged_target_path()``code/cache/merged_{时间戳}.xlsx`。若本次没有任何成功落盘(例如仅传了 URL 但分支返回无数据且未写文件)`data.merged``null`
- **汇总 merged_*.xlsx + 低价稽查 low_price_*.xlsx**:凡有 **≥1 路**清洗 `ok` 且带 `target_file`,先按 **团队 → 浦零 → 诚予** 顺序读各文件 **「合并后」** sheet,在内存中列对齐后纵向拼接,再写入 `default_merged_target_path()``code/cache/merged_{时间戳}.xlsx`(留档)。**不再次读盘**:同一份内存中的合并 `DataFrame` 交给 `py_/audit/point_sale/low_price.py``run_low_price_audit`:从 MySQL(`Settings` 指向的库,表 **`bi_price_xx`**,结构见 `code/sql_/bi_price_xx.sql`)拉取价盘字段 `bi_product / pro_weight / channel_type / low_price`,与合并表按原列位(产品系列/克重/渠道/价格)比对,写出 **`code/cache/low_price_{时间戳}.xlsx`**(「合并后」sheet,**全量行**与合并表一致,用「是否低价」「破价价差」等列标记低价问题)。若价盘库连接或查询失败 → HTTP **502**。若本次没有任何成功落盘`data.merged``null`
- **year / month / day**:可选;若均提供则拼为 `YYYYMMDD` 传入各清洗分支作为稽查日期线索。
请求体模型见 **`code/api/schemas.py`**`CleanRequestBody`)。
**成功时 `data` 形态**`{ "team": dict | null, "puling": dict | null, "chengyu": dict | null, "merged": dict | null }`。前三项仅对本次**非空 URL** 分支写入;`merged` 在写出汇总文件时为 `{ ok, merged_target_file, merged_rows, merged_from }`,否则 `null`
**成功时 `data` 形态**`{ "team": dict | null, "puling": dict | null, "chengyu": dict | null, "merged": dict | null }`。前三项仅对本次**非空 URL** 分支写入;`merged` 在写出汇总文件时为 `{ ok, merged_target_file, merged_rows, merged_from, low_price_target_file, low_price_rows, low_price_flagged_rows }`(后三项为低价稽查输出:`low_price_rows` 为写出文件的行数,与合并全量一致;`low_price_flagged_rows` 为其中「是否低价」= 低价的行数),否则 `null`
## 统一响应
......@@ -40,4 +40,5 @@
## 相关文件
- `routes_clean.py``schemas.py``response.py``exception_handlers.py``team_conversion_loader.py``chengyu_puling_loader.py`
- `routes_clean.py``schemas.py``response.py``exception_handlers.py``team_conversion_loader.py``chengyu_puling_loader.py``low_price_loader.py``settings.py`
- 数据库环境变量说明见 [database.md](database.md)
# 数据库配置(`code/api/settings.py`)
## 环境变量(同名,`.env` 或进程环境)
| 变量 | 说明 |
|------|------|
| `ENVIRONMENT` | `development`(本地/测试)或 `production`(线上) |
| `DB_HOST` | 主机 |
| `DB_PORT` | 端口(整数) |
| `DB_NAME` | 库名 |
| `DB_USER` | 账号 |
| `DB_PASSWORD` | 密码 |
## 使用方式
1. **本地**:将 `code/.env.example` 复制为 `code/.env`,填写测试库连接信息(`code/.env` 已被 git 忽略)。
2. **线上**:在部署环境配置上述变量指向生产库,**不要**把生产密码写进仓库。
加载:`from api.settings import get_settings``get_settings().mysql_connect_kwargs()` 供 MySQL 客户端使用。
应用启动时会执行一次 `get_settings()` 以载入配置。
......@@ -16,7 +16,9 @@
## `clean_output_merge.py`
- **`merge_clean_result_xlsx`**:读 1 个或多个 xlsx 的 **「合并后」** sheet(`dtype=str`),统一列序后 `concat`(仅 1 个输入时结果即该表),写入目标路径(单 sheet)。
- **`read_merged_dataframe`**:读 1 个或多个 xlsx 的 **「合并后」** sheet(`dtype=str`),统一列序后 `concat`,返回内存中的 `DataFrame`(不落盘)。
- **`write_merged_dataframe`**:将上述形态的 `DataFrame` 写入指定路径(单 sheet「合并后」)。
- **`merge_clean_result_xlsx`**`read_merged_dataframe` + `write_merged_dataframe` 一步封装(脚本或其它调用方可直接用)。
- **`default_merged_target_path`**`code/cache/merged_{时间戳}.xlsx`
**`api/routes_clean.py`** 在多路清洗成功后调用
**`api/routes_clean.py`** 在多路清洗成功后:先 `read_merged_dataframe` → 写 merged → 再对同一 `DataFrame` 跑低价稽查写 `low_price_*.xlsx`
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论