如题,这个场景实则还挺常见的,有一些任务需要异步处理,但必须保证它们的执行顺序。当然,这些任务根据不同的业务ID进行组织,要进入不同的队列并行处理,每个队列有序。
捋一下思路,我们需要一个Map来存储不同的任务队列,每个队列需要一个单线程的任务执行器来处理。据此可以写出如下的代码
public class TaskProcessor {
// 队列ID与对应队列的映射
private final Map<String, Queue<Runnable>> queues = new HashMap<>();
// 队列ID与对应执行器的映射
private final Map<String, ExecutorService> executors = new HashMap<>();
// 向指定队列添加任务
public void addTask(String queueId, Runnable task) {
// 获取或创建指定ID的队列
Queue<Runnable> queue = queues.computeIfAbsent(queueId, k -> new ConcurrentLinkedQueue<>());
// 获取或创建指定ID队列的执行器(单线程,保证顺序执行)
ExecutorService executor = executors.computeIfAbsent(queueId, k -> Executors.newSingleThreadExecutor(runnable -> {
Thread thread = new Thread(runnable);
thread.setName("QueueProcessor-" + queueId);
thread.setDaemon(true);
return thread;
}));
// 将任务添加到队列
queue.add(task);
// 提交一个任务处理器到执行器,处理队列中的所有任务
executor.submit(() -> {
Runnable nextTask;
// 循环处理队列中的任务,直到队列为空
while ((nextTask = queue.poll()) != null) {
try {
nextTask.run();
} catch (Exception e) {
System.err.println("Error processing task in queue " + queueId + ": " + e.getMessage());
e.printStackTrace();
}
}
});
}
}
写一个调用示例看一下
public static void main(String[] args) {
TaskProcessor processor = new TaskProcessor();
processor.addTask("biz1", () -> {
// do something
System.out.println("执行biz1 -> 任务1");
});
processor.addTask("biz1", () -> {
// do something
System.out.println("执行biz1 -> 任务2");
});
processor.addTask("biz2", () -> {
// do something
System.out.println("执行biz2 -> 任务1");
});
processor.addTask("biz2", () -> {
// do something
System.out.println("执行biz2 -> 任务2");
});
}
当然,还可以做的更好一些,列如提取一个Task类,加一些任务控制的方法,多个队列用线程池管理调度等等。
这里,queueId是与业务强关联的,列如,队列代表一次会话,那么queueId就是dialogId,顺序执行这次会话的各流程模块,多个会话可以并行执行。
这是一种比较直接的处理方案,另外还可以借助消息队列MQ来处理这种问题,需要根据实际情况来,但要注意是否值得引入复杂性。
© 版权声明
文章版权归作者所有,未经允许请勿转载。
相关文章
暂无评论...


