Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
P
promotion-service
项目
项目
详情
活动
周期分析
仓库
仓库
文件
提交
分支
标签
贡献者
图表
比较
统计图
议题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
日程
统计图
Wiki
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
图像
聊天
创建新问题
作业
提交
问题看板
Open sidebar
promotion
promotion-service
Commits
73825b3b
提交
73825b3b
authored
11月 13, 2025
作者:
吕本才
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat(activity): 实现随机任务开始发送 WebSocket通知功能
上级
1a630300
显示空白字符变更
内嵌
并排
正在显示
5 个修改的文件
包含
79 行增加
和
110 行删除
+79
-110
TemporaryActivityTaskClockMessageType.java
...enums/activity/TemporaryActivityTaskClockMessageType.java
+11
-16
MessageBean.java
.../java/com/wangxiaolu/promotion/websocket/MessageBean.java
+0
-27
TemporaryActivityTaskClockSocketHandler.java
...on/websocket/TemporaryActivityTaskClockSocketHandler.java
+19
-4
MessageBean.java
.../com/wangxiaolu/promotion/websocket/pojo/MessageBean.java
+21
-0
ActivityStautsHandler.java
...angxiaolu/promotion/xxljobtask/ActivityStautsHandler.java
+28
-63
没有找到文件。
src/main/java/com/wangxiaolu/promotion/
websocket/TemporaryActivityTaskClock
Type.java
→
src/main/java/com/wangxiaolu/promotion/
enums/activity/TemporaryActivityTaskClockMessage
Type.java
浏览文件 @
73825b3b
package
com
.
wangxiaolu
.
promotion
.
websocket
;
package
com
.
wangxiaolu
.
promotion
.
enums
.
activity
;
public
enum
TemporaryActivityTaskClockType
{
RANDOM_TASK_TART
(
"randomTaskStart"
,
"随机任务开始"
),
import
lombok.AllArgsConstructor
;
import
lombok.Data
;
import
lombok.Getter
;
@Getter
@AllArgsConstructor
public
enum
TemporaryActivityTaskClockMessageType
{
RANDOM_TASK_START
(
"randomTaskStart"
,
"随机任务开始通知"
),
RANDOM_TASK_CLOCK
(
"randomTaskClock"
,
"随机任务打卡"
),
RANDOM_TASK_CLOCK
(
"randomTaskClock"
,
"随机任务打卡"
),
POS_CLOCK
(
"posClock"
,
"Pos机任务打卡"
),
POS_CLOCK
(
"posClock"
,
"Pos机任务打卡"
),
TASK_CLOCK_QUERY
(
"taskClockQuery"
,
"任务信息查询"
),
TASK_CLOCK_QUERY
(
"taskClockQuery"
,
"任务信息查询"
),
...
@@ -13,19 +21,6 @@ public enum TemporaryActivityTaskClockType {
...
@@ -13,19 +21,6 @@ public enum TemporaryActivityTaskClockType {
private
String
name
;
private
String
name
;
TemporaryActivityTaskClockType
(
String
type
,
String
name
)
{
this
.
type
=
type
;
this
.
name
=
name
;
}
public
String
getName
(
String
type
)
{
for
(
TemporaryActivityTaskClockType
value
:
values
())
{
if
(
value
.
type
.
equals
(
type
))
{
return
value
.
name
;
}
}
return
""
;
}
}
}
src/main/java/com/wangxiaolu/promotion/websocket/MessageBean.java
deleted
100644 → 0
浏览文件 @
1a630300
package
com
.
wangxiaolu
.
promotion
.
websocket
;
import
com.fasterxml.jackson.annotation.JsonProperty
;
public
class
MessageBean
{
@JsonProperty
(
"msgType"
)
private
String
type
;
@JsonProperty
(
"data"
)
private
String
content
;
// Getter and Setter
public
String
getType
()
{
return
type
;
}
public
void
setType
(
String
type
)
{
this
.
type
=
type
;
}
public
String
getContent
()
{
return
content
;
}
public
void
setContent
(
String
content
)
{
this
.
content
=
content
;
}
}
src/main/java/com/wangxiaolu/promotion/websocket/TemporaryActivityTaskClockSocketHandler.java
浏览文件 @
73825b3b
package
com
.
wangxiaolu
.
promotion
.
websocket
;
package
com
.
wangxiaolu
.
promotion
.
websocket
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
com.wangxiaolu.promotion.websocket.pojo.MessageBean
;
import
lombok.extern.slf4j.Slf4j
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.stereotype.Component
;
import
org.springframework.stereotype.Component
;
import
org.springframework.web.socket.CloseStatus
;
import
org.springframework.web.socket.CloseStatus
;
import
org.springframework.web.socket.TextMessage
;
import
org.springframework.web.socket.TextMessage
;
import
org.springframework.web.socket.WebSocketMessage
;
import
org.springframework.web.socket.WebSocketSession
;
import
org.springframework.web.socket.WebSocketSession
;
import
org.springframework.web.socket.handler.TextWebSocketHandler
;
import
org.springframework.web.socket.handler.TextWebSocketHandler
;
import
java.io.IOException
;
import
java.io.IOException
;
...
@@ -67,10 +69,10 @@ public class TemporaryActivityTaskClockSocketHandler extends TextWebSocketHandle
...
@@ -67,10 +69,10 @@ public class TemporaryActivityTaskClockSocketHandler extends TextWebSocketHandle
MessageBean
messageBean
=
objectMapper
.
readValue
(
payload
,
MessageBean
.
class
);
MessageBean
messageBean
=
objectMapper
.
readValue
(
payload
,
MessageBean
.
class
);
handleMessageType
(
messageBean
);
handleMessageType
(
messageBean
);
// 业务消息处理(示例:广播消息)
// 业务消息处理(示例:广播消息)
broadcast
(
"用户["
+
userId
+
"]:"
+
payload
);
//
broadcast("用户[" + userId + "]:" + payload);
}
}
private
void
handleMessageType
(
MessageBean
messageBean
)
{
private
void
handleMessageType
(
MessageBean
messageBean
)
{
switch
(
messageBean
.
getType
())
{
switch
(
messageBean
.
get
Msg
Type
())
{
case
"ACTIVITY_START"
:
case
"ACTIVITY_START"
:
// startActivity(messageBean.getContent());
// startActivity(messageBean.getContent());
break
;
break
;
...
@@ -78,7 +80,7 @@ public class TemporaryActivityTaskClockSocketHandler extends TextWebSocketHandle
...
@@ -78,7 +80,7 @@ public class TemporaryActivityTaskClockSocketHandler extends TextWebSocketHandle
// endActivity(messageBean.getContent());
// endActivity(messageBean.getContent());
break
;
break
;
default
:
default
:
log
.
warn
(
"未知消息类型: {}"
,
messageBean
.
getType
());
log
.
warn
(
"未知消息类型: {}"
,
messageBean
.
get
Msg
Type
());
}
}
}
}
/**
/**
...
@@ -128,10 +130,23 @@ public class TemporaryActivityTaskClockSocketHandler extends TextWebSocketHandle
...
@@ -128,10 +130,23 @@ public class TemporaryActivityTaskClockSocketHandler extends TextWebSocketHandle
/**
/**
* 向指定用户发送消息
* 向指定用户发送消息
*/
*/
public
void
sendToUser
(
String
userId
,
String
message
)
throws
IOException
{
public
Integer
sendToUser
(
String
userId
,
String
message
)
throws
IOException
{
WebSocketSession
session
=
sessions
.
get
(
userId
);
WebSocketSession
session
=
sessions
.
get
(
userId
);
if
(
session
!=
null
&&
session
.
isOpen
())
{
if
(
session
!=
null
&&
session
.
isOpen
())
{
// String jsonMessage = objectMapper.writeValueAsString(message);
session
.
sendMessage
(
new
TextMessage
(
message
));
session
.
sendMessage
(
new
TextMessage
(
message
));
return
1
;
}
}
return
0
;
}
public
Integer
sendToUser
(
String
userId
,
MessageBean
messageBean
)
throws
IOException
{
WebSocketSession
session
=
sessions
.
get
(
userId
);
if
(
session
!=
null
&&
session
.
isOpen
())
{
String
jsonMessage
=
objectMapper
.
writeValueAsString
(
messageBean
);
session
.
sendMessage
(
new
TextMessage
(
jsonMessage
));
return
1
;
}
return
0
;
}
}
}
}
src/main/java/com/wangxiaolu/promotion/websocket/pojo/MessageBean.java
0 → 100644
浏览文件 @
73825b3b
package
com
.
wangxiaolu
.
promotion
.
websocket
.
pojo
;
import
cn.hutool.json.JSONObject
;
import
com.fasterxml.jackson.annotation.JsonProperty
;
import
lombok.Data
;
/**
* 消息bean
* @author : lvbencai
* @date : 2025/11/13
*/
@Data
public
class
MessageBean
{
@JsonProperty
(
"msgType"
)
private
String
msgType
;
@JsonProperty
(
"data"
)
private
Object
data
;
}
src/main/java/com/wangxiaolu/promotion/xxljobtask/ActivityStautsHandler.java
浏览文件 @
73825b3b
...
@@ -6,18 +6,23 @@ import cn.hutool.core.date.DateUtil;
...
@@ -6,18 +6,23 @@ import cn.hutool.core.date.DateUtil;
import
com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper
;
import
com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper
;
import
com.wangxiaolu.promotion.domain.activity.mapper.entity.TemporaryActivityTaskClockDO
;
import
com.wangxiaolu.promotion.domain.activity.mapper.entity.TemporaryActivityTaskClockDO
;
import
com.wangxiaolu.promotion.enums.activity.ActivityClockTaskStatus
;
import
com.wangxiaolu.promotion.enums.activity.ActivityClockTaskStatus
;
import
com.wangxiaolu.promotion.enums.activity.ActivityPhotoType
;
import
com.wangxiaolu.promotion.enums.activity.TemporaryActivityTaskClockMessageType
;
import
com.wangxiaolu.promotion.pojo.user.dto.WxTemporaryInfoDto
;
import
com.wangxiaolu.promotion.pojo.user.dto.WxTemporaryInfoDto
;
import
com.wangxiaolu.promotion.service.activity.manage.ActivityTypeQueryService
;
import
com.wangxiaolu.promotion.service.activity.manage.ActivityTypeQueryService
;
import
com.wangxiaolu.promotion.service.activity.manage.EmployeeCoreTemporaryInfoService
;
import
com.wangxiaolu.promotion.service.activity.manage.EmployeeCoreTemporaryInfoService
;
import
com.wangxiaolu.promotion.service.activity.temporary.TemporaryActivityTaskClockService
;
import
com.wangxiaolu.promotion.service.activity.temporary.TemporaryActivityTaskClockService
;
import
com.wangxiaolu.promotion.service.activity.temporary.impl.SubscribeMessageService
;
import
com.wangxiaolu.promotion.service.activity.temporary.impl.SubscribeMessageService
;
import
com.wangxiaolu.promotion.utils.DateUtils
;
import
com.wangxiaolu.promotion.utils.DateUtils
;
import
com.wangxiaolu.promotion.websocket.TemporaryActivityTaskClockSocketHandler
;
import
com.wangxiaolu.promotion.websocket.pojo.MessageBean
;
import
com.xxl.job.core.handler.annotation.XxlJob
;
import
com.xxl.job.core.handler.annotation.XxlJob
;
import
lombok.extern.slf4j.Slf4j
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.poi.ss.formula.functions.T
;
import
org.apache.poi.ss.formula.functions.T
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Component
;
import
org.springframework.stereotype.Component
;
import
java.io.IOException
;
import
java.text.SimpleDateFormat
;
import
java.text.SimpleDateFormat
;
import
java.util.Date
;
import
java.util.Date
;
import
java.util.HashMap
;
import
java.util.HashMap
;
...
@@ -39,7 +44,8 @@ public class ActivityStautsHandler {
...
@@ -39,7 +44,8 @@ public class ActivityStautsHandler {
private
TemporaryActivityTaskClockService
taskClockService
;
private
TemporaryActivityTaskClockService
taskClockService
;
@Autowired
@Autowired
private
SubscribeMessageService
messageService
;
private
SubscribeMessageService
messageService
;
@Autowired
private
TemporaryActivityTaskClockSocketHandler
taskClockSocketHandler
;
@Autowired
@Autowired
private
EmployeeCoreTemporaryInfoService
temporaryInfoService
;
private
EmployeeCoreTemporaryInfoService
temporaryInfoService
;
...
@@ -47,20 +53,33 @@ public class ActivityStautsHandler {
...
@@ -47,20 +53,33 @@ public class ActivityStautsHandler {
public
void
sendSubscribeMessage
()
{
public
void
sendSubscribeMessage
()
{
DateTime
dateTime
=
DateUtil
.
offsetMinute
(
new
Date
(),
10
);
DateTime
dateTime
=
DateUtil
.
offsetMinute
(
new
Date
(),
10
);
// 1. 查询订单信息和用户订阅记录
// 1. 查询订单信息和用户订阅记录
List
<
TemporaryActivityTaskClockDO
>
list
=
taskClockService
.
list
(
new
LambdaQueryWrapper
<
TemporaryActivityTaskClockDO
>()
List
<
TemporaryActivityTaskClockDO
>
list
=
taskClockService
.
list
(
new
LambdaQueryWrapper
<
TemporaryActivityTaskClockDO
>()
.
eq
(
TemporaryActivityTaskClockDO:
:
getTaskStatus
,
ActivityClockTaskStatus
.
TO_BE_START
)
.
eq
(
TemporaryActivityTaskClockDO:
:
getTaskStatus
,
ActivityClockTaskStatus
.
TO_BE_START
.
getType
()
)
// .eq(TemporaryActivityTaskClockDO::getSubscribeStatus, 1
)
.
eq
(
TemporaryActivityTaskClockDO:
:
getTaskType
,
ActivityPhotoType
.
RANDOM_TASK
.
getType
()
)
// .
eq(TemporaryActivityTaskClockDO::getIsSendSubscribe, 0
)
// .
le(TemporaryActivityTaskClockDO::getRequiredlockTime, dateTime
)
.
le
(
TemporaryActivityTaskClockDO:
:
getRequiredlockTime
,
dateTime
)
// .ge(TemporaryActivityTaskClockDO::getRequiredlockTime, new Date()
)
.
ge
(
TemporaryActivityTaskClockDO:
:
getRequiredlockTime
,
new
Date
())
);
);
for
(
TemporaryActivityTaskClockDO
taskClockDO
:
list
)
{
for
(
TemporaryActivityTaskClockDO
taskClockDO
:
list
)
{
if
(
taskClockDO
.
getSubscribeStatus
()
!=
1
)
{
if
(
taskClockDO
.
getSubscribeStatus
()
!=
1
)
{
// 发送websocket 通知用户
try
{
// taskClockSocketHandler.handleMessage(taskClockDO.getTemporaryId().toString(), "订阅通知发送成功");
// taskClockSocketHandler.sendToUser(taskClockDO.getTemporaryId().toString(), "订阅通知发送成功");
MessageBean
msgBean
=
new
MessageBean
();
msgBean
.
setMsgType
(
TemporaryActivityTaskClockMessageType
.
RANDOM_TASK_START
.
getType
());
msgBean
.
setData
(
taskClockDO
);
taskClockSocketHandler
.
sendToUser
(
taskClockDO
.
getTemporaryId
().
toString
(),
msgBean
);
log
.
info
(
"用户{}未订阅活动打卡通知"
,
taskClockDO
.
getTemporaryName
());
log
.
info
(
"用户{}未订阅活动打卡通知"
,
taskClockDO
.
getTemporaryName
());
taskClockDO
.
setIsSendSubscribe
(
1
);
// taskClockDO.setIsSendSubscribe(1);
taskClockService
.
updateById
(
taskClockDO
);
// taskClockService.updateById(taskClockDO);
}
catch
(
IOException
e
)
{
// 添加异常处理
log
.
error
(
"发送Websocket订阅通知失败"
,
e
);
throw
new
RuntimeException
(
e
);
}
continue
;
continue
;
}
}
...
@@ -100,60 +119,6 @@ public class ActivityStautsHandler {
...
@@ -100,60 +119,6 @@ public class ActivityStautsHandler {
}
}
@XxlJob
(
"sendWebsocketMessage"
)
public
void
sendWebsocketMessage
()
{
DateTime
dateTime
=
DateUtil
.
offsetMinute
(
new
Date
(),
10
);
/**
* 10分钟后结束打卡的,(已经进行中5分钟了)
* 发送websocket消息
*/
// 1. 查询订单信息和用户订阅记录
List
<
TemporaryActivityTaskClockDO
>
list
=
taskClockService
.
list
(
new
LambdaQueryWrapper
<
TemporaryActivityTaskClockDO
>()
.
eq
(
TemporaryActivityTaskClockDO:
:
getTaskStatus
,
ActivityClockTaskStatus
.
STARTING
)
.
eq
(
TemporaryActivityTaskClockDO:
:
getSubscribeStatus
,
1
)
.
eq
(
TemporaryActivityTaskClockDO:
:
getIsSendSubscribe
,
0
)
.
le
(
TemporaryActivityTaskClockDO:
:
getRequiredlockTime
,
dateTime
)
.
ge
(
TemporaryActivityTaskClockDO:
:
getRequiredlockTime
,
new
Date
()));
for
(
TemporaryActivityTaskClockDO
taskClockDO
:
list
)
{
// 2. 构建模板数据(根据小程序订阅模板的字段定义)
Map
<
String
,
String
>
data
=
new
HashMap
<>();
// 模板中的字段1
data
.
put
(
"thing1"
,
"随机任务已开启"
);
// 模板中的字段2
data
.
put
(
"time2"
,
new
SimpleDateFormat
(
"yyyy-MM-dd HH:mm"
).
format
(
new
Date
()));
// 模板中的字段3
data
.
put
(
"requiredlockTime"
,
DateUtil
.
format
(
taskClockDO
.
getRequiredlockTime
(),
"yyyy-MM-dd HH:mm"
));
// 3. 发送通知
// 获取openid
WxTemporaryInfoDto
wxTemporaryInfoDto
=
temporaryInfoService
.
selectById
(
taskClockDO
.
getTemporaryId
());
String
openid
=
wxTemporaryInfoDto
.
getOpenId
();
// 3. 发送通知
String
templateId
=
SUBSCRIBE_MESSAGE_TEMPLATE_ID
;
boolean
success
=
messageService
.
sendSubscribeMessage
(
openid
,
templateId
,
// 跳转页面
"/pages/order/detail?"
,
data
);
if
(
success
)
{
log
.
info
(
"促销员{}订阅通知发送成功"
,
taskClockDO
.
getTemporaryName
());
taskClockDO
.
setSubscribeTime
(
new
Date
());
}
// 修改状态 待开始 -> 进行中
taskClockDO
.
setTaskStatus
(
ActivityClockTaskStatus
.
STARTING
.
getType
());
taskClockService
.
updateById
(
taskClockDO
);
}
}
}
}
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论