提交 ea335516 authored 作者: lidongxu's avatar lidongxu

包一层文件夹

上级 fa6c4c14
差异被折叠。
......@@ -7,8 +7,9 @@ import os
from typing import Optional
from dotenv import load_dotenv
# 加载 .env 文件
load_dotenv()
# 加载 .env 文件(使用绝对路径,避免因工作目录不同导致加载失败)
_env_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), '.env')
load_dotenv(dotenv_path=_env_path)
class Config:
"""应用配置类"""
......
......@@ -6,8 +6,8 @@
import logging
import mysql.connector
from typing import List, Dict, Any
import os
from contextlib import contextmanager
from config import config
logger = logging.getLogger(__name__)
......@@ -17,11 +17,11 @@ class DatabaseHandler:
def __init__(self):
"""初始化数据库配置"""
self.db_config = {
'host': os.getenv('DB_HOST', 'localhost'),
'user': os.getenv('DB_USER', 'root'),
'password': os.getenv('DB_PASSWORD', ''),
'database': os.getenv('DB_NAME', 'clean_data'),
'port': int(os.getenv('DB_PORT', 3306)),
'host': config.DB_HOST,
'user': config.DB_USER,
'password': config.DB_PASSWORD,
'database': config.DB_NAME,
'port': config.DB_PORT,
'autocommit': False,
'connection_timeout': 10
}
......
import sys
import os
import pandas as pd
import mysql.connector
# 兼容直接运行(python core_py/1低价计算.py)和作为模块被 index.py 导入两种场景
if __name__ == "__main__":
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from config import config
def load_price_map_from_db() -> dict:
"""
从 market_bi.bi_price_xx 读取线下价盘数据,
返回匹配字典: { "产品系列|产品克重|渠道(大写)" -> low_price(float) }
"""
conn = mysql.connector.connect(
host=config.DB_HOST,
port=config.DB_PORT,
user=config.DB_USER,
password=config.DB_PASSWORD,
database="market_bi",
charset="utf8mb4",
)
try:
sql = "SELECT bi_product, pro_weight, channel_type, low_price FROM bi_price_xx"
df_p = pd.read_sql(sql, conn)
finally:
conn.close()
def _clean(s):
return "" if pd.isna(s) else str(s).strip().upper()
df_p["match_key"] = (
df_p["bi_product"].apply(_clean) + "|"
+ df_p["pro_weight"].apply(_clean) + "|"
+ df_p["channel_type"].apply(_clean)
)
df_p["low_price"] = pd.to_numeric(df_p["low_price"], errors="coerce")
return df_p.set_index("match_key")["low_price"].to_dict()
def transform(df_y: pd.DataFrame) -> pd.DataFrame:
"""
供 API 调用的低价计算入口。
接收大宽表 DataFrame(STANDARD_COLUMNS 列名),从数据库 market_bi.bi_price_xx
读取价盘基准,计算并回填以下三列后返回:
- 是否低价:低价 / 正常 / None(无法匹配或缺价格)
- 破价价差:低价时的价差(decimal),正常/无法匹配时为 None
- 低价整改状态:低价时置为 '未整改',其余不改动
Args:
df_y: 大宽表 DataFrame,必须包含列:
产品系列、产品克重、渠道类型(稽查源提供)、产品价格
Returns:
pd.DataFrame: 更新了低价相关字段的 DataFrame(不修改原对象)
"""
df = df_y.copy()
price_map = load_price_map_from_db()
def _clean(s):
return "" if pd.isna(s) else str(s).strip().upper()
# 构建匹配键和数值价格(辅助列,最终会删除)
df["_series_c"] = df["产品系列"].apply(_clean)
df["_weight_c"] = df["产品克重"].apply(_clean)
df["_channel_c"] = df["渠道类型(稽查源提供)"].apply(_clean)
df["_match_key"] = df["_series_c"] + "|" + df["_weight_c"] + "|" + df["_channel_c"]
df["_price_num"] = pd.to_numeric(df["产品价格"], errors="coerce")
df["_p_low_price"] = df["_match_key"].map(price_map)
# 重置低价相关列
df["是否低价"] = None
df["破价价差"] = None
# 条件向量化计算,避免逐行循环
has_both = df["_price_num"].notna() & df["_p_low_price"].notna()
cond_low = has_both & (df["_price_num"] < df["_p_low_price"])
cond_normal = has_both & ~cond_low
df.loc[cond_low, "是否低价"] = "低价"
df.loc[cond_low, "破价价差"] = (
df.loc[cond_low, "_p_low_price"] - df.loc[cond_low, "_price_num"]
).round(2)
df["低价整改状态"] = df["低价整改状态"].astype(object)
df.loc[cond_low, "低价整改状态"] = "未整改"
df.loc[cond_normal, "是否低价"] = "正常"
df.loc[cond_normal, "破价价差"] = None
# 清除辅助列
df.drop(
columns=["_series_c", "_weight_c", "_channel_c", "_match_key", "_price_num", "_p_low_price"],
inplace=True,
)
return df
if __name__ == "__main__":
# ── 独立测试模式:读本地 Excel 大宽表 → 计算低价 → 输出结果文件 ──
from datetime import datetime
from dateutil.relativedelta import relativedelta
current_date = (datetime.now().replace(day=1) - relativedelta(months=1)).strftime("%Y-%m-01")
y_file = f"/王小卤/风控/代码-新/大日期{current_date}_2.xlsx"
output_file = f"/王小卤/风控/代码-新/低价大日期_2.xlsx"
print("正在读取稽查结果大宽表...")
df_y = pd.read_excel(y_file, sheet_name="合并后", dtype=str)
df_y.columns = df_y.columns.str.strip()
print("正在从数据库读取价盘并计算低价...")
df_result = transform(df_y)
df_result.to_excel(output_file, index=False)
print(f"✅ 处理完成!结果已保存至:{output_file}")
差异被折叠。
import pandas as pd
from datetime import datetime
from dateutil.relativedelta import relativedelta
# 文件路径
# TODO: 配置稽查月份(默认1号)
current_date = (datetime.now().replace(day=1) - relativedelta(months=1)).strftime("%Y-%m-01")
y_file = f"/王小卤/风控/代码-新/大日期{current_date}_2.xlsx"
p_file = f"/王小卤/风控/代码-新//线下价盘表2601版.xlsx"
# 保存回原文件(建议先保存为新文件以防覆盖)
output_file = f"/王小卤/风控/代码-新//低价大日期_2.xlsx"
# 读取Y表(稽查结果表)
df_y = pd.read_excel(y_file,sheet_name='合并后', dtype=str) # 先以字符串读入避免格式问题,后续转数字
# 读取P表(价盘表)
df_p = pd.read_excel(p_file, dtype=str)
# 清理列名(去除前后空格等)
df_y.columns = df_y.columns.str.strip()
df_p.columns = df_p.columns.str.strip()
# 将关键字段转换为统一格式(去除空格、统一大小写等,便于匹配)
def clean_str(s):
if pd.isna(s):
return ""
return str(s).strip().upper()
# 对Y表的关键列清洗
df_y['产品系列_clean'] = df_y.iloc[:, 14].apply(clean_str) # O列:产品系列
df_y['产品克重_clean'] = df_y.iloc[:, 16].apply(clean_str) # Q列:产品克重
df_y['渠道类型_clean'] = df_y.iloc[:, 13].apply(clean_str) # N列:渠道类型(稽查源提供)
# 对P表的关键列清洗
df_p['产品系统_clean'] = df_p.iloc[:, 0].apply(clean_str) # A列:产品系统
df_p['产品克重_p_clean'] = df_p.iloc[:, 2].apply(clean_str) # C列:产品克重
df_p['渠道_p_clean'] = df_p.iloc[:, 3].apply(clean_str) # D列:渠道
# 将价格列转为数值类型(注意处理非数字情况)
df_y['产品价格_num'] = pd.to_numeric(df_y.iloc[:, 17], errors='coerce') # R列:产品价格
df_p['低价_num'] = pd.to_numeric(df_p.iloc[:, 4], errors='coerce') # E列:低价
# 构建P表的唯一键(产品系统 + 产品克重 + 渠道)
df_p['match_key'] = df_p['产品系统_clean'] + '|' + df_p['产品克重_p_clean'] + '|' + df_p['渠道_p_clean']
# 构建Y表的匹配键(产品系列 + 产品克重 + 渠道类型)
df_y['match_key'] = df_y['产品系列_clean'] + '|' + df_y['产品克重_clean'] + '|' + df_y['渠道类型_clean']
# 将P表转为字典:key -> 低价
price_map = df_p.set_index('match_key')['低价_num'].to_dict()
# 初始化Y表的目标列(S: 是否低价, T: 破价价差)
df_y['是否低价'] = '正常' # 默认值
df_y['破价价差'] = None
# 遍历Y表每一行进行匹配和判断
for idx, row in df_y.iterrows():
key = row['match_key']
y_price = row['产品价格_num']
p_low_price = price_map.get(key, None)
if pd.notna(y_price) and pd.notna(p_low_price):
if y_price < p_low_price:
df_y.at[idx, '是否低价'] = '低价'
df_y.at[idx, '破价价差'] = round(p_low_price - y_price, 2)
df_y.at[idx, '低价整改状态'] = '未整改'
else:
df_y.at[idx, '是否低价'] = '正常'
df_y.at[idx, '破价价差'] = None
else:
# 无法匹配或价格缺失,保留默认或标记
df_y.at[idx, '是否低价'] = None
df_y.at[idx, '破价价差'] = None
# 只保留原始列(不保留清洗用的辅助列)
original_columns = df_y.columns.tolist()
output_columns = [col for col in original_columns if not col.endswith('_clean') and col not in ['产品价格_num', 'match_key']]
df_y[output_columns].to_excel(output_file, index=False)
print(f"处理完成!结果已保存至:{output_file}")
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论