1. 项目背景与需求分析
在企业日常运营中,及时准确的消息推送是保障信息传递效率的关键。基于企业微信的发消息功能,我们需要构建一个灵活可靠的消息推送系统,支持三种发送规则:
立即发送:创建规则后立即发送消息定时发送:在指定时间点发送消息周期发送:按照cron表达式定期发送消息
目标受众可以是企业微信中的部门、标签或指定用户。系统需要处理的核心挑战包括:
规则变更时的状态一致性发场景下的任务执行消息发送的可靠性和可追溯性系统扩展性和维护性
2. 核心设计思想:事件溯源与任务解耦
2.1 什么是事件溯源?
事件溯源是一种架构模式,其核心思想是:不存储当前状态,而是存储导致状态变化的一系列事件。与传统CRUD系统中直接更新当前状态不同,ES通过持久化事件流来重建状态。
2.2 传统设计的挑战
在传统的消息推送系统中,通常会将规则与执行紧密耦合,这会导致:
状态管理复杂:规则变更时需要更新已生成的任务状态并发冲突:需要复杂的分布式锁机制来保证数据一致性审计困难:难以追溯完整的规则变更和执行历史
2.3 我们的解决方案
借鉴**事件溯源(Event Sourcing)**架构思想,通过引入任务层实现规则与执行的解耦:在我们的设计中,wecom_msg_send_task表实际上承担了”事件存储”的部分角色:
-- 每个任务代表一个"消息发送事件"
INSERT INTO wecom_msg_send_task (
rule_id, type, planned_execute_time, status, created_at
) VALUES (?, ?, ?, 1, NOW()); -- 状态1:待执行,相当于事件已存储但未处理
规则创建/更新 → 生成任务 → 执行任务 → 记录日志
核心设计原则:
任务只增不改:避免并发写冲突。这一块的设计是重点,借鉴事件溯源思想中的只增不改的想法,规则的变更,通过新增任务实现,避免了复杂的更新逻辑规则变更通过新任务体现:乐观并发控制:降低分布式锁的复杂度
3. 系统架构设计
3.1 核心组件架构
┌─────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ 规则管理器 │ │ 规则调度器 │ │ 任务调度器 │
│ │ │ │ │ │
│ - 创建/更新规则 │───▶│ - 扫描周期规则 │───▶│ - 扫描待执行任务 │
│ - 生成立即任务 │ │ - 生成周期任务 │ │ - 并发执行任务 │
│ - 生成定时任务 │ │ - 排除时间检查 │ │ - 乐观锁控制 │
└─────────────────┘ └──────────────────┘ └──────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ 规则表 │ │ 任务表 │ │ 发送日志表 │
└─────────────────┘ └──────────────────┘ └──────────────────┘
3.2 数据库表结构概览
-- 规则表:存储消息发送规则配置
CREATE TABLE "public"."wecom_msg_rule" (
"id" bigserial PRIMARY KEY,
"name" varchar(255) COLLATE "pg_catalog"."default" NOT NULL,
"type" int2 NOT NULL DEFAULT nextval('wecom_msg_rule_id_seq'::regclass),
"to_party" varchar(1000) COLLATE "pg_catalog"."default" NOT NULL,
"to_tag" varchar(255) COLLATE "pg_catalog"."default" NOT NULL,
"to_user" varchar(20000) COLLATE "pg_catalog"."default" NOT NULL,
"content_id" int8 NOT NULL,
"send_time" timestamp(6) NOT NULL,
"cron_expression" json NOT NULL,
"status" int2 NOT NULL,
"operator_id" int8 NOT NULL,
"created_at" timestamp(6) NOT NULL,
"updated_at" timestamp(6) NOT NULL,
"expected_receiver_count" int8 NOT NULL,
"operator_name" varchar(255) COLLATE "pg_catalog"."default" NOT NULL
)
;
COMMENT ON COLUMN "public"."wecom_msg_rule"."id" IS '主键id';
COMMENT ON COLUMN "public"."wecom_msg_rule"."name" IS '规则名';
COMMENT ON COLUMN "public"."wecom_msg_rule"."type" IS '发送类型1:立即发送 2:定时发送 3:周期发送';
COMMENT ON COLUMN "public"."wecom_msg_rule"."to_party" IS '目标部门ID列表,使用”|“分隔';
COMMENT ON COLUMN "public"."wecom_msg_rule"."to_tag" IS '目标标签ID列表,使用”|“分隔';
COMMENT ON COLUMN "public"."wecom_msg_rule"."to_user" IS '目标用户ID列表,使用”|“分隔';
COMMENT ON COLUMN "public"."wecom_msg_rule"."content_id" IS '消息内容ID';
COMMENT ON COLUMN "public"."wecom_msg_rule"."send_time" IS '定时发送时间';
COMMENT ON COLUMN "public"."wecom_msg_rule"."cron_expression" IS '周期发送规则';
COMMENT ON COLUMN "public"."wecom_msg_rule"."status" IS '状态 1:启用 2:禁用 3:已推送';
COMMENT ON COLUMN "public"."wecom_msg_rule"."operator_id" IS '操作者id';
COMMENT ON COLUMN "public"."wecom_msg_rule"."created_at" IS '创建时间';
COMMENT ON COLUMN "public"."wecom_msg_rule"."updated_at" IS '更新时间';
COMMENT ON COLUMN "public"."wecom_msg_rule"."expected_receiver_count" IS '预计推送人数';
COMMENT ON COLUMN "public"."wecom_msg_rule"."operator_name" IS '操作者姓名';
-- 任务表:规则与执行之间的缓冲层
CREATE TABLE "public"."wecom_msg_send_log" (
"id" bigserial PRIMARY KEY,
"rule_id" int8 NOT NULL,
"content_id" int8 NOT NULL,
"wecom_msg_id" varchar COLLATE "pg_catalog"."default" NOT NULL,
"send_time" timestamp(6) NOT NULL,
"send_type" int2 NOT NULL,
"total_users" int8 NOT NULL,
"read_count" int8 NOT NULL,
"status" int2 NOT NULL,
"created_at" timestamp(6) NOT NULL,
"updated_at" timestamp(6) NOT NULL,
"rule_name" varchar(255) COLLATE "pg_catalog"."default" NOT NULL DEFAULT ''::character varying,
"operator_name" varchar(255) COLLATE "pg_catalog"."default" NOT NULL DEFAULT ''::character varying,
"last_send_time" timestamp(6) NOT NULL,
"to_party" varchar(2000) COLLATE "pg_catalog"."default" NOT NULL,
"to_tag" varchar(255) COLLATE "pg_catalog"."default" NOT NULL,
"to_user" varchar(20000) COLLATE "pg_catalog"."default" NOT NULL,
"cron_expression" json NOT NULL,
"content_type" varchar(20) COLLATE "pg_catalog"."default" NOT NULL,
"content" json NOT NULL
)
;
COMMENT ON COLUMN "public"."wecom_msg_send_log"."id" IS '主键id';
COMMENT ON COLUMN "public"."wecom_msg_send_log"."rule_id" IS '推送消息的规则id';
COMMENT ON COLUMN "public"."wecom_msg_send_log"."content_id" IS '推送消息内容id';
COMMENT ON COLUMN "public"."wecom_msg_send_log"."wecom_msg_id" IS '企业微信返回的消息 ID(用于回调关联)';
COMMENT ON COLUMN "public"."wecom_msg_send_log"."send_time" IS '实际发送时间';
COMMENT ON COLUMN "public"."wecom_msg_send_log"."send_type" IS '发送类型1:立即发送 2:定时发送 3:周期发送';
COMMENT ON COLUMN "public"."wecom_msg_send_log"."total_users" IS '总推送人数';
COMMENT ON COLUMN "public"."wecom_msg_send_log"."read_count" IS '已读人数';
COMMENT ON COLUMN "public"."wecom_msg_send_log"."status" IS '发送状态 1.已推送 2:推送撤回 3:已取消';
COMMENT ON COLUMN "public"."wecom_msg_send_log"."created_at" IS '创建时间';
COMMENT ON COLUMN "public"."wecom_msg_send_log"."updated_at" IS '更新时间';
COMMENT ON COLUMN "public"."wecom_msg_send_log"."rule_name" IS '冗余存储规则名称';
COMMENT ON COLUMN "public"."wecom_msg_send_log"."operator_name" IS '操作人';
COMMENT ON COLUMN "public"."wecom_msg_send_log"."last_send_time" IS '消息最新发送时间,在未读再次发送时更新';
COMMENT ON COLUMN "public"."wecom_msg_send_log"."to_party" IS '目标部门ID列表快照';
COMMENT ON COLUMN "public"."wecom_msg_send_log"."to_tag" IS '目标标签ID列表快照';
COMMENT ON COLUMN "public"."wecom_msg_send_log"."to_user" IS '目标用户ID列表快照';
COMMENT ON COLUMN "public"."wecom_msg_send_log"."cron_expression" IS '周期发送规则快照';
COMMENT ON COLUMN "public"."wecom_msg_send_log"."content_type" IS '内容类型快照';
COMMENT ON COLUMN "public"."wecom_msg_send_log"."content" IS '内容快照';
-- 发送日志表:记录执行结果
CREATE TABLE "public"."wecom_msg_send_task" (
"id" bigserial PRIMARY KEY,
"rule_id" int8 NOT NULL,
"type" int2 NOT NULL,
"planned_execute_time" timestamp(6) NOT NULL,
"actual_execute_time" timestamp(6) NOT NULL,
"status" int2 NOT NULL,
"send_log_id" int8 NOT NULL,
"error_message" text COLLATE "pg_catalog"."default" NOT NULL,
"created_at" timestamp(6) NOT NULL,
"updated_at" timestamp(6) NOT NULL
)
;
COMMENT ON COLUMN "public"."wecom_msg_send_task"."id" IS '主键id';
COMMENT ON COLUMN "public"."wecom_msg_send_task"."rule_id" IS '关联规则id';
COMMENT ON COLUMN "public"."wecom_msg_send_task"."type" IS '任务类型 1:立即任务 2:定时任务 3:周期任务';
COMMENT ON COLUMN "public"."wecom_msg_send_task"."planned_execute_time" IS '计划执行时间';
COMMENT ON COLUMN "public"."wecom_msg_send_task"."actual_execute_time" IS '实际执行时间';
COMMENT ON COLUMN "public"."wecom_msg_send_task"."status" IS '状态 1:待执行 2:执行中 3:成功 4:失败 5:取消(后台操作人员取消的) 6:已失效(规则发生变更,已经生成的task要失效)';
COMMENT ON COLUMN "public"."wecom_msg_send_task"."send_log_id" IS '关联的发送记录ID''';
COMMENT ON COLUMN "public"."wecom_msg_send_task"."error_message" IS '错误信息';
COMMENT ON COLUMN "public"."wecom_msg_send_task"."created_at" IS '创建时间';
COMMENT ON COLUMN "public"."wecom_msg_send_task"."updated_at" IS '更新时间';
4. 核心流程实现
4.1 规则调度器:周期任务生成
规则调度器专门处理周期规则的任务生成,确保每个周期规则在适当的时间生成执行任务。
// RuleScheduler 规则调度器 - 处理周期规则的任务生成
func (s *Service) RuleScheduler(ctx context.Context) {
// 查询所有启用的周期规则
rules, _, err := s.wecomMsgRuleRepo.List(ctx, nil, "*", true,
&define.WecomMsgRuleListFilter{
Type: []int{define.SendTypePeriodic},
Status: []int{define.WecomMsgRuleStatusEnabled},
})
for _, rule := range rules {
// 为每个周期规则创建任务
domainRule, _ := s.ConvertRepoRuleToDomainRule(rule)
s.CreatePeriodicTask(ctx, domainRule)
}
}
// CreatePeriodicTask 创建周期任务
func (s *Service) CreatePeriodicTask(ctx context.Context, rule *domaindefine.WecomMsgRule) error {
// 计算下一个执行时间(考虑排除日期)
nextTime, err := s.NextExecutionTimeWithExclude(ctx, rule.CronExpression, time.Time{})
if err != nil {
// 排除日期导致的错误属于正常情况
if strings.Contains(err.Error(), "排除日期") {
return nil // 静默处理
}
return err
}
// 创建新的周期任务
task := &define.WecomMsgSendTask{
RuleID: rule.ID,
Type: define.SendTypePeriodic,
PlannedExecuteTime: nextTime,
Status: define.TaskStatusPending,
}
_, err = s.wecomMsgTaskRepo.Create(ctx, nil, task)
return err
}
// NextExecutionTimeWithExclude 计算考虑排除日期的下一个执行时间
func (s *Service) NextExecutionTimeWithExclude(ctx context.Context, cronExpression domaindefine.CronExpression, after time.Time) (time.Time, error) {
// 计算基础执行时间
nextTime, err := timeutil.NextExecutionTimeAfter(cronExpr, after)
if err != nil {
return time.Time{}, err
}
// 检查是否在排除日期中
if s.isExcluded(nextTime, cronExpression.ExcludeDates) {
return time.Time{}, fmt.Errorf("下一个执行时间在排除日期列表中")
}
return nextTime, nil
}
4.2 任务调度器:任务执行引擎
任务调度器负责执行所有待处理的任务,采用乐观锁保证并发安全。
// TaskScheduler 任务调度器 - 执行待处理任务
func (s *Service) TaskScheduler(ctx context.Context) {
// 查询所有待执行的任务
tasks, _, _ := s.wecomMsgTaskRepo.List(ctx, nil, "*", false,
&define.WecomMsgSendTaskListFilter{
Status: []int{define.TaskStatusPending},
PlannedExecuteEndTime: time.Now(), // 计划时间已到的任务
})
// 并发执行任务
for _, task := range tasks {
go func(task define.WecomMsgSendTask) {
s.ExecuteTask(ctx, task)
}(*task)
}
}
// ExecuteTask 执行单个任务
func (s *Service) ExecuteTask(ctx context.Context, task define.WecomMsgSendTask) error {
// 1. 使用乐观锁获取任务执行权
rowsAffected, err := s.wecomMsgTaskRepo.SetRunning(ctx, s.db, task.ID)
if rowsAffected == 0 {
return errors.New("任务已被其他实例执行") // 乐观锁冲突
}
// 2. 查询关联规则并校验任务有效性
ruleDetail, err := s.BackRuleGet(ctx, task.RuleID)
isValid, _, err := s.CheckTaskIsValid(ctx, task, ruleDetail)
if !isValid {
// 任务失效,设置状态并退出
s.wecomMsgTaskRepo.SetExpired(ctx, nil, task.ID)
return nil
}
// 3. 执行消息发送流程
return s.executeMessageSend(ctx, task, ruleDetail)
}
4.3 任务有效性校验:防止规则变更导致的无效执行
在任务执行前进行严格的校验,确保任务仍然有效。
// CheckTaskIsValid 校验任务是否有效
func (s *Service) CheckTaskIsValid(ctx context.Context, task define.WecomMsgSendTask, rule *domaindefine.BackRuleGetRes) (bool, time.Time, error) {
// 1. 检查任务类型与规则类型一致性
if rule.Type != task.Type {
return false, time.Time{}, errors.New("任务类型与规则类型不一致")
}
// 2. 对于定时规则,检查规则是否变更
if rule.Type == define.SendTypeScheduled && rule.CreatedAt != rule.UpdatedAt {
updatedAt, _ := time.Parse("2006-01-02 15:04:05", rule.Rule.UpdatedAt)
if task.CreatedAt.Before(updatedAt) {
return false, time.Time{}, fmt.Errorf("任务创建时间早于规则修改时间")
}
}
// 3. 检查规则状态
if rule.Status == define.WecomMsgRuleStatusDisabled {
return false, time.Time{}, errors.New("规则已被禁用")
}
// 4. 校验计划时间与规则配置一致性
return s.checkTaskTime(ctx, task.PlannedExecuteTime, rule.Rule)
}
// checkTaskTime 校验任务时间与规则配置是否一致
func (s *Service) checkTaskTime(ctx context.Context, plannedExecuteTime time.Time, rule domaindefine.Rule) (bool, time.Time, error) {
switch rule.Type {
case define.SendTypeImmediate:
return true, plannedExecuteTime, nil
case define.SendTypeScheduled:
// 比较计划时间与规则配置时间
sendTime, _ := time.Parse("2006-01-02 15:04:05", rule.SendTime)
plannedFormatted := plannedExecuteTime.Format("2006-01-02 15:04:05")
sendFormatted := sendTime.Format("2006-01-02 15:04:05")
if plannedFormatted != sendFormatted {
return false, sendTime, fmt.Errorf("计划执行时间与规则配置时间不一致")
}
return true, plannedExecuteTime, nil
case define.SendTypePeriodic:
// 重新计算执行时间并比较
nextExecuteTime, err := s.NextExecutionTimeWithExclude(ctx, rule.CronExpression, time.Now().Add(-1*time.Hour))
if err != nil {
return false, time.Time{}, err
}
plannedFormatted := plannedExecuteTime.Format("2006-01-02 15:04:05")
nextFormatted := nextExecuteTime.Format("2006-01-02 15:04:05")
if nextFormatted != plannedFormatted {
return false, nextExecuteTime, fmt.Errorf("计划执行时间与规则计算时间不一致")
}
return true, plannedExecuteTime, nil
}
return false, time.Time{}, errors.New("未知的发送类型")
}
4.4 消息发送执行:核心业务逻辑
// executeMessageSend 执行消息发送
func (s *Service) executeMessageSend(ctx context.Context, task define.WecomMsgSendTask, rule *domaindefine.BackRuleGetRes) error {
// 1. 组装企业微信消息
message, err := s.ConvertDomainContentToWecomMsg(ctx, &rule.Content, rule.ToTagIds, rule.ToPartyIds, rule.ToUserIds)
if err != nil {
return s.markTaskFailed(task, "转换消息内容失败", time.Now())
}
// 2. 调用企业微信接口
res, err := s.wecomBaseService.PushMessage(ctx, message)
if err != nil || res.Errcode != 0 {
return s.markTaskFailed(task, "发送消息失败", time.Now())
}
// 3. 创建发送日志
sendLogID, err := s.createSendLog(ctx, task, rule, res.Msgid, time.Now())
if err != nil {
return s.markTaskFailed(task, "创建发送日志失败", time.Now())
}
// 4. 记录接收者信息
s.recordReceivers(ctx, sendLogID, rule)
// 5. 标记任务成功
return s.markTaskSuccess(task, sendLogID, time.Now())
}
5. 并发控制与一致性保障
5.1 乐观锁实现
// 设置任务为执行中状态(乐观锁)
func (r *taskRepo) SetRunning(ctx context.Context, db *sql.DB, taskID int64) (int64, error) {
result, err := db.ExecContext(ctx, `
UPDATE wecom_msg_send_task
SET status = $1, actual_execute_time = $2
WHERE id = $3 AND status = $4 -- 只有待执行状态才能更新为执行中
`, define.TaskStatusRunning, time.Now(), taskID, define.TaskStatusPending)
rowsAffected, _ := result.RowsAffected()
return rowsAffected, err
}
5.2 任务状态流转
待执行(1) → 执行中(2) → 成功(3)
→ 失败(4)
→ 取消(5) [规则变更时]
6. 设计亮点与优势
6.1 架构优势
规则与执行解耦:规则变更通过生成新任务体现,不影响历史任务并发安全:乐观锁机制避免分布式锁的复杂性容错性强:任务失效时自动重新生成,保证最终一致性审计完整:完整的任务流和日志记录,便于问题追溯
6.2 业务价值
高可靠性:即使规则频繁变更,消息发送也能保证正确性易于扩展:规则调度器和任务调度器可以独立扩展运维友好:详细的状态流转和日志记录,便于监控和排查问题业务灵活:支持复杂的周期规则和排除日期配置
7. 总结
本设计通过引入任务中间层,成功实现了规则与执行的解耦,主要优势包括:
降低并发复杂度:通过任务只增不改和乐观锁避免分布式锁提高系统稳定性:规则变更不影响已生成任务的执行良好的扩展性:组件职责清晰,便于独立扩展和维护
这种设计在保证系统可靠性的同时,提供了良好的扩展性和维护性。通过借鉴事件溯源思想,我们在不引入完整ES架构复杂性的前提下,获得了其核心优势。
向完整ES架构的演进路径
如果需要向完整ES架构演进,可以:
**第一步 **:将规则表的更新改为事件驱动
**第二步 **:建立完整的事件存储
**第三步 **:引入消息队列驱动任务生成
**第四步 **:实现CQRS分离读写模型