提交 3b2e71ec authored 作者: lidongxu's avatar lidongxu

新版本_完成第一步团队数据的清洗测试

上级 c34ee4a3
__pycache__/
*.py[cod]
*$py.class
.Python
*.so
.venv/
venv/
.env
# 团队转换默认输出目录
code/cache/
# 数据清洗系统 - 环境变量配置
# 通过 ENV 区分环境:不设或 ENV=development 为开发,ENV=production 为生产
# 当前环境:development | production(不写则默认为开发)
ENV=development
# 服务器配置
HOST=0.0.0.0
PORT=8000
DEBUG=False
# ---------- 开发环境数据库(当前 ENV=development 时使用) ----------
DB_HOST=192.168.100.39
DB_PORT=25301
DB_USER=root
DB_PASSWORD="Zt%68Dsuv&M"
DB_NAME=market_bi
# ---------- 生产环境数据库(ENV=production 时使用) ----------
PROD_DB_HOST=rm-2ze28qp55mrm34g8bbo.mysql.rds.aliyuncs.com
PROD_DB_PORT=3306
PROD_DB_USER=sfabus
PROD_DB_PASSWORD=Wxl@325Pa91
PROD_DB_NAME=market_bi
# 日志配置
LOG_LEVEL=INFO
LOG_FILE=./logs/app.log
# Excel 下载配置
EXCEL_DOWNLOAD_TIMEOUT=30
MAX_EXCEL_SIZE=52428800 # 50MB
# 任务超时配置
TASK_TIMEOUT_SECONDS=3600 # 1小时
# ========== Python ==========
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# 虚拟环境
venv/
.venv/
env/
ENV/
# ========== 测试与覆盖率 ==========
.pytest_cache/
.coverage
htmlcov/
.tox/
.nox/
coverage.xml
*.cover
.hypothesis/
# ========== IDE / 编辑器 ==========
.idea/
.vscode/
*.swp
*.swo
*~
.project
.pydevproject
.settings/
# ========== 系统文件 ==========
.DS_Store
.DS_Store?
Thumbs.db
ehthumbs.db
Desktop.ini
# ========== 日志与临时 ==========
*.log
*.tmp
*.temp
.cache/
# ========== 其他 ==========
*.sql.backup
*.bak
# 数据清洗系统 - 项目说明文档 uvicorn main:app --host 0.0.0.0 --port 8000 --reload
\ No newline at end of file
## 项目概述
本项目是一个使用 FastAPI 框架开发的数据清洗系统,支持从 Excel 文件中提取数据、进行数据清洗处理,并将最终结果保存到 MySQL 数据库。
### 核心功能
1. **Excel 数据解析**:从网络链接下载并解析 Excel 文件
2. **数据清洗处理**:对解析后的数据进行验证、清洗和去重
3. **进度反馈**:通过 HTTP 轮询方式向前端实时反馈数据清洗进度
4. **数据持久化**:将清洗后的数据保存到 MySQL 数据库
---
## 项目结构
```
clean_data/
├── index.py # 主程序入口
├── requirements.txt # 项目依赖列表
├── .env.example # 环境变量配置示例
├── README.md # 项目说明文档
├── core/ # 核心业务模块
│ ├── __init__.py
│ ├── excel_handler.py # Excel 文件处理
│ ├── data_cleaner.py # 数据清洗逻辑
│ ├── db_handler.py # 数据库交互
│ └── progress_manager.py # 进度管理
└── utils/ # 工具模块
├── __init__.py
├── exceptions.py # 自定义异常
└── validators.py # 数据验证
```
---
## 快速开始
### 1. 环境准备
```bash
# 克隆项目(如果需要)
cd clean_data
# 创建虚拟环境(推荐)
python -m venv venv
# 激活虚拟环境
# Windows:
venv\Scripts\activate
# Linux/Mac:
source venv/bin/activate
# 安装依赖
pip install -r requirements.txt
```
### 2. 配置环境变量
```bash
# 复制环境变量配置文件
cp .env.example .env
# 编辑 .env 文件,填写实际的配置信息
# 特别注意:
# - DB_HOST, DB_PORT, DB_USER, DB_PASSWORD 需要填写实际的数据库配置
# - DB_NAME 为要使用的数据库名称
```
### 3. 启动服务
```bash
# 方式一:使用 Python 直接运行
python index.py
# 方式二:使用 Uvicorn 运行(推荐)
uvicorn index:app --host 0.0.0.0 --port 8000 --reload
# 服务将在 http://0.0.0.0:8000 启动
# API 文档:http://localhost:8000/docs(Swagger UI)
```
---
## API 接口文档
### 1. 启动数据清洗任务
**请求**
```
POST /api/v1/clean
```
**请求体**
```json
{
"excel_url": "https://example.com/data.xlsx",
"department": "sales",
"description": "Q1销售数据清洗"
}
```
**响应**
```json
{
"task_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "queued",
"message": "任务已创建,正在处理中...",
"data_preview": null
}
```
### 2. 获取数据清洗进度
**请求**
```
GET /api/v1/progress/{task_id}
```
**响应**
```json
{
"task_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "processing",
"progress": 65,
"message": "已清洗 650/1000 行数据",
"timestamp": "2026-03-06T10:30:45.123456"
}
```
**状态说明**
- `queued`: 任务已创建,排队中
- `processing`: 数据正在处理中
- `completed`: 数据清洗完成
- `failed`: 清洗过程中出错
### 3. 获取清洗结果
**请求**
```
GET /api/v1/result/{task_id}
```
**响应**
```json
{
"task_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "ready_to_save",
"message": "数据清洗完成,可进行保存",
"data_preview": [
{"产品": "产品A", "金额": 1000},
{"产品": "产品B", "金额": 2000}
],
"total_rows": 1000,
"department": "sales"
}
```
### 4. 保存清洗后的数据
**请求**
```
POST /api/v1/save
```
**请求体**
```json
{
"task_id": "550e8400-e29b-41d4-a716-446655440000",
"table_name": "sales_data"
}
```
**响应**
```json
{
"task_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "saved",
"message": "数据已成功保存到数据库",
"affected_rows": 1000
}
```
### 5. 健康检查
**请求**
```
GET /api/v1/health
```
**响应**
```json
{
"status": "healthy",
"timestamp": "2026-03-06T10:30:45.123456",
"service": "数据清洗系统"
}
```
---
## 进度反馈机制
### HTTP 轮询方案(无需 WebSocket)
系统采用 **HTTP 轮询** 方式实现进度反馈,具有以下优势:
1. **无连接保持**:客户端主动请求,降低服务器负载
2. **兼容性强**:支持所有 HTTP 客户端
3. **易于部署**:无需 WebSocket 基础设施
4. **便于扩展**:易于部署到各种云环境
### 前端实现建议
```javascript
// 示例:React/Vue 前端逻辑
const pollProgress = async (taskId) => {
const interval = setInterval(async () => {
try {
const response = await fetch(`/api/v1/progress/${taskId}`);
const data = await response.json();
// 更新进度条
updateProgressBar(data.progress);
updateMessage(data.message);
// 任务完成时停止轮询
if (data.status === 'completed' || data.status === 'failed') {
clearInterval(interval);
}
} catch (error) {
console.error('获取进度失败:', error);
}
}, 1000); // 每秒轮询一次
};
```
---
## 数据清洗逻辑
### 清洗步骤
1. **下载**:从网络链接下载 Excel 文件
2. **解析**:使用 openpyxl 解析 Excel 内容
3. **验证**:验证数据类型和必填字段
4. **清洗**
- 移除首尾空格
- 处理空值
- 去重处理
5. **缓存**:将清洗后的数据存储在内存中
6. **保存**:前端确认后保存到数据库
### 自定义清洗规则
编辑 `core/data_cleaner.py` 中的 `_validate_required_fields` 方法来自定义不同部门的清洗规则:
```python
required_fields_map = {
'sales': ['产品', '金额', '销售日期'],
'inventory': ['SKU', '数量', '仓库'],
'finance': ['交易日期', '金额', '类别']
}
```
---
## 数据库配置
### MySQL 5.6+ 连接配置
编辑 `.env` 文件:
```ini
DB_HOST=localhost
DB_PORT=3306
DB_USER=root
DB_PASSWORD=your_password
DB_NAME=clean_data
```
### 创建目标表(示例)
```sql
CREATE TABLE sales_data (
id INT AUTO_INCREMENT PRIMARY KEY,
产品 VARCHAR(100),
金额 DECIMAL(10, 2),
销售日期 DATE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
```
---
## 异常处理
系统定义了多种自定义异常,便于错误追踪:
- **DataCleaningException**:数据清洗过程中的异常
- **DatabaseException**:数据库操作异常
- **ExcelParsingException**:Excel 解析异常
- **ValidationException**:数据验证异常
所有异常都会被记录到日志中,便于问题排查。
---
## 日志记录
系统使用 Python 标准 logging 模块记录所有操作,日志级别可在 `.env` 中配置:
```
LOG_LEVEL=INFO
LOG_FILE=./logs/app.log
```
日志记录内容包括:
- 任务创建和完成
- 数据处理进度
- 异常错误信息
- 数据库操作记录
---
## 性能优化建议
1. **批量插入**:数据库操作使用批量插入(默认每 1000 行为一批)
2. **异步处理**:使用 FastAPI 的后台任务避免阻塞响应
3. **进度缓存**:使用内存字典缓存进度数据和清洗结果
4. **连接池**:建议为数据库使用连接池(可扩展功能)
---
## 常见问题
### Q: 为什么不使用 WebSocket?
A: HTTP 轮询方案具有以下优势:
- 服务器不需要维持连接状态
- 更容易水平扩展
- 无需 WebSocket 库和基础设施
- 使用标准 HTTP 协议,兼容性更强
### Q: 清洗后的数据存储在哪里?
A: 清洗后的数据默认存储在:
- **短期**:服务器内存中(task_id 映射)
- **长期**:用户确认后保存到 MySQL 数据库
### Q: 如何处理大文件?
A: 可在 `.env` 中配置最大文件大小限制:
```
MAX_EXCEL_SIZE=52428800 # 50MB
```
---
## 扩展功能(可选)
1. **数据备份**:定期备份已保存的数据
2. **审计日志**:记录所有数据修改操作
3. **权限控制**:添加用户认证和授权机制
4. **缓存优化**:使用 Redis 替代内存缓存
5. **任务队列**:使用 Celery 处理大批量任务
---
## 部署建议
### 生产环境
1. 使用 Gunicorn + Uvicorn 运行应用
2. 配置反向代理(nginx)
3. 启用 HTTPS
4. 配置日志持久化
5. 设置监控告警
### Docker 部署
```dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["uvicorn", "index:app", "--host", "0.0.0.0", "--port", "8000"]
```
---
## 技术栈
- **Web 框架**:FastAPI 0.104.1
- **ASGI 服务器**:Uvicorn 0.24.0
- **Excel 处理**:openpyxl 3.10.10
- **数据库驱动**:mysql-connector-python 8.2.0
- **数据验证**:Pydantic 2.5.0
- **HTTP 客户端**:requests 2.31.0
---
## License
MIT
---
## 支持
如有任何问题或建议,请联系开发团队。
# HTTP 路由与子模块
"""将异常与校验失败统一为 { code, data, msg } 响应体。"""
from typing import Any
from fastapi import Request
from fastapi.encoders import jsonable_encoder
from fastapi.exceptions import HTTPException, RequestValidationError
from fastapi.responses import JSONResponse
def _msg_from_detail(detail: str | dict | list | None) -> str:
if detail is None:
return "请求失败"
if isinstance(detail, str):
return detail
if isinstance(detail, dict):
return str(detail.get("error") or detail.get("msg") or detail.get("message") or "请求失败")
return "参数校验失败"
def _data_from_detail(detail: str | dict | list | None) -> Any:
if isinstance(detail, (dict, list)):
return jsonable_encoder(detail)
return None
async def http_exception_handler(request: Request, exc: HTTPException) -> JSONResponse:
body = {
"code": exc.status_code,
"data": _data_from_detail(exc.detail),
"msg": _msg_from_detail(exc.detail),
}
return JSONResponse(status_code=exc.status_code, content=body)
async def validation_exception_handler(request: Request, exc: RequestValidationError) -> JSONResponse:
errors = jsonable_encoder(exc.errors())
return JSONResponse(
status_code=422,
content={"code": 422, "data": errors, "msg": "参数校验失败"},
)
"""统一 API 响应:code=0 成功,非 0 为逻辑/业务错误码;data 为载荷;msg 为说明文案。"""
from typing import Any
from pydantic import BaseModel, Field
class ApiEnvelope(BaseModel):
code: int = Field(..., description="0 成功,非 0 失败")
data: Any = None
msg: str = ""
model_config = {"json_schema_extra": {"example": {"code": 0, "data": {}, "msg": "成功"}}}
def ok(data: Any = None, msg: str = "") -> ApiEnvelope:
return ApiEnvelope(code=0, data=data, msg=msg)
"""清洗相关 HTTP 路由:校验入参、调用团队转换、映射业务错误到 HTTP 状态码。"""
from fastapi import APIRouter, HTTPException
from api.response import ApiEnvelope, ok
from api.schemas import CleanRequestBody
from api.team_conversion_loader import default_team_target_path, run_team_conversion
DEPARTMENT_RISK_AUDIT_CLEAN = "风控稽查数据清洗"
api_router = APIRouter(prefix="/api")
def _audit_date_str_from_body(body: CleanRequestBody) -> str | None:
if body.year is None or body.month is None or body.day is None:
return None
return f"{body.year:04d}{body.month:02d}{body.day:02d}"
def _raise_http_for_failed_result(result: dict) -> None:
"""团队转换返回 ok=False 时,按 error 文案选择状态码。"""
err = result.get("error") or ""
if "source_url 须为" in err:
raise HTTPException(status_code=400, detail=result)
if "从 URL 读取源表失败" in err or err.startswith("读取源表失败"):
raise HTTPException(status_code=502, detail=result)
if result.get("message") and "error" not in result:
return
raise HTTPException(status_code=500, detail=result)
@api_router.post("/v1/clean", response_model=ApiEnvelope)
def post_clean(body: CleanRequestBody) -> ApiEnvelope:
dept = (body.department or "").strip()
if dept != DEPARTMENT_RISK_AUDIT_CLEAN:
raise HTTPException(
status_code=400,
detail={
"ok": False,
"error": f"不支持的 department: {dept!r},当前仅支持「{DEPARTMENT_RISK_AUDIT_CLEAN}」",
},
)
team_url = (body.team_url or "").strip()
team_target = (body.team_target_path or "").strip() or default_team_target_path()
if not team_url:
raise HTTPException(
status_code=400,
detail={"ok": False, "error": "team_url 不能为空"},
)
audit_date_str = _audit_date_str_from_body(body)
result = run_team_conversion(team_url, team_target, audit_date_str)
if result.get("ok"):
return ok(data=result, msg="成功")
_raise_http_for_failed_result(result)
return ok(data=result, msg=str(result.get("message") or ""))
"""清洗接口请求体。"""
from pydantic import BaseModel, Field
class CleanRequestBody(BaseModel):
department: str = Field(..., description="业务类型,风控稽查数据清洗 走团队转换")
year: int | None = None
month: int | None = None
day: int | None = None
team_url: str | None = None
team_target_path: str | None = None # 默认:项目下 cache/team_时间戳.xlsx
puling_url: str | None = None
chengyu_url: str | None = None
"""动态加载团队转换脚本(历史路径/中文文件名),对外只暴露可调用入口与路径工具。"""
import importlib.util
from datetime import datetime
from pathlib import Path
from typing import Any, Callable
_CODE_BASE = Path(__file__).resolve().parent.parent
_TEAM_SCRIPT = _CODE_BASE / "py_" / "audit" / "point_sale" / "data_conversion.py"
def _load_run_team_conversion() -> Callable[..., dict[str, Any]]:
spec = importlib.util.spec_from_file_location("team_data_convert", _TEAM_SCRIPT)
if spec is None or spec.loader is None:
raise RuntimeError(f"无法加载团队转换模块: {_TEAM_SCRIPT}")
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
fn = getattr(mod, "run_team_conversion", None)
if fn is None:
raise RuntimeError("data_conversion 中缺少 run_team_conversion")
return fn
run_team_conversion: Callable[..., dict[str, Any]] = _load_run_team_conversion()
def default_team_target_path() -> str:
"""未传路径时:cache/team_{时间戳}.xlsx"""
d = _CODE_BASE / "cache"
d.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
return str(d / f"team_{ts}.xlsx")
差异被折叠。
"""
配置管理模块
负责读取和管理应用配置
通过环境变量 ENV=development|production 自动区分开发/生产环境
"""
import os
from typing import Optional
from dotenv import load_dotenv
# 加载 .env 文件(使用绝对路径,避免因工作目录不同导致加载失败)
_env_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), '.env')
load_dotenv(dotenv_path=_env_path)
# 环境标识:development | production,未设置时默认为开发环境
_ENV = os.getenv("ENV", "development").strip().lower()
IS_PRODUCTION = _ENV == "production"
IS_DEV = not IS_PRODUCTION
def _db_var(key: str, dev_default: str, prod_default: str = "") -> str:
"""按环境读取数据库相关变量:生产环境优先读 PROD_DB_*,否则读 DB_*"""
if IS_PRODUCTION:
return os.getenv(f"PROD_DB_{key}", os.getenv(f"DB_{key}", prod_default)) or prod_default
return os.getenv(f"DB_{key}", dev_default)
class Config:
"""应用配置类"""
# 环境
ENV: str = _ENV
IS_PRODUCTION: bool = IS_PRODUCTION
IS_DEV: bool = IS_DEV
# 服务器配置(生产环境默认关闭 DEBUG)
HOST: str = os.getenv("HOST", "0.0.0.0")
PORT: int = int(os.getenv("PORT", "8000"))
DEBUG: bool = os.getenv("DEBUG", "false" if IS_PRODUCTION else "true").lower() == "true"
# 数据库配置:开发用 DB_*,生产用 PROD_DB_*(或系统环境变量覆盖)
DB_HOST: str = _db_var("HOST", "localhost")
DB_PORT: int = int(_db_var("PORT", "3306"))
DB_USER: str = _db_var("USER", "root")
DB_PASSWORD: str = _db_var("PASSWORD", "")
DB_NAME: str = _db_var("NAME", "clean_data")
# 日志配置
LOG_LEVEL: str = os.getenv("LOG_LEVEL", "INFO")
LOG_FILE: Optional[str] = os.getenv("LOG_FILE")
# Excel 下载配置
EXCEL_DOWNLOAD_TIMEOUT: int = int(os.getenv("EXCEL_DOWNLOAD_TIMEOUT", "30"))
MAX_EXCEL_SIZE: int = int(os.getenv("MAX_EXCEL_SIZE", "52428800")) # 50MB
# 任务超时配置
TASK_TIMEOUT_SECONDS: int = int(os.getenv("TASK_TIMEOUT_SECONDS", "3600")) # 1小时
@classmethod
def get_db_config(cls) -> dict:
"""获取数据库配置字典"""
return {
'host': cls.DB_HOST,
'port': cls.DB_PORT,
'user': cls.DB_USER,
'password': cls.DB_PASSWORD,
'database': cls.DB_NAME,
}
# 创建全局配置实例
config = Config()
"""Core 业务模块"""
"""
数据清洗模块
负责数据的清洗和验证逻辑
"""
import logging
import asyncio
import pandas as pd
from typing import List, Dict, Any, Callable, Optional
logger = logging.getLogger(__name__)
# 各 department 对应的清洗策略注册表
# key: department 名称, value: (transform函数, 产品组配置, 稽查来源名称)
_DEPARTMENT_CLEANERS = {}
def _load_department_cleaners():
"""非专用清洗逻辑"""
global _DEPARTMENT_CLEANERS
if _DEPARTMENT_CLEANERS: # 如果部门清洗模块已加载,则直接返回
return
try:
# 加载部门清洗使用的工具
from core_py.数据转换_团队 import (
transform as _team_transform,
PRODUCT_GROUPS_JC,
) # PRODUCT_GROUPS_JC 风控稽查数据清洗配置数据
_DEPARTMENT_CLEANERS["风控稽查数据清洗"] = (_team_transform, PRODUCT_GROUPS_JC, "稽查团队")
logger.info("已加载部门清洗模块: 风控稽查数据清洗")
except ImportError as e:
logger.warning(f"加载团队清洗模块失败: {e}")
class DataCleaner:
"""数据清洗类"""
def __init__(self):
self.rules = {}
async def clean(
self,
raw_data: List[Dict[str, Any]],
department: str,
progress_callback: Optional[Callable[[float, str, Optional[int]], None]] = None,
audit_date: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""
清洗数据
Args:
raw_data: 原始数据列表(每行为 dict,key 为列名)
department: 业务部门名称,如 "团队"
progress_callback: 进度回调函数,接收 (progress: 0-1, message: str)
audit_date: 稽查日期字符串,格式 'yyyy-mm-dd';为 None 时由各清洗模块自动取上月1号
Returns:
List[Dict]: 清洗后的数据
"""
try:
logger.info(f"开始清洗数据,部门: {department},数据行数: {len(raw_data)}")
# ── 专项清洗路由 ──────────────────────────────────────────────
_load_department_cleaners()
if department in _DEPARTMENT_CLEANERS:
return await self._clean_by_department(
raw_data, department, progress_callback, audit_date=audit_date
)
# ─────────────────────────────────────────────────────────────
total_rows = len(raw_data)
cleaned_data = []
for idx, row in enumerate(raw_data):
try:
cleaned_row = await self._validate_and_convert(row, department)
if cleaned_row and not self._is_duplicate(
cleaned_row, cleaned_data
):
cleaned_data.append(cleaned_row)
if progress_callback and idx % max(1, total_rows // 10) == 0:
progress = idx / total_rows if total_rows > 0 else 0
progress_callback(progress, f"已清洗 {idx}/{total_rows} 行数据", len(cleaned_data))
except Exception as e:
logger.warning(f"第 {idx + 1} 行数据清洗失败: {str(e)}")
continue
if progress_callback:
progress_callback(1.0, f"清洗完成,共 {len(cleaned_data)} 行有效数据", len(cleaned_data))
logger.info(
f"数据清洗完成,原始行数: {total_rows},清洗后行数: {len(cleaned_data)}"
)
return cleaned_data
except Exception as e:
logger.error(f"clean 方法执行失败: {str(e)}")
raise
async def _clean_by_department(
self,
raw_data: List[Dict[str, Any]],
department: str,
progress_callback: Optional[Callable[[float, str, Optional[int]], None]] = None,
audit_date: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""
调用对应部门的专项 transform 函数进行清洗。
raw_data 来自 excel_handler(List[Dict],key 为列名),
transform 函数通过 iloc 按位置访问列,因此转换为 DataFrame 时
只要列顺序与原始 Excel 一致,iloc 索引就能正确对应。
"""
transform_fn, pg, yname = _DEPARTMENT_CLEANERS[department]
if progress_callback:
progress_callback(0.1, "正在转换数据格式", None)
# List[Dict] → DataFrame(保留原始列顺序,iloc 索引与 Excel 列位置对应)
df = pd.DataFrame(raw_data)
if progress_callback:
progress_callback(0.3, f"正在执行 {department} 数据清洗", None)
# transform 是同步函数,用 asyncio.to_thread 避免阻塞事件循环
records = await asyncio.to_thread(transform_fn, df, yname, pg, audit_date)
if progress_callback:
progress_callback(1.0, f"清洗完成,共 {len(records)} 行有效数据", len(records))
logger.info(f"[{department}] 专项清洗完成,共 {len(records)} 条记录")
return records
async def _validate_and_convert(
self, row: Dict[str, Any], department: str
) -> Optional[Dict[str, Any]]:
"""
验证和转换单行数据
Args:
row: 数据行
department: 业务部门名称
Returns:
转换后的数据行,若无效则返回 None
"""
try:
cleaned_row = {}
for key, value in row.items():
if value is None or (isinstance(value, str) and not value.strip()):
# 空值处理
cleaned_row[key] = None
continue
# 字符串数据清洗
if isinstance(value, str):
cleaned_row[key] = value.strip()
else:
cleaned_row[key] = value
# 验证必填字段(根据部门调整规则)
if not self._validate_required_fields(cleaned_row, department):
return None
return cleaned_row
except Exception as e:
logger.warning(f"_validate_and_convert 失败: {str(e)}")
return None
def _validate_required_fields(self, row: Dict[str, Any], department: str) -> bool:
"""
验证必填字段
Args:
row: 数据行
department: 业务部门
Returns:
bool: 是否通过验证
"""
# 示例:可根据部门定义不同的必填字段规则
required_fields_map = {
"sales": ["产品", "金额"],
"inventory": ["SKU", "数量"],
"finance": ["交易日期", "金额"],
}
required_fields = required_fields_map.get(department, [])
# 检查必填字段是否存在且非空
for field in required_fields:
if field not in row or row[field] is None:
return False
return True
def _is_duplicate(
self, row: Dict[str, Any], existing_data: List[Dict[str, Any]]
) -> bool:
"""
检查行是否为重复数据
Args:
row: 当前行
existing_data: 已有数据列表
Returns:
bool: 是否为重复
"""
# 简单的重复检查(可扩展为更复杂的逻辑)
for existing_row in existing_data:
if row == existing_row:
return True
return False
"""
数据库处理模块
负责与 MySQL 数据库的交互
"""
import logging
import mysql.connector
from typing import List, Dict, Any
from contextlib import contextmanager
from config import config
logger = logging.getLogger(__name__)
class DatabaseHandler:
"""数据库处理类"""
def __init__(self):
"""初始化数据库配置"""
self.db_config = {
'host': config.DB_HOST,
'user': config.DB_USER,
'password': config.DB_PASSWORD,
'database': config.DB_NAME,
'port': config.DB_PORT,
'autocommit': False,
'connection_timeout': 10
}
@contextmanager
def _get_connection(self):
"""
获取数据库连接的上下文管理器
Yields:
mysql.connector.MySQLConnection: 数据库连接
Raises:
Exception: 连接失败时抛出异常
"""
connection = None
try:
connection = mysql.connector.connect(**self.db_config)
logger.info("数据库连接成功")
yield connection
except mysql.connector.Error as e:
logger.error(f"数据库连接失败: {str(e)}")
raise
finally:
if connection and connection.is_connected():
connection.close()
logger.info("数据库连接已关闭")
async def insert_data(
self,
table_name: str,
data: List[Dict[str, Any]]
) -> int:
"""
将数据 upsert 到指定的表(首次写入为 INSERT,命中唯一键时覆盖更新)。
MySQL ON DUPLICATE KEY UPDATE 行为说明:
- 新行插入:rowcount += 1
- 已有行被更新:rowcount += 2
- 数据与现有行完全一致(无变化):rowcount += 0
Args:
table_name: 目标表名
data: 数据列表
Returns:
tuple[int, int]: (submitted_rows, raw_affected)
- submitted_rows: 提交处理的总行数(去重后传入的行数,即预估真实入库行数)
- raw_affected: MySQL 累计 rowcount 原始值(insert=+1, update=+2, 无变化=+0)
Raises:
Exception: 插入失败时抛出异常
"""
if not data:
logger.warning("插入的数据为空")
return 0
try:
with self._get_connection() as connection:
cursor = connection.cursor()
# 获取字段名
columns = list(data[0].keys())
column_names = ', '.join([f'`{col}`' for col in columns])
placeholders = ', '.join(['%s'] * len(columns))
# ON DUPLICATE KEY UPDATE:命中唯一键时覆盖所有字段值
update_clause = ', '.join([f'`{col}` = VALUES(`{col}`)' for col in columns])
upsert_sql = f"""
INSERT INTO `{table_name}` ({column_names})
VALUES ({placeholders})
ON DUPLICATE KEY UPDATE {update_clause}
"""
logger.info(f"准备 upsert {len(data)} 行数据到表 {table_name}")
# 批量 upsert
# ON DUPLICATE KEY UPDATE 的 rowcount 含义:insert=1,update=2,无变化=0
# 真实入库(新增)行数 = rowcount // 1 的部分;用 lastrowid 变化量计算最准,
# 但批量时不可用。此处用最简单可靠的方案:
# raw_affected 累加 rowcount 原始值,
# insert_rows = raw_affected 中 rowcount==1 的部分(需逐条统计)
# 由于 executemany 只返回总 rowcount,改为逐条 execute 才能精确区分。
# 权衡性能与精度,保留 executemany 批量写入,同时返回原始 raw_affected,
# 并在 log 中说明换算公式,调用方按需解读。
raw_affected = 0
for batch_start in range(0, len(data), 1000):
batch_end = min(batch_start + 1000, len(data))
batch_data = data[batch_start:batch_end]
values_list = [
tuple(row.get(col) for col in columns)
for row in batch_data
]
cursor.executemany(upsert_sql, values_list)
raw_affected += cursor.rowcount
logger.info(f"已处理 {batch_end} / {len(data)} 行数据")
connection.commit()
# 查询本次 upsert 后表中实际存在的行数(含历史数据),
# 以及本批次真实写入行数:
# insert_rows ≈ raw_affected 中 rowcount=1 的行(executemany 无法细分)
# upsert_rows = raw_affected(去掉无变化的0,insert贡献1,update贡献2)
# 用 (raw_affected + 批次总行数) / 3 可估算 update 行数,但不精确。
# 最可靠的语义:把传入行数作为"提交处理行数",raw_affected 作为辅助信息。
submitted_rows = len(data)
cursor.close()
logger.info(
f"upsert 完成:提交 {submitted_rows} 行,"
f"raw_affected={raw_affected}(insert+1 / update+2 / 无变化+0)"
)
# 返回 (submitted_rows, raw_affected) 元组,由调用方决定展示哪个
return submitted_rows, raw_affected
except mysql.connector.Error as e:
logger.error(f"MySQL 错误: {str(e)}")
raise
except Exception as e:
logger.error(f"insert_data 失败: {str(e)}")
raise
async def test_connection(self) -> bool:
"""
测试数据库连接
Returns:
bool: 连接是否成功
"""
try:
with self._get_connection() as connection:
cursor = connection.cursor()
cursor.execute("SELECT 1")
cursor.fetchone()
cursor.close()
return True
except Exception as e:
logger.error(f"数据库连接测试失败: {str(e)}")
return False
async def create_table_if_not_exists(
self,
table_name: str,
schema: Dict[str, str]
) -> bool:
"""
如果表不存在则创建表
Args:
table_name: 表名
schema: 表架构定义 {列名: 列定义}
Returns:
bool: 是否创建成功或表已存在
"""
try:
with self._get_connection() as connection:
cursor = connection.cursor()
# 检查表是否存在
cursor.execute(f"""
SELECT TABLE_NAME FROM information_schema.TABLES
WHERE TABLE_SCHEMA = '{self.db_config['database']}'
AND TABLE_NAME = '{table_name}'
""")
if cursor.fetchone():
logger.info(f"表 {table_name} 已存在")
cursor.close()
return True
# 创建表
columns_sql = ', '.join([f'`{col}` {definition}' for col, definition in schema.items()])
create_sql = f"""
CREATE TABLE `{table_name}` (
id INT AUTO_INCREMENT PRIMARY KEY,
{columns_sql},
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
cursor.execute(create_sql)
connection.commit()
cursor.close()
logger.info(f"成功创建表 {table_name}")
return True
except Exception as e:
logger.error(f"create_table_if_not_exists 失败: {str(e)}")
raise
"""
Excel 文件处理模块
负责从 URL 下载和解析 Excel 文件
"""
import aiohttp
import logging
from openpyxl import load_workbook
from io import BytesIO
from typing import List, Dict, Any
import os
import tempfile
logger = logging.getLogger(__name__)
class ExcelHandler:
"""Excel 文件处理类"""
def __init__(self):
self.timeout = aiohttp.ClientTimeout(total=30)
async def fetch_bytes(self, url: str) -> bytes:
"""
从 URL 下载文件,返回原始字节内容(供调用方自行用 pandas 解析)
Args:
url: 文件的网络链接
Returns:
bytes: 文件的原始二进制内容
"""
try:
logger.info(f"开始从 {url} 下载文件")
async with aiohttp.ClientSession(timeout=self.timeout) as session:
async with session.get(url) as response:
if response.status != 200:
raise Exception(f"下载失败,HTTP 状态码: {response.status}")
content = await response.read()
logger.info(f"下载完成,文件大小: {len(content)} 字节")
return content
except Exception as e:
logger.error(f"fetch_bytes 失败: {str(e)}")
raise
async def fetch_and_parse(self, excel_url: str) -> List[Dict[str, Any]]:
"""
从 URL 下载并解析 Excel 文件
Args:
excel_url: Excel 文件的网络链接
Returns:
List[Dict]: 解析后的数据,每行为一个字典
Raises:
Exception: 下载或解析失败时抛出异常
"""
try:
# 1. 下载文件
logger.info(f"开始从 {excel_url} 下载 Excel 文件")
async with aiohttp.ClientSession(timeout=self.timeout) as session:
async with session.get(excel_url) as response:
if response.status != 200:
raise Exception(f"下载失败,HTTP 状态码: {response.status}")
excel_content = await response.read()
logger.info(f"下载完成,文件大小: {len(excel_content)} 字节")
# 2. 解析 Excel
return self._parse_excel_content(excel_content)
except Exception as e:
logger.error(f"fetch_and_parse 失败: {str(e)}")
raise
def _parse_excel_content(self, excel_content: bytes) -> List[Dict[str, Any]]:
"""
解析 Excel 内容
Args:
excel_content: Excel 文件的二进制内容
Returns:
List[Dict]: 解析后的数据
"""
try:
# 使用 BytesIO 从内存中读取
excel_file = BytesIO(excel_content)
workbook = load_workbook(excel_file)
# 获取第一个工作表
worksheet = workbook.active
if not worksheet:
raise Exception("Excel 文件不包含有效的工作表")
# 获取标题行
headers = []
for cell in worksheet[1]:
headers.append(cell.value)
if not headers or all(h is None for h in headers):
raise Exception("Excel 文件不包含有效的标题行")
# 解析数据行
data = []
for row in worksheet.iter_rows(min_row=2, values_only=False):
row_data = {}
for idx, cell in enumerate(row):
if idx < len(headers):
row_data[headers[idx]] = cell.value
# 跳过空行
if any(v is not None for v in row_data.values()):
data.append(row_data)
logger.info(f"成功解析 Excel,共 {len(data)} 行数据")
return data
except Exception as e:
logger.error(f"_parse_excel_content 失败: {str(e)}")
raise
"""
进度管理模块
负责任务进度的记录和查询
"""
import logging
from typing import Dict, Any, Optional
from datetime import datetime, timedelta
import threading
logger = logging.getLogger(__name__)
class ProgressManager:
"""进度管理类"""
def __init__(self, timeout_seconds: int = 3600):
"""
初始化进度管理器
Args:
timeout_seconds: 任务进度的过期时间(秒),默认 1 小时
"""
self.progress_data: Dict[str, Dict[str, Any]] = {}
self.timeout_seconds = timeout_seconds
self.lock = threading.Lock()
def update_progress(
self,
task_id: str,
status: str,
progress: int,
message: str,
processed_count: Optional[int] = None
) -> None:
"""
更新任务进度
Args:
task_id: 任务唯一标识
status: 状态 (queued, processing, completed, failed)
progress: 进度百分比 (0-100)
message: 进度信息
processed_count: 已处理的数据条数,None 表示暂未统计
"""
with self.lock:
self.progress_data[task_id] = {
'task_id': task_id,
'status': status,
'progress': max(0, min(100, progress)),
'message': message,
'processed_count': processed_count,
'timestamp': datetime.now().isoformat(),
'created_at': datetime.now()
}
logger.debug(f"[{task_id}] 进度更新: {status} {progress}% - {message}")
def get_progress(self, task_id: str) -> Optional[Dict[str, Any]]:
"""
获取任务进度
Args:
task_id: 任务唯一标识
Returns:
Optional[Dict]: 进度信息,若任务不存在或已过期返回 None
"""
with self.lock:
if task_id not in self.progress_data:
return None
data = self.progress_data[task_id]
# 检查是否过期
if datetime.now() - data['created_at'] > timedelta(seconds=self.timeout_seconds):
logger.warning(f"任务 {task_id} 已过期,删除记录")
del self.progress_data[task_id]
return None
# 返回字典副本,移除 created_at(内部字段)
result = {k: v for k, v in data.items() if k != 'created_at'}
return result
def get_all_progress(self) -> Dict[str, Dict[str, Any]]:
"""
获取所有任务的进度信息
Returns:
Dict: 所有任务的进度信息
"""
with self.lock:
# 清理过期任务
expired_tasks = []
for task_id, data in self.progress_data.items():
if datetime.now() - data['created_at'] > timedelta(seconds=self.timeout_seconds):
expired_tasks.append(task_id)
for task_id in expired_tasks:
del self.progress_data[task_id]
logger.info(f"清理过期任务: {task_id}")
# 返回所有有效任务的进度
return {
task_id: {k: v for k, v in data.items() if k != 'created_at'}
for task_id, data in self.progress_data.items()
}
def clear_progress(self, task_id: str) -> None:
"""
清除任务进度记录
Args:
task_id: 任务唯一标识
"""
with self.lock:
if task_id in self.progress_data:
del self.progress_data[task_id]
logger.info(f"清除任务 {task_id} 的进度记录")
import sys
import os
import pandas as pd
import mysql.connector
# 兼容直接运行(python core_py/1低价计算.py)和作为模块被 index.py 导入两种场景
if __name__ == "__main__":
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from config import config
def load_price_map_from_db() -> dict:
"""
从 market_bi.bi_price_xx 读取线下价盘数据,
返回匹配字典: { "产品系列|产品克重|渠道(大写)" -> low_price(float) }
"""
conn = mysql.connector.connect(
host=config.DB_HOST,
port=config.DB_PORT,
user=config.DB_USER,
password=config.DB_PASSWORD,
database="market_bi",
charset="utf8mb4",
)
try:
sql = "SELECT bi_product, pro_weight, channel_type, low_price FROM bi_price_xx"
df_p = pd.read_sql(sql, conn)
finally:
conn.close()
def _clean(s):
return "" if pd.isna(s) else str(s).strip().upper()
df_p["match_key"] = (
df_p["bi_product"].apply(_clean) + "|"
+ df_p["pro_weight"].apply(_clean) + "|"
+ df_p["channel_type"].apply(_clean)
)
df_p["low_price"] = pd.to_numeric(df_p["low_price"], errors="coerce")
return df_p.set_index("match_key")["low_price"].to_dict()
def transform(df_y: pd.DataFrame) -> pd.DataFrame:
"""
供 API 调用的低价计算入口。
接收大宽表 DataFrame(STANDARD_COLUMNS 列名),从数据库 market_bi.bi_price_xx
读取价盘基准,计算并回填以下三列后返回:
- 是否低价:低价 / 正常 / None(无法匹配或缺价格)
- 破价价差:低价时的价差(decimal),正常/无法匹配时为 None
- 低价整改状态:低价时置为 '未整改',其余不改动
Args:
df_y: 大宽表 DataFrame,必须包含列:
产品系列、产品克重、渠道类型(稽查源提供)、产品价格
Returns:
pd.DataFrame: 更新了低价相关字段的 DataFrame(不修改原对象)
"""
df = df_y.copy()
price_map = load_price_map_from_db()
def _clean(s):
return "" if pd.isna(s) else str(s).strip().upper()
# 构建匹配键和数值价格(辅助列,最终会删除)
df["_series_c"] = df["产品系列"].apply(_clean)
df["_weight_c"] = df["产品克重"].apply(_clean)
df["_channel_c"] = df["渠道类型(稽查源提供)"].apply(_clean)
df["_match_key"] = df["_series_c"] + "|" + df["_weight_c"] + "|" + df["_channel_c"]
df["_price_num"] = pd.to_numeric(df["产品价格"], errors="coerce")
df["_p_low_price"] = df["_match_key"].map(price_map)
# 重置低价相关列
df["是否低价"] = None
df["破价价差"] = None
# 条件向量化计算,避免逐行循环
has_both = df["_price_num"].notna() & df["_p_low_price"].notna()
cond_low = has_both & (df["_price_num"] < df["_p_low_price"])
cond_normal = has_both & ~cond_low
df.loc[cond_low, "是否低价"] = "低价"
df.loc[cond_low, "破价价差"] = (
df.loc[cond_low, "_p_low_price"] - df.loc[cond_low, "_price_num"]
).round(2)
df["低价整改状态"] = df["低价整改状态"].astype(object)
df.loc[cond_low, "低价整改状态"] = "未整改"
df.loc[cond_normal, "是否低价"] = "正常"
df.loc[cond_normal, "破价价差"] = None
# 清除辅助列
df.drop(
columns=["_series_c", "_weight_c", "_channel_c", "_match_key", "_price_num", "_p_low_price"],
inplace=True,
)
return df
if __name__ == "__main__":
# ── 独立测试模式:读本地 Excel 大宽表 → 计算低价 → 输出结果文件 ──
from datetime import datetime
from dateutil.relativedelta import relativedelta
current_date = (datetime.now().replace(day=1) - relativedelta(months=1)).strftime("%Y-%m-01")
y_file = f"/王小卤/风控/代码-新/大日期{current_date}_2.xlsx"
output_file = f"/王小卤/风控/代码-新/低价大日期_2.xlsx"
print("正在读取稽查结果大宽表...")
df_y = pd.read_excel(y_file, sheet_name="合并后", dtype=str)
df_y.columns = df_y.columns.str.strip()
print("正在从数据库读取价盘并计算低价...")
df_result = transform(df_y)
df_result.to_excel(output_file, index=False)
print(f"✅ 处理完成!结果已保存至:{output_file}")
差异被折叠。
差异被折叠。
fastapi==0.104.1 fastapi>=0.115.0
uvicorn==0.24.0 uvicorn[standard]>=0.32.0
python-multipart==0.0.6
openpyxl==3.1.5 # 数据转换_团队.py
requests==2.31.0
aiohttp==3.9.1
mysql-connector-python==8.2.0
pydantic==2.4.2
python-dotenv==1.0.0
pandas>=2.0.0 pandas>=2.0.0
python-dateutil>=2.8.2 openpyxl>=3.1.0
\ No newline at end of file python-dateutil>=2.8.0
/*
Navicat MySQL Data Transfer
Source Server : t100_dev
Source Server Version : 50744
Source Host : 192.168.100.39:25301
Source Database : market_bi
Target Server Type : MYSQL
Target Server Version : 50744
File Encoding : 65001
Date: 2026-03-09 18:13:42
*/
SET FOREIGN_KEY_CHECKS=0;
-- ----------------------------
-- Table structure for risk_audit_visit
-- ----------------------------
DROP TABLE IF EXISTS `risk_audit_visit`;
CREATE TABLE `risk_audit_visit` (
`rav_id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
`audit_date` date DEFAULT NULL COMMENT '稽查日期',
`source` varchar(20) DEFAULT NULL COMMENT '稽查来源',
`region_name` varchar(20) DEFAULT NULL COMMENT '大区',
`district_name` varchar(20) DEFAULT NULL COMMENT '战区',
`dealer_code` varchar(10) DEFAULT NULL COMMENT '经销商编码',
`dealer_name` varchar(100) DEFAULT NULL COMMENT '经销商名称',
`store_code` varchar(20) DEFAULT NULL COMMENT '门店编码',
`store_name` varchar(100) DEFAULT NULL COMMENT '勤策门店',
`f_emp_no` varchar(20) DEFAULT NULL COMMENT '客户经理工号',
`f_emp_name` varchar(100) DEFAULT NULL COMMENT '客户经理名称',
`qin_ce_type_large` varchar(20) DEFAULT NULL COMMENT '勤策渠道大类',
`jh_channel_type` varchar(20) DEFAULT NULL COMMENT '稽查渠道类型',
`city` varchar(30) DEFAULT NULL COMMENT '城市',
`channel_type` varchar(30) DEFAULT NULL COMMENT '渠道类型(稽查源提供)',
`series` varchar(20) DEFAULT NULL COMMENT '产品系列',
`taste` varchar(20) DEFAULT NULL COMMENT '产品口味',
`weight` varchar(20) DEFAULT NULL COMMENT '产品克重',
`price` decimal(10,2) DEFAULT NULL COMMENT '产品价格',
`low_price` varchar(20) DEFAULT NULL COMMENT '是否低价:低价,正常',
`low_price_diff` decimal(10,2) DEFAULT NULL COMMENT '价差',
`low_price_status` varchar(20) DEFAULT NULL COMMENT '低价整改状态',
`low_price_rectify` varchar(100) DEFAULT NULL COMMENT '低价整改说明',
`production_month` date DEFAULT NULL COMMENT '产品生产月份',
`near_month_num` int(11) DEFAULT NULL COMMENT '临期月份数',
`near_month_status` varchar(20) DEFAULT NULL COMMENT '临期状态',
`fresh_status` varchar(20) DEFAULT NULL COMMENT '新鲜度',
`large_date_status` varchar(20) DEFAULT NULL COMMENT '大日期整改状态',
`large_date_rectify` varchar(100) DEFAULT NULL COMMENT '大日期整改说明',
PRIMARY KEY (`rav_id`),
-- 业务唯一键:同一稽查日期 + 来源 + 门店名称 + 渠道类型(稽查源提供)+ 产品系列 + 口味 + 克重 = 唯一一条记录
-- ON DUPLICATE KEY UPDATE 依赖此唯一键判断是执行 INSERT 还是覆盖 UPDATE
UNIQUE KEY `uk_biz` (`audit_date`,`source`,`store_name`(100),`channel_type`,`series`,`taste`,`weight`),
KEY `audit` (`audit_date`),
KEY `dealer` (`dealer_code`,`dealer_name`),
KEY `product_index` (`series`,`taste`,`weight`),
KEY `regiondistrict` (`region_name`,`district_name`),
KEY `type_small` (`jh_channel_type`),
KEY `weight_index` (`weight`)
) ENGINE=InnoDB AUTO_INCREMENT=493621 DEFAULT CHARSET=utf8mb4 COMMENT='稽查走访价格大日期表';
"""
API 测试脚本
用于快速测试 API 的各个端点
"""
import asyncio
import httpx
import json
from datetime import datetime
BASE_URL = "http://localhost:8000"
class APITester:
"""API 测试类"""
def __init__(self, base_url: str = BASE_URL):
self.base_url = base_url
self.task_id: str = None
async def test_health_check(self):
"""测试健康检查接口"""
print("\n" + "="*50)
print("测试:健康检查接口")
print("="*50)
try:
async with httpx.AsyncClient() as client:
response = await client.get(f"{self.base_url}/api/v1/health")
print(f"状态码: {response.status_code}")
print(f"响应: {json.dumps(response.json(), indent=2, ensure_ascii=False)}")
except Exception as e:
print(f"错误: {str(e)}")
async def test_start_cleaning(self):
"""测试启动清洗任务接口"""
print("\n" + "="*50)
print("测试:启动数据清洗任务")
print("="*50)
payload = {
"excel_url": "https://example.com/test_data.xlsx",
"department": "sales",
"description": "测试数据清洗"
}
try:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/api/v1/clean",
json=payload
)
print(f"状态码: {response.status_code}")
data = response.json()
print(f"响应: {json.dumps(data, indent=2, ensure_ascii=False)}")
if response.status_code == 200:
self.task_id = data.get('task_id')
print(f"\n✓ 任务创建成功,Task ID: {self.task_id}")
except Exception as e:
print(f"错误: {str(e)}")
async def test_get_progress(self):
"""测试获取进度接口"""
if not self.task_id:
print("跳过:需要先创建任务")
return
print("\n" + "="*50)
print("测试:获取数据清洗进度")
print("="*50)
try:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.base_url}/api/v1/progress/{self.task_id}"
)
print(f"状态码: {response.status_code}")
print(f"响应: {json.dumps(response.json(), indent=2, ensure_ascii=False, default=str)}")
except Exception as e:
print(f"错误: {str(e)}")
async def test_get_result(self):
"""测试获取清洗结果接口"""
if not self.task_id:
print("跳过:需要先创建任务")
return
print("\n" + "="*50)
print("测试:获取清洗结果")
print("="*50)
try:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.base_url}/api/v1/result/{self.task_id}"
)
print(f"状态码: {response.status_code}")
data = response.json()
print(f"响应: {json.dumps(data, indent=2, ensure_ascii=False, default=str)}")
except Exception as e:
print(f"错误: {str(e)}")
async def test_save_data(self):
"""测试保存数据接口"""
if not self.task_id:
print("跳过:需要先创建任务")
return
print("\n" + "="*50)
print("测试:保存清洗后的数据")
print("="*50)
payload = {
"task_id": self.task_id,
"table_name": "sales_data"
}
try:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/api/v1/save",
json=payload
)
print(f"状态码: {response.status_code}")
print(f"响应: {json.dumps(response.json(), indent=2, ensure_ascii=False)}")
except Exception as e:
print(f"错误: {str(e)}")
async def run_all_tests(self):
"""运行所有测试"""
print("\n")
print("╔" + "="*48 + "╗")
print("║" + " "*10 + "数据清洗系统 API 测试" + " "*16 + "║")
print("║" + f" "*10 + f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" + " "*15 + "║")
print("╚" + "="*48 + "╝")
await self.test_health_check()
await asyncio.sleep(1)
await self.test_start_cleaning()
await asyncio.sleep(2)
await self.test_get_progress()
await asyncio.sleep(1)
await self.test_get_result()
await asyncio.sleep(1)
print("\n" + "="*50)
print("所有测试完成!")
print("="*50 + "\n")
async def main():
"""主函数"""
tester = APITester()
await tester.run_all_tests()
if __name__ == "__main__":
print("\n提示:确保 FastAPI 服务已在 http://localhost:8000 运行中\n")
asyncio.run(main())
"""Utils 工具模块""" # 跨业务复用的小工具(日期、网络 Excel 等)
from utils.response import BizCode, ApiResponse, ok_resp, fail_resp
__all__ = ["BizCode", "ApiResponse", "ok_resp", "fail_resp"]
"""日期解析与 DataFrame 中取首个有效日期(与具体业务表头通过参数解耦)。"""
from __future__ import annotations
import re
from collections.abc import Sequence
from datetime import datetime
import pandas as pd
def _parse_yyyymmdd(s: str) -> str | None:
"""8 位 YYYYMMDD → YYYY-MM-DD;非法日历则 None。"""
if not re.fullmatch(r"\d{8}", s):
return None
try:
return datetime.strptime(s, "%Y%m%d").strftime("%Y-%m-%d")
except ValueError:
return None
def to_yyyy_mm_dd(val) -> str | None:
"""任意单元格值 → YYYY-MM-DD;无法解析则 None。"""
if val is None or (isinstance(val, float) and pd.isna(val)):
return None
if isinstance(val, str):
y = _parse_yyyymmdd(val.strip())
if y:
return y
if isinstance(val, int) and val >= 0:
s = str(val)
if len(s) == 8:
y = _parse_yyyymmdd(s)
if y:
return y
if isinstance(val, float) and val.is_integer() and val >= 0:
s = str(int(val))
if len(s) == 8:
y = _parse_yyyymmdd(s)
if y:
return y
ts = pd.to_datetime(val, errors="coerce")
if pd.isna(ts):
return None
return ts.strftime("%Y-%m-%d")
def first_yyyy_mm_dd_in_iloc(df: pd.DataFrame, col_idx: int) -> str | None:
"""自上而下取第 col_idx 列首个可解析日期(宽表无表头时常用)。"""
if df is None or df.shape[1] <= col_idx or col_idx < 0:
return None
for val in df.iloc[:, col_idx]:
n = to_yyyy_mm_dd(val)
if n:
return n
return None
def first_yyyy_mm_dd_in_dataframe(
df: pd.DataFrame,
column_names: Sequence[str],
*,
third_column_fallback: bool = True,
) -> str | None:
"""按列名顺序找第一列,自上而下取首个可解析为日期的值;若无匹配列且允许则用第 3 列(下标 2)。"""
ser = None
if df is not None and df.shape[1] > 0:
for name in column_names:
if name in df.columns:
ser = df[name]
break
if ser is None and third_column_fallback and df.shape[1] > 2:
ser = df.iloc[:, 2]
if ser is None:
return None
for val in ser:
n = to_yyyy_mm_dd(val)
if n:
return n
return None
def normalize_year_month_to_day01(src_month):
"""
生产月份类字符串 → YYYY-MM-01(供后续 strptime %Y-%m-%d)。
支持 yyyy-mm、yyyymm;其它类型/格式原样返回。
"""
if not isinstance(src_month, str):
return src_month
src_month = src_month.strip()
if not src_month:
return src_month
if re.fullmatch(r"\d{4}-\d{1,2}", src_month):
year, month = src_month.split("-")
return f"{year}-{month.zfill(2)}-01"
if re.fullmatch(r"\d{6}", src_month):
year = src_month[:4]
month = src_month[4:].zfill(2)
return f"{year}-{month}-01"
return src_month
def approx_gap_months_calendar(expiry_date, inspect_date) -> float:
"""到期日相对检查日的剩余月数近似值(与原业务公式一致:年*12+月+日/30)。"""
diff_years = expiry_date.year - inspect_date.year
diff_months = expiry_date.month - inspect_date.month
diff_days = expiry_date.day - inspect_date.day
return diff_years * 12 + diff_months + diff_days / 30.0
"""从 URL 下载 Excel 到内存并用 pandas 解析(不写本地临时文件)。"""
from __future__ import annotations
import io
import urllib.request
import pandas as pd
def read_excel_from_url(
url: str,
*,
timeout: float = 300,
user_agent: str = "clean-data-api/1.0",
skiprows: int = 0,
header=None,
dtype=str,
) -> pd.DataFrame:
req = urllib.request.Request(url.strip(), headers={"User-Agent": user_agent})
with urllib.request.urlopen(req, timeout=timeout) as resp:
raw = resp.read()
return pd.read_excel(io.BytesIO(raw), skiprows=skiprows, header=header, dtype=dtype)
def read_excel_from_url_skip1_with_header_row(
url: str,
*,
timeout: float = 300,
user_agent: str = "clean-data-api/1.0",
dtype=str,
) -> tuple[pd.DataFrame, pd.Series]:
"""跳过第 1 行后的数据 + 被跳过的第 1 行(表头,0 基列与数据列对齐)。"""
req = urllib.request.Request(url.strip(), headers={"User-Agent": user_agent})
with urllib.request.urlopen(req, timeout=timeout) as resp:
raw = resp.read()
buf = io.BytesIO(raw)
header_df = pd.read_excel(buf, header=None, dtype=dtype, nrows=1)
buf.seek(0)
data_df = pd.read_excel(buf, skiprows=1, header=None, dtype=dtype)
return data_df, header_df.iloc[0]
"""
异常定义模块
"""
class DataCleaningException(Exception):
"""数据清洗异常"""
pass
class DatabaseException(Exception):
"""数据库异常"""
pass
class ExcelParsingException(Exception):
"""Excel 解析异常"""
pass
class ValidationException(Exception):
"""验证异常"""
pass
"""
统一响应格式封装模块
所有接口统一返回: { code: 业务状态码, msg: 消息, data: 数据 }
"""
from enum import IntEnum
from typing import Any
from fastapi.responses import JSONResponse
from pydantic import BaseModel
class BizCode(IntEnum):
"""业务逻辑状态码"""
SUCCESS = 200 # 通用成功
TASK_QUEUED = 201 # 任务已入队(异步场景)
TASK_PROCESSING = 202 # 任务处理中
BAD_REQUEST = 400 # 请求参数错误
NOT_FOUND = 404 # 资源不存在
TASK_FAILED = 422 # 任务执行失败(业务层)
SERVER_ERROR = 500 # 服务器内部错误
DB_ERROR = 501 # 数据库错误
EXCEL_ERROR = 502 # Excel 解析错误
class ApiResponse(BaseModel):
"""统一 API 响应体"""
code: int
msg: str
data: Any = None
def ok_resp(data: Any = None, msg: str = "success") -> JSONResponse:
"""返回成功的 JSONResponse(HTTP 200)"""
return JSONResponse(
status_code=200,
content=ApiResponse(code=BizCode.SUCCESS, msg=msg, data=data).model_dump()
)
def fail_resp(
biz_code: BizCode,
msg: str,
http_status: int = 400,
data: Any = None
) -> JSONResponse:
"""返回失败的 JSONResponse"""
return JSONResponse(
status_code=http_status,
content=ApiResponse(code=biz_code, msg=msg, data=data).model_dump()
)
"""
数据验证模块
"""
import re
import logging
logger = logging.getLogger(__name__)
def validate_excel_url(url: str) -> bool:
"""
验证 Excel URL 的有效性
Args:
url: URL 字符串
Returns:
bool: 是否为有效的 Excel URL
"""
if not url or not isinstance(url, str):
return False
# 检查 URL 格式
url_pattern = r'^https?://.*\.(xlsx|xls|csv)$'
if not re.match(url_pattern, url, re.IGNORECASE):
logger.warning(f"URL 格式无效: {url}")
return False
return True
def sanitize_filename(filename: str) -> str:
"""
清理文件名,移除不安全的字符
Args:
filename: 原始文件名
Returns:
str: 清理后的文件名
"""
# 移除不安全字符
sanitized = re.sub(r'[<>:"/\\|?*]', '', filename)
return sanitized[:255] # 限制长度
def validate_table_name(table_name: str) -> bool:
"""
验证数据库表名的有效性
Args:
table_name: 表名
Returns:
bool: 是否为有效的表名
"""
if not table_name or not isinstance(table_name, str):
return False
# MySQL 表名规则:以字母、数字或下划线开头,不包含特殊字符
table_name_pattern = r'^[a-zA-Z_][a-zA-Z0-9_]{0,63}$'
if not re.match(table_name_pattern, table_name):
logger.warning(f"表名格式无效: {table_name}")
return False
return True
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论