基于CompletableFuture的主子任务并行处理架构实战:多渠道账单并发导入性能提升5倍的技术方案

内容分享2周前发布
0 0 0

概述

引言

在企业级财务对账系统的开发实践中,我们遇到了一个典型的性能瓶颈:批量处理15个不同支付渠道的账单数据时,串行执行模式导致单次导入耗时长达2.5小时。更严重的是,任一渠道处理失败都会中断整个流程,造成大量无效等待。

经过架构重构,我们设计了基于CompletableFuture的主子任务并行处理模型,配合精细化的线程池调优和异常隔离机制,最终将处理时间压缩至30分钟以内,性能提升达5倍,同时实现了故障隔离——单个渠道异常不再影响其他渠道的正常处理。

技术方案核心要点

架构设计:主任务-子任务双层模型,支持细粒度进度追踪与异常隔离并发优化:基于CompletableFuture + AtomicInteger实现真正的并行处理异常容错:子任务独立异常处理,单渠道失败不影响其他渠道执行前后端协同:Vue 3 + JSP混合架构下的异步任务状态实时同步机制线程池调优:针对不同部署场景的ThreadPoolExecutor参数最佳实践

技术栈

后端框架:Spring Boot 2.7.18 + MyBatis-Plus并发工具:Java 17 CompletableFuture + AtomicInteger + ThreadPoolTaskExecutor数据库:MySQL 8.0前端技术:Vue 3 + Element Plus视图层:JSP + JSTL


第一章 系统架构设计

1.1 整体架构流程

下图展示了从用户触发导入到后端并行处理的完整链路:

1.2 实际运行效果展示

下图为生产环境中一键导入15个渠道的真实运行截图。可以看到:

主任务(ID=1): 显示整体进度80%,已完成12/15个渠道,当前处于运行中状态子任务(ID=2-16): 各渠道独立执行,部分已成功完成(100%),部分仍在运行中进度可视化: 每个任务都有独立的进度条,清晰展示当前处理进度异常隔离: ID=8的美团外卖渠道处于运行中(50%),但不影响其他已完成渠道的结果
基于CompletableFuture的主子任务并行处理架构实战:多渠道账单并发导入性能提升5倍的技术方案

关键观察点:

主子任务分层: 第1行为主任务,展示”12/15个渠道”的整体进度;下方为各渠道子任务并行执行: 多个子任务同时处于”运行中”状态,证明真正实现了并行处理状态多样性: 包含”成功”、”运行中”等多种状态,真实反映复杂业务场景细粒度追踪: 每个渠道都有独立的任务名称、渠道类型、进度百分比

1.3 主子任务双层模型

系统采用两级任务体系实现分层管理:

任务类型 职责 关键字段 典型场景
主任务(MAIN) 代表整体批量导入流程 totalChannels、completedChannels、progressPercent 一键导入15个渠道
子任务(SUB) 对应单个渠道的数据处理 channelType、parentTaskId、errorMessage 处理渠道账单

双层架构的三个关键优势:

细粒度追踪: 可精确定位每个渠道的执行状态与异常详情
主任务显示”已完成9/15个渠道”,子任务显示”xxx渠道: 处理第xxxx行”

独立重试: 子任务失败后可单独重新执行,无需重跑整体流程

进度可视化: 前端可分别展示主任务整体进度与各子任务详细状态
主任务进度条显示60%,各子任务进度条分别显示不同完成度

1.3 异常隔离机制设计

传统串行处理的致命缺陷是”一个失败,全盘皆输”。我们通过以下设计实现异常隔离:


private CompletableFuture<Void> processSingleChannelAsync(...) {
    return CompletableFuture.runAsync(() -> {
        try {
            //  处理单个渠道逻辑
            billParserService.parseSingleChannelBills(...).join();
            // 更新子任务状态为成功
            billImportTaskService.updateTaskStatus(subTaskId, "SUCCESS", 100, ...);
        } catch (Exception e) {
            // ⚠️ 关键点1: 异常仅影响当前子任务,不抛出到外层
            billImportTaskService.updateTaskError(subTaskId, errorMessage);
            logger.error("渠道 {} 导入失败", channelEnum.getDescr(), e);
            // ⚠️ 关键点2: 不再重新抛出异常,让其他渠道继续执行
        }
    }, importTaskExecutor);
}

异常隔离的三层保障:

子任务内部完全捕获异常: catch块消化所有异常,仅更新自身状态为FAILED不向外层抛出异常: 避免中断
CompletableFuture.allOf()
的等待数据库记录详细错误信息: 便于后续问题排查与定向重试

实际效果对比:

处理模式 饿了么失败 美团失败 其他13个渠道 整体结果
串行处理 未执行 ❌ 全部失败
并行+异常隔离 ✅ 全部成功 ⚠️ 部分成功(13/15)

第二章 数据模型设计

2.1 任务实体核心字段


@Data
@TableName("bill_import_task")
public class BillImportTaskEntity {
    
    @TableId(type = IdType.AUTO)
    private Long id;
    
    private String taskName;              // 任务名称
    private String channelType;           // 渠道类型(如ele、meituan)
    private String folderPath;            // 文件夹路径
    private String status;                // 状态:PENDING/RUNNING/SUCCESS/FAILED
    private Integer progressPercent;      // 进度百分比(0-100)
    private String currentMessage;        // 当前处理消息
    private String errorMessage;          // 错误信息
    
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
    private LocalDateTime startTime;      // 开始时间
    
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
    private LocalDateTime endTime;        // 结束时间
    
    // ========== 主子任务关联字段 ==========
    private Long parentTaskId;            // 父任务ID(仅子任务有值)
    private String taskType;              // 任务类型:MAIN/SUB
    private String channelName;           // 渠道名称(中文)
    private Integer totalChannels;        // 总渠道数(仅主任务有值)
    private Integer completedChannels;    // 已完成渠道数(仅主任务有值)
    
    // ========== 冗余字段-不存储到数据库 ==========
    @TableField(exist = false)
    private List<BillImportTaskEntity> subTasks;  // 子任务列表
    
    @TableField(exist = false)
    private String parentTaskName;        // 父任务名称
    
    // 标准审计字段
    private LocalDateTime createTime;
    private LocalDateTime updateTime;
    private String createBy;
    private String updateBy;
    private Integer isDeleted;            // 软删除标识
}

字段设计要点:

parentTaskId: 建立主子任务关联关系,支持树形查询taskType: 区分MAIN/SUB,便于分层查询与统计totalChannels & completedChannels: 支持主任务进度计算(completedChannels/totalChannels * 100)errorMessage: 存储异常堆栈信息,支持后续定向重试

2.2 数据库表结构设计


CREATE TABLE `bill_import_task` (
  `id` BIGINT AUTO_INCREMENT COMMENT '自增ID' PRIMARY KEY,
  `task_name` VARCHAR(255) NOT NULL COMMENT '任务名称',
  `channel_type` VARCHAR(50) DEFAULT NULL COMMENT '渠道类型',
  `folder_path` VARCHAR(500) DEFAULT NULL COMMENT '文件夹路径',
  `status` VARCHAR(20) NOT NULL DEFAULT 'PENDING' COMMENT '任务状态',
  `progress_percent` INT DEFAULT 0 COMMENT '进度百分比',
  `current_message` VARCHAR(500) DEFAULT NULL COMMENT '当前处理消息',
  `error_message` TEXT DEFAULT NULL COMMENT '错误信息',
  `start_time` TIMESTAMP NULL DEFAULT NULL COMMENT '开始时间',
  `end_time` TIMESTAMP NULL DEFAULT NULL COMMENT '结束时间',
  
  -- 主子任务关联字段
  `parent_task_id` BIGINT DEFAULT NULL COMMENT '父任务ID',
  `task_type` VARCHAR(20) DEFAULT 'MAIN' COMMENT '任务类型:MAIN/SUB',
  `channel_name` VARCHAR(50) DEFAULT NULL COMMENT '渠道名称',
  `total_channels` INT DEFAULT NULL COMMENT '总渠道数',
  `completed_channels` INT DEFAULT 0 COMMENT '已完成渠道数',
  
  -- 标准审计字段
  `create_time` TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  `create_by` VARCHAR(50) DEFAULT '' COMMENT '创建者',
  `update_by` VARCHAR(50) DEFAULT '' COMMENT '更新者',
  `is_deleted` INT DEFAULT 0 COMMENT '软删除标识',
  
  INDEX `idx_parent_task_id` (`parent_task_id`),
  INDEX `idx_status` (`status`),
  INDEX `idx_task_type` (`task_type`),
  INDEX `idx_create_time` (`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='账单导入任务表';

索引设计策略:

idx_parent_task_id: 支持快速查询子任务列表idx_status: 支持按状态筛选(RUNNING/SUCCESS/FAILED)idx_task_type: 支持分层查询(仅查主任务或子任务)idx_create_time: 支持按时间范围筛选任务


第三章 并发处理核心实现

3.1 线程池配置策略

线程池是并发处理的基础设施,参数配置直接影响系统吞吐量与稳定性。


@Configuration
@EnableAsync
public class AsyncConfig {

    @Bean(name = "importTaskExecutor")
    public Executor importTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 核心线程数设置为6,适配实际硬件资源
        executor.setCorePoolSize(6);
        // 最大线程数设置为10,支持峰值并发
        executor.setMaxPoolSize(10);
        // 队列容量设置为20,缓冲等待任务
        executor.setQueueCapacity(20);
        executor.setThreadNamePrefix("ImportTask-");
        executor.setRejectedExecutionHandler(
            new java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy()
        );
        executor.initialize();
        return executor;
    }
}
线程池参数调优策略

线程池参数的选择需要综合考虑硬件资源、业务特点和部署环境。基于生产实践,我们总结了以下经验公式:


// 基础计算公式
corePoolSize = min(CPU核数 * 1.5, 并发任务数)
maxPoolSize = corePoolSize * 1.5 ~ 2
queueCapacity = 预期高峰任务数 - corePoolSize

实战案例分析:

假设服务器配置为8核CPU,业务需要处理15个渠道的并发导入任务。按照公式计算:


corePoolSize = min(8 * 1.5, 15) = 12
maxPoolSize = 12 * 1.5 = 18  
queueCapacity = 20

但在实际部署中,考虑到服务器还需运行数据库、Web容器等组件,我们将参数适当降低:


corePoolSize = 6   // 保留40%计算资源给其他组件
maxPoolSize = 10   // 应对突发流量
queueCapacity = 20 // 任务缓冲区

这样的配置在保证并发处理能力的同时,避免了系统资源竞争导致的整体性能下降。

ThreadPoolExecutor工作流程图解

15个渠道任务的实际执行流程:

提交前6个任务时,直接分配给6个核心线程并行执行第7-26个任务进入队列等待(队列容量20)队列满后,创建额外线程(最多到10个)处理新任务超出最大线程数+队列容量的任务触发拒绝策略

3.2 并行处理主流程实现


@PostMapping("/importAllBills")
@ResponseBody
public Map<String, Object> importAllBills(@RequestParam String folderPath) {
    Map<String, Object> result = new HashMap<>();

    // 前置校验
    ImportValidationUtil.ValidationResult validation = 
        ImportValidationUtil.validateImportAllFolder(folderPath);
    if (!validation.isValid()) {
        result.put("success", false);
        result.put("message", validation.getMessage());
        return result;
    }

    try {
        // ========== 步骤1: 创建主任务 ==========
        String taskName = "一键导入所有账单";
        Long mainTaskId = billImportTaskService.createMainTask(taskName, folderPath);

        // ========== 步骤2: 获取所有渠道文件夹 ==========
        List<String> channelFolders = getChannelFolders(folderPath);

        // ========== 步骤3: 为每个渠道创建子任务 ==========
        List<Long> subTaskIds = new ArrayList<>();
        for (String channelFolder : channelFolders) {
            ChannelEnum channelEnum = getChannelTypeByFolderName(channelFolder);
            if (channelEnum != null) {
                Long subTaskId = billImportTaskService.createSubTask(
                    mainTaskId,
                    channelEnum.getDescr() + "账单导入",
                    channelEnum.getCode(),
                    folderPath + "/" + channelFolder
                );
                subTaskIds.add(subTaskId);
            }
        }

        // ========== 步骤4: 更新主任务的总渠道数 ==========
        billImportTaskService.updateMainTaskChannels(mainTaskId, channelFolders.size());

        result.put("success", true);
        result.put("message", "导入任务已创建成功,主任务ID: " + mainTaskId);
        result.put("mainTaskId", mainTaskId);
        result.put("subTaskCount", subTaskIds.size());

        // ========== 步骤5: 启动异步任务处理(并行执行) ==========
        CompletableFuture.runAsync(() -> {
            billImportTaskService.updateTaskStatus(mainTaskId, "RUNNING", 0, "开始导入...");

            // 使用原子计数器跟踪已完成的任务数
            AtomicInteger completedCount = new AtomicInteger(0);
            int totalTasks = channelFolders.size();

            // 并行处理所有渠道,为每个渠道创建独立的CompletableFuture
            List<CompletableFuture<Void>> futures = new ArrayList<>();
            for (int i = 0; i < channelFolders.size(); i++) {
                String channelFolder = channelFolders.get(i);
                Long subTaskId = subTaskIds.get(i);
                ChannelEnum channelEnum = getChannelTypeByFolderName(channelFolder);

                if (channelEnum != null) {
                    CompletableFuture<Void> future = 
                        processSingleChannelAsync(mainTaskId, subTaskId, channelFolder, folderPath, channelEnum)
                        .whenComplete((voidResult, throwable) -> {
                            // 无论成功或失败,都更新主任务进度
                            int completed = completedCount.incrementAndGet();
                            billImportTaskService.updateMainTaskProgress(
                                mainTaskId, completed, totalTasks
                            );
                        });
                    futures.add(future);
                }
            }

            // 等待所有渠道处理完成
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
                .whenComplete((voidResult, throwable) -> {
                    // 所有任务完成后,更新主任务状态
                    billImportTaskService.updateTaskStatus(
                        mainTaskId, "SUCCESS", 100, "所有渠道导入完成"
                    );
                })
                .join();

        }, importTaskExecutor);

    } catch (Exception e) {
        result.put("success", false);
        result.put("message", "创建导入任务失败:" + e.getMessage());
        logger.error("创建一键导入任务失败", e);
    }

    return result;
}

3.3 AtomicInteger原子计数器机制

在多线程并发环境中,普通的
int
变量无法保证线程安全。
AtomicInteger
基于CAS(Compare-And-Swap)机制实现无锁并发。

CAS机制原理图解

CAS执行流程:

读取内存位置的当前值 V计算新值 V’ = V + 1尝试将内存值更新为V’,但仅当内存值仍等于V时才执行如果CAS失败(内存值已被其他线程修改),重复步骤1-3

优势对比:

同步机制 实现原理 性能 适用场景
synchronized 加锁,悲观策略 低(上下文切换) 复杂临界区操作
AtomicInteger CAS,乐观策略 高(无锁) 简单计数器操作
实际应用代码解析

// 使用原子计数器跟踪已完成的任务数
AtomicInteger completedCount = new AtomicInteger(0);

// 每个子任务完成时回调
future.whenComplete((voidResult, throwable) -> {
    // ⚠️ 关键点: incrementAndGet()保证原子性递增
    int completed = completedCount.incrementAndGet();
    // 更新主任务进度: completed / totalTasks * 100
    billImportTaskService.updateMainTaskProgress(mainTaskId, completed, totalTasks);
});

常用原子操作方法:


AtomicInteger count = new AtomicInteger(0);

// 1. 自增并返回新值
int newValue = count.incrementAndGet(); // ++i

// 2. 自增并返回旧值
int oldValue = count.getAndIncrement(); // i++

// 3. 按指定值增加
int result = count.addAndGet(5); // count += 5

// 4. CAS更新
boolean success = count.compareAndSet(10, 20); // if(count==10) count=20;

// 5. 获取当前值
int current = count.get();

3.4 CompletableFuture高级用法

whenComplete vs exceptionally

// 方法一: whenComplete(同时处理成功和异常)
future.whenComplete((result, throwable) -> {
    if (throwable != null) {
        logger.error("任务执行失败", throwable);
    } else {
        logger.info("任务执行成功");
    }
    // 无论成功或失败,都更新计数器
    completedCount.incrementAndGet();
});

// 方法二: exceptionally(仅处理异常)
future.exceptionally(throwable -> {
    logger.error("任务执行失败", throwable);
    return null; // 返回默认值
});

选型建议:

需要同时处理成功和异常场景: 使用
whenComplete
仅需处理异常场景: 使用
exceptionally
本项目中需要无论成功或失败都更新计数器,故采用
whenComplete

allOf vs anyOf

// allOf: 等待所有任务完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
    .whenComplete((voidResult, throwable) -> {
        logger.info("所有任务已完成");
    })
    .join();

// anyOf: 仅等待任意一个任务完成
CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0]))
    .whenComplete((result, throwable) -> {
        logger.info("有任务已完成");
    });

应用场景:

allOf: 批量任务必须全部完成(本项目采用)anyOf: 竞速场景,多个数据源取最快响应者


第四章 服务层实现

4.1 核心方法实现

4.1.1 主任务进度更新

@Override
public void updateMainTaskProgress(Long mainTaskId, int completedChannels, int totalChannels) {
    BillImportTaskEntity task = getById(mainTaskId);
    if (task != null) {
        task.setCompletedChannels(completedChannels);
        task.setTotalChannels(totalChannels);
        
        // ⚠️ 关键计算: 进度百分比 = (已完成子任务数 / 总子任务数) × 100
        int progressPercent = totalChannels > 0 ? (completedChannels * 100 / totalChannels) : 0;
        task.setProgressPercent(progressPercent);
        
        if (completedChannels >= totalChannels) {
            task.setStatus("SUCCESS");
            task.setEndTime(LocalDateTime.now());
            task.setCurrentMessage("所有渠道处理完成");
        } else {
            task.setCurrentMessage("已完成 " + completedChannels + "/" + totalChannels + " 个渠道");
        }
        
        task.setUpdateTime(LocalDateTime.now());
        task.setUpdateBy("system");
        updateById(task);
    }
}

进度计算示例:

15个渠道,完成9个


progressPercent = (9 / 15) × 100 = 60%
4.1.2 子任务状态更新

@Override
public void updateTaskStatus(Long taskId, String status, Integer progressPercent, String message) {
    BillImportTaskEntity task = getById(taskId);
    if (task != null) {
        task.setStatus(status);
        if (progressPercent != null) {
            task.setProgressPercent(progressPercent);
        }
        task.setCurrentMessage(message);
        task.setUpdateTime(LocalDateTime.now());
        task.setUpdateBy("system");
        
        // 首次进入RUNNING状态时记录开始时间
        if ("RUNNING".equals(status) && task.getStartTime() == null) {
            task.setStartTime(LocalDateTime.now());
        }
        
        // 任务结束时记录结束时间
        if ("SUCCESS".equals(status) || "FAILED".equals(status)) {
            task.setEndTime(LocalDateTime.now());
        }
        
        updateById(task);
    }
}

状态机转换图:


第五章 前端实现

5.1 一键导入页面


<%@ page contentType="text/html;charset=UTF-8" %>
<%@ include file="/WEB-INF/views/taglib.jsp" %>
<html>
<head>
    <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
    <title>一键导入所有账单</title>
    <link rel="stylesheet" href="/css/vip.css" type="text/css">
    <%@ include file="/WEB-INF/views/header.jsp" %>
    <style>
        #importAllApp.vue-app {
            padding: 0 !important;
            width: 100% !important;
        }
        
        .import-all-container {
            background: #ffffff !important;
            border-radius: 8px !important;
            box-shadow: 0 1px 3px rgba(0, 0, 0, 0.1) !important;
            padding: 20px !important;
        }
        
        .import-all-container::before {
            content: '';
            position: absolute;
            top: 0;
            left: 0;
            right: 0;
            height: 3px;
            background: linear-gradient(90deg, #6b7280 0%, #9ca3af 50%, #6b7280 100%);
            border-radius: 8px 8px 0 0;
        }
    </style>
</head>
<body>
    <div class="vue-app">
        <div class="import-all-container">
            <!-- 表单区域 -->
            <div class="form-section">
                <el-form :model="importForm" @submit.prevent="handleImport">
                    <el-form-item label="账单文件夹路径" required>
                        <el-input 
                            v-model="importForm.folderPath" 
                            placeholder="例如:D:\账单数据\2025年7月"
                            size="large"
                            clearable>
                        </el-input>
                    </el-form-item>
                    <el-form-item>
                        <el-button 
                            type="default" 
                            size="large"
                            :loading="isImporting"
                            @click="handleImport">
                            导入
                        </el-button>
                    </el-form-item>
                </el-form>
            </div>

            <!-- 状态消息 -->
            <div v-if="statusMessage" class="status-section">
                <el-alert 
                    :title="statusMessage"
                    :type="statusType"
                    :closable="false"
                    show-icon>
                </el-alert>
            </div>
        </div>
    </div>

    <script>
        const { createApp, ref, reactive } = Vue;
        const { ElMessage } = ElementPlus;
        const zhCn = window.ElementPlusZhCn;
        
        createApp({
            setup() {
                const importForm = reactive({
                    folderPath: ''
                });
                
                const isImporting = ref(false);
                const statusMessage = ref('');
                const statusType = ref('');
                const currentTaskId = ref(null);
                
                const showStatus = (message, type = 'info') => {
                    statusMessage.value = message;
                    statusType.value = type;
                };
                
                const handleImport = () => {
                    if (!importForm.folderPath.trim()) {
                        ElMessage.warning('请输入账单文件夹路径!');
                        return;
                    }
                    
                    if (isImporting.value) {
                        ElMessage.warning('正在导入中,请勿重复操作!');
                        return;
                    }
                    
                    isImporting.value = true;
                    showStatus('正在创建导入任务...', 'info');

                    // 使用fetch API提交请求
                    const formData = new URLSearchParams();
                    formData.append('folderPath', importForm.folderPath);
                    
                    fetch('/bill/importAllBills', {
                        method: 'POST',
                        headers: {
                            'Content-Type': 'application/x-www-form-urlencoded'
                        },
                        body: formData
                    })
                    .then(response => response.json())
                    .then(data => {
                        if (data.success) {
                            currentTaskId.value = data.mainTaskId;
                            showStatus(
                                '导入任务已创建成功,任务ID: ' + data.mainTaskId + 
                                ',共创建 ' + data.subTaskCount + ' 个子任务', 
                                'success'
                            );
                            ElMessage.success('请到"任务进度"菜单查看执行状态');
                            
                            // 通知父窗口开始任务进度监控
                            try {
                                if (window.parent && window.parent !== window) {
                                    window.parent.postMessage({ type: 'IMPORT_STARTED' }, '*');
                                }
                            } catch (e) {
                                console.warn('无法向父窗口发送消息:', e);
                            }
                        } else {
                            showStatus('导入出错: ' + data.message, 'error');
                            ElMessage.error(data.message);
                        }
                    })
                    .catch(error => {
                        showStatus('网络错误: 无法连接到服务器', 'error');
                        ElMessage.error('网络错误');
                        console.error('请求失败:', error);
                    })
                    .finally(() => {
                        isImporting.value = false;
                    });
                };
                
                return {
                    importForm,
                    isImporting,
                    statusMessage,
                    statusType,
                    currentTaskId,
                    handleImport
                };
            }
        }).use(ElementPlus, {
            locale: zhCn
        }).mount('#importAllApp');
    </script>
</body>
</html>

关键技术点:

Fetch API替代传统AJAX

使用现代浏览器原生支持的fetch API支持Promise链式调用,代码更简洁通过then/catch处理成功与失败

postMessage跨窗口通信


window.parent.postMessage({ type: 'IMPORT_STARTED' }, '*');

通知父窗口(菜单页面)开始任务进度监控触发父页面中”任务进度”菜单项的闪烁提示实现跨iframe通信

Vue 3 Composition API

使用setup函数组织响应式数据与方法ref/reactive创建响应式变量避免this上下文混乱问题

5.2 任务列表展示页面(核心片段)


<div class="data-section">
    <table border="1" cellpadding="0" cellspacing="0">
        <thead>
            <tr class="table-header-bg">
                <th width="5%">ID</th>
                <th width="15%">任务名称</th>
                <th width="8%">任务类型</th>
                <th width="10%">所属主任务</th>
                <th width="8%">渠道类型</th>
                <th width="8%">状态</th>
                <th width="10%">进度</th>
                <th width="15%">当前消息</th>
                <th width="13%">创建时间</th>
                <th width="8%">完成时间</th>
            </tr>
        </thead>
        <tbody>
            <c:forEach items="${taskList}" var="task">
                <tr>
                    <td>${task.id}</td>
                    <td>${task.taskName}</td>
                    <td>
                        <c:choose>
                            <c:when test="${task.taskType == 'MAIN'}">主任务</c:when>
                            <c:when test="${task.taskType == 'SUB'}">子任务</c:when>
                            <c:otherwise>独立任务</c:otherwise>
                        </c:choose>
                    </td>
                    <td>${task.parentTaskName != null ? task.parentTaskName : '-'}</td>
                    <td>${task.channelName != null ? task.channelName : '-'}</td>
                    <td>
                        <span class="status-badge status-${fn:toLowerCase(task.status)}">
                            <c:choose>
                                <c:when test="${task.status == 'PENDING'}">待处理</c:when>
                                <c:when test="${task.status == 'RUNNING'}">运行中</c:when>
                                <c:when test="${task.status == 'SUCCESS'}">成功</c:when>
                                <c:when test="${task.status == 'FAILED'}">失败</c:when>
                                <c:otherwise>${task.status}</c:otherwise>
                            </c:choose>
                        </span>
                    </td>
                    <td>
                        <div class="progress-bar">
                            <div class="progress-fill"></div>
                        </div>
                        <div class="task-details">
                            ${task.progressPercent}%
                            <c:if test="${task.taskType == 'MAIN' && task.totalChannels != null}">
                                (${task.completedChannels}/${task.totalChannels})
                            </c:if>
                        </div>
                    </td>
                    <td>${task.currentMessage != null ? task.currentMessage : '-'}</td>
                    <td>
                        <fmt:formatDate value="${task.createTime}" pattern="yyyy-MM-dd HH:mm:ss" />
                    </td>
                    <td>
                        <c:choose>
                            <c:when test="${task.endTime != null}">
                                <fmt:formatDate value="${task.endTime}" pattern="HH:mm:ss" />
                            </c:when>
                            <c:otherwise>-</c:otherwise>
                        </c:choose>
                    </td>
                </tr>
                
                <!-- 显示错误信息(如果有) -->
                <c:if test="${task.errorMessage != null && !empty task.errorMessage}">
                    <tr>
                        <td colspan="10" class="error-message">
                            错误信息: ${task.errorMessage}
                        </td>
                    </tr>
                </c:if>
            </c:forEach>
        </tbody>
    </table>
</div>

JSP + Vue混合架构特点:

服务端渲染表格数据

使用JSTL标签
<c:forEach>
遍历后端传递的
taskList
避免前端二次请求,减少网络开销适配传统JSP技术栈,降低迁移成本

进度条可视化


<div class="progress-bar">
    <div class="progress-fill"></div>
</div>

CSS宽度百分比直接映射数据库进度字段线性渐变背景色增强视觉效果


第六章 核心技术深度剖析

6.1 线程池拒绝策略

当线程池无法接受新任务时,会触发拒绝策略。Java提供了四种内置策略:

策略 行为 适用场景
CallerRunsPolicy 调用者线程执行任务 任务不能丢失,允许降低吸吐量
AbortPolicy 抛出RejectedExecutionException 需要感知任务拒绝并处理
DiscardPolicy 静默丢弃任务 任务可丢失,不需要反馈
DiscardOldestPolicy 丢弃最旧的任务,提交新任务 新任务优先级高于旧任务

本项目采用CallerRunsPolicy:


executor.setRejectedExecutionHandler(
    new java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy()
);

原因:

账单导入任务不允许丢失当线程池满载时,由主线程同步执行任务,起到限流作用

6.3 大文件流式解析

单个Excel文件如果超过10MB,采用EasyExcel流式读取:


public abstract class AbstractBaseExcelParser<T> implements BillFileParser {
    
    protected void processAndSaveDirectly(String filePath, Consumer<List<T>> batchConsumer) {
        try {
            EasyExcel.read(filePath, getEntityClass(), new PageReadListener<T>(dataList -> {
                // 每读取1000行回调一次
                List<T> processedList = dataList.stream()
                    .map(this::processRow)
                    .filter(Objects::nonNull)
                    .collect(Collectors.toList());
                
                if (!processedList.isEmpty()) {
                    batchConsumer.accept(processedList); // 分批入库
                }
            }, 1000)).sheet().doRead();
        } catch (Exception e) {
            throw new RuntimeException("解析Excel文件失败: " + filePath, e);
        }
    }
}

内存优化特点:

不一次性加载整个文件到内存每批次处理1000行,处理完立即释放内存占用始终维持在100-200MB之间


第七章 最佳实践总结

7.1 架构设计原则

任务粒度划分

主任务代表业务流程,子任务代表执行单元子任务间互不依赖,支持并行执行通过parentTaskId建立层级关系

异常隔离设计

子任务内部完全捕获异常,不向外抛出异常信息记录到数据库,便于后续排查主任务通过子任务状态统计判断整体执行情况

进度反馈机制

子任务级别:通过ProgressCallback实时上报处理进度主任务级别:通过AtomicInteger统计已完成子任务数前端轮询:定时读取数据库任务状态更新UI

7.2 并发编程要点

线程池参数选型

corePoolSize根据CPU核数和并发任务数综合考虑maxPoolSize预留缓冲空间应对突发流量queueCapacity避免过大导致内存溢出

CompletableFuture使用规范

明确指定执行线程池,避免使用默认ForkJoinPool使用whenComplete处理成功和异常两种情况allOf等待所有任务完成,join()阻塞主线程

原子类使用场景

多线程共享计数器:使用AtomicInteger复杂状态管理:使用AtomicReference布尔标志位:使用AtomicBoolean

7.3 性能优化检查清单

线程池参数是否与硬件资源匹配 数据库连接池大小是否足够 Excel文件是否采用流式解析 数据库入库是否采用批量插入 关键表是否建立索引


结语

本文基于真实项目实战,系统性阐述了如何构建一套完整的主子任务并行处理架构,并通过Java 17的CompletableFuture、AtomicInteger等并发工具实现了高性能的多渠道账单数据导入。核心成果包括:

性能突破:相比串行处理,整体耗时从2.5小时降低至30分钟,提升5倍稳定性增强:通过异常隔离机制,单渠道失败不影响其他渠道执行可观测性:主子任务双层设计,支持细粒度进度追踪与错误定位可扩展性:线程池参数化配置,轻松适配不同部署环境

该架构不仅适用于账单导入场景,还可广泛应用于批量文件处理、数据迁移、报表生成等需要高并发、高可靠的企业级业务场景。希望本文的技术方案与实战经验能为读者的系统设计与优化工作提供参考价值。

© 版权声明

相关文章

暂无评论

您必须登录才能参与评论!
立即登录
none
暂无评论...