提交 a8339062 authored 作者: 吕本才's avatar 吕本才

增加Sharding分表,并且分表配置从nacos读取

上级 16ac5365
......@@ -93,6 +93,13 @@
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>dynamic-datasource-spring-boot-starter</artifactId>
<!-- 排除低版本 snakeyaml -->
<exclusions>
<exclusion>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
......@@ -140,6 +147,24 @@
<groupId>com.taobao</groupId>
<artifactId>taobao-sdk-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-sharding-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-jdbc-core</artifactId>
<version>${shardingsphere.version}</version>
</dependency>
<!-- 显式指定高版本 snakeyaml(确保与 ShardingSphere 兼容) -->
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-test</artifactId>
</dependency>
</dependencies>
<build>
......
package com.sfa.job.config;
import org.apache.shardingsphere.sharding.api.sharding.standard.PreciseShardingValue;
import org.apache.shardingsphere.sharding.api.sharding.standard.RangeShardingValue;
import org.apache.shardingsphere.sharding.api.sharding.standard.StandardShardingAlgorithm;
import java.time.LocalDateTime;
import java.util.*;
/**
* ShardingSphere 5.4.1 按 pay_time 年份分表算法
*/
public class PayTimeYearShardingAlgorithm implements StandardShardingAlgorithm<LocalDateTime> {
/**
* 精确分片(处理 = 条件)
* 例:pay_time = '2023-05-01 10:00:00' → 路由到 t_order_2023
*/
/**
* 范围分片(处理 >、<、BETWEEN 等条件)
* 例:pay_time BETWEEN '2023-01-01' AND '2024-12-31' → 路由到 t_order_2023、t_order_2024
*/
// 初始化方法(可留空)
@Override
public void init(Properties props) {
StandardShardingAlgorithm.super.init(props);
}
// 获取分片算法类型(自定义名称)
@Override
public String getType() {
return "PAY_TIME_YEAR";
}
@Override
public Collection<Object> getTypeAliases() {
return StandardShardingAlgorithm.super.getTypeAliases();
}
@Override
public boolean isDefault() {
return StandardShardingAlgorithm.super.isDefault();
}
@Override
public String doSharding(Collection<String> collection, PreciseShardingValue<LocalDateTime> preciseShardingValue) {
// 获取精确查询的时间值
LocalDateTime payTime = preciseShardingValue.getValue();
int year = payTime.getYear();
// 生成目标表名(逻辑表名_年份)
String targetTable = preciseShardingValue.getLogicTableName() + "_" + year;
// 校验表是否存在
if (collection.contains(targetTable)) {
return targetTable;
}
throw new IllegalArgumentException("未找到匹配的表:" + targetTable);
}
@Override
public Collection<String> doSharding(Collection<String> collection, RangeShardingValue<LocalDateTime> shardingValue) {
Set<String> result = new HashSet<>();
// 遍历所有分片条件(通常只有一个)
// for (RangeShardingValue<LocalDateTime> shardingValue : 、) {
String logicTableName = shardingValue.getLogicTableName();
// 获取范围查询的上下限
LocalDateTime lower = shardingValue.getValueRange().lowerEndpoint();
LocalDateTime upper = shardingValue.getValueRange().upperEndpoint();
int startYear = lower.getYear();
int endYear = upper.getYear();
// 生成所有符合条件的表名
for (int year = startYear; year <= endYear; year++) {
String targetTable = logicTableName + "_" + year;
if (collection.contains(targetTable)) {
result.add(targetTable);
}
}
// }
return result;
}
@Override
public Optional<String> getAlgorithmStructure(String dataNodePrefix, String shardingColumn) {
return StandardShardingAlgorithm.super.getAlgorithmStructure(dataNodePrefix, shardingColumn);
}
}
package com.sfa.job.config;
import com.alibaba.cloud.nacos.NacosConfigAutoConfiguration;
import com.alibaba.cloud.nacos.NacosConfigProperties;
import com.baomidou.dynamic.datasource.provider.AbstractDataSourceProvider;
import com.baomidou.dynamic.datasource.provider.DynamicDataSourceProvider;
import com.baomidou.dynamic.datasource.spring.boot.autoconfigure.DataSourceProperty;
import com.baomidou.dynamic.datasource.spring.boot.autoconfigure.DynamicDataSourceProperties;
import com.sfa.job.util.NacosConfigiServiceUtils;
import org.apache.shardingsphere.driver.api.yaml.YamlShardingSphereDataSourceFactory;
import org.apache.shardingsphere.driver.jdbc.core.driver.ShardingSphereURLProvider;
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Map;
@Configuration
@AutoConfigureAfter(DataSourceAutoConfiguration.class)
@EnableConfigurationProperties({DataSourceProperties.class})
@ConditionalOnClass(NacosConfigAutoConfiguration.class)
public class ShardingConfiguration {
@Resource
private DynamicDataSourceProperties properties;
@Resource
private NacosConfigProperties nacosConfigProperties;
@Value("${spring.shardindsphereUrl}")
private String shardingsphereUrl;
@Bean
@Primary
public DynamicDataSourceProvider dynamicDataSourceProvider() {
NacosConfigiServiceUtils.init(nacosConfigProperties);
Map<String, DataSourceProperty> datasourceMap = properties.getDatasource();
return new AbstractDataSourceProvider() {
@Override
public Map<String, DataSource> loadDataSources() {
Map<String, DataSource> dataSourceMap = createDataSourceMap(datasourceMap);
//SPI机制
Collection<ShardingSphereURLProvider> provider = ShardingSphereServiceLoader.getServiceInstances(ShardingSphereURLProvider.class);
provider.forEach(item -> {
if (item.accept(shardingsphereUrl)) {
try {
DataSource dataSource = YamlShardingSphereDataSourceFactory.createDataSource(item.getContent(shardingsphereUrl,"jdbc:shardingsphere:"));
dataSourceMap.put("sharding", dataSource);
} catch (SQLException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
});
return dataSourceMap;
}
};
}
}
package com.sfa.job.controller.order;
import com.sfa.job.pojo.response.SyncOrderDetailDto;
import com.sfa.job.service.order.IWdtQimenOrderPayTimeSyncService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
/**
* @author : liqiulin
* @date : 2025-07-08 13
* @describe : 订单 - 发货单物流查询
*/
@RestController
@RequestMapping("/order/qimen/")
public class WdtQimenOrderPayTimeController {
@Autowired
private IWdtQimenOrderPayTimeSyncService orderQimenSyncService;
@GetMapping("/syncByPayTime")
public Object sync(@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") Date startTime,
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") Date endTime,
@RequestParam(value = "pageSize", required = false, defaultValue = "200") Long pageSize
) {
SyncOrderDetailDto syncOrderDetailDto = orderQimenSyncService.syncWdtQimenOrder(startTime, endTime, 1, pageSize);
return syncOrderDetailDto;
}
@GetMapping("/payTime/syncByTradeNo")
public Object syncByTradeNo(@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") Date startTime,
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") Date endTime,
@RequestParam(value = "tradeNo", required = false) String tradeNo,
@RequestParam(value = "pageNo", required = false, defaultValue = "1") Long pageNo,
@RequestParam(value = "pageSize", required = false, defaultValue = "200") Long pageSize
) {
SyncOrderDetailDto syncOrderDetailDto = orderQimenSyncService.syncWdtQimenOrderByTradeNo(startTime, endTime, 1, tradeNo, pageNo, pageSize);
return syncOrderDetailDto;
}
@GetMapping("/payTime/test")
public Object syncByPayTime(@RequestParam(value = "year", required = false) Integer year) {
SyncOrderDetailDto syncOrderDetailDto = orderQimenSyncService.test(year);
return syncOrderDetailDto;
}
}
package com.sfa.job.domain.order.dao;
import com.sfa.job.domain.order.entity.WdtQimenOrderDetailPayTime;
import java.util.List;
public interface WdtQimenOrderDetailPayTimeDao {
void saveOrUpdateBatch(List<WdtQimenOrderDetailPayTime> mergeList);
}
package com.sfa.job.domain.order.dao;
import com.sfa.job.domain.order.entity.WdtQimenOrderPayTime;
import java.util.List;
public interface WdtQimenOrderPayTimeDao {
void saveOrUpdateBatch(List<WdtQimenOrderPayTime> mergeList);
}
package com.sfa.job.domain.order.dao.impl;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.sfa.job.domain.order.dao.WdtQimenOrderDetailPayTimeDao;
import com.sfa.job.domain.order.entity.WdtQimenOrderDetailPayTime;
import com.sfa.job.domain.order.mapper.WdtQimenOrderDetailPayTimeMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import java.util.List;
@DS("sharding")
@Repository
public class WdtQimenOrderDetailPayTimeDaoImpl implements WdtQimenOrderDetailPayTimeDao {
private static final int BATCH_SIZE = 1000;
@Autowired
private WdtQimenOrderDetailPayTimeMapper wdtQimenOrderDetailMapper;
@Override
public void saveOrUpdateBatch(List<WdtQimenOrderDetailPayTime> mergeList) {
for (int i = 0; i < mergeList.size(); i += BATCH_SIZE) {
int toIndex = Math.min(i + BATCH_SIZE, mergeList.size());
List<WdtQimenOrderDetailPayTime> batchLists = mergeList.subList(i, toIndex);
wdtQimenOrderDetailMapper.saveOrUpdateBatch(batchLists);
}
}
}
package com.sfa.job.domain.order.dao.impl;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.sfa.job.domain.order.dao.WdtQimenOrderPayTimeDao;
import com.sfa.job.domain.order.entity.WdtQimenOrderPayTime;
import com.sfa.job.domain.order.mapper.WdtQimenOrderPayTimeMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import java.util.List;
@DS("sharding")
@Repository
public class WdtQimenOrderPayTimeDaoImpl implements WdtQimenOrderPayTimeDao {
private static final int BATCH_SIZE = 1000;
@Autowired
private WdtQimenOrderPayTimeMapper wdtQimenOrderPayTimeMapper;
@DS("sharding")
@Override
public void saveOrUpdateBatch(List<WdtQimenOrderPayTime> mergeList) {
for (int i = 0; i < mergeList.size(); i += BATCH_SIZE) {
int toIndex = Math.min(i + BATCH_SIZE, mergeList.size());
List<WdtQimenOrderPayTime> batchLists = mergeList.subList(i, toIndex);
wdtQimenOrderPayTimeMapper.saveOrUpdateBatch(batchLists);
}
}
}
package com.sfa.job.domain.order.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.sfa.job.domain.order.entity.WdtQimenOrderDetailPayTime;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
@Mapper
public interface WdtQimenOrderDetailPayTimeMapper extends BaseMapper<WdtQimenOrderDetailPayTime> {
void saveOrUpdateBatch(@Param(value = "list") List<WdtQimenOrderDetailPayTime> mergeList);
}
package com.sfa.job.domain.order.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.sfa.job.domain.order.entity.WdtQimenOrderPayTime;
import org.apache.ibatis.annotations.Mapper;
import java.util.List;
@Mapper
public interface WdtQimenOrderPayTimeMapper extends BaseMapper<WdtQimenOrderPayTime> {
void saveOrUpdateBatch(List<WdtQimenOrderPayTime> batchLists);
}
package com.sfa.job.pojo.order.response;
import lombok.Data;
@Data
public class WdtQimenOrderPayTimeBodyDto {
private static final long serialVersionUID = 4632382792982436956L;
private WdtQimenOrderPayTimeResponseDto response;
private String message;
private Integer status;
}
package com.sfa.job.pojo.order.response;
import com.sfa.job.domain.order.entity.WdtQimenOrderPayTime;
import lombok.Data;
import java.util.List;
@Data
public class WdtQimenOrderPayTimeDto {
private static final long serialVersionUID = 4632382792982436956L;
private List<WdtQimenOrderPayTime> order;
private Integer totalCount;
}
package com.sfa.job.pojo.order.response;
import lombok.Data;
@Data
public class WdtQimenOrderPayTimeResponseDto {
private static final long serialVersionUID = 4632382792982436956L;
private WdtQimenOrderPayTimeDto data;
private String message;
private Integer status;
}
package com.sfa.job.service.order;
import com.sfa.job.pojo.response.SyncOrderDetailDto;
import java.util.Date;
public interface IWdtQimenOrderPayTimeSyncService {
SyncOrderDetailDto syncWdtQimenOrder(Date startTime, Date endTime, Integer syncType, Long pageSize);
SyncOrderDetailDto syncWdtQimenOrderByTradeNo(Date startTime, Date endTime, Integer syncType, String tradeNo, Long pageNo, Long pageSize);
SyncOrderDetailDto test(int year);
}
package com.sfa.job.spi;
import com.alibaba.cloud.nacos.NacosConfigProperties;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.shaded.com.google.common.base.Preconditions;
import com.sfa.common.core.utils.StringUtils;
import com.sfa.job.util.NacosConfigiServiceUtils;
import lombok.SneakyThrows;
import org.apache.shardingsphere.driver.jdbc.core.driver.ShardingSphereURLProvider;
import java.nio.charset.StandardCharsets;
public final class ShardingJdbcNacosURLProvider implements ShardingSphereURLProvider {
private static final String CLASSPATH_TYPE = "nacos:";
private static final String URL_PREFIX = "jdbc:shardingsphere:";
@Override
public boolean accept(String url) {
return StringUtils.isNotEmpty(url) && url.contains(CLASSPATH_TYPE);
}
@SneakyThrows
@Override
public byte[] getContent(String url, String urlPrefix) {
String dataId = url.substring(CLASSPATH_TYPE.length() + URL_PREFIX.length());
Preconditions.checkArgument(!dataId.isEmpty(), "Nacos namespace is required in ShardingSphere dataId.");
NacosConfigProperties nacosConfigProperties = NacosConfigiServiceUtils.getNacosConfigProperties();
ConfigService configService = nacosConfigProperties.configServiceInstance();
String content = configService.getConfig(dataId, nacosConfigProperties.getGroup(), nacosConfigProperties.getTimeout());
return content.getBytes(StandardCharsets.UTF_8);
}
}
package com.sfa.job.util;
import com.alibaba.cloud.nacos.NacosConfigProperties;
public class NacosConfigiServiceUtils {
private static NacosConfigProperties nacosConfigProperties;
public static void init(NacosConfigProperties properties){
nacosConfigProperties = properties;
}
public static NacosConfigProperties getNacosConfigProperties() {
return nacosConfigProperties;
}
}
package com.sfa.job.config;
import cn.hutool.core.date.LocalDateTimeUtil;
import com.sfa.job.domain.order.dao.WdtQimenOrderPayTimeDao;
import com.sfa.job.domain.order.entity.WdtQimenOrderPayTime;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.test.context.junit4.SpringRunner;
import java.math.BigDecimal;
import java.util.Calendar;
import java.util.Date;
import java.util.Random;
/**
* ShardingSphere分表功能测试类
* 用于验证YearBasedShardingAlgorithm分片算法是否正确工作
*/
@ExtendWith(SpringExtension.class)
@SpringBootTest
// 关键注解:指定profile为test
@ActiveProfiles("test")
@Slf4j
public class ShardingTableTest {
@Autowired
private WdtQimenOrderPayTimeDao wdtQimenOrderPayTimeDao;
/**
* 测试插入不同年份的数据是否能正确路由到对应的分表
*/
@Test
public void testShardingInsert() {
log.info("开始测试分表插入功能...");
// 创建并插入2023年的订单
insertOrderWithSpecificYear(2023);
// 创建并插入2024年的订单
insertOrderWithSpecificYear(2024);
// 创建并插入2025年的订单
insertOrderWithSpecificYear(2025);
System.out.println("分表插入测试完成,请检查各年份表数据!");
}
/**
* 创建并插入指定年份的订单数据
* @param year 年份
*/
private void insertOrderWithSpecificYear(int year) {
WdtQimenOrderPayTime order = new WdtQimenOrderPayTime();
// 设置基本字段
order.setTradeId(System.currentTimeMillis());
order.setTradeNo("TEST_" + year + "_" + new Random().nextInt(10000));
order.setShopId(1);
order.setShopNo("SHOP001");
order.setPlatformId(11);
order.setShopName("测试店铺");
order.setTradeStatus(10); // 未付款状态
order.setTradeType(1); // 网店销售
// order.setPayTime(LocalDateTimeUtil.now());
order.setCreated(String.valueOf(System.currentTimeMillis()));
order.setCreatedDate(new Date());
order.setDelFlag(0);
order.setCreateTime(new Date());
order.setUpdateTime(new Date());
order.setDeliveryTerm(1);
order.setRefundStatus(1);
order.setFenxiaoType(11);
order.setBadReason(1);
order.setGoodsTypeCount(new BigDecimal(1));
order.setGoodsCount(new BigDecimal(1));
order.setGoodsAmount(new BigDecimal(1));
order.setPostAmount(new BigDecimal(1));
order.setOtherAmount(new BigDecimal(1));
order.setDiscount(new BigDecimal(1));
order.setDiscount(new BigDecimal(1));
order.setReceivable(new BigDecimal(1));
order.setCodAmount(new BigDecimal(1));
order.setExtCodFee(new BigDecimal(1));
order.setGoodsCost(new BigDecimal(1));
order.setPostCost(new BigDecimal(1));
order.setWeight(new BigDecimal(1));
order.setProfit(new BigDecimal(1));
order.setTax(new BigDecimal(1));
order.setTaxRate(new BigDecimal(1));
order.setCommission(new BigDecimal(1));
order.setInvoiceType(1);
order.setTradeFrom(1);
order.setRawGoodsCount(new BigDecimal(1));
order.setRawGoodsTypeCount(1);
order.setCurrency("CNY");
order.setCreated("1");
order.setCreatedDate(new Date());
order.setTradeMask(1);
order.setPackageCost(new BigDecimal(1));
order.setPaid(new BigDecimal(1));
order.setLargeType(1);
order.setGiftMask(1);
order.setOtherCost(new BigDecimal(1));
order.setIsSealed(true);
order.setCustomerType(1);
order.setDelFlag(1);
order.setUpdateTime(new Date());
order.setBatchNo("TEST_" + year);
// 设置不同年份的支付时间,这是分片的关键字段
Calendar calendar = Calendar.getInstance();
calendar.set(year, 0, 1, 0, 0, 0);
order.setPayTime(LocalDateTimeUtil.of(calendar.getTime()));
log.info("插入" + year + "年订单,payTime: " + order.getPayTime());
// 由于是测试环境,这里可能需要特殊处理批量保存
// 注意:实际环境中应该使用业务层方法进行保存
wdtQimenOrderPayTimeDao.saveOrUpdateBatch(java.util.Collections.singletonList(order));
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论