提交 8c70b4da authored 作者: 吕本才's avatar 吕本才

1、定时任务分页从旺店通查询

上级 c8907c66
......@@ -2,11 +2,9 @@ package com.sfa.job.domain.order.dao;
import com.sfa.job.domain.order.entity.CollectOrderLogInfo;
import java.util.Date;
public interface CollectOrderLogInfoDao {
// 插入一条记录
void insert(CollectOrderLogInfo collectOrderLogInfo);
Date selectOrderSyncLatest(Integer syncType);
CollectOrderLogInfo selectOrderSyncLatest(Integer syncType);
}
......@@ -9,7 +9,6 @@ import org.apache.commons.lang3.ObjectUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import java.util.Date;
import java.util.List;
@DS("Bi")
......@@ -25,7 +24,7 @@ public class CollectOrderLogInfoDaoImpl implements CollectOrderLogInfoDao {
}
@Override
public Date selectOrderSyncLatest(Integer syncType) {
public CollectOrderLogInfo selectOrderSyncLatest(Integer syncType) {
// 查询最新发货日期
List<CollectOrderLogInfo> orderList = logInfoMapper.selectList(
new LambdaQueryWrapper<CollectOrderLogInfo>()
......@@ -36,7 +35,7 @@ public class CollectOrderLogInfoDaoImpl implements CollectOrderLogInfoDao {
.last("LIMIT 1"));
if(ObjectUtils.isNotEmpty(orderList)){
// 最新的数据日期
return orderList.get(0).getLatestTime();
return orderList.get(0);
}
return null;
}
......
......@@ -41,56 +41,50 @@ public class CollectOrderLogInfo implements Serializable {
/**
* 采集批次信息,可存储长度不超过 30 个字符,使用 utf8mb4 字符集和 utf8mb4_0900_ai_ci 校对规则
*/
//@TableField("batch_no")
private String batchNo;
/**
* 采集的最新时间,代表上一次采集的结束时间,存储为日期时间类型
*/
//@TableField("latest_time")
private Date latestTime;
// 2025年01月22日17:24:37增加,用于分批次查询
private Integer pageNo;
/**
* 是否删除的标志,'0' 表示未删除,'1' 表示已删除,使用 utf8mb3 字符集和 utf8mb3_general_ci 校对规则
*/
//@TableField("del_flag")
private String delFlag;
/**
* 创建者信息,存储为长度不超过 20 个字符的字符串
*/
//@TableField("create_by")
private String createBy;
/**
* 创建人的用户 ID,存储为长整型
*/
//@TableField("create_user_id")
private Long createUserId;
/**
* 创建时间,存储为日期时间类型,使用数据库的当前时间作为默认值
*/
//@TableField("create_time")
private Date createTime;
/**
* 更新者信息,存储为长度不超过 20 个字符的字符串
*/
//@TableField("update_by")
private String updateBy;
/**
* 修改人的用户 ID,存储为长整型
*/
//@TableField("update_user_id")
private Long updateUserId;
/**
* 更新时间,存储为日期时间类型,更新时自动更新为当前时间
*/
//@TableField("update_time")
private Date updateTime;
private Integer syncType;
......
package com.sfa.job.pojo.request;
import lombok.Data;
/**
* @author lvbencai
* @date 2025-01-22 21:15:39
* @Description: 旺店通查询订单请求参数,可以按照旺店通的来定义
*/
@Data
public class WangdiantongQueryVO {
private String start_time;
private String end_time;
}
package com.sfa.job.pojo.response;
import com.sfa.job.domain.order.entity.FinanceOrder;
import com.sfa.job.domain.order.entity.FinanceOrderDetail;
import lombok.Data;
import java.util.Date;
import java.util.List;
/**
* 财务同步旺店通返回数据Dto
* 财务同步旺店通接口-返回数据Dto
*/
@Data
public class FinanceSyncOrderDetailDto {
private Integer orderCount;
private Integer orderDetailCount;
private List<FinanceOrder> orders;
private List<FinanceOrderDetail> orderDetails;
// private List<FinanceOrder> orders;
// private List<FinanceOrderDetail> orderDetails;
private Date startTime;
private Date endTime;
private Integer syncType;
private String batchNo;
private Integer pageNo;
}
......@@ -11,6 +11,7 @@ import com.sfa.job.domain.order.dao.CollectOrderLogInfoDao;
import com.sfa.job.domain.order.dao.IProductDao;
import com.sfa.job.domain.order.entity.*;
import com.sfa.job.domain.order.mapper.FinanceOrderMapper;
import com.sfa.job.pojo.request.WangdiantongQueryVO;
import com.sfa.job.pojo.response.FinanceSyncOrderDetailDto;
import com.sfa.job.service.order.FinanceBaseProductService;
import com.sfa.job.service.order.FinanceBaseZbjTypeService;
......@@ -26,6 +27,7 @@ import org.springframework.transaction.annotation.Transactional;
import java.math.BigDecimal;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 旺店通订单Service业务层处理
......@@ -64,37 +66,55 @@ public class FinanceOrderSyncServiceImpl extends ServiceImpl<FinanceOrderMapper,
public FinanceSyncOrderDetailDto syncWangdiantongOrder(Date startTime, Date endTime, Integer syncType) {
FinanceSyncOrderDetailDto detailDto = new FinanceSyncOrderDetailDto();
String batchNo = syncType + DateUtils.dateTimeNow() + Thread.currentThread().getId();
AtomicInteger beginPageNo = new AtomicInteger(0);
// 继续下一页的标识 pageNo =-1 标识终止分页 pageNo >0 标识继续分页
Boolean nextPageFlag = true;
try {
// startTime 为空,默认是定时任务调用
if (ObjectUtils.isEmpty(startTime)) {
Date latestTime = null;
// 查询最新的采集时间
Date date = orderLogInfoDao.selectOrderSyncLatest(syncType);
if (ObjectUtils.isNotEmpty(date)) {
startTime = date;
CollectOrderLogInfo collectOrderLogInfo = orderLogInfoDao.selectOrderSyncLatest(syncType);
if (ObjectUtils.isNotEmpty(collectOrderLogInfo)) {
latestTime = collectOrderLogInfo.getLatestTime();
Integer pageNoExist = collectOrderLogInfo.getPageNo();
beginPageNo.set( pageNoExist > 0? collectOrderLogInfo.getPageNo() : 0);
nextPageFlag = pageNoExist > 0;
} else {
// 默认上个月的第一天 00:00:00
startTime = cn.hutool.core.date.DateUtil.beginOfDay(cn.hutool.core.date.DateUtil.beginOfMonth(cn.hutool.core.date.DateUtil.lastMonth()));
}
// 调用查询旺店通接口api 获取最新日期前的一个小时
startTime = DateUtils.addMinutes(startTime, -3);
} else {
// 调用查询旺店通接口api 获取最新日期前的一个小时
Date currentLatest = DateUtils.addHours(new Date(), -2);
if (currentLatest.compareTo(startTime) < 0) {
throw new ServiceException("开始时间不能大于当前时间");
if (nextPageFlag) {
startTime = DateUtils.addMinutes(latestTime, -60);
} else {
// 调用查询旺店通接口api 获取最新日期前的一个小时
startTime = DateUtils.addMinutes(latestTime, -3);
}
}
// 调用查询旺店通接口api 获取最新日期前的一个小时
Date currentLatest = DateUtils.addHours(new Date(), -2);
if (currentLatest.compareTo(startTime) < 0) {
throw new ServiceException("开始时间不能大于当前时间");
}
if (ObjectUtils.isEmpty(endTime)) {
endTime = DateUtils.addMinutes(startTime, 60);
}
log.info("查询订单参数:开始时间{},结束时间{}", DateUtil.formatDateTime(startTime), DateUtil.formatDateTime(endTime));
log.info("查询订单参数:开始时间{},结束时间{},当前页:{}", DateUtil.formatDateTime(startTime), DateUtil.formatDateTime(endTime),beginPageNo.get());
WangdiantongQueryVO wangdiantongQueryVO = new WangdiantongQueryVO();
wangdiantongQueryVO.setStart_time(com.alibaba.fastjson2.util.DateUtils.format(startTime));
wangdiantongQueryVO.setEnd_time(com.alibaba.fastjson2.util.DateUtils.format(endTime));
JSONArray orderAllArray = wangdiantongUtil.queryWithDetail(startTime, endTime);
// 旺店通入参 和其他入参分开传入 旺店通入参bean转map
JSONArray orderAllArray = wangdiantongUtil.queryWithDetail(wangdiantongQueryVO, beginPageNo);
if (ObjectUtils.isEmpty(orderAllArray)) {
throw new ServiceException("旺店通没有查询到订单数据");
}
//转化成orderList
// List<FinanceOrder> orders = orderAllArray.toJavaList(FinanceOrder.class);
List<FinanceOrder> orders = JSONArray.parseArray(orderAllArray.toString(), FinanceOrder.class, JSONReader.Feature.SupportSmartMatch);
// 基础数据 直播间分类数据+成本、规格、口味
......@@ -162,15 +182,15 @@ public class FinanceOrderSyncServiceImpl extends ServiceImpl<FinanceOrderMapper,
orderDetail.setFlavorErp(ObjectUtils.isNotEmpty(baseProductMap.get(orderDetail.getGoodsNo())) ? baseProductMap.get(orderDetail.getGoodsNo()).getFlavor() : "");
orderDetail.setSpecNameErp(ObjectUtils.isNotEmpty(baseProductMap.get(orderDetail.getGoodsNo())) ? baseProductMap.get(orderDetail.getGoodsNo()).getSpec() : "");
orderDetail.setActualCost(ObjectUtils.isNotEmpty(baseProductMap.get(orderDetail.getGoodsNo())) ?
baseProductMap.get(orderDetail.getGoodsNo()).getActualCost():
new BigDecimal(0)
baseProductMap.get(orderDetail.getGoodsNo()).getActualCost() :
new BigDecimal(0)
// (ObjectUtils.isNotEmpty(order.getGoodsTypeCount()) && (order.getGoodsTypeCount().signum()!= 0) ? order.getGoodsCost().divide(order.getGoodsTypeCount()).setScale(4, BigDecimal.ROUND_HALF_UP) : new BigDecimal(0))
);
);
orderDetail.setStandardCost(ObjectUtils.isNotEmpty(baseProductMap.get(orderDetail.getGoodsNo())) ?
baseProductMap.get(orderDetail.getGoodsNo()).getStandardCost() :
new BigDecimal(0)
baseProductMap.get(orderDetail.getGoodsNo()).getStandardCost() :
new BigDecimal(0)
// (ObjectUtils.isNotEmpty(order.getGoodsTypeCount()) && (order.getGoodsTypeCount().signum()!= 0) ? order.getGoodsCost().divide(order.getGoodsTypeCount()).setScale(4, BigDecimal.ROUND_HALF_UP) : new BigDecimal(0))
);
);
}
mergeList.addAll(orderDetails);
}
......@@ -178,13 +198,7 @@ public class FinanceOrderSyncServiceImpl extends ServiceImpl<FinanceOrderMapper,
// 批量插入
detailService.saveOrUpdateBatch(mergeList);
detailDto.setOrders(orders);
detailDto.setOrderCount(orders.size());
detailDto.setOrderDetailCount(mergeList.size());
detailDto.setOrderDetails(mergeList);
detailDto.setStartTime(startTime);
detailDto.setEndTime(endTime);
detailDto.setBatchNo(batchNo);
log.info("插入订单和订单详情完成,批次{}开始时间{},结束时间{},订单数量:{},详情数量:{}", batchNo, DateUtil.formatDateTime(startTime), DateUtil.formatDateTime(endTime), orders.size(), mergeList.size());
......@@ -194,7 +208,16 @@ public class FinanceOrderSyncServiceImpl extends ServiceImpl<FinanceOrderMapper,
collectOrderLogInfo.setOrderDetailCount(mergeList.size());
collectOrderLogInfo.setBatchNo(batchNo);
collectOrderLogInfo.setLatestTime(endTime);
collectOrderLogInfo.setPageNo(beginPageNo.get());
orderLogInfoDao.insert(collectOrderLogInfo);
// 接口测试使用
detailDto.setOrderCount(orders.size());
detailDto.setOrderDetailCount(mergeList.size());
detailDto.setStartTime(finalStartTime);
detailDto.setPageNo(beginPageNo.get());
detailDto.setEndTime(finalEndTime);
detailDto.setBatchNo(batchNo);
} catch (Exception e) {
log.error(e.getMessage(), e);
// 记录错误日志表 collect_error_info
......
package com.sfa.job.util;
import cn.hutool.core.bean.BeanUtil;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.util.DateUtils;
import com.lark.oapi.okhttp.*;
import com.sfa.common.core.exception.ServiceException;
import com.sfa.job.pojo.request.WangdiantongQueryVO;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.stereotype.Component;
import org.springframework.util.DigestUtils;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@Component
......@@ -22,13 +24,17 @@ public class WangdiantongUtil {
/**
* 卖家账号/接口账号/盐/接口名称
*/
public JSONArray queryWithDetail(Date startTime, Date endTime) {
// List<FinanceOrder> orderAllList = new ArrayList<>();
public JSONArray queryWithDetail(WangdiantongQueryVO wangdiantongQuery, AtomicInteger beginPageNo) {
JSONArray allArray = new JSONArray();
try {
int total = 0;
// 最大处理量,超过了,不再查询
int maxDealCount = 500;
int currentDealTotal = 0;
int size = 0;
int page_no = 0;
int pageNo = beginPageNo.get();
int maxPage = 0;
do {
String url = "http://wdt.wangdian.cn/openapi";
String sid = "wxl3";
......@@ -39,15 +45,16 @@ public class WangdiantongUtil {
String v = "1.0";
HashMap<String, Object> params = new HashMap<>();
params.put("start_time", DateUtils.format(startTime));
params.put("end_time", DateUtils.format(endTime));
// params.put("status", "110");
// params.put("time_type", 2);
// params.put("start_time", wangdiantongQuery.getStart_time());
// params.put("end_time", wangdiantongQuery.getEnd_time());
// bean 转map
Map<String, Object> stringObjectMap = BeanUtil.beanToMap(wangdiantongQuery);
params.putAll(stringObjectMap);
List<HashMap<String, Object>> paramsList = Arrays.asList(params);
String signParams = appsecret + "body" + JSONObject.toJSONString(paramsList) + "calc_total1" + "key" + key
+ "method" + method + "page_no"+ page_no + "page_size200" + "salt" + salt + "sid" + sid +
+ "method" + method + "page_no"+ pageNo + "page_size200" + "salt" + salt + "sid" + sid +
"timestamp" + (System.currentTimeMillis() / 1000 - 1325347200) + "v" + v + appsecret;
String sign = DigestUtils.md5DigestAsHex(signParams.toString().getBytes());
log.debug(signParams);
......@@ -62,7 +69,7 @@ public class WangdiantongUtil {
.append("&timestamp=").append(System.currentTimeMillis() / 1000 - 1325347200)
.append("&sign=").append(sign)
.append("&page_size=").append(200)
.append("&page_no=").append(page_no)
.append("&page_no=").append(pageNo)
.append("&calc_total=").append(1);
log.info("查询订单数据url:" + url + "?" + urlParams);
......@@ -74,23 +81,28 @@ public class WangdiantongUtil {
JSONObject responseJson = JSONObject.parseObject(execute.body().string());
JSONObject dataR = responseJson.getJSONObject("data");
JSONObject messageJson = responseJson.getJSONObject("message");
if(ObjectUtils.isNotEmpty(messageJson)){
log.error("访问旺店通接口错误"+messageJson.toString());
throw new ServiceException("访问旺店通接口错误"+messageJson.toString());
if (ObjectUtils.isNotEmpty(messageJson)) {
log.error("访问旺店通接口错误" + messageJson.toString());
throw new ServiceException("访问旺店通接口错误" + messageJson.toString());
}
Integer totalCount = dataR.getInteger("total_count");
JSONArray orderJsonArray = dataR.getJSONArray("order");
total = totalCount;
currentDealTotal = totalCount;
allArray.addAll(orderJsonArray);
size = allArray.size();
page_no ++;
log.info("当前时间段{}-{},查询返回的order条数:{},总条数totalCount:{},page_no:{}", DateUtils.format(startTime),DateUtils.format(endTime), orderJsonArray.size(),totalCount,page_no);
} while ( size < total);
pageNo++;
log.info("当前时间段{}-{},查询返回的order条数:{},总条数totalCount:{},page_no:{}", wangdiantongQuery.getStart_time(), wangdiantongQuery.getEnd_time(), orderJsonArray.size(), totalCount, pageNo);
} while (size < maxDealCount && size < currentDealTotal);
maxPage = (currentDealTotal / 200);
if (pageNo >= maxPage) {
beginPageNo.set(-1);
} else {
beginPageNo.set(pageNo);
}
return allArray;
} catch (Exception e) {
// 记录异常日志
log.error("异常信息:{}"+e.getMessage(),e);
log.error("异常信息:{}" + e.getMessage(), e);
return allArray;
}
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论