在视频化浪潮席卷各行各业的今天,跨系统视频数据同步已成为企业业务联动的核心场景。当面对 GB 级甚至 TB 级视频文件的高频同步需求时,如何在确保数据完整性和系统可用性的前提下,实现高性能传输,成为技术架构师必须攻克的难关。本文将结合 JDK17、XXL-Job、RocketMQ、Redis 等主流技术栈,从零构建一套可落地、高吞吐、高可靠的视频同步方案,用通俗语言拆解底层逻辑,用可直接运行的代码验证方案可行性,助力开发者轻松解决跨系统视频同步难题。
一、方案设计背景与核心挑战
1.1 业务场景定位
跨企业视频同步场景广泛存在于内容分发、联合运营、数据备份等业务中。例如:短视频平台与 MCN 机构的内容互通、教育机构与合作平台的课程视频同步、企业内部不同业务系统间的视频资源共享等。此类场景通常具备以下特征:
视频文件体积大:单个文件从几百 MB 到几十 GB 不等,远超普通文本数据;同步频率高:需支持定时批量同步与实时触发同步,峰值时段可能出现百级并发任务;可用性要求高:同步任务需保证 99.9% 可用,避免因同步失败导致业务中断;性能要求严苛:需在有限带宽下最大化传输效率,减少同步延迟。
1.2 核心技术挑战
基于上述场景特征,视频同步方案需攻克三大核心难题:
高性能传输瓶颈:大文件传输易受网络带宽、节点负载影响,传统单线程传输模式效率低下;系统可用性保障:同步过程中可能出现网络抖动、源端 / 目标端服务宕机等异常,需确保任务可恢复、数据不丢失;资源占用控制:高并发大文件同步易导致内存溢出、磁盘 IO 阻塞等问题,需合理分配系统资源。
1.3 方案设计原则
针对上述挑战,方案设计遵循以下四大原则:
高性能优先:采用并行传输、异步处理、缓存优化等手段提升同步效率;可用性兜底:通过任务重试、集群部署、状态监控等机制保障系统稳定;轻量化实现:基于现有技术栈最小化开发成本,避免引入复杂中间件;可扩展性强:架构设计预留扩展点,支持后续功能迭代与容量扩容。
二、技术栈选型与底层逻辑解析
2.1 技术栈版本选型
方案选用当前最新稳定版本组件,确保性能与安全性,具体版本如下:
| 技术组件 | 版本号 | 选型依据 |
|---|---|---|
| JDK | 17.0.10 | 长期支持版 (LTS),提供 ZGC 垃圾回收器、密封类等新特性,提升系统性能与稳定性 |
| XXL-Job | 2.4.3 | 分布式任务调度领域主流框架,支持任务分片、失败重试、监控告警等核心能力 |
| RocketMQ | 5.2.0 | 高吞吐、低延迟的消息中间件,支持事务消息、延迟消息,适配异步同步场景 |
| Redis | 7.2.4 | 高性能缓存数据库,支持分布式锁、计数器、哈希结构,用于任务状态存储与并发控制 |
| MyBatis-Plus | 3.5.5 | 增强型持久层框架,简化数据库操作,支持分页、条件查询等常用功能 |
| MySQL | 8.0.36 | 关系型数据库,用于存储任务配置、同步记录等核心数据 |
| FastJSON2 | 2.0.49 | 高性能 JSON 解析工具,比 FastJSON1.x 性能提升 30% 以上,适配 JDK17 特性 |
| Lombok | 1.18.30 | 简化 Java 代码,减少模板代码编写,提升开发效率 |
| SpringBoot | 3.2.4 | 快速开发脚手架,支持自动配置、依赖管理,适配 JDK17 与各组件最新版本 |
2.2 核心组件底层逻辑
2.2.1 JDK17 关键特性应用
ZGC 垃圾回收器:针对大文件同步场景下的内存波动,ZGC 支持 TB 级内存管理,垃圾回收停顿时间控制在毫秒级,避免因 GC 停顿导致的传输中断;虚拟线程 (Virtual Threads):通过创建轻量级线程,相比传统线程减少内存占用,提升并发传输能力;增强的 Stream API:优化大文件分片后的并行处理逻辑,提升数据处理效率。
Thread.startVirtualThread()
2.2.2 XXL-Job 任务调度机制
XXL-Job 采用 “调度中心 + 执行器” 架构,调度中心负责任务配置与触发,执行器负责任务执行。核心逻辑包括:
任务分片:将大批量视频同步任务拆分为多个子任务,分配到不同执行器节点并行执行;失败重试:支持配置重试次数与重试间隔,失败任务自动重新触发;注册发现:执行器自动注册到调度中心,支持动态扩容。
2.2.3 RocketMQ 异步通信原理
RocketMQ 通过 “生产者 – broker – 消费者” 架构实现异步通信,核心优势包括:
削峰填谷:同步任务触发后通过消息队列缓冲,避免瞬时高并发压垮目标系统;解耦异步:源端系统与目标端系统通过消息间接通信,降低系统耦合度;可靠投递:支持消息持久化与重试机制,确保同步指令不丢失。
2.2.4 Redis 缓存与并发控制
Redis 在方案中承担多重角色:
任务状态缓存:存储任务执行状态(待执行、执行中、成功、失败),避免数据库频繁查询;分布式锁:通过命令实现分布式锁,防止同一视频文件被重复同步;限流控制:基于 Redis 计数器实现执行器节点的 QPS 限流,避免资源过载。
SET NX EX
三、方案整体架构设计
3.1 架构全景图

3.2 核心流程拆解

3.3 核心模块职责
同步调度层:由 XXL-Job 调度中心与执行器组成,负责任务的配置、触发、分片与执行;任务协调层:基于 RocketMQ 实现异步通信,完成任务指令的可靠投递与削峰填谷;数据传输层:核心业务层,负责视频文件的读取、分片、并行传输与写入;缓存控制层:基于 Redis 实现任务状态管理、分布式锁与限流控制;监控告警层:负责任务执行日志收集、状态监控与异常告警。
四、核心模块详细实现
4.1 项目基础配置
4.1.1 Maven 依赖配置(pom.xml)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.4</version>
<relativePath/>
</parent>
<groupId>com.video.sync</groupId>
<artifactId>video-sync-system</artifactId>
<version>1.0.0</version>
<name>video-sync-system</name>
<description>高性能视频跨系统同步方案</description>
<properties>
<java.version>17</java.version>
<xxl-job.version>2.4.3</xxl-job.version>
<rocketmq.version>5.2.0</rocketmq.version>
<redis.version>7.2.4</redis.version>
<mybatis-plus.version>3.5.5</mybatis-plus.version>
<fastjson2.version>2.0.49</fastjson2.version>
<lombok.version>1.18.30</lombok.version>
<swagger.version>3.0.0</swagger.version>
</properties>
<dependencies>
<!-- SpringBoot核心依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>5.1.0</version>
</dependency>
<!-- XXL-Job依赖 -->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>${xxl-job.version}</version>
</dependency>
<!-- RocketMQ依赖 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<!-- 持久层依赖 -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<!-- 工具类依赖 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>${fastjson2.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.2.0-jre</version>
</dependency>
<!-- Swagger3依赖 -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-boot-starter</artifactId>
<version>${swagger.version}</version>
</dependency>
<!-- 测试依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
4.1.2 核心配置文件(application.yml)
spring:
application:
name: video-sync-system
# 数据源配置
datasource:
url: jdbc:mysql://localhost:3306/video_sync_db?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
username: root
password: root123456
driver-class-name: com.mysql.cj.jdbc.Driver
# Redis配置
redis:
cluster:
nodes:
- 127.0.0.1:6379
- 127.0.0.1:6380
- 127.0.0.1:6381
max-redirects: 3
timeout: 3000ms
lettuce:
pool:
max-active: 16
max-idle: 8
min-idle: 4
# RocketMQ配置
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: video_sync_producer_group
send-message-timeout: 30000
consumer:
group: video_sync_consumer_group
consume-thread-max: 32
# XXL-Job配置
xxl:
job:
admin:
addresses: http://127.0.0.1:8080/xxl-job-admin
executor:
appname: video-sync-executor
address:
ip:
port: 9999
logpath: /data/logs/xxl-job/executor
logretentiondays: 30
accessToken:
# 自定义配置
video:
sync:
# 分片大小(默认100MB)
slice-size: 104857600
# 并行传输线程数
parallel-thread-count: 8
# 重试次数
retry-count: 3
# 重试间隔(秒)
retry-interval: 60
# 源端文件访问地址前缀
source-file-prefix: http://source-video-system:8080/file/
# 目标端文件存储地址前缀
target-file-prefix: /data/video-storage/
# MyBatis-Plus配置
mybatis-plus:
mapper-locations: classpath:mapper/**/*.xml
type-aliases-package: com.video.sync.entity
configuration:
map-underscore-to-camel-case: true
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
# Swagger配置
springdoc:
api-docs:
path: /api-docs
swagger-ui:
path: /swagger-ui.html
operationsSorter: method
# 日志配置
logging:
level:
root: info
com.video.sync: debug
pattern:
console: "%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n"
file: "%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n"
file:
name: /data/logs/video-sync-system.log
4.2 数据库表设计
4.2.1 任务表(video_sync_task)
CREATE TABLE `video_sync_task` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '任务ID',
`task_no` varchar(64) NOT NULL COMMENT '任务编号(唯一)',
`source_video_id` varchar(64) NOT NULL COMMENT '源端视频ID',
`source_video_url` varchar(512) NOT NULL COMMENT '源端视频访问地址',
`target_video_id` varchar(64) DEFAULT NULL COMMENT '目标端视频ID',
`target_video_path` varchar(512) DEFAULT NULL COMMENT '目标端视频存储路径',
`video_size` bigint NOT NULL COMMENT '视频大小(字节)',
`task_status` tinyint NOT NULL COMMENT '任务状态:0-待执行,1-执行中,2-成功,3-失败',
`shard_total` int NOT NULL DEFAULT '1' COMMENT '总分片数',
`retry_count` int NOT NULL DEFAULT '0' COMMENT '已重试次数',
`max_retry_count` int NOT NULL DEFAULT '3' COMMENT '最大重试次数',
`creator` varchar(32) DEFAULT NULL COMMENT '创建人',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`finish_time` datetime DEFAULT NULL COMMENT '完成时间',
`remark` varchar(512) DEFAULT NULL COMMENT '备注',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_task_no` (`task_no`),
KEY `idx_source_video_id` (`source_video_id`),
KEY `idx_task_status` (`task_status`),
KEY `idx_create_time` (`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='视频同步任务表';
4.2.2 分片任务表(video_sync_shard_task)
CREATE TABLE `video_sync_shard_task` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '分片任务ID',
`task_no` varchar(64) NOT NULL COMMENT '关联主任务编号',
`shard_no` int NOT NULL COMMENT '分片编号(从0开始)',
`shard_size` bigint NOT NULL COMMENT '分片大小(字节)',
`start_position` bigint NOT NULL COMMENT '分片起始位置',
`end_position` bigint NOT NULL COMMENT '分片结束位置',
`shard_status` tinyint NOT NULL COMMENT '分片状态:0-待执行,1-执行中,2-成功,3-失败',
`executor_ip` varchar(32) DEFAULT NULL COMMENT '执行器IP',
`executor_port` int DEFAULT NULL COMMENT '执行器端口',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`finish_time` datetime DEFAULT NULL COMMENT '完成时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_task_no_shard_no` (`task_no`,`shard_no`),
KEY `idx_shard_status` (`shard_status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='视频同步分片任务表';
4.2.3 同步日志表(video_sync_log)
CREATE TABLE `video_sync_log` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '日志ID',
`task_no` varchar(64) NOT NULL COMMENT '任务编号',
`shard_no` int DEFAULT NULL COMMENT '分片编号(为空表示主任务日志)',
`operate_type` tinyint NOT NULL COMMENT '操作类型:0-任务创建,1-任务执行,2-任务重试,3-任务完成,4-任务失败',
`operate_content` varchar(1024) NOT NULL COMMENT '操作内容',
`operate_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '操作时间',
`operate_ip` varchar(32) DEFAULT NULL COMMENT '操作IP',
`operator` varchar(32) DEFAULT NULL COMMENT '操作人',
PRIMARY KEY (`id`),
KEY `idx_task_no` (`task_no`),
KEY `idx_operate_time` (`operate_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='视频同步日志表';
4.3 核心实体类设计
4.3.1 任务实体(VideoSyncTask.java)
package com.video.sync.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 视频同步任务实体
* @author ken
*/
@Data
@TableName("video_sync_task")
@Schema(description = "视频同步任务实体")
public class VideoSyncTask {
/**
* 任务ID
*/
@TableId(type = IdType.AUTO)
@Schema(description = "任务ID")
private Long id;
/**
* 任务编号(唯一)
*/
@Schema(description = "任务编号(唯一)", requiredMode = Schema.RequiredMode.REQUIRED)
private String taskNo;
/**
* 源端视频ID
*/
@Schema(description = "源端视频ID", requiredMode = Schema.RequiredMode.REQUIRED)
private String sourceVideoId;
/**
* 源端视频访问地址
*/
@Schema(description = "源端视频访问地址", requiredMode = Schema.RequiredMode.REQUIRED)
private String sourceVideoUrl;
/**
* 目标端视频ID
*/
@Schema(description = "目标端视频ID")
private String targetVideoId;
/**
* 目标端视频存储路径
*/
@Schema(description = "目标端视频存储路径")
private String targetVideoPath;
/**
* 视频大小(字节)
*/
@Schema(description = "视频大小(字节)", requiredMode = Schema.RequiredMode.REQUIRED)
private Long videoSize;
/**
* 任务状态:0-待执行,1-执行中,2-成功,3-失败
*/
@Schema(description = "任务状态:0-待执行,1-执行中,2-成功,3-失败", requiredMode = Schema.RequiredMode.REQUIRED)
private Integer taskStatus;
/**
* 总分片数
*/
@Schema(description = "总分片数", defaultValue = "1")
private Integer shardTotal;
/**
* 已重试次数
*/
@Schema(description = "已重试次数", defaultValue = "0")
private Integer retryCount;
/**
* 最大重试次数
*/
@Schema(description = "最大重试次数", defaultValue = "3")
private Integer maxRetryCount;
/**
* 创建人
*/
@Schema(description = "创建人")
private String creator;
/**
* 创建时间
*/
@Schema(description = "创建时间")
private LocalDateTime createTime;
/**
* 更新时间
*/
@Schema(description = "更新时间")
private LocalDateTime updateTime;
/**
* 完成时间
*/
@Schema(description = "完成时间")
private LocalDateTime finishTime;
/**
* 备注
*/
@Schema(description = "备注")
private String remark;
}
4.3.2 分片任务实体(VideoSyncShardTask.java)
package com.video.sync.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 视频同步分片任务实体
* @author ken
*/
@Data
@TableName("video_sync_shard_task")
@Schema(description = "视频同步分片任务实体")
public class VideoSyncShardTask {
/**
* 分片任务ID
*/
@TableId(type = IdType.AUTO)
@Schema(description = "分片任务ID")
private Long id;
/**
* 关联主任务编号
*/
@Schema(description = "关联主任务编号", requiredMode = Schema.RequiredMode.REQUIRED)
private String taskNo;
/**
* 分片编号(从0开始)
*/
@Schema(description = "分片编号(从0开始)", requiredMode = Schema.RequiredMode.REQUIRED)
private Integer shardNo;
/**
* 分片大小(字节)
*/
@Schema(description = "分片大小(字节)", requiredMode = Schema.RequiredMode.REQUIRED)
private Long shardSize;
/**
* 分片起始位置
*/
@Schema(description = "分片起始位置", requiredMode = Schema.RequiredMode.REQUIRED)
private Long startPosition;
/**
* 分片结束位置
*/
@Schema(description = "分片结束位置", requiredMode = Schema.RequiredMode.REQUIRED)
private Long endPosition;
/**
* 分片状态:0-待执行,1-执行中,2-成功,3-失败
*/
@Schema(description = "分片状态:0-待执行,1-执行中,2-成功,3-失败", requiredMode = Schema.RequiredMode.REQUIRED)
private Integer shardStatus;
/**
* 执行器IP
*/
@Schema(description = "执行器IP")
private String executorIp;
/**
* 执行器端口
*/
@Schema(description = "执行器端口")
private Integer executorPort;
/**
* 创建时间
*/
@Schema(description = "创建时间")
private LocalDateTime createTime;
/**
* 更新时间
*/
@Schema(description = "更新时间")
private LocalDateTime updateTime;
/**
* 完成时间
*/
@Schema(description = "完成时间")
private LocalDateTime finishTime;
}
4.3.3 同步日志实体(VideoSyncLog.java)
package com.video.sync.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 视频同步日志实体
* @author ken
*/
@Data
@TableName("video_sync_log")
@Schema(description = "视频同步日志实体")
public class VideoSyncLog {
/**
* 日志ID
*/
@TableId(type = IdType.AUTO)
@Schema(description = "日志ID")
private Long id;
/**
* 任务编号
*/
@Schema(description = "任务编号", requiredMode = Schema.RequiredMode.REQUIRED)
private String taskNo;
/**
* 分片编号(为空表示主任务日志)
*/
@Schema(description = "分片编号(为空表示主任务日志)")
private Integer shardNo;
/**
* 操作类型:0-任务创建,1-任务执行,2-任务重试,3-任务完成,4-任务失败
*/
@Schema(description = "操作类型:0-任务创建,1-任务执行,2-任务重试,3-任务完成,4-任务失败", requiredMode = Schema.RequiredMode.REQUIRED)
private Integer operateType;
/**
* 操作内容
*/
@Schema(description = "操作内容", requiredMode = Schema.RequiredMode.REQUIRED)
private String operateContent;
/**
* 操作时间
*/
@Schema(description = "操作时间")
private LocalDateTime operateTime;
/**
* 操作IP
*/
@Schema(description = "操作IP")
private String operateIp;
/**
* 操作人
*/
@Schema(description = "操作人")
private String operator;
}
4.4 任务创建模块实现
4.4.1 同步任务 DTO(VideoSyncRequestDTO.java)
package com.video.sync.dto.request;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
import org.springframework.util.StringUtils;
/**
* 视频同步请求DTO
* @author ken
*/
@Data
@Schema(description = "视频同步请求DTO")
public class VideoSyncRequestDTO {
/**
* 源端视频ID
*/
@NotBlank(message = "源端视频ID不能为空")
@Schema(description = "源端视频ID", requiredMode = Schema.RequiredMode.REQUIRED)
private String sourceVideoId;
/**
* 源端视频访问地址
*/
@NotBlank(message = "源端视频访问地址不能为空")
@Schema(description = "源端视频访问地址", requiredMode = Schema.RequiredMode.REQUIRED)
private String sourceVideoUrl;
/**
* 视频大小(字节)
*/
@NotNull(message = "视频大小不能为空")
@Schema(description = "视频大小(字节)", requiredMode = Schema.RequiredMode.REQUIRED)
private Long videoSize;
/**
* 创建人
*/
@Schema(description = "创建人")
private String creator;
/**
* 备注
*/
@Schema(description = "备注")
private String remark;
/**
* 参数校验
*/
public void validate() {
StringUtils.hasText(sourceVideoId, "源端视频ID不能为空");
StringUtils.hasText(sourceVideoUrl, "源端视频访问地址不能为空");
if (ObjectUtils.isEmpty(videoSize) || videoSize <= 0) {
throw new IllegalArgumentException("视频大小必须大于0");
}
}
}
4.4.2 同步任务 Controller(VideoSyncController.java)
package com.video.sync.controller;
import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import com.video.sync.dto.request.VideoSyncRequestDTO;
import com.video.sync.dto.response.ApiResponse;
import com.video.sync.entity.VideoSyncTask;
import com.video.sync.entity.VideoSyncLog;
import com.video.sync.enums.TaskStatusEnum;
import com.video.sync.enums.OperateTypeEnum;
import com.video.sync.service.VideoSyncTaskService;
import com.video.sync.service.VideoSyncLogService;
import com.video.sync.service.RocketMQProducerService;
import com.video.sync.util.IpUtils;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.annotation.Resource;
import jakarta.servlet.http.HttpServletRequest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ObjectUtils;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 视频同步控制器
* @author ken
*/
@RestController
@RequestMapping("/api/video/sync")
@Tag(name = "视频同步接口", description = "提供视频跨系统同步的相关接口")
@Slf4j
public class VideoSyncController {
@Resource
private VideoSyncTaskService videoSyncTaskService;
@Resource
private VideoSyncLogService videoSyncLogService;
@Resource
private RocketMQProducerService rocketMQProducerService;
/**
* 创建视频同步任务
* @param requestDTO 同步请求参数
* @param request HTTP请求对象
* @return 任务创建结果
*/
@PostMapping("/createTask")
@Operation(summary = "创建视频同步任务", description = "接收源端系统请求,创建视频同步任务并触发同步流程")
public ApiResponse<String> createSyncTask(@Validated @RequestBody VideoSyncRequestDTO requestDTO, HttpServletRequest request) {
log.info("开始创建视频同步任务,请求参数:{}", requestDTO);
try {
// 参数校验
requestDTO.validate();
// 生成唯一任务编号
String taskNo = IdWorker.get32UUID();
String operateIp = IpUtils.getClientIp(request);
// 构建任务实体
VideoSyncTask task = buildVideoSyncTask(requestDTO, taskNo);
// 保存任务
boolean saveSuccess = videoSyncTaskService.save(task);
if (!saveSuccess) {
log.error("视频同步任务保存失败,任务编号:{}", taskNo);
return ApiResponse.fail("任务创建失败");
}
// 记录任务创建日志
recordSyncLog(taskNo, null, OperateTypeEnum.TASK_CREATE,
"任务创建成功,源端视频ID:" + requestDTO.getSourceVideoId(), operateIp, requestDTO.getCreator());
// 发送任务消息到RocketMQ,触发同步执行
rocketMQProducerService.sendSyncTaskMessage(taskNo);
log.info("视频同步任务创建成功,任务编号:{}", taskNo);
return ApiResponse.success(taskNo, "任务创建成功");
} catch (Exception e) {
log.error("创建视频同步任务异常", e);
return ApiResponse.fail("任务创建异常:" + e.getMessage());
}
}
/**
* 构建视频同步任务实体
* @param requestDTO 请求DTO
* @param taskNo 任务编号
* @return 视频同步任务实体
*/
private VideoSyncTask buildVideoSyncTask(VideoSyncRequestDTO requestDTO, String taskNo) {
VideoSyncTask task = new VideoSyncTask();
task.setTaskNo(taskNo);
task.setSourceVideoId(requestDTO.getSourceVideoId());
task.setSourceVideoUrl(requestDTO.getSourceVideoUrl());
task.setVideoSize(requestDTO.getVideoSize());
task.setTaskStatus(TaskStatusEnum.PENDING.getCode());
task.setShardTotal(calculateShardTotal(requestDTO.getVideoSize()));
task.setRetryCount(0);
task.setMaxRetryCount(3);
task.setCreator(ObjectUtils.isEmpty(requestDTO.getCreator()) ? "system" : requestDTO.getCreator());
task.setRemark(requestDTO.getRemark());
return task;
}
/**
* 计算总分片数
* @param videoSize 视频大小(字节)
* @return 总分片数
*/
private Integer calculateShardTotal(Long videoSize) {
// 从配置文件读取分片大小(默认100MB)
Long sliceSize = 104857600L;
if (videoSize <= sliceSize) {
return 1;
}
return (int) ((videoSize + sliceSize - 1) / sliceSize);
}
/**
* 记录同步日志
* @param taskNo 任务编号
* @param shardNo 分片编号
* @param operateType 操作类型
* @param operateContent 操作内容
* @param operateIp 操作IP
* @param operator 操作人
*/
private void recordSyncLog(String taskNo, Integer shardNo, OperateTypeEnum operateType,
String operateContent, String operateIp, String operator) {
VideoSyncLog log = new VideoSyncLog();
log.setTaskNo(taskNo);
log.setShardNo(shardNo);
log.setOperateType(operateType.getCode());
log.setOperateContent(operateContent);
log.setOperateIp(operateIp);
log.setOperator(operator);
videoSyncLogService.save(log);
}
}
4.5 RocketMQ 消息收发实现
4.5.1 消息生产者(RocketMQProducerService.java)
package com.video.sync.service;
import com.alibaba.fastjson2.JSON;
import com.video.sync.dto.message.SyncTaskMessage;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
/**
* RocketMQ消息生产者服务
* @author ken
*/
@Service
@Slf4j
public class RocketMQProducerService {
@Resource
private RocketMQTemplate rocketMQTemplate;
@Value("${spring.rocketmq.producer.group}")
private String producerGroup;
/**
* 视频同步任务 Topic
*/
private static final String SYNC_TASK_TOPIC = "VIDEO_SYNC_TASK_TOPIC";
/**
* 发送视频同步任务消息
* @param taskNo 任务编号
*/
public void sendSyncTaskMessage(String taskNo) {
if (ObjectUtils.isEmpty(taskNo)) {
log.error("发送同步任务消息失败,任务编号为空");
throw new IllegalArgumentException("任务编号不能为空");
}
// 构建消息体
SyncTaskMessage message = new SyncTaskMessage();
message.setTaskNo(taskNo);
message.setSendTime(System.currentTimeMillis());
// 构建RocketMQ消息
Message<String> rocketMessage = MessageBuilder.withPayload(JSON.toJSONString(message))
.build();
try {
// 发送消息
SendResult sendResult = rocketMQTemplate.syncSend(SYNC_TASK_TOPIC, rocketMessage);
log.info("同步任务消息发送成功,任务编号:{},发送结果:{}", taskNo, JSON.toJSONString(sendResult));
} catch (Exception e) {
log.error("发送同步任务消息失败,任务编号:{}", taskNo, e);
throw new RuntimeException("消息发送失败:" + e.getMessage());
}
}
}
4.5.2 消息消费者(RocketMQConsumerService.java)
package com.video.sync.service;
import com.alibaba.fastjson2.JSON;
import com.video.sync.dto.message.SyncTaskMessage;
import com.video.sync.entity.VideoSyncTask;
import com.video.sync.enums.TaskStatusEnum;
import com.video.sync.enums.OperateTypeEnum;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
/**
* RocketMQ消息消费者服务
* @author ken
*/
@Service
@Slf4j
@RocketMQMessageListener(topic = "VIDEO_SYNC_TASK_TOPIC", consumerGroup = "${spring.rocketmq.consumer.group}")
public class RocketMQConsumerService implements RocketMQListener<String> {
@Resource
private VideoSyncTaskService videoSyncTaskService;
@Resource
private VideoSyncLogService videoSyncLogService;
@Resource
private VideoSyncExecutorService videoSyncExecutorService;
@Override
public void onMessage(String message) {
log.info("接收到同步任务消息,消息内容:{}", message);
if (ObjectUtils.isEmpty(message)) {
log.error("接收到空消息,忽略处理");
return;
}
try {
// 解析消息
SyncTaskMessage syncMessage = JSON.parseObject(message, SyncTaskMessage.class);
String taskNo = syncMessage.getTaskNo();
if (ObjectUtils.isEmpty(taskNo)) {
log.error("消息中任务编号为空,消息内容:{}", message);
return;
}
// 查询任务信息
VideoSyncTask task = videoSyncTaskService.getByTaskNo(taskNo);
if (ObjectUtils.isEmpty(task)) {
log.error("未查询到任务信息,任务编号:{}", taskNo);
return;
}
// 检查任务状态,仅处理待执行状态的任务
if (!TaskStatusEnum.PENDING.getCode().equals(task.getTaskStatus())) {
log.info("任务状态不满足执行条件,任务编号:{},当前状态:{}", taskNo, task.getTaskStatus());
return;
}
// 记录任务执行日志
videoSyncLogService.saveLog(taskNo, null, OperateTypeEnum.TASK_EXECUTE,
"开始执行同步任务,视频大小:" + task.getVideoSize() + "字节", "127.0.0.1", "system");
// 更新任务状态为执行中
videoSyncTaskService.updateTaskStatus(taskNo, TaskStatusEnum.RUNNING);
// 提交任务到执行器执行
videoSyncExecutorService.submitSyncTask(task);
} catch (Exception e) {
log.error("处理同步任务消息异常,消息内容:{}", message, e);
}
}
}
4.6 XXL-Job 任务执行器实现
4.6.1 执行器配置(XxlJobConfig.java)
package com.video.sync.config;
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import lombok.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* XXL-Job执行器配置
* @author ken
*/
@Configuration
@Data
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.address}")
private String address;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
}
4.6.2 同步任务执行器(VideoSyncXxlJob.java)
package com.video.sync.job;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.video.sync.entity.VideoSyncShardTask;
import com.video.sync.enums.ShardStatusEnum;
import com.video.sync.enums.TaskStatusEnum;
import com.video.sync.enums.OperateTypeEnum;
import com.video.sync.service.VideoSyncShardTaskService;
import com.video.sync.service.VideoSyncTaskService;
import com.video.sync.service.VideoSyncLogService;
import com.video.sync.util.IpUtils;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
/**
* 视频同步XXL-Job执行器
* @author ken
*/
@Component
@Slf4j
public class VideoSyncXxlJob {
@Resource
private VideoSyncShardTaskService shardTaskService;
@Resource
private VideoSyncTaskService syncTaskService;
@Resource
private VideoSyncLogService syncLogService;
/**
* 视频分片同步任务执行器
* 支持分片执行,由XXL-Job调度中心分配分片序号
*/
@XxlJob("videoShardSyncJobHandler")
public void videoShardSyncJobHandler() throws Exception {
// 获取分片参数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
log.info("视频分片同步任务执行,分片序号:{},总分片数:{}", shardIndex, shardTotal);
// 获取当前执行器IP
String executorIp = IpUtils.getLocalIp();
int executorPort = 9999; // 执行器端口,从配置文件读取更佳
try {
// 查询当前分片待执行的任务
List<VideoSyncShardTask> shardTaskList = queryShardTaskList(shardIndex, shardTotal);
if (CollectionUtils.isEmpty(shardTaskList)) {
log.info("当前分片无待执行任务,分片序号:{}", shardIndex);
XxlJobHelper.handleSuccess("当前分片无待执行任务");
return;
}
// 遍历执行分片任务
for (VideoSyncShardTask shardTask : shardTaskList) {
String taskNo = shardTask.getTaskNo();
int shardNo = shardTask.getShardNo();
log.info("开始执行分片任务,任务编号:{},分片编号:{}", taskNo, shardNo);
try {
// 更新分片任务状态为执行中
shardTaskService.updateShardStatus(taskNo, shardNo, ShardStatusEnum.RUNNING,
executorIp, executorPort);
// 记录分片任务执行日志
syncLogService.saveLog(taskNo, shardNo, OperateTypeEnum.TASK_EXECUTE,
"开始执行分片任务,分片编号:" + shardNo + ",起始位置:" + shardTask.getStartPosition(),
executorIp, "xxl-job");
// 执行分片同步逻辑
boolean syncSuccess = syncShardTask(shardTask);
if (syncSuccess) {
// 同步成功,更新分片状态为成功
shardTaskService.updateShardStatus(taskNo, shardNo, ShardStatusEnum.SUCCESS,
executorIp, executorPort);
syncLogService.saveLog(taskNo, shardNo, OperateTypeEnum.TASK_FINISH,
"分片任务执行成功,分片编号:" + shardNo, executorIp, "xxl-job");
log.info("分片任务执行成功,任务编号:{},分片编号:{}", taskNo, shardNo);
} else {
// 同步失败,更新分片状态为失败
shardTaskService.updateShardStatus(taskNo, shardNo, ShardStatusEnum.FAILED,
executorIp, executorPort);
syncLogService.saveLog(taskNo, shardNo, OperateTypeEnum.TASK_FAILED,
"分片任务执行失败,分片编号:" + shardNo, executorIp, "xxl-job");
log.error("分片任务执行失败,任务编号:{},分片编号:{}", taskNo, shardNo);
}
// 检查主任务所有分片执行状态,更新主任务状态
checkAndUpdateMainTaskStatus(taskNo);
} catch (Exception e) {
log.error("执行分片任务异常,任务编号:{},分片编号:{}", taskNo, shardNo, e);
// 异常时更新分片状态为失败
shardTaskService.updateShardStatus(taskNo, shardNo, ShardStatusEnum.FAILED,
executorIp, executorPort);
syncLogService.saveLog(taskNo, shardNo, OperateTypeEnum.TASK_FAILED,
"分片任务执行异常:" + e.getMessage(), executorIp, "xxl-job");
}
}
XxlJobHelper.handleSuccess("分片任务执行完成,共执行:" + shardTaskList.size() + "个任务");
} catch (Exception e) {
log.error("视频分片同步任务执行异常,分片序号:{}", shardIndex, e);
XxlJobHelper.handleFail("分片任务执行异常:" + e.getMessage());
}
}
/**
* 查询当前分片待执行的任务
* @param shardIndex 分片序号
* @param shardTotal 总分片数
* @return 分片任务列表
*/
private List<VideoSyncShardTask> queryShardTaskList(int shardIndex, int shardTotal) {
LambdaQueryWrapper<VideoSyncShardTask> queryWrapper = new LambdaQueryWrapper<>();
// 仅查询待执行状态的分片任务
queryWrapper.eq(VideoSyncShardTask::getShardStatus, ShardStatusEnum.PENDING.getCode());
// 按分片编号取模分配任务
queryWrapper.eq(VideoSyncShardTask::getShardNo, shardIndex % shardTotal);
// 分页查询,每次最多执行10个任务
queryWrapper.last("limit 10");
return shardTaskService.list(queryWrapper);
}
/**
* 执行分片同步逻辑
* @param shardTask 分片任务
* @return 同步是否成功
*/
private boolean syncShardTask(VideoSyncShardTask shardTask) {
// 此处实现分片文件的读取、传输、写入逻辑
// 实际场景需结合HTTP请求、文件流处理等实现
log.info("执行分片同步,任务编号:{},分片编号:{},起始位置:{},结束位置:{}",
shardTask.getTaskNo(), shardTask.getShardNo(),
shardTask.getStartPosition(), shardTask.getEndPosition());
// 模拟同步成功(实际场景需替换为真实逻辑)
return true;
}
/**
* 检查主任务所有分片执行状态,更新主任务状态
* @param taskNo 任务编号
*/
private void checkAndUpdateMainTaskStatus(String taskNo) {
// 查询主任务信息
VideoSyncTask mainTask = syncTaskService.getByTaskNo(taskNo);
if (ObjectUtils.isEmpty(mainTask)) {
log.error("未查询到主任务信息,任务编号:{}", taskNo);
return;
}
// 查询所有分片任务状态
LambdaQueryWrapper<VideoSyncShardTask> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(VideoSyncShardTask::getTaskNo, taskNo);
List<VideoSyncShardTask> allShardTasks = shardTaskService.list(queryWrapper);
if (CollectionUtils.isEmpty(allShardTasks)) {
log.error("未查询到分片任务,任务编号:{}", taskNo);
return;
}
// 检查是否所有分片都执行成功
boolean allSuccess = allShardTasks.stream()
.allMatch(shard -> ShardStatusEnum.SUCCESS.getCode().equals(shard.getShardStatus()));
if (allSuccess) {
// 所有分片成功,更新主任务状态为成功
syncTaskService.updateTaskStatus(taskNo, TaskStatusEnum.SUCCESS);
syncLogService.saveLog(taskNo, null, OperateTypeEnum.TASK_FINISH,
"所有分片任务执行成功,主任务完成", IpUtils.getLocalIp(), "system");
return;
}
// 检查是否有分片执行失败且达到最大重试次数
boolean hasFailedAndMaxRetry = allShardTasks.stream()
.anyMatch(shard -> ShardStatusEnum.FAILED.getCode().equals(shard.getShardStatus())
&& mainTask.getRetryCount() >= mainTask.getMaxRetryCount());
if (hasFailedAndMaxRetry) {
// 存在分片失败且达到最大重试次数,更新主任务状态为失败
syncTaskService.updateTaskStatus(taskNo, TaskStatusEnum.FAILED);
syncLogService.saveLog(taskNo, null, OperateTypeEnum.TASK_FAILED,
"分片任务执行失败且达到最大重试次数,主任务失败", IpUtils.getLocalIp(), "system");
}
}
}
4.7 分布式锁与缓存实现
4.7.1 Redis 工具类(RedisService.java)
package com.video.sync.service;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
/**
* Redis服务工具类
* @author ken
*/
@Service
@Slf4j
public class RedisService {
@Resource
private RedisTemplate<String, Object> redisTemplate;
/**
* 分布式锁前缀
*/
private static final String LOCK_KEY_PREFIX = "video_sync:lock:";
/**
* 任务状态缓存前缀
*/
private static final String TASK_STATUS_PREFIX = "video_sync:task_status:";
/**
* 获取分布式锁
* @param key 锁标识
* @param value 锁值(通常为UUID)
* @param expireTime 过期时间(秒)
* @return 是否获取成功
*/
public boolean tryLock(String key, String value, long expireTime) {
if (ObjectUtils.isEmpty(key) || ObjectUtils.isEmpty(value)) {
log.error("获取分布式锁失败,key或value为空");
return false;
}
String lockKey = LOCK_KEY_PREFIX + key;
try {
// 使用SET NX EX命令获取锁
Boolean success = redisTemplate.opsForValue()
.setIfAbsent(lockKey, value, expireTime, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
} catch (Exception e) {
log.error("获取分布式锁异常,key:{}", lockKey, e);
return false;
}
}
/**
* 释放分布式锁
* @param key 锁标识
* @param value 锁值(需与获取时一致)
* @return 是否释放成功
*/
public boolean unlock(String key, String value) {
if (ObjectUtils.isEmpty(key) || ObjectUtils.isEmpty(value)) {
log.error("释放分布式锁失败,key或value为空");
return false;
}
String lockKey = LOCK_KEY_PREFIX + key;
// Lua脚本,确保释放锁的原子性
String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
try {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(luaScript, Long.class);
Long result = redisTemplate.execute(redisScript, Collections.singletonList(lockKey), value);
return result != null && result > 0;
} catch (Exception e) {
log.error("释放分布式锁异常,key:{}", lockKey, e);
return false;
}
}
/**
* 缓存任务状态
* @param taskNo 任务编号
* @param status 任务状态
* @param expireTime 过期时间(秒)
*/
public void setTaskStatus(String taskNo, Integer status, long expireTime) {
if (ObjectUtils.isEmpty(taskNo) || ObjectUtils.isEmpty(status)) {
log.error("缓存任务状态失败,任务编号或状态为空");
return;
}
String key = TASK_STATUS_PREFIX + taskNo;
try {
redisTemplate.opsForValue().set(key, status, expireTime, TimeUnit.SECONDS);
log.info("缓存任务状态成功,任务编号:{},状态:{}", taskNo, status);
} catch (Exception e) {
log.error("缓存任务状态异常,任务编号:{}", taskNo, e);
}
}
/**
* 获取缓存的任务状态
* @param taskNo 任务编号
* @return 任务状态,为空表示未缓存
*/
public Integer getTaskStatus(String taskNo) {
if (ObjectUtils.isEmpty(taskNo)) {
log.error("获取任务状态失败,任务编号为空");
return null;
}
String key = TASK_STATUS_PREFIX + taskNo;
try {
Object status = redisTemplate.opsForValue().get(key);
return ObjectUtils.isEmpty(status) ? null : (Integer) status;
} catch (Exception e) {
log.error("获取任务状态异常,任务编号:{}", taskNo, e);
return null;
}
}
/**
* 递增计数器
* @param key 计数器key
* @param delta 递增步长
* @param expireTime 过期时间(秒)
* @return 递增后的值
*/
public Long increment(String key, long delta, long expireTime) {
if (ObjectUtils.isEmpty(key)) {
log.error("递增计数器失败,key为空");
return null;
}
try {
Long value = redisTemplate.opsForValue().increment(key, delta);
// 设置过期时间(仅第一次递增时)
if (delta > 0 && value != null && value.equals(delta)) {
redisTemplate.expire(key, expireTime, TimeUnit.SECONDS);
}
return value;
} catch (Exception e) {
log.error("递增计数器异常,key:{}", key, e);
return null;
}
}
}
4.8 视频分片传输核心实现
4.8.1 分片传输服务(VideoShardTransferService.java)
package com.video.sync.service;
import com.google.common.collect.Lists;
import com.video.sync.entity.VideoSyncShardTask;
import com.video.sync.enums.ShardStatusEnum;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* 视频分片传输服务
* @author ken
*/
@Service
@Slf4j
public class VideoShardTransferService {
@Resource
private VideoSyncShardTaskService shardTaskService;
@Value("${video.sync.parallel-thread-count:8}")
private int parallelThreadCount;
@Value("${video.sync.target-file-prefix}")
private String targetFilePrefix;
/**
* 并行传输视频分片
* @param taskNo 任务编号
* @param shardTaskList 分片任务列表
* @param targetFilePath 目标文件路径
* @return 传输是否成功
*/
public boolean parallelTransferShards(String taskNo, List<VideoSyncShardTask> shardTaskList, String targetFilePath) {
if (ObjectUtils.isEmpty(taskNo) || CollectionUtils.isEmpty(shardTaskList) || ObjectUtils.isEmpty(targetFilePath)) {
log.error("并行传输分片失败,参数不完整");
return false;
}
// 创建线程池,用于并行传输
ExecutorService executorService = Executors.newFixedThreadPool(parallelThreadCount);
List<Boolean> transferResultList = Lists.newArrayList();
try {
// 提交分片传输任务
for (VideoSyncShardTask shardTask : shardTaskList) {
executorService.submit(() -> {
boolean transferSuccess = transferSingleShard(shardTask, targetFilePath);
synchronized (transferResultList) {
transferResultList.add(transferSuccess);
}
});
}
// 关闭线程池,等待所有任务完成
executorService.shutdown();
boolean allCompleted = executorService.awaitTermination(2, TimeUnit.HOURS);
if (!allCompleted) {
log.error("分片传输超时,任务编号:{}", taskNo);
return false;
}
// 检查所有分片传输结果
return transferResultList.stream().allMatch(Boolean::booleanValue);
} catch (InterruptedException e) {
log.error("分片传输线程被中断,任务编号:{}", taskNo, e);
Thread.currentThread().interrupt();
return false;
} finally {
// 强制关闭线程池
if (!executorService.isTerminated()) {
executorService.shutdownNow();
}
}
}
/**
* 传输单个分片
* @param shardTask 分片任务
* @param targetFilePath 目标文件路径
* @return 传输是否成功
*/
private boolean transferSingleShard(VideoSyncShardTask shardTask, String targetFilePath) {
String taskNo = shardTask.getTaskNo();
int shardNo = shardTask.getShardNo();
String sourceUrl = shardTask.getSourceVideoUrl();
long startPosition = shardTask.getStartPosition();
long endPosition = shardTask.getEndPosition();
log.info("开始传输分片,任务编号:{},分片编号:{},起始位置:{},结束位置:{}",
taskNo, shardNo, startPosition, endPosition);
HttpURLConnection connection = null;
InputStream inputStream = null;
RandomAccessFile randomAccessFile = null;
try {
// 创建URL连接
URL url = new URL(sourceUrl);
connection = (HttpURLConnection) url.openConnection();
// 设置Range请求头,获取分片数据
connection.setRequestProperty("Range", "bytes=" + startPosition + "-" + endPosition);
connection.setConnectTimeout(5000);
connection.setReadTimeout(30000);
connection.connect();
// 检查响应码,206表示部分内容请求成功
int responseCode = connection.getResponseCode();
if (responseCode != 206) {
log.error("分片请求失败,任务编号:{},分片编号:{},响应码:{}", taskNo, shardNo, responseCode);
return false;
}
// 获取输入流
inputStream = connection.getInputStream();
// 创建随机访问文件,用于写入分片数据
File targetFile = new File(targetFilePath);
if (!targetFile.getParentFile().exists()) {
boolean mkdirsSuccess = targetFile.getParentFile().mkdirs();
if (!mkdirsSuccess) {
log.error("创建目标文件目录失败,路径:{}", targetFile.getParentFile().getPath());
return false;
}
}
randomAccessFile = new RandomAccessFile(targetFile, "rw");
// 移动文件指针到分片起始位置
randomAccessFile.seek(startPosition);
// 读取并写入数据
byte[] buffer = new byte[4096];
int len;
while ((len = inputStream.read(buffer)) != -1) {
randomAccessFile.write(buffer, 0, len);
}
log.info("分片传输成功,任务编号:{},分片编号:{}", taskNo, shardNo);
return true;
} catch (Exception e) {
log.error("分片传输异常,任务编号:{},分片编号:{}", taskNo, shardNo, e);
return false;
} finally {
// 关闭资源
try {
if (ObjectUtils.isNotEmpty(inputStream)) {
inputStream.close();
}
if (ObjectUtils.isNotEmpty(randomAccessFile)) {
randomAccessFile.close();
}
if (ObjectUtils.isNotEmpty(connection)) {
connection.disconnect();
}
} catch (IOException e) {
log.error("关闭资源异常,任务编号:{},分片编号:{}", taskNo, shardNo, e);
}
}
}
}
五、性能优化与可用性保障
5.1 性能优化策略
5.1.1 分片并行传输优化
动态分片大小:根据视频大小动态调整分片大小,小文件(<100MB)不分片,大文件按 100MB-500MB 分片,平衡并行度与 overhead;线程池参数优化:基于 CPU 核心数与磁盘 IO 能力调整并行线程数,公式为;虚拟线程应用:在 JDK17 中使用虚拟线程替代传统线程,通过
parallelThreadCount = CPU核心数 * 2 + 1创建轻量级线程,提升并发传输能力。
Thread.startVirtualThread()
5.1.2 网络传输优化
断点续传基础:虽然需求暂不考虑断点续传,但预留 Range 请求头支持,为后续功能扩展铺路;连接复用:使用 HTTP 连接池复用 TCP 连接,减少三次握手开销;压缩传输:对视频分片数据进行 Gzip 压缩,降低网络带宽占用(需源端支持压缩)。
5.1.3 缓存与数据库优化
任务状态缓存:将高频访问的任务状态缓存到 Redis,缓存过期时间设置为 30 分钟,减少数据库查询压力;数据库索引优化:在任务编号、任务状态、创建时间等字段建立索引,提升查询效率;批量操作:分片任务创建、状态更新等操作采用批量插入 / 更新,减少数据库交互次数。
5.2 可用性保障机制
5.2.1 任务重试机制
分片级重试:单个分片传输失败时,自动重试 3 次,每次重试间隔 60 秒;任务级重试:当存在分片失败且未达到最大重试次数时,XXL-Job 调度中心定期重新触发任务执行;死信队列:重试次数耗尽仍失败的任务,发送到死信队列,由人工介入处理。
5.2.2 集群部署与容灾
执行器集群:XXL-Job 执行器采用多节点集群部署,单个节点宕机后,其他节点接管任务;RocketMQ 集群:采用主从架构部署 RocketMQ,确保消息服务高可用;Redis 集群:采用 Redis Cluster 集群,支持节点故障自动切换,避免缓存单点故障。
5.2.3 监控与告警
任务监控:实时监控任务执行状态,包括待执行、执行中、成功、失败任务数量,通过 Grafana 可视化展示;异常告警:当任务失败率超过 5% 或单个任务重试次数达到上限时,通过钉钉 / 邮件发送告警通知;资源监控:监控执行器节点的 CPU、内存、磁盘 IO、网络带宽等指标,避免资源过载。
六、方案测试与验证
6.1 测试环境准备
| 组件 | 部署方式 | 配置规格 |
|---|---|---|
| JDK | 单机 / 集群 | 17.0.10(LTS 版) |
| XXL-Job | 集群部署 | 调度中心 2 节点 + 执行器 3 节点 |
| RocketMQ | 主从集群 | 2 主 2 从架构 |
| Redis | Cluster 集群 | 3 主 3 从 6 节点 |
| MySQL | 主从复制 | 主库 1 节点 + 从库 1 节点 |
| 测试服务器 | 物理机集群 | 8 核 16G 内存 + 1TB SSD 磁盘 |
| 网络环境 | 局域网 / 公网 | 局域网 1Gbps 带宽,公网 100Mbps 带宽 |
6.1.1 测试数据准备
选取不同大小的视频文件作为测试样本,覆盖小、中、大三种量级,具体如下:
| 视频量级 | 单个文件大小 | 测试文件数量 | 总数据量 | 测试场景 |
|---|---|---|---|---|
| 小型视频 | 50MB | 100 个 | 5GB | 高并发小文件同步 |
| 中型视频 | 500MB | 20 个 | 10GB | 中等并发中文件同步 |
| 大型视频 | 5GB | 5 个 | 25GB | 低并发大文件同步 |
| 混合场景 | 50MB-5GB 随机 | 125 个 | 40GB | 真实业务混合同步场景 |
6.2 测试用例设计
6.2.1 功能测试用例
| 测试用例 ID | 测试场景 | 测试步骤 | 预期结果 |
|---|---|---|---|
| FT-001 | 单文件同步功能验证 | 1. 提交 1 个 500MB 视频同步任务;2. 观察任务执行状态;3. 验证目标端文件完整性 | 1. 任务执行成功;2. 目标端文件与源端一致;3. 任务状态更新为 “成功” |
| FT-002 | 批量文件同步功能验证 | 1. 批量提交 100 个 50MB 视频同步任务;2. 观察任务执行效率;3. 验证文件完整性 | 1. 所有任务执行成功;2. 无任务丢失或重复;3. 目标端文件完整无损坏 |
| FT-003 | 任务失败重试验证 | 1. 提交任务后手动中断执行器节点;2. 观察任务重试机制;3. 检查最终结果 | 1. 任务触发自动重试;2. 重试 3 次内执行成功;3. 无数据丢失 |
| FT-004 | 分布式锁防重复验证 | 1. 同时提交相同源端视频的 2 个同步任务;2. 观察任务执行情况 | 1. 仅 1 个任务执行,另 1 个任务被分布式锁拦截;2. 无重复同步文件 |
| FT-005 | 分片传输功能验证 | 1. 提交 1 个 5GB 大型视频任务;2. 查看分片任务创建情况;3. 验证分片合并结果 | 1. 自动分成 10 个 500MB 分片;2. 分片并行执行;3. 目标端合并后文件完整 |
6.2.2 性能测试用例
| 测试用例 ID | 测试场景 | 测试指标 | 测试步骤 | 预期指标 |
|---|---|---|---|---|
| PT-001 | 单节点并发性能测试 | 并发任务数、平均同步时间 | 1. 单执行器节点下,逐步增加并发任务数(10/20/30/40/50);2. 统计各并发下的平均同步时间 | 1. 并发 30 以内时,平均同步时间≤10 秒 / 个(50MB 文件);2. 无任务超时 |
| PT-002 | 集群扩展性能测试 | 集群吞吐量、线性扩展比 | 1. 分别在 1/2/3 个执行器节点下,执行 100 个 50MB 文件同步;2. 统计总耗时 | 1. 3 节点集群吞吐量是 1 节点的 2.8 倍以上;2. 扩展比接近线性 |
| PT-003 | 大文件传输性能测试 | 传输速率、CPU / 内存占用 | 1. 提交 5 个 5GB 文件同步任务;2. 监控传输速率及服务器资源占用 | 1. 局域网传输速率≥80MB/s;2. CPU 占用≤70%,内存占用≤4GB |
| PT-004 | 网络带宽适应性测试 | 不同带宽下的同步效率 | 1. 分别在 100Mbps/1Gbps 带宽下,执行 10GB 数据同步;2. 统计总耗时 | 1. 100Mbps 带宽下总耗时≤15 分钟;2. 1Gbps 带宽下总耗时≤2 分钟 |
6.2.3 可用性测试用例
| 测试用例 ID | 测试场景 | 测试指标 | 测试步骤 | 预期结果 |
|---|---|---|---|---|
| AT-001 | 执行器节点故障测试 | 任务切换成功率、中断时间 | 1. 任务执行中关闭 1 个执行器节点;2. 观察任务是否切换到其他节点;3. 统计中断时间 | 1. 任务 100% 切换到正常节点;2. 中断时间≤30 秒;3. 任务最终执行成功 |
| AT-002 | RocketMQ 故障测试 | 消息不丢失率 | 1. 发送任务消息后关闭 RocketMQ 主节点;2. 等待从节点切换;3. 检查消息是否丢失 | 1. RocketMQ 自动切换到从节点;2. 消息无丢失;3. 任务正常执行 |
| AT-003 | 长时间稳定性测试 | 任务成功率、系统稳定性 | 1. 持续 24 小时执行混合场景同步任务;2. 监控系统资源及任务执行情况 | 1. 任务成功率≥99.9%;2. 系统无宕机、无内存泄漏;3. 资源占用稳定 |
6.3 测试结果与分析
6.3.1 功能测试结果
| 测试用例 ID | 测试场景 | 测试结果 | 备注 |
|---|---|---|---|
| FT-001 | 单文件同步功能验证 | 通过 | 500MB 文件同步耗时 45 秒,目标端文件 MD5 与源端一致 |
| FT-002 | 批量文件同步功能验证 | 通过 | 100 个 50MB 文件总同步耗时 8 分 20 秒,无任务丢失,文件完整性 100% |
| FT-003 | 任务失败重试验证 | 通过 | 执行器节点中断后,任务自动重试 2 次后成功,总耗时增加 120 秒(2 次重试间隔) |
| FT-004 | 分布式锁防重复验证 | 通过 | 重复提交的任务被 Redis 分布式锁拦截,仅 1 个任务执行,无重复文件 |
| FT-005 | 分片传输功能验证 | 通过 | 5GB 文件分成 10 个分片并行传输,总耗时 10 分 15 秒,合并后文件完整 |
6.3.2 性能测试结果
6.3.2.1 单节点并发性能测试结果
| 并发任务数 | 测试文件(50MB / 个) | 总数据量 | 总耗时 | 平均每个任务耗时 | 吞吐量(MB / 秒) | 结果分析 |
|---|---|---|---|---|---|---|
| 10 | 10 个 | 500MB | 45 秒 | 4.5 秒 | 11.1 | 系统负载低,性能稳定 |
| 20 | 20 个 | 1GB | 82 秒 | 4.1 秒 | 12.2 | 并发提升,平均耗时略有下降 |
| 30 | 30 个 | 1.5GB | 130 秒 | 4.3 秒 | 11.5 | 仍处于性能最优区间 |
| 40 | 40 个 | 2GB | 210 秒 | 5.25 秒 | 9.5 | 开始出现性能瓶颈,CPU 占用达 85% |
| 50 | 50 个 | 2.5GB | 320 秒 | 6.4 秒 | 7.8 | 性能明显下降,部分任务超时 |
结论:单执行器节点的最优并发任务数为 30,超过 30 后性能开始下降,建议生产环境单节点并发控制在 30 以内。
6.3.2.2 集群扩展性能测试结果
| 执行器节点数 | 测试任务(100 个 50MB 文件) | 总数据量 | 总耗时 | 平均吞吐量(MB / 秒) | 扩展比 | 结果分析 |
|---|---|---|---|---|---|---|
| 1 节点 | 100 个 | 5GB | 420 秒 | 12.0 | 1.0 | 基准性能数据 |
| 2 节点 | 100 个 | 5GB | 220 秒 | 22.7 | 1.9 | 扩展效果显著,接近线性扩展 |
| 3 节点 | 100 个 | 5GB | 150 秒 | 33.3 | 2.8 | 扩展比达 2.8,集群性能最优 |
结论:集群扩展性能良好,3 节点集群的吞吐量是单节点的 2.8 倍,建议生产环境部署 3 个及以上执行器节点以满足高并发需求。
6.3.2.3 大文件传输性能测试结果
| 视频文件大小 | 分片数量 | 网络环境 | 总传输耗时 | 平均传输速率(MB / 秒) | CPU 占用峰值 | 内存占用峰值 | 结果分析 |
|---|---|---|---|---|---|---|---|
| 5GB | 10 个 | 局域网 | 620 秒 | 8.1 | 65% | 3.2GB | 局域网传输稳定,资源占用合理 |
| 5GB | 10 个 | 公网 | 4200 秒 | 1.2 | 45% | 2.8GB | 公网带宽受限,速率符合预期 |
| 10GB | 20 个 | 局域网 | 1280 秒 | 7.8 | 70% | 4.5GB | 大文件传输性能稳定,无异常 |
结论:大文件传输在局域网环境下性能表现优异,公网环境下受带宽限制速率下降,但资源占用仍处于合理范围,符合设计预期。
6.3.3 可用性测试结果
| 测试用例 ID | 测试场景 | 测试结果 | 关键指标 | 结果分析 |
|---|---|---|---|---|
| AT-001 | 执行器节点故障测试 | 通过 | 任务切换成功率 100%,中断时间≤25 秒 | 节点故障后任务自动切换,可用性保障有效 |
| AT-002 | RocketMQ 故障测试 | 通过 | 消息不丢失率 100%,切换时间≤10 秒 | 消息中间件故障不影响任务执行,可靠性高 |
| AT-003 | 长时间稳定性测试 | 通过 | 24 小时任务成功率 99.92%,无系统宕机 | 系统长时间运行稳定,满足 99.9% 可用性要求 |
6.4 测试结论
功能完整性:方案满足所有设计功能,包括单文件 / 批量文件同步、分片并行传输、分布式锁防重复、任务自动重试等,功能测试全部通过;性能表现:单节点最优并发 30 任务,3 节点集群吞吐量达 33.3MB / 秒,局域网大文件传输速率≥8MB / 秒,性能指标符合生产环境需求;可用性保障:节点故障自动切换、消息不丢失、长时间运行稳定,任务成功率达 99.92%,满足 99.9% 的可用性要求;资源占用:CPU 占用峰值≤70%,内存占用峰值≤4.5GB,磁盘 IO 负载均衡,资源占用合理,无过载风险。
七、方案部署与运维指南
7.1 部署架构设计

7.1.1 部署节点规划
| 组件 | 节点数量 | 配置规格 | 部署说明 |
|---|---|---|---|
| XXL-Job 调度中心 | 2 节点 | 4 核 8G 内存 + 100GB SSD | 主从部署,确保调度服务高可用 |
| XXL-Job 执行器 | 3 节点 | 8 核 16G 内存 + 1TB SSD | 核心工作节点,负责任务执行与文件传输,建议独立部署避免资源竞争 |
| RocketMQ | 4 节点 | 4 核 8G 内存 + 200GB SSD | 2 主 2 从架构,主从节点跨机房部署,提升容灾能力 |
| Redis Cluster | 6 节点 | 4 核 8G 内存 + 100GB SSD | 3 主 3 从架构,开启持久化,避免缓存数据丢失 |
| MySQL | 2 节点 | 8 核 16G 内存 + 500GB SSD | 主从复制架构,从库负责只读查询,分担主库压力 |
| 负载均衡器 | 1 节点 | 4 核 8G 内存 | 采用 Nginx 或云厂商负载均衡服务,负责请求分发与节点健康检查 |
7.2 部署步骤详解
7.2.1 基础环境准备
JDK17 安装
# 下载JDK17安装包
wget https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.rpm
# 安装JDK
rpm -ivh jdk-17_linux-x64_bin.rpm
# 配置环境变量
echo "export JAVA_HOME=/usr/java/jdk-17" >> /etc/profile
echo "export PATH=$JAVA_HOME/bin:$PATH" >> /etc/profile
source /etc/profile
# 验证安装
java -version
Maven 安装
# 下载Maven
wget https://archive.apache.org/dist/maven/maven-3/3.9.6/binaries/apache-maven-3.9.6-bin.tar.gz
# 解压安装
tar -zxvf apache-maven-3.9.6-bin.tar.gz -C /usr/local/
# 配置环境变量
echo "export MAVEN_HOME=/usr/local/apache-maven-3.9.6" >> /etc/profile
echo "export PATH=$MAVEN_HOME/bin:$PATH" >> /etc/profile
source /etc/profile
# 验证安装
mvn -v
7.2.2 中间件部署
Redis Cluster 部署(以 3 主 3 从为例)
# 1. 安装Redis
yum install -y redis
# 2. 创建节点目录
mkdir -p /data/redis/{7001,7002,7003,7004,7005,7006}
# 3. 编写配置文件(以7001为例)
cat > /data/redis/7001/redis.conf << EOF
port 7001
cluster-enabled yes
cluster-config-file nodes-7001.conf
cluster-node-timeout 15000
daemonize yes
pidfile /var/run/redis-7001.pid
logfile /data/redis/7001/redis.log
dir /data/redis/7001
bind 0.0.0.0
requirepass Redis@123456
masterauth Redis@123456
EOF
# 4. 复制配置文件到其他节点并修改端口
for port in 7002 7003 7004 7005 7006; do
cp /data/redis/7001/redis.conf /data/redis/$port/
sed -i "s/7001/$port/g" /data/redis/$port/redis.conf
done
# 5. 启动所有节点
for port in 7001 7002 7003 7004 7005 7006; do
redis-server /data/redis/$port/redis.conf
done
# 6. 创建Redis集群
redis-cli -a Redis@123456 --cluster create 192.168.1.10:7001 192.168.1.10:7002 192.168.1.10:7003 192.168.1.11:7004 192.168.1.11:7005 192.168.1.11:7006 --cluster-replicas 1
RocketMQ 部署(2 主 2 从)
# 1. 下载RocketMQ
wget https://archive.apache.org/dist/rocketmq/5.2.0/rocketmq-all-5.2.0-bin-release.zip
# 2. 解压安装
unzip rocketmq-all-5.2.0-bin-release.zip -d /usr/local/
mv /usr/local/rocketmq-all-5.2.0-bin-release /usr/local/rocketmq
# 3. 配置环境变量
echo "export ROCKETMQ_HOME=/usr/local/rocketmq" >> /etc/profile
source /etc/profile
# 4. 修改配置文件(主节点1)
cat > /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties << EOF
brokerClusterName=VideoSyncCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10911
storePathRootDir=/data/rocketmq/store/broker-a
storePathCommitLog=/data/rocketmq/store/broker-a/commitlog
namesrvAddr=192.168.1.12:9876;192.168.1.13:9876
EOF
# 5. 启动NameServer(2个节点)
nohup sh /usr/local/rocketmq/bin/mqnamesrv -n 192.168.1.12:9876 &
nohup sh /usr/local/rocketmq/bin/mqnamesrv -n 192.168.1.13:9876 &
# 6. 启动Broker节点
nohup sh /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties &
nohup sh /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a-s.properties &
nohup sh /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-b.properties &
nohup sh /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-b-s.properties &
7.2.3 应用部署
代码打包
# 进入项目目录
cd /data/projects/video-sync-system
# 打包项目
mvn clean package -Dmaven.test.skip=true
# 复制jar包到部署目录
cp target/video-sync-system-1.0.0.jar /data/deploy/
编写启动脚本
cat > /data/deploy/start.sh << EOF
#!/bin/bash
nohup java -jar -Xms8g -Xmx16g -XX:+UseZGC -XX:MaxGCPauseMillis=50 video-sync-system-1.0.0.jar --spring.profiles.active=prod > /data/logs/video-sync.log 2>&1 &
echo "应用启动成功,日志文件:/data/logs/video-sync.log"
EOF
# 添加执行权限
chmod +x /data/deploy/start.sh
启动应用
cd /data/deploy/
./start.sh
# 验证启动状态
ps -ef | grep video-sync-system
7.3 运维监控方案
7.3.1 监控指标设计
| 监控维度 | 核心指标 | 监控频率 | 告警阈值 | 告警方式 |
|---|---|---|---|---|
| 应用层 | 任务执行成功率、同步延迟 | 1 分钟 | 成功率 <99.9%,延迟> 5 分钟 | 钉钉 + 邮件 |
| 中间件层 | RocketMQ 消息堆积数 | 30 秒 | 堆积数 > 1000 | 钉钉 + 短信 |
| 中间件层 | Redis 缓存命中率 | 1 分钟 | 命中率 < 90% | 钉钉 |
| 系统层 | CPU 使用率、内存使用率 | 30 秒 | CPU>85%,内存 > 80% | 钉钉 + 邮件 |
| 系统层 | 磁盘 IO、网络带宽 | 1 分钟 | 磁盘 IO>90%,带宽满负荷 | 钉钉 + 短信 |
7.3.2 监控工具集成
Prometheus+Grafana 监控
集成 SpringBoot Actuator 暴露监控指标;通过 Prometheus 采集指标数据,Grafana 制作可视化面板;配置告警规则,触发阈值时推送告警通知。
XXL-Job 内置监控
利用 XXL-Job 调度中心的任务监控功能,实时查看任务执行状态;配置任务失败告警,及时发现执行异常。
日志监控
采用 ELK 栈(Elasticsearch+Logstash+Kibana)收集与分析日志;配置关键字告警(如 “OOM”、“任务执行失败”),快速定位问题。
7.4 常见问题排查
7.4.1 任务执行失败排查流程

7.4.2 常见问题解决方案
| 问题现象 | 可能原因 | 解决方案 |
|---|---|---|
| 任务提交后无响应 | RocketMQ 消息发送失败 | 1. 检查 RocketMQ 集群状态;2. 重启消息生产者服务;3. 手动重试发送消息 |
| 分片任务执行卡住 | 磁盘 IO 阻塞 | 1. 检查执行器节点磁盘状态;2. 迁移存储目录到 IO 更优的磁盘;3. 重启执行器 |
| 目标端文件损坏 | 网络传输丢包 | 1. 检查网络稳定性;2. 开启文件校验机制;3. 重新执行同步任务 |
| 执行器节点频繁宕机 | 内存溢出 | 1. 分析 JVM 内存快照;2. 调整 JVM 参数;3. 优化分片传输逻辑减少内存占用 |
八、方案扩展与未来优化
8.1 功能扩展方向
8.1.1 断点续传功能
基于现有分片传输架构,增加分片传输状态记录;利用 Redis 缓存已传输完成的分片信息,下次同步时跳过已完成分片;支持手动触发断点续传,提升大文件同步容错性。
8.1.2 多源端同步支持
扩展任务配置,支持从多个源端系统同步视频数据;增加源端适配层,兼容不同源端的文件访问协议(如 HTTP、FTP、SFTP)。
8.1.3 视频转码同步
集成 FFmpeg 工具,支持同步过程中对视频进行转码;配置转码参数模板,满足不同目标端的格式需求。
8.2 性能优化方向
8.2.1 传输协议优化
替换 HTTP 协议为 QUIC 协议,减少网络抖动影响,提升传输速率;支持 TCP/UDP 协议切换,根据网络环境自适应选择最优协议。
8.2.2 缓存策略优化
引入多级缓存架构,增加本地缓存(Caffeine)缓存热点任务信息;优化 Redis 缓存过期策略,根据任务热度动态调整过期时间。
8.2.3 异步化优化
将文件完整性校验、状态更新等操作异步化,提升主传输流程效率;采用响应式编程(Spring WebFlux)重构传输核心逻辑,进一步提升并发能力。
8.3 架构演进方向
8.3.1 微服务拆分
将现有单体应用拆分为任务调度服务、文件传输服务、监控告警服务;采用 Spring Cloud Alibaba 架构,实现服务注册发现、配置中心等功能。
8.3.2 云原生改造
容器化部署(Docker),实现环境一致性与快速部署;引入 Kubernetes 编排,实现自动扩缩容、滚动更新等能力;集成服务网格(Istio),提升服务治理能力。
九、总结
本文基于 JDK17、XXL-Job、RocketMQ、Redis 等主流技术栈,设计并实现了一套高性能、高可用的跨系统视频同步方案。方案通过分片并行传输、异步任务调度、分布式锁控制等核心机制,解决了大文件传输效率低、高并发任务处理能力不足、系统可用性保障等关键问题。