提交 323ceb13 authored 作者: lidongxu's avatar lidongxu

新增:获取飞书订阅号链接接口

上级 0fe2f336
...@@ -56,60 +56,89 @@ class DatabaseHandler: ...@@ -56,60 +56,89 @@ class DatabaseHandler:
data: List[Dict[str, Any]] data: List[Dict[str, Any]]
) -> int: ) -> int:
""" """
将数据插入到指定的表 将数据 upsert 到指定的表(首次写入为 INSERT,命中唯一键时覆盖更新)。
MySQL ON DUPLICATE KEY UPDATE 行为说明:
- 新行插入:rowcount += 1
- 已有行被更新:rowcount += 2
- 数据与现有行完全一致(无变化):rowcount += 0
Args: Args:
table_name: 目标表名 table_name: 目标表名
data: 数据列表 data: 数据列表
Returns: Returns:
int: 受影响的行数 tuple[int, int]: (submitted_rows, raw_affected)
- submitted_rows: 提交处理的总行数(去重后传入的行数,即预估真实入库行数)
- raw_affected: MySQL 累计 rowcount 原始值(insert=+1, update=+2, 无变化=+0)
Raises: Raises:
Exception: 插入失败时抛出异常 Exception: 插入失败时抛出异常
""" """
if not data: if not data:
logger.warning("插入的数据为空") logger.warning("插入的数据为空")
return 0 return 0
try: try:
with self._get_connection() as connection: with self._get_connection() as connection:
cursor = connection.cursor() cursor = connection.cursor()
# 获取字段名 # 获取字段名
columns = list(data[0].keys()) columns = list(data[0].keys())
column_names = ', '.join([f'`{col}`' for col in columns]) column_names = ', '.join([f'`{col}`' for col in columns])
placeholders = ', '.join(['%s'] * len(columns)) placeholders = ', '.join(['%s'] * len(columns))
insert_sql = f""" # ON DUPLICATE KEY UPDATE:命中唯一键时覆盖所有字段值
update_clause = ', '.join([f'`{col}` = VALUES(`{col}`)' for col in columns])
upsert_sql = f"""
INSERT INTO `{table_name}` ({column_names}) INSERT INTO `{table_name}` ({column_names})
VALUES ({placeholders}) VALUES ({placeholders})
ON DUPLICATE KEY UPDATE {update_clause}
""" """
logger.info(f"准备插入 {len(data)} 行数据到表 {table_name}") logger.info(f"准备 upsert {len(data)} 行数据到表 {table_name}")
# 批量插入数据 # 批量 upsert
# ON DUPLICATE KEY UPDATE 的 rowcount 含义:insert=1,update=2,无变化=0
# 真实入库(新增)行数 = rowcount // 1 的部分;用 lastrowid 变化量计算最准,
# 但批量时不可用。此处用最简单可靠的方案:
# raw_affected 累加 rowcount 原始值,
# insert_rows = raw_affected 中 rowcount==1 的部分(需逐条统计)
# 由于 executemany 只返回总 rowcount,改为逐条 execute 才能精确区分。
# 权衡性能与精度,保留 executemany 批量写入,同时返回原始 raw_affected,
# 并在 log 中说明换算公式,调用方按需解读。
raw_affected = 0
for batch_start in range(0, len(data), 1000): for batch_start in range(0, len(data), 1000):
batch_end = min(batch_start + 1000, len(data)) batch_end = min(batch_start + 1000, len(data))
batch_data = data[batch_start:batch_end] batch_data = data[batch_start:batch_end]
# 准备批次数据 values_list = [
values_list = [] tuple(row.get(col) for col in columns)
for row in batch_data: for row in batch_data
values = tuple(row.get(col) for col in columns) ]
values_list.append(values)
cursor.executemany(upsert_sql, values_list)
# 执行批量插入 raw_affected += cursor.rowcount
cursor.executemany(insert_sql, values_list) logger.info(f"已处理 {batch_end} / {len(data)} 行数据")
logger.info(f"已插入 {batch_end} / {len(data)} 行数据")
connection.commit() connection.commit()
affected_rows = cursor.rowcount
# 查询本次 upsert 后表中实际存在的行数(含历史数据),
# 以及本批次真实写入行数:
# insert_rows ≈ raw_affected 中 rowcount=1 的行(executemany 无法细分)
# upsert_rows = raw_affected(去掉无变化的0,insert贡献1,update贡献2)
# 用 (raw_affected + 批次总行数) / 3 可估算 update 行数,但不精确。
# 最可靠的语义:把传入行数作为"提交处理行数",raw_affected 作为辅助信息。
submitted_rows = len(data)
cursor.close() cursor.close()
logger.info(f"成功插入 {affected_rows} 行数据到 {table_name}") logger.info(
return affected_rows f"upsert 完成:提交 {submitted_rows} 行,"
f"raw_affected={raw_affected}(insert+1 / update+2 / 无变化+0)"
)
# 返回 (submitted_rows, raw_affected) 元组,由调用方决定展示哪个
return submitted_rows, raw_affected
except mysql.connector.Error as e: except mysql.connector.Error as e:
logger.error(f"MySQL 错误: {str(e)}") logger.error(f"MySQL 错误: {str(e)}")
raise raise
......
...@@ -9,6 +9,7 @@ import logging ...@@ -9,6 +9,7 @@ import logging
import uuid import uuid
import asyncio import asyncio
import math import math
import random
import pandas as pd import pandas as pd
from io import BytesIO from io import BytesIO
from datetime import datetime from datetime import datetime
...@@ -186,6 +187,8 @@ class DataCleaningService: ...@@ -186,6 +187,8 @@ class DataCleaningService:
self.db_handler = DatabaseHandler() self.db_handler = DatabaseHandler()
# 存储已清洗的数据(内存中,可扩展为 Redis) # 存储已清洗的数据(内存中,可扩展为 Redis)
self.cleaned_data_cache: Dict[str, Any] = {} self.cleaned_data_cache: Dict[str, Any] = {}
# 正在执行保存操作的 task_id 集合,用于防止并发重复写入
self._saving_tasks: set = set()
def _evict_expired_cache(self): def _evict_expired_cache(self):
"""清除超过 TTL 的 cache 条目,在写入和读取时调用""" """清除超过 TTL 的 cache 条目,在写入和读取时调用"""
...@@ -365,6 +368,13 @@ class DataCleaningService: ...@@ -365,6 +368,13 @@ class DataCleaningService:
Returns: Returns:
包含保存结果的字典 包含保存结果的字典
""" """
# ── 并发防重:同一 task_id 只允许一个 save 请求在执行 ──────────
# asyncio 是单线程协程模型,此处 check-and-add 之间不会发生协程切换,
# 因此无需额外加锁,天然原子。
if task_id in self._saving_tasks:
raise DatabaseException(f"任务 {task_id} 正在保存中,请勿重复提交")
self._saving_tasks.add(task_id)
try: try:
logger.info(f"[{task_id}] 开始保存数据到数据库") logger.info(f"[{task_id}] 开始保存数据到数据库")
...@@ -391,12 +401,15 @@ class DataCleaningService: ...@@ -391,12 +401,15 @@ class DataCleaningService:
] ]
# 保存到数据库 # 保存到数据库
affected_rows = await self.db_handler.insert_data( submitted_rows, raw_affected = await self.db_handler.insert_data(
target_table, target_table,
cleaned_data cleaned_data
) )
logger.info(f"[{task_id}] 成功保存 {affected_rows} 行数据到 {target_table}") logger.info(
f"[{task_id}] 成功保存到 {target_table},"
f"提交行数={submitted_rows},raw_affected={raw_affected}"
)
# 清理缓存 # 清理缓存
del self.cleaned_data_cache[task_id] del self.cleaned_data_cache[task_id]
...@@ -405,7 +418,7 @@ class DataCleaningService: ...@@ -405,7 +418,7 @@ class DataCleaningService:
'task_id': task_id, 'task_id': task_id,
'status': 'saved', 'status': 'saved',
'message': '数据已成功保存到数据库', 'message': '数据已成功保存到数据库',
'affected_rows': affected_rows 'affected_rows': submitted_rows, # 真实提交(去重后)行数,与预览页 total_rows 一致
} }
except DatabaseException as e: except DatabaseException as e:
...@@ -414,6 +427,9 @@ class DataCleaningService: ...@@ -414,6 +427,9 @@ class DataCleaningService:
except Exception as e: except Exception as e:
logger.error(f"[{task_id}] 保存数据时出错: {str(e)}", exc_info=True) logger.error(f"[{task_id}] 保存数据时出错: {str(e)}", exc_info=True)
raise DatabaseException(f"保存失败: {str(e)}") raise DatabaseException(f"保存失败: {str(e)}")
finally:
# 无论成功或失败,都释放保存锁,避免任务永远卡在「保存中」状态
self._saving_tasks.discard(task_id)
async def clean_fengkong_data( async def clean_fengkong_data(
self, self,
...@@ -712,13 +728,41 @@ async def get_cleaning_result(task_id: str): ...@@ -712,13 +728,41 @@ async def get_cleaning_result(task_id: str):
return fail_resp(BizCode.NOT_FOUND, "清洗数据不存在或已过期(超过30分钟)", http_status=404) return fail_resp(BizCode.NOT_FOUND, "清洗数据不存在或已过期(超过30分钟)", http_status=404)
cached = service.cleaned_data_cache[task_id] cached = service.cleaned_data_cache[task_id]
raw_data = cached['data']
# 对 risk_audit_visit 先做列名映射 + 类型转换,再基于唯一键去重,
# 得到真正会写入数据库的行数(用于 total_rows);预览数据保留中文列名
target_table = cached.get('table_name', '')
if target_table == "risk_audit_visit":
mapped = [
_coerce_fengkong_row(
{FENGKONG_COLUMN_MAP[k]: v for k, v in row.items() if k in FENGKONG_COLUMN_MAP}
)
for row in raw_data
]
# 按唯一键去重(保留最后一条,与 ON DUPLICATE KEY UPDATE 行为一致)
_BIZ_KEYS = ("audit_date", "source", "store_name", "channel_type", "series", "taste", "weight")
dedup: dict = {}
for i, row in enumerate(mapped):
key = tuple(row.get(k) for k in _BIZ_KEYS)
dedup[key] = i # 只记录原始行索引,用于去重后从 raw_data 取中文行
total_rows = len(dedup)
# 用去重后的索引对应回 raw_data(中文列名),保证预览列始终为中文
dedup_raw = [raw_data[i] for i in dedup.values()]
else:
dedup_raw = raw_data
total_rows = len(raw_data)
# 随机抽取最多 20 行用于前端预览(中文列名)
sample_rows = random.sample(dedup_raw, min(20, len(dedup_raw)))
return ok_resp( return ok_resp(
data={ data={
"task_id": task_id, "task_id": task_id,
"status": "ready_to_save", "status": "ready_to_save",
"data_preview": cached['data'][:10], "data_preview": sample_rows,
"total_rows": cached['row_count'], "total_rows": total_rows, # 去重后的预估入库行数
"raw_rows": cached['row_count'], # 清洗前宽表原始行数,供参考
"department": cached['department'] "department": cached['department']
}, },
msg="数据清洗完成,可进行保存" msg="数据清洗完成,可进行保存"
...@@ -752,6 +796,30 @@ async def save_cleaned_data(request: SavingRequest): ...@@ -752,6 +796,30 @@ async def save_cleaned_data(request: SavingRequest):
return fail_resp(BizCode.SERVER_ERROR, f"保存失败: {str(e)}", http_status=500) return fail_resp(BizCode.SERVER_ERROR, f"保存失败: {str(e)}", http_status=500)
@app.get("/api/v1/url-link")
async def get_url_link():
"""
从数据库 fortune-hub.transfer_url 表读取跳转链接
Returns: { code, msg, data: { url_link: str } }
"""
try:
with service.db_handler._get_connection() as conn:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT `url_link` FROM `fortune-hub`.`transfer_url` LIMIT 1")
row = cursor.fetchone()
cursor.close()
if not row or not row.get("url_link"):
return fail_resp(BizCode.NOT_FOUND, "未查询到跳转链接数据", http_status=404)
return ok_resp(data={"url_link": row["url_link"]})
except Exception as e:
logger.error(f"获取跳转链接失败: {str(e)}")
return fail_resp(BizCode.DB_ERROR, f"获取跳转链接失败: {str(e)}", http_status=500)
@app.get("/api/v1/health") @app.get("/api/v1/health")
async def health_check(): async def health_check():
"""健康检查接口""" """健康检查接口"""
......
...@@ -50,6 +50,9 @@ CREATE TABLE `risk_audit_visit` ( ...@@ -50,6 +50,9 @@ CREATE TABLE `risk_audit_visit` (
`large_date_status` varchar(20) DEFAULT NULL COMMENT '大日期整改状态', `large_date_status` varchar(20) DEFAULT NULL COMMENT '大日期整改状态',
`large_date_rectify` varchar(100) DEFAULT NULL COMMENT '大日期整改说明', `large_date_rectify` varchar(100) DEFAULT NULL COMMENT '大日期整改说明',
PRIMARY KEY (`rav_id`), PRIMARY KEY (`rav_id`),
-- 业务唯一键:同一稽查日期 + 来源 + 门店名称 + 渠道类型(稽查源提供)+ 产品系列 + 口味 + 克重 = 唯一一条记录
-- ON DUPLICATE KEY UPDATE 依赖此唯一键判断是执行 INSERT 还是覆盖 UPDATE
UNIQUE KEY `uk_biz` (`audit_date`,`source`,`store_name`(100),`channel_type`,`series`,`taste`,`weight`),
KEY `audit` (`audit_date`), KEY `audit` (`audit_date`),
KEY `dealer` (`dealer_code`,`dealer_name`), KEY `dealer` (`dealer_code`,`dealer_name`),
KEY `product_index` (`series`,`taste`,`weight`), KEY `product_index` (`series`,`taste`,`weight`),
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论