异步任务处理场景——多个队列分别顺序执行

如题,这个场景实则还挺常见的,有一些任务需要异步处理,但必须保证它们的执行顺序。当然,这些任务根据不同的业务ID进行组织,要进入不同的队列并行处理,每个队列有序。

捋一下思路,我们需要一个Map来存储不同的任务队列,每个队列需要一个单线程的任务执行器来处理。据此可以写出如下的代码

public class TaskProcessor {
    // 队列ID与对应队列的映射
    private final Map<String, Queue<Runnable>> queues = new HashMap<>();
    // 队列ID与对应执行器的映射
    private final Map<String, ExecutorService> executors = new HashMap<>();

    // 向指定队列添加任务
    public void addTask(String queueId, Runnable task) {
        // 获取或创建指定ID的队列
        Queue<Runnable> queue = queues.computeIfAbsent(queueId, k -> new ConcurrentLinkedQueue<>());
        // 获取或创建指定ID队列的执行器(单线程,保证顺序执行)
        ExecutorService executor = executors.computeIfAbsent(queueId, k -> Executors.newSingleThreadExecutor(runnable -> {
            Thread thread = new Thread(runnable);
            thread.setName("QueueProcessor-" + queueId);
            thread.setDaemon(true);
            return thread;
        }));

        // 将任务添加到队列
        queue.add(task);

        // 提交一个任务处理器到执行器,处理队列中的所有任务
        executor.submit(() -> {
            Runnable nextTask;
            // 循环处理队列中的任务,直到队列为空
            while ((nextTask = queue.poll()) != null) {
                try {
                    nextTask.run();
                } catch (Exception e) {
                    System.err.println("Error processing task in queue " + queueId + ": " + e.getMessage());
                    e.printStackTrace();
                }
            }
        });
    }
}

写一个调用示例看一下

public static void main(String[] args) {
    TaskProcessor processor = new TaskProcessor();

    processor.addTask("biz1", () -> {
        // do something
        System.out.println("执行biz1 -> 任务1");
    });
    processor.addTask("biz1", () -> {
        // do something
        System.out.println("执行biz1 -> 任务2");
    });
    processor.addTask("biz2", () -> {
        // do something
        System.out.println("执行biz2 -> 任务1");
    });
    processor.addTask("biz2", () -> {
        // do something
        System.out.println("执行biz2 -> 任务2");
    });
}

当然,还可以做的更好一些,列如提取一个Task类,加一些任务控制的方法,多个队列用线程池管理调度等等。

这里,queueId是与业务强关联的,列如,队列代表一次会话,那么queueId就是dialogId,顺序执行这次会话的各流程模块,多个会话可以并行执行。

这是一种比较直接的处理方案,另外还可以借助消息队列MQ来处理这种问题,需要根据实际情况来,但要注意是否值得引入复杂性。

© 版权声明

相关文章

暂无评论

您必须登录才能参与评论!
立即登录
none
暂无评论...