提交 fa6c4c14 authored 作者: lidongxu's avatar lidongxu

新建项目

上级
# ========== 环境与敏感信息 ==========
.env
.env.local
.env.*.local
*.env
# ========== Python ==========
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# 虚拟环境
venv/
.venv/
env/
ENV/
# ========== 测试与覆盖率 ==========
.pytest_cache/
.coverage
htmlcov/
.tox/
.nox/
coverage.xml
*.cover
.hypothesis/
# ========== IDE / 编辑器 ==========
.idea/
.vscode/
*.swp
*.swo
*~
.project
.pydevproject
.settings/
# ========== 系统文件 ==========
.DS_Store
.DS_Store?
Thumbs.db
ehthumbs.db
Desktop.ini
# ========== 日志与临时 ==========
*.log
*.tmp
*.temp
.cache/
# ========== 其他 ==========
*.sql.backup
*.bak
# 数据清洗系统 - 项目说明文档
## 项目概述
本项目是一个使用 FastAPI 框架开发的数据清洗系统,支持从 Excel 文件中提取数据、进行数据清洗处理,并将最终结果保存到 MySQL 数据库。
### 核心功能
1. **Excel 数据解析**:从网络链接下载并解析 Excel 文件
2. **数据清洗处理**:对解析后的数据进行验证、清洗和去重
3. **进度反馈**:通过 HTTP 轮询方式向前端实时反馈数据清洗进度
4. **数据持久化**:将清洗后的数据保存到 MySQL 数据库
---
## 项目结构
```
clean_data/
├── index.py # 主程序入口
├── requirements.txt # 项目依赖列表
├── .env.example # 环境变量配置示例
├── README.md # 项目说明文档
├── core/ # 核心业务模块
│ ├── __init__.py
│ ├── excel_handler.py # Excel 文件处理
│ ├── data_cleaner.py # 数据清洗逻辑
│ ├── db_handler.py # 数据库交互
│ └── progress_manager.py # 进度管理
└── utils/ # 工具模块
├── __init__.py
├── exceptions.py # 自定义异常
└── validators.py # 数据验证
```
---
## 快速开始
### 1. 环境准备
```bash
# 克隆项目(如果需要)
cd clean_data
# 创建虚拟环境(推荐)
python -m venv venv
# 激活虚拟环境
# Windows:
venv\Scripts\activate
# Linux/Mac:
source venv/bin/activate
# 安装依赖
pip install -r requirements.txt
```
### 2. 配置环境变量
```bash
# 复制环境变量配置文件
cp .env.example .env
# 编辑 .env 文件,填写实际的配置信息
# 特别注意:
# - DB_HOST, DB_PORT, DB_USER, DB_PASSWORD 需要填写实际的数据库配置
# - DB_NAME 为要使用的数据库名称
```
### 3. 启动服务
```bash
# 方式一:使用 Python 直接运行
python index.py
# 方式二:使用 Uvicorn 运行(推荐)
uvicorn index:app --host 0.0.0.0 --port 8000 --reload
# 服务将在 http://0.0.0.0:8000 启动
# API 文档:http://localhost:8000/docs(Swagger UI)
```
---
## API 接口文档
### 1. 启动数据清洗任务
**请求**
```
POST /api/v1/clean
```
**请求体**
```json
{
"excel_url": "https://example.com/data.xlsx",
"department": "sales",
"description": "Q1销售数据清洗"
}
```
**响应**
```json
{
"task_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "queued",
"message": "任务已创建,正在处理中...",
"data_preview": null
}
```
### 2. 获取数据清洗进度
**请求**
```
GET /api/v1/progress/{task_id}
```
**响应**
```json
{
"task_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "processing",
"progress": 65,
"message": "已清洗 650/1000 行数据",
"timestamp": "2026-03-06T10:30:45.123456"
}
```
**状态说明**
- `queued`: 任务已创建,排队中
- `processing`: 数据正在处理中
- `completed`: 数据清洗完成
- `failed`: 清洗过程中出错
### 3. 获取清洗结果
**请求**
```
GET /api/v1/result/{task_id}
```
**响应**
```json
{
"task_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "ready_to_save",
"message": "数据清洗完成,可进行保存",
"data_preview": [
{"产品": "产品A", "金额": 1000},
{"产品": "产品B", "金额": 2000}
],
"total_rows": 1000,
"department": "sales"
}
```
### 4. 保存清洗后的数据
**请求**
```
POST /api/v1/save
```
**请求体**
```json
{
"task_id": "550e8400-e29b-41d4-a716-446655440000",
"table_name": "sales_data"
}
```
**响应**
```json
{
"task_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "saved",
"message": "数据已成功保存到数据库",
"affected_rows": 1000
}
```
### 5. 健康检查
**请求**
```
GET /api/v1/health
```
**响应**
```json
{
"status": "healthy",
"timestamp": "2026-03-06T10:30:45.123456",
"service": "数据清洗系统"
}
```
---
## 进度反馈机制
### HTTP 轮询方案(无需 WebSocket)
系统采用 **HTTP 轮询** 方式实现进度反馈,具有以下优势:
1. **无连接保持**:客户端主动请求,降低服务器负载
2. **兼容性强**:支持所有 HTTP 客户端
3. **易于部署**:无需 WebSocket 基础设施
4. **便于扩展**:易于部署到各种云环境
### 前端实现建议
```javascript
// 示例:React/Vue 前端逻辑
const pollProgress = async (taskId) => {
const interval = setInterval(async () => {
try {
const response = await fetch(`/api/v1/progress/${taskId}`);
const data = await response.json();
// 更新进度条
updateProgressBar(data.progress);
updateMessage(data.message);
// 任务完成时停止轮询
if (data.status === 'completed' || data.status === 'failed') {
clearInterval(interval);
}
} catch (error) {
console.error('获取进度失败:', error);
}
}, 1000); // 每秒轮询一次
};
```
---
## 数据清洗逻辑
### 清洗步骤
1. **下载**:从网络链接下载 Excel 文件
2. **解析**:使用 openpyxl 解析 Excel 内容
3. **验证**:验证数据类型和必填字段
4. **清洗**
- 移除首尾空格
- 处理空值
- 去重处理
5. **缓存**:将清洗后的数据存储在内存中
6. **保存**:前端确认后保存到数据库
### 自定义清洗规则
编辑 `core/data_cleaner.py` 中的 `_validate_required_fields` 方法来自定义不同部门的清洗规则:
```python
required_fields_map = {
'sales': ['产品', '金额', '销售日期'],
'inventory': ['SKU', '数量', '仓库'],
'finance': ['交易日期', '金额', '类别']
}
```
---
## 数据库配置
### MySQL 5.6+ 连接配置
编辑 `.env` 文件:
```ini
DB_HOST=localhost
DB_PORT=3306
DB_USER=root
DB_PASSWORD=your_password
DB_NAME=clean_data
```
### 创建目标表(示例)
```sql
CREATE TABLE sales_data (
id INT AUTO_INCREMENT PRIMARY KEY,
产品 VARCHAR(100),
金额 DECIMAL(10, 2),
销售日期 DATE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
```
---
## 异常处理
系统定义了多种自定义异常,便于错误追踪:
- **DataCleaningException**:数据清洗过程中的异常
- **DatabaseException**:数据库操作异常
- **ExcelParsingException**:Excel 解析异常
- **ValidationException**:数据验证异常
所有异常都会被记录到日志中,便于问题排查。
---
## 日志记录
系统使用 Python 标准 logging 模块记录所有操作,日志级别可在 `.env` 中配置:
```
LOG_LEVEL=INFO
LOG_FILE=./logs/app.log
```
日志记录内容包括:
- 任务创建和完成
- 数据处理进度
- 异常错误信息
- 数据库操作记录
---
## 性能优化建议
1. **批量插入**:数据库操作使用批量插入(默认每 1000 行为一批)
2. **异步处理**:使用 FastAPI 的后台任务避免阻塞响应
3. **进度缓存**:使用内存字典缓存进度数据和清洗结果
4. **连接池**:建议为数据库使用连接池(可扩展功能)
---
## 常见问题
### Q: 为什么不使用 WebSocket?
A: HTTP 轮询方案具有以下优势:
- 服务器不需要维持连接状态
- 更容易水平扩展
- 无需 WebSocket 库和基础设施
- 使用标准 HTTP 协议,兼容性更强
### Q: 清洗后的数据存储在哪里?
A: 清洗后的数据默认存储在:
- **短期**:服务器内存中(task_id 映射)
- **长期**:用户确认后保存到 MySQL 数据库
### Q: 如何处理大文件?
A: 可在 `.env` 中配置最大文件大小限制:
```
MAX_EXCEL_SIZE=52428800 # 50MB
```
---
## 扩展功能(可选)
1. **数据备份**:定期备份已保存的数据
2. **审计日志**:记录所有数据修改操作
3. **权限控制**:添加用户认证和授权机制
4. **缓存优化**:使用 Redis 替代内存缓存
5. **任务队列**:使用 Celery 处理大批量任务
---
## 部署建议
### 生产环境
1. 使用 Gunicorn + Uvicorn 运行应用
2. 配置反向代理(nginx)
3. 启用 HTTPS
4. 配置日志持久化
5. 设置监控告警
### Docker 部署
```dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["uvicorn", "index:app", "--host", "0.0.0.0", "--port", "8000"]
```
---
## 技术栈
- **Web 框架**:FastAPI 0.104.1
- **ASGI 服务器**:Uvicorn 0.24.0
- **Excel 处理**:openpyxl 3.10.10
- **数据库驱动**:mysql-connector-python 8.2.0
- **数据验证**:Pydantic 2.5.0
- **HTTP 客户端**:requests 2.31.0
---
## License
MIT
---
## 支持
如有任何问题或建议,请联系开发团队。
"""
配置管理模块
负责读取和管理应用配置
"""
import os
from typing import Optional
from dotenv import load_dotenv
# 加载 .env 文件
load_dotenv()
class Config:
"""应用配置类"""
# 服务器配置
HOST: str = os.getenv("HOST", "0.0.0.0")
PORT: int = int(os.getenv("PORT", "8000"))
DEBUG: bool = os.getenv("DEBUG", "False").lower() == "true"
# 数据库配置
DB_HOST: str = os.getenv("DB_HOST", "localhost")
DB_PORT: int = int(os.getenv("DB_PORT", "3306"))
DB_USER: str = os.getenv("DB_USER", "root")
DB_PASSWORD: str = os.getenv("DB_PASSWORD", "")
DB_NAME: str = os.getenv("DB_NAME", "clean_data")
# 日志配置
LOG_LEVEL: str = os.getenv("LOG_LEVEL", "INFO")
LOG_FILE: Optional[str] = os.getenv("LOG_FILE")
# Excel 下载配置
EXCEL_DOWNLOAD_TIMEOUT: int = int(os.getenv("EXCEL_DOWNLOAD_TIMEOUT", "30"))
MAX_EXCEL_SIZE: int = int(os.getenv("MAX_EXCEL_SIZE", "52428800")) # 50MB
# 任务超时配置
TASK_TIMEOUT_SECONDS: int = int(os.getenv("TASK_TIMEOUT_SECONDS", "3600")) # 1小时
@classmethod
def get_db_config(cls) -> dict:
"""获取数据库配置字典"""
return {
'host': cls.DB_HOST,
'port': cls.DB_PORT,
'user': cls.DB_USER,
'password': cls.DB_PASSWORD,
'database': cls.DB_NAME,
}
# 创建全局配置实例
config = Config()
"""Core 业务模块"""
"""
数据清洗模块
负责数据的清洗和验证逻辑
"""
import logging
import asyncio
import pandas as pd
from typing import List, Dict, Any, Callable, Optional
logger = logging.getLogger(__name__)
# 各 department 对应的清洗策略注册表
# key: department 名称, value: (transform函数, 产品组配置, 稽查来源名称)
_DEPARTMENT_CLEANERS = {}
def _load_department_cleaners():
"""非专用清洗逻辑"""
global _DEPARTMENT_CLEANERS
if _DEPARTMENT_CLEANERS: # 如果部门清洗模块已加载,则直接返回
return
try:
# 加载部门清洗使用的工具
from core_py.数据转换_团队 import (
transform as _team_transform,
PRODUCT_GROUPS_JC,
) # PRODUCT_GROUPS_JC 风控稽查数据清洗配置数据
_DEPARTMENT_CLEANERS["风控稽查数据清洗"] = (_team_transform, PRODUCT_GROUPS_JC, "稽查团队")
logger.info("已加载部门清洗模块: 风控稽查数据清洗")
except ImportError as e:
logger.warning(f"加载团队清洗模块失败: {e}")
class DataCleaner:
"""数据清洗类"""
def __init__(self):
self.rules = {}
async def clean(
self,
raw_data: List[Dict[str, Any]],
department: str,
progress_callback: Optional[Callable[[float, str, Optional[int]], None]] = None,
audit_date: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""
清洗数据
Args:
raw_data: 原始数据列表(每行为 dict,key 为列名)
department: 业务部门名称,如 "团队"
progress_callback: 进度回调函数,接收 (progress: 0-1, message: str)
audit_date: 稽查日期字符串,格式 'yyyy-mm-dd';为 None 时由各清洗模块自动取上月1号
Returns:
List[Dict]: 清洗后的数据
"""
try:
logger.info(f"开始清洗数据,部门: {department},数据行数: {len(raw_data)}")
# ── 专项清洗路由 ──────────────────────────────────────────────
_load_department_cleaners()
if department in _DEPARTMENT_CLEANERS:
return await self._clean_by_department(
raw_data, department, progress_callback, audit_date=audit_date
)
# ─────────────────────────────────────────────────────────────
total_rows = len(raw_data)
cleaned_data = []
for idx, row in enumerate(raw_data):
try:
cleaned_row = await self._validate_and_convert(row, department)
if cleaned_row and not self._is_duplicate(
cleaned_row, cleaned_data
):
cleaned_data.append(cleaned_row)
if progress_callback and idx % max(1, total_rows // 10) == 0:
progress = idx / total_rows if total_rows > 0 else 0
progress_callback(progress, f"已清洗 {idx}/{total_rows} 行数据", len(cleaned_data))
except Exception as e:
logger.warning(f"第 {idx + 1} 行数据清洗失败: {str(e)}")
continue
if progress_callback:
progress_callback(1.0, f"清洗完成,共 {len(cleaned_data)} 行有效数据", len(cleaned_data))
logger.info(
f"数据清洗完成,原始行数: {total_rows},清洗后行数: {len(cleaned_data)}"
)
return cleaned_data
except Exception as e:
logger.error(f"clean 方法执行失败: {str(e)}")
raise
async def _clean_by_department(
self,
raw_data: List[Dict[str, Any]],
department: str,
progress_callback: Optional[Callable[[float, str, Optional[int]], None]] = None,
audit_date: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""
调用对应部门的专项 transform 函数进行清洗。
raw_data 来自 excel_handler(List[Dict],key 为列名),
transform 函数通过 iloc 按位置访问列,因此转换为 DataFrame 时
只要列顺序与原始 Excel 一致,iloc 索引就能正确对应。
"""
transform_fn, pg, yname = _DEPARTMENT_CLEANERS[department]
if progress_callback:
progress_callback(0.1, "正在转换数据格式", None)
# List[Dict] → DataFrame(保留原始列顺序,iloc 索引与 Excel 列位置对应)
df = pd.DataFrame(raw_data)
if progress_callback:
progress_callback(0.3, f"正在执行 {department} 数据清洗", None)
# transform 是同步函数,用 asyncio.to_thread 避免阻塞事件循环
records = await asyncio.to_thread(transform_fn, df, yname, pg, audit_date)
if progress_callback:
progress_callback(1.0, f"清洗完成,共 {len(records)} 行有效数据", len(records))
logger.info(f"[{department}] 专项清洗完成,共 {len(records)} 条记录")
return records
async def _validate_and_convert(
self, row: Dict[str, Any], department: str
) -> Optional[Dict[str, Any]]:
"""
验证和转换单行数据
Args:
row: 数据行
department: 业务部门名称
Returns:
转换后的数据行,若无效则返回 None
"""
try:
cleaned_row = {}
for key, value in row.items():
if value is None or (isinstance(value, str) and not value.strip()):
# 空值处理
cleaned_row[key] = None
continue
# 字符串数据清洗
if isinstance(value, str):
cleaned_row[key] = value.strip()
else:
cleaned_row[key] = value
# 验证必填字段(根据部门调整规则)
if not self._validate_required_fields(cleaned_row, department):
return None
return cleaned_row
except Exception as e:
logger.warning(f"_validate_and_convert 失败: {str(e)}")
return None
def _validate_required_fields(self, row: Dict[str, Any], department: str) -> bool:
"""
验证必填字段
Args:
row: 数据行
department: 业务部门
Returns:
bool: 是否通过验证
"""
# 示例:可根据部门定义不同的必填字段规则
required_fields_map = {
"sales": ["产品", "金额"],
"inventory": ["SKU", "数量"],
"finance": ["交易日期", "金额"],
}
required_fields = required_fields_map.get(department, [])
# 检查必填字段是否存在且非空
for field in required_fields:
if field not in row or row[field] is None:
return False
return True
def _is_duplicate(
self, row: Dict[str, Any], existing_data: List[Dict[str, Any]]
) -> bool:
"""
检查行是否为重复数据
Args:
row: 当前行
existing_data: 已有数据列表
Returns:
bool: 是否为重复
"""
# 简单的重复检查(可扩展为更复杂的逻辑)
for existing_row in existing_data:
if row == existing_row:
return True
return False
"""
数据库处理模块
负责与 MySQL 数据库的交互
"""
import logging
import mysql.connector
from typing import List, Dict, Any
import os
from contextlib import contextmanager
logger = logging.getLogger(__name__)
class DatabaseHandler:
"""数据库处理类"""
def __init__(self):
"""初始化数据库配置"""
self.db_config = {
'host': os.getenv('DB_HOST', 'localhost'),
'user': os.getenv('DB_USER', 'root'),
'password': os.getenv('DB_PASSWORD', ''),
'database': os.getenv('DB_NAME', 'clean_data'),
'port': int(os.getenv('DB_PORT', 3306)),
'autocommit': False,
'connection_timeout': 10
}
@contextmanager
def _get_connection(self):
"""
获取数据库连接的上下文管理器
Yields:
mysql.connector.MySQLConnection: 数据库连接
Raises:
Exception: 连接失败时抛出异常
"""
connection = None
try:
connection = mysql.connector.connect(**self.db_config)
logger.info("数据库连接成功")
yield connection
except mysql.connector.Error as e:
logger.error(f"数据库连接失败: {str(e)}")
raise
finally:
if connection and connection.is_connected():
connection.close()
logger.info("数据库连接已关闭")
async def insert_data(
self,
table_name: str,
data: List[Dict[str, Any]]
) -> int:
"""
将数据插入到指定的表
Args:
table_name: 目标表名
data: 数据列表
Returns:
int: 受影响的行数
Raises:
Exception: 插入失败时抛出异常
"""
if not data:
logger.warning("插入的数据为空")
return 0
try:
with self._get_connection() as connection:
cursor = connection.cursor()
# 获取字段名
columns = list(data[0].keys())
column_names = ', '.join([f'`{col}`' for col in columns])
placeholders = ', '.join(['%s'] * len(columns))
insert_sql = f"""
INSERT INTO `{table_name}` ({column_names})
VALUES ({placeholders})
"""
logger.info(f"准备插入 {len(data)} 行数据到表 {table_name}")
# 批量插入数据
for batch_start in range(0, len(data), 1000):
batch_end = min(batch_start + 1000, len(data))
batch_data = data[batch_start:batch_end]
# 准备批次数据
values_list = []
for row in batch_data:
values = tuple(row.get(col) for col in columns)
values_list.append(values)
# 执行批量插入
cursor.executemany(insert_sql, values_list)
logger.info(f"已插入 {batch_end} / {len(data)} 行数据")
connection.commit()
affected_rows = cursor.rowcount
cursor.close()
logger.info(f"成功插入 {affected_rows} 行数据到 {table_name}")
return affected_rows
except mysql.connector.Error as e:
logger.error(f"MySQL 错误: {str(e)}")
raise
except Exception as e:
logger.error(f"insert_data 失败: {str(e)}")
raise
async def test_connection(self) -> bool:
"""
测试数据库连接
Returns:
bool: 连接是否成功
"""
try:
with self._get_connection() as connection:
cursor = connection.cursor()
cursor.execute("SELECT 1")
cursor.fetchone()
cursor.close()
return True
except Exception as e:
logger.error(f"数据库连接测试失败: {str(e)}")
return False
async def create_table_if_not_exists(
self,
table_name: str,
schema: Dict[str, str]
) -> bool:
"""
如果表不存在则创建表
Args:
table_name: 表名
schema: 表架构定义 {列名: 列定义}
Returns:
bool: 是否创建成功或表已存在
"""
try:
with self._get_connection() as connection:
cursor = connection.cursor()
# 检查表是否存在
cursor.execute(f"""
SELECT TABLE_NAME FROM information_schema.TABLES
WHERE TABLE_SCHEMA = '{self.db_config['database']}'
AND TABLE_NAME = '{table_name}'
""")
if cursor.fetchone():
logger.info(f"表 {table_name} 已存在")
cursor.close()
return True
# 创建表
columns_sql = ', '.join([f'`{col}` {definition}' for col, definition in schema.items()])
create_sql = f"""
CREATE TABLE `{table_name}` (
id INT AUTO_INCREMENT PRIMARY KEY,
{columns_sql},
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
cursor.execute(create_sql)
connection.commit()
cursor.close()
logger.info(f"成功创建表 {table_name}")
return True
except Exception as e:
logger.error(f"create_table_if_not_exists 失败: {str(e)}")
raise
"""
Excel 文件处理模块
负责从 URL 下载和解析 Excel 文件
"""
import aiohttp
import logging
from openpyxl import load_workbook
from io import BytesIO
from typing import List, Dict, Any
import os
import tempfile
logger = logging.getLogger(__name__)
class ExcelHandler:
"""Excel 文件处理类"""
def __init__(self):
self.timeout = aiohttp.ClientTimeout(total=30)
async def fetch_bytes(self, url: str) -> bytes:
"""
从 URL 下载文件,返回原始字节内容(供调用方自行用 pandas 解析)
Args:
url: 文件的网络链接
Returns:
bytes: 文件的原始二进制内容
"""
try:
logger.info(f"开始从 {url} 下载文件")
async with aiohttp.ClientSession(timeout=self.timeout) as session:
async with session.get(url) as response:
if response.status != 200:
raise Exception(f"下载失败,HTTP 状态码: {response.status}")
content = await response.read()
logger.info(f"下载完成,文件大小: {len(content)} 字节")
return content
except Exception as e:
logger.error(f"fetch_bytes 失败: {str(e)}")
raise
async def fetch_and_parse(self, excel_url: str) -> List[Dict[str, Any]]:
"""
从 URL 下载并解析 Excel 文件
Args:
excel_url: Excel 文件的网络链接
Returns:
List[Dict]: 解析后的数据,每行为一个字典
Raises:
Exception: 下载或解析失败时抛出异常
"""
try:
# 1. 下载文件
logger.info(f"开始从 {excel_url} 下载 Excel 文件")
async with aiohttp.ClientSession(timeout=self.timeout) as session:
async with session.get(excel_url) as response:
if response.status != 200:
raise Exception(f"下载失败,HTTP 状态码: {response.status}")
excel_content = await response.read()
logger.info(f"下载完成,文件大小: {len(excel_content)} 字节")
# 2. 解析 Excel
return self._parse_excel_content(excel_content)
except Exception as e:
logger.error(f"fetch_and_parse 失败: {str(e)}")
raise
def _parse_excel_content(self, excel_content: bytes) -> List[Dict[str, Any]]:
"""
解析 Excel 内容
Args:
excel_content: Excel 文件的二进制内容
Returns:
List[Dict]: 解析后的数据
"""
try:
# 使用 BytesIO 从内存中读取
excel_file = BytesIO(excel_content)
workbook = load_workbook(excel_file)
# 获取第一个工作表
worksheet = workbook.active
if not worksheet:
raise Exception("Excel 文件不包含有效的工作表")
# 获取标题行
headers = []
for cell in worksheet[1]:
headers.append(cell.value)
if not headers or all(h is None for h in headers):
raise Exception("Excel 文件不包含有效的标题行")
# 解析数据行
data = []
for row in worksheet.iter_rows(min_row=2, values_only=False):
row_data = {}
for idx, cell in enumerate(row):
if idx < len(headers):
row_data[headers[idx]] = cell.value
# 跳过空行
if any(v is not None for v in row_data.values()):
data.append(row_data)
logger.info(f"成功解析 Excel,共 {len(data)} 行数据")
return data
except Exception as e:
logger.error(f"_parse_excel_content 失败: {str(e)}")
raise
"""
进度管理模块
负责任务进度的记录和查询
"""
import logging
from typing import Dict, Any, Optional
from datetime import datetime, timedelta
import threading
logger = logging.getLogger(__name__)
class ProgressManager:
"""进度管理类"""
def __init__(self, timeout_seconds: int = 3600):
"""
初始化进度管理器
Args:
timeout_seconds: 任务进度的过期时间(秒),默认 1 小时
"""
self.progress_data: Dict[str, Dict[str, Any]] = {}
self.timeout_seconds = timeout_seconds
self.lock = threading.Lock()
def update_progress(
self,
task_id: str,
status: str,
progress: int,
message: str,
processed_count: Optional[int] = None
) -> None:
"""
更新任务进度
Args:
task_id: 任务唯一标识
status: 状态 (queued, processing, completed, failed)
progress: 进度百分比 (0-100)
message: 进度信息
processed_count: 已处理的数据条数,None 表示暂未统计
"""
with self.lock:
self.progress_data[task_id] = {
'task_id': task_id,
'status': status,
'progress': max(0, min(100, progress)),
'message': message,
'processed_count': processed_count,
'timestamp': datetime.now().isoformat(),
'created_at': datetime.now()
}
logger.debug(f"[{task_id}] 进度更新: {status} {progress}% - {message}")
def get_progress(self, task_id: str) -> Optional[Dict[str, Any]]:
"""
获取任务进度
Args:
task_id: 任务唯一标识
Returns:
Optional[Dict]: 进度信息,若任务不存在或已过期返回 None
"""
with self.lock:
if task_id not in self.progress_data:
return None
data = self.progress_data[task_id]
# 检查是否过期
if datetime.now() - data['created_at'] > timedelta(seconds=self.timeout_seconds):
logger.warning(f"任务 {task_id} 已过期,删除记录")
del self.progress_data[task_id]
return None
# 返回字典副本,移除 created_at(内部字段)
result = {k: v for k, v in data.items() if k != 'created_at'}
return result
def get_all_progress(self) -> Dict[str, Dict[str, Any]]:
"""
获取所有任务的进度信息
Returns:
Dict: 所有任务的进度信息
"""
with self.lock:
# 清理过期任务
expired_tasks = []
for task_id, data in self.progress_data.items():
if datetime.now() - data['created_at'] > timedelta(seconds=self.timeout_seconds):
expired_tasks.append(task_id)
for task_id in expired_tasks:
del self.progress_data[task_id]
logger.info(f"清理过期任务: {task_id}")
# 返回所有有效任务的进度
return {
task_id: {k: v for k, v in data.items() if k != 'created_at'}
for task_id, data in self.progress_data.items()
}
def clear_progress(self, task_id: str) -> None:
"""
清除任务进度记录
Args:
task_id: 任务唯一标识
"""
with self.lock:
if task_id in self.progress_data:
del self.progress_data[task_id]
logger.info(f"清除任务 {task_id} 的进度记录")
import pandas as pd
from datetime import datetime
from dateutil.relativedelta import relativedelta
# 文件路径
# TODO: 配置稽查月份(默认1号)
current_date = (datetime.now().replace(day=1) - relativedelta(months=1)).strftime("%Y-%m-01")
y_file = f"/王小卤/风控/代码-新/大日期{current_date}_2.xlsx"
p_file = f"/王小卤/风控/代码-新//线下价盘表2601版.xlsx"
# 保存回原文件(建议先保存为新文件以防覆盖)
output_file = f"/王小卤/风控/代码-新//低价大日期_2.xlsx"
# 读取Y表(稽查结果表)
df_y = pd.read_excel(y_file,sheet_name='合并后', dtype=str) # 先以字符串读入避免格式问题,后续转数字
# 读取P表(价盘表)
df_p = pd.read_excel(p_file, dtype=str)
# 清理列名(去除前后空格等)
df_y.columns = df_y.columns.str.strip()
df_p.columns = df_p.columns.str.strip()
# 将关键字段转换为统一格式(去除空格、统一大小写等,便于匹配)
def clean_str(s):
if pd.isna(s):
return ""
return str(s).strip().upper()
# 对Y表的关键列清洗
df_y['产品系列_clean'] = df_y.iloc[:, 14].apply(clean_str) # O列:产品系列
df_y['产品克重_clean'] = df_y.iloc[:, 16].apply(clean_str) # Q列:产品克重
df_y['渠道类型_clean'] = df_y.iloc[:, 13].apply(clean_str) # N列:渠道类型(稽查源提供)
# 对P表的关键列清洗
df_p['产品系统_clean'] = df_p.iloc[:, 0].apply(clean_str) # A列:产品系统
df_p['产品克重_p_clean'] = df_p.iloc[:, 2].apply(clean_str) # C列:产品克重
df_p['渠道_p_clean'] = df_p.iloc[:, 3].apply(clean_str) # D列:渠道
# 将价格列转为数值类型(注意处理非数字情况)
df_y['产品价格_num'] = pd.to_numeric(df_y.iloc[:, 17], errors='coerce') # R列:产品价格
df_p['低价_num'] = pd.to_numeric(df_p.iloc[:, 4], errors='coerce') # E列:低价
# 构建P表的唯一键(产品系统 + 产品克重 + 渠道)
df_p['match_key'] = df_p['产品系统_clean'] + '|' + df_p['产品克重_p_clean'] + '|' + df_p['渠道_p_clean']
# 构建Y表的匹配键(产品系列 + 产品克重 + 渠道类型)
df_y['match_key'] = df_y['产品系列_clean'] + '|' + df_y['产品克重_clean'] + '|' + df_y['渠道类型_clean']
# 将P表转为字典:key -> 低价
price_map = df_p.set_index('match_key')['低价_num'].to_dict()
# 初始化Y表的目标列(S: 是否低价, T: 破价价差)
df_y['是否低价'] = '正常' # 默认值
df_y['破价价差'] = None
# 遍历Y表每一行进行匹配和判断
for idx, row in df_y.iterrows():
key = row['match_key']
y_price = row['产品价格_num']
p_low_price = price_map.get(key, None)
if pd.notna(y_price) and pd.notna(p_low_price):
if y_price < p_low_price:
df_y.at[idx, '是否低价'] = '低价'
df_y.at[idx, '破价价差'] = round(p_low_price - y_price, 2)
df_y.at[idx, '低价整改状态'] = '未整改'
else:
df_y.at[idx, '是否低价'] = '正常'
df_y.at[idx, '破价价差'] = None
else:
# 无法匹配或价格缺失,保留默认或标记
df_y.at[idx, '是否低价'] = None
df_y.at[idx, '破价价差'] = None
# 只保留原始列(不保留清洗用的辅助列)
original_columns = df_y.columns.tolist()
output_columns = [col for col in original_columns if not col.endswith('_clean') and col not in ['产品价格_num', 'match_key']]
df_y[output_columns].to_excel(output_file, index=False)
print(f"处理完成!结果已保存至:{output_file}")
\ No newline at end of file
差异被折叠。
差异被折叠。
差异被折叠。
fastapi==0.104.1
uvicorn==0.24.0
python-multipart==0.0.6
openpyxl==3.1.5
requests==2.31.0
aiohttp==3.9.1
mysql-connector-python==8.2.0
pydantic==2.4.2
python-dotenv==1.0.0
pandas>=2.0.0
python-dateutil>=2.8.2
\ No newline at end of file
/*
Navicat MySQL Data Transfer
Source Server : t100_dev
Source Server Version : 50744
Source Host : 192.168.100.39:25301
Source Database : market_bi
Target Server Type : MYSQL
Target Server Version : 50744
File Encoding : 65001
Date: 2026-03-09 18:13:42
*/
SET FOREIGN_KEY_CHECKS=0;
-- ----------------------------
-- Table structure for risk_audit_visit
-- ----------------------------
DROP TABLE IF EXISTS `risk_audit_visit`;
CREATE TABLE `risk_audit_visit` (
`rav_id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
`audit_date` date DEFAULT NULL COMMENT '稽查日期',
`source` varchar(20) DEFAULT NULL COMMENT '稽查来源',
`region_name` varchar(20) DEFAULT NULL COMMENT '大区',
`district_name` varchar(20) DEFAULT NULL COMMENT '战区',
`dealer_code` varchar(10) DEFAULT NULL COMMENT '经销商编码',
`dealer_name` varchar(100) DEFAULT NULL COMMENT '经销商名称',
`store_code` varchar(20) DEFAULT NULL COMMENT '门店编码',
`store_name` varchar(100) DEFAULT NULL COMMENT '勤策门店',
`f_emp_no` varchar(20) DEFAULT NULL COMMENT '客户经理工号',
`f_emp_name` varchar(100) DEFAULT NULL COMMENT '客户经理名称',
`qin_ce_type_large` varchar(20) DEFAULT NULL COMMENT '勤策渠道大类',
`jh_channel_type` varchar(20) DEFAULT NULL COMMENT '稽查渠道类型',
`city` varchar(30) DEFAULT NULL COMMENT '城市',
`channel_type` varchar(30) DEFAULT NULL COMMENT '渠道类型(稽查源提供)',
`series` varchar(20) DEFAULT NULL COMMENT '产品系列',
`taste` varchar(20) DEFAULT NULL COMMENT '产品口味',
`weight` varchar(20) DEFAULT NULL COMMENT '产品克重',
`price` decimal(10,2) DEFAULT NULL COMMENT '产品价格',
`low_price` varchar(20) DEFAULT NULL COMMENT '是否低价:低价,正常',
`low_price_diff` decimal(10,2) DEFAULT NULL COMMENT '价差',
`low_price_status` varchar(20) DEFAULT NULL COMMENT '低价整改状态',
`low_price_rectify` varchar(100) DEFAULT NULL COMMENT '低价整改说明',
`production_month` date DEFAULT NULL COMMENT '产品生产月份',
`near_month_num` int(11) DEFAULT NULL COMMENT '临期月份数',
`near_month_status` varchar(20) DEFAULT NULL COMMENT '临期状态',
`fresh_status` varchar(20) DEFAULT NULL COMMENT '新鲜度',
`large_date_status` varchar(20) DEFAULT NULL COMMENT '大日期整改状态',
`large_date_rectify` varchar(100) DEFAULT NULL COMMENT '大日期整改说明',
PRIMARY KEY (`rav_id`),
KEY `audit` (`audit_date`),
KEY `dealer` (`dealer_code`,`dealer_name`),
KEY `product_index` (`series`,`taste`,`weight`),
KEY `regiondistrict` (`region_name`,`district_name`),
KEY `type_small` (`jh_channel_type`),
KEY `weight_index` (`weight`)
) ENGINE=InnoDB AUTO_INCREMENT=493621 DEFAULT CHARSET=utf8mb4 COMMENT='稽查走访价格大日期表';
"""
API 测试脚本
用于快速测试 API 的各个端点
"""
import asyncio
import httpx
import json
from datetime import datetime
BASE_URL = "http://localhost:8000"
class APITester:
"""API 测试类"""
def __init__(self, base_url: str = BASE_URL):
self.base_url = base_url
self.task_id: str = None
async def test_health_check(self):
"""测试健康检查接口"""
print("\n" + "="*50)
print("测试:健康检查接口")
print("="*50)
try:
async with httpx.AsyncClient() as client:
response = await client.get(f"{self.base_url}/api/v1/health")
print(f"状态码: {response.status_code}")
print(f"响应: {json.dumps(response.json(), indent=2, ensure_ascii=False)}")
except Exception as e:
print(f"错误: {str(e)}")
async def test_start_cleaning(self):
"""测试启动清洗任务接口"""
print("\n" + "="*50)
print("测试:启动数据清洗任务")
print("="*50)
payload = {
"excel_url": "https://example.com/test_data.xlsx",
"department": "sales",
"description": "测试数据清洗"
}
try:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/api/v1/clean",
json=payload
)
print(f"状态码: {response.status_code}")
data = response.json()
print(f"响应: {json.dumps(data, indent=2, ensure_ascii=False)}")
if response.status_code == 200:
self.task_id = data.get('task_id')
print(f"\n✓ 任务创建成功,Task ID: {self.task_id}")
except Exception as e:
print(f"错误: {str(e)}")
async def test_get_progress(self):
"""测试获取进度接口"""
if not self.task_id:
print("跳过:需要先创建任务")
return
print("\n" + "="*50)
print("测试:获取数据清洗进度")
print("="*50)
try:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.base_url}/api/v1/progress/{self.task_id}"
)
print(f"状态码: {response.status_code}")
print(f"响应: {json.dumps(response.json(), indent=2, ensure_ascii=False, default=str)}")
except Exception as e:
print(f"错误: {str(e)}")
async def test_get_result(self):
"""测试获取清洗结果接口"""
if not self.task_id:
print("跳过:需要先创建任务")
return
print("\n" + "="*50)
print("测试:获取清洗结果")
print("="*50)
try:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.base_url}/api/v1/result/{self.task_id}"
)
print(f"状态码: {response.status_code}")
data = response.json()
print(f"响应: {json.dumps(data, indent=2, ensure_ascii=False, default=str)}")
except Exception as e:
print(f"错误: {str(e)}")
async def test_save_data(self):
"""测试保存数据接口"""
if not self.task_id:
print("跳过:需要先创建任务")
return
print("\n" + "="*50)
print("测试:保存清洗后的数据")
print("="*50)
payload = {
"task_id": self.task_id,
"table_name": "sales_data"
}
try:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/api/v1/save",
json=payload
)
print(f"状态码: {response.status_code}")
print(f"响应: {json.dumps(response.json(), indent=2, ensure_ascii=False)}")
except Exception as e:
print(f"错误: {str(e)}")
async def run_all_tests(self):
"""运行所有测试"""
print("\n")
print("╔" + "="*48 + "╗")
print("║" + " "*10 + "数据清洗系统 API 测试" + " "*16 + "║")
print("║" + f" "*10 + f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" + " "*15 + "║")
print("╚" + "="*48 + "╝")
await self.test_health_check()
await asyncio.sleep(1)
await self.test_start_cleaning()
await asyncio.sleep(2)
await self.test_get_progress()
await asyncio.sleep(1)
await self.test_get_result()
await asyncio.sleep(1)
print("\n" + "="*50)
print("所有测试完成!")
print("="*50 + "\n")
async def main():
"""主函数"""
tester = APITester()
await tester.run_all_tests()
if __name__ == "__main__":
print("\n提示:确保 FastAPI 服务已在 http://localhost:8000 运行中\n")
asyncio.run(main())
"""Utils 工具模块"""
from utils.response import BizCode, ApiResponse, ok_resp, fail_resp
__all__ = ["BizCode", "ApiResponse", "ok_resp", "fail_resp"]
"""
异常定义模块
"""
class DataCleaningException(Exception):
"""数据清洗异常"""
pass
class DatabaseException(Exception):
"""数据库异常"""
pass
class ExcelParsingException(Exception):
"""Excel 解析异常"""
pass
class ValidationException(Exception):
"""验证异常"""
pass
"""
统一响应格式封装模块
所有接口统一返回: { code: 业务状态码, msg: 消息, data: 数据 }
"""
from enum import IntEnum
from typing import Any
from fastapi.responses import JSONResponse
from pydantic import BaseModel
class BizCode(IntEnum):
"""业务逻辑状态码"""
SUCCESS = 200 # 通用成功
TASK_QUEUED = 201 # 任务已入队(异步场景)
TASK_PROCESSING = 202 # 任务处理中
BAD_REQUEST = 400 # 请求参数错误
NOT_FOUND = 404 # 资源不存在
TASK_FAILED = 422 # 任务执行失败(业务层)
SERVER_ERROR = 500 # 服务器内部错误
DB_ERROR = 501 # 数据库错误
EXCEL_ERROR = 502 # Excel 解析错误
class ApiResponse(BaseModel):
"""统一 API 响应体"""
code: int
msg: str
data: Any = None
def ok_resp(data: Any = None, msg: str = "success") -> JSONResponse:
"""返回成功的 JSONResponse(HTTP 200)"""
return JSONResponse(
status_code=200,
content=ApiResponse(code=BizCode.SUCCESS, msg=msg, data=data).model_dump()
)
def fail_resp(
biz_code: BizCode,
msg: str,
http_status: int = 400,
data: Any = None
) -> JSONResponse:
"""返回失败的 JSONResponse"""
return JSONResponse(
status_code=http_status,
content=ApiResponse(code=biz_code, msg=msg, data=data).model_dump()
)
"""
数据验证模块
"""
import re
import logging
logger = logging.getLogger(__name__)
def validate_excel_url(url: str) -> bool:
"""
验证 Excel URL 的有效性
Args:
url: URL 字符串
Returns:
bool: 是否为有效的 Excel URL
"""
if not url or not isinstance(url, str):
return False
# 检查 URL 格式
url_pattern = r'^https?://.*\.(xlsx|xls|csv)$'
if not re.match(url_pattern, url, re.IGNORECASE):
logger.warning(f"URL 格式无效: {url}")
return False
return True
def sanitize_filename(filename: str) -> str:
"""
清理文件名,移除不安全的字符
Args:
filename: 原始文件名
Returns:
str: 清理后的文件名
"""
# 移除不安全字符
sanitized = re.sub(r'[<>:"/\\|?*]', '', filename)
return sanitized[:255] # 限制长度
def validate_table_name(table_name: str) -> bool:
"""
验证数据库表名的有效性
Args:
table_name: 表名
Returns:
bool: 是否为有效的表名
"""
if not table_name or not isinstance(table_name, str):
return False
# MySQL 表名规则:以字母、数字或下划线开头,不包含特殊字符
table_name_pattern = r'^[a-zA-Z_][a-zA-Z0-9_]{0,63}$'
if not re.match(table_name_pattern, table_name):
logger.warning(f"表名格式无效: {table_name}")
return False
return True
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论