我们需要异步处理不同类型的对象。每种/类型的对象都使用 API 密钥进行处理。
每个 API 密钥对并发使用都有自己的限制(例如一个 API 密钥的并行会话不超过 5 个)。
我们对工作线程计数(CPU 限制)有全局限制。
我们希望在工作线程限制内进行尽可能多的 API 调用。
可能的解决方案:
2 tasks with KEY1 (max 2 session) - total 3 workers
5 tasks with KEY2 (max 3 session) -/
是:
1. worker1: KEY2, worker2: KEY2, worker3: KEY2 (in queue: 2x KEY1, 2x KEY2)
2. worker1: KEY1, worker2: KEY2, worker3: KEY2 (in queue: 1x KEY1, 3x KEY2)
3. worker1: KEY1, worker2: KEY1, worker3: KEY2 (in queue: 4x KEY2)
可能的解决方案:
3 tasks with KEY1 (max 1 session) & 3 workers
是:
1. worker1: KEY1, worker2: IDLE, worker3: IDLE, (in queue 2x KEY1)
执行顺序无关紧要(但我们希望听到接近先进先出的策略),最大吞吐量是最重要的。
目前尚不清楚选择哪种实施策略。
ThreadExecutor
任何队列都不够,因为您需要知道ThreadExecutor
当前使用了哪些 API 密钥。
我不确定我是否正确回答了这个问题,但您需要的是每个 API 密钥的Semaphore
。
Semaphore key1Semaphore = new Semaphore(2);
Semaphore key2Semaphore = new Semaphore(3);
您可以检查key1Semaphore
是否有许可证,并通过致电key1Semaphore.tryAcquire()
获取许可证(如果有)。这是非阻塞的,因此如果它失败并返回 false,您可以尝试从另一个 API 密钥获取信号量并从中提交任务。
重要的是,在使用其中一个 API 密钥的任务结束时,信号量许可证将被释放回来。
您可能需要一个额外的对象来与wait()
和notify()
同步,以便在任务完成时通知正在调度任务的主线程再次检查信号量。
因此,从本质上讲,您得到的是,您的任务调度程序将向您ExecutorService
3 个工作人员提交 5 个任务,然后在其中一个信号量许可证发布之前,它将无法再提交任何任务。
当任务完成并释放许可证时,调度程序会收到通知,因此它取消阻止等待,并再次按顺序检查信号量并将任务提交给ExecutorService
。
此解决方案有点偏向于第一个 API 密钥,但您可以通过检查每个密钥的任务长度并更公平地分配它们来进一步完善它。您甚至可以旋转索引,以便每次循环都将索引递增 1,以便第一次从 API KEY 1 开始,第二次从 API KEY 2 开始,依此类推。
我可能会创建一个维护的服务
- 包含由任务和相应键组成的条目的单个
Queue
, - 一个带有密钥和该密钥的已运行线程 (
Map<String,AtomicInteger>
的Map
),以及 - 具有全局允许的线程计数的
ThreadPoolExecutor
。
如果全局线程计数已满并且已提交任务,则会将其置于队列的末尾。
如果全局线程计数未满,则检查与请求键对应的映射值的键线程限制;如果达到,则任务将放回队列中,否则提交到执行器服务。
"提交到执行器服务"不会直接提交任务,但会增加关键线程计数,并将任务包装到一个Runnable
中,这将另外 1. 减少映射中的关键线程计数和 2. 触发队列的重新评估,以便提交新任务(如果适用)。
也可以在BlockingQueue
中创建"每个键的活动计数"逻辑,该逻辑将返回first()
包含未达到最大计数的键任务的下一个元素,并将其作为管理队列传递给ThreadPoolExecutor
构造函数;但我确信这会破坏队列合约并且使用起来并不完全安全。