我已经考虑了一段时间,并且随着时间的推移,我对线程、执行器等的了解越来越多。我对执行器和线程有一个粗略的理解,但感觉有点卡住了。
这就是我要做的。
有命令,也有动作。例如,命令是命名的,用户可以任意调用它!播放歌曲!欢呼等。Action是一种将工作发送到服务的东西;例如,要求websocket客户端发送一条新消息,或要求IRC客户端发送新消息,等等。
当一个命令被执行时,它会依次执行其操作。
例如!欢呼指令可能有四个动作:
- 发出websocket请求,并等待成功响应(例如:在OBS中显示场景项(
- 发送IRC消息(例如:发送聊天消息(。一旦发送
- 等待1-3秒(例如:等待视频播放完毕(。一旦等待结束,那么
- 发出另一个websocket请求(例如:从步骤1隐藏场景项(
这些不仅必须按顺序执行,而且我们也不能让所有操作同时开始(操作1、2和4首先完成,然后操作3最后完成(;每个动作都取决于它的前一个动作是否首先完成。
最重要的是,客户端可以在任何时候任意提交命令,并且不能相互阻止。例如longcommand可以启动,但不会阻止!shortcommand启动(假设底层服务没有被阻止(。
我想做的是:
我知道我可以使用Future/Callable来阻止在给定线程上执行的挂起结果,所以每个Action在运行时都应该返回一个Future(Future来自它使用的相应服务(。然后,我可以简单地在命令上以这样的阻塞方式逐个调用操作,以确保它们按顺序执行,并且每个操作都等待另一个操作完成:
class ExecutableCommand implments Runnable {
// omitted for brevity
run() {
for(Action action:command.getActions()) {
action.run().get();
}
}
但是我该如何处理执行命令呢?我想我会通过一个执行器提交每个命令,也许是像这样的ThreadPoolExecutor,因为每个命令都是提交的?
class ExecutorServiceWrapper {
private final ExecutorService executorService = Executors.newThreadPoolExecutor(4);
void submit(ExecutableCommand command) {
executorService.submit(command)
}
}
然后,每个客户端ofc只需保留一个对ExecutorServiceWrapper的引用,并调用它来响应触发它们的事件:
class FromChatHandler() {
private final ExecutorServiceWrapper masterQueue;
onMessage(String message) {
Command command = // parse what command to lookup from message
masterQueue.submit(command)
}
}
@RestController // or whatever
class MyController() {
private final ExecutorServiceWrapper masterQueue;
@Post
executeCommandByName(String commandName) {
Command command = // lookup command
masterQueue.submit(command)
}
}
class directHandler() {
private final ExecutorServiceWrapper masterQueue;
handle(Command command) {
Command command = // build the command given the message
masterQueue.submit(command)
}
}
我假设,由于每个命令都被提交给执行器,所以每个命令都将转到自己的线程,这样它就不会阻塞其他命令。
但我不确定我是否应该像上面那样用ExecutableCommand执行命令,并像我一样执行命令中的每个动作。
此外,我不确定它是否能处理这种情况:线程池固定为5个线程。已经执行了5个命令。它们运行时间很长,使用不同的服务,但底层服务没有被阻止,仍然可以接受工作。有人试图执行第六个命令——不应该阻止它们,因为底层服务仍然可以接受工作。
有更好的方法吗?我走对了吗?
在这方面花了更多的时间之后,我提出了一些使用Executors或Futures的可能解决方案。还不确定哪一个会比另一个更好,但由于我知道我可以扩展ThreadPoolExecutor(比如说,添加一个暂停功能(,我可能会倾向于Executor。
否则,如果有人有意见,我们随时欢迎!
我现在把这两种解决方案都保存在GH中((,但我也会把它们放在下面。https://github.com/TinaTiel/concurrency-learning
期货实施
package futures;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
public class CommandActionExample {
public static void main(String[] args) {
// Initialize some starting params
Random random = new Random();
int maxActions = 20;
int maxCommands = 5;
// Generate some commands, with a random number of actions.
// We'll use the indexes as the command and action names to keep it simple/readable
List<Command> commands = new ArrayList<>();
for(Integer c = 0; c < maxCommands; c++) {
Command command = new Command(String.format("%d", c+1));
for(Integer a = 0; a < random.nextInt(maxActions); a++) {
Action action = new Action(random, String.format("%d", a+1));
command.addAction(action);
}
commands.add(command);
}
// Print out the commands we'll execute, again to keep the results readable/understandable
System.out.println("Commands to execute: n" + commands.stream().map(Command::toString).collect(Collectors.joining("n")) + "n");
// Build a Future that tries to execute all commands (supplied as Futures) in an arbitrary order
try {
CompletableFuture.allOf(commands.stream()
.map((Function<Command, CompletableFuture<Void>>) CompletableFuture::runAsync)
.collect(Collectors.toList())
.toArray(CompletableFuture[]::new)
).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
// commands.get(0).run(); // sanity check one of the command's actions run as expected
// When we execute the results, the actions should be executed in-order within a command at some point in the future
// (not started all at once), so something like:
// 0 Command-2:Action-1 scheduled at 34
// 0 Command-1:Action-1 scheduled at 21
// 0 Command-3:Action-1 scheduled at 4
// 4 Command-3:Action2 scheduled at ...
// 21 Command-1:Action-2 scheduled at ...
// 34 Command-1-Action-2 scheduled at ...
// ...
// Now how to test this...Maybe with JUnit inOrder.verify(...).run() ?
}
public static class Action implements Runnable {
private Command command;
private final Random random;
private final String name;
public Action(Random random, String name) {
this.random = random;
this.name = name;
}
public void setCommand(Command command) {
this.command = command;
}
@Override
public void run() {
// Simply sleep for a random period of time. This simulates pieces of work being done (network request, etc.)
long msTime = random.nextInt(1000);
System.out.println(new Timestamp(System.currentTimeMillis()) + ": Command-" + command.name + ":Action-" + name + " executing on Thread '" + Thread.currentThread().getName() + "' executing for " + msTime + "ms");
try {
Thread.sleep(msTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return "Action{" +
"name='" + name + ''' +
'}';
}
}
public static class Command implements Runnable {
private final String name;
private final List<Action> actions = new ArrayList<>();
public Command(String name) {
this.name = name;
}
public void addAction(Action action) {
action.setCommand(this);
actions.add(action);
}
@Override
public void run() {
// If there are no actions, then do nothing
if(actions.isEmpty()) return;
// Build up a chain of futures.
// Looks like we have to build them up in reverse order, so start with the first action...
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(actions.remove(0));
// ...And then reverse the list and build the rest of the chain
// (yes we could execute backwards...but it's not common and I/others probably don't like to reason about it)
Collections.reverse(actions);
for(int i=0; i< actions.size(); i++) {
completableFuture.thenRun(actions.get(i));
}
// Execute our chain
try {
completableFuture.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return "Command{" +
"name='" + name + ''' +
", actions=" + actions +
'}';
}
}
}
结果
输出和计划如预期,但Futures似乎使用了ForkJoinPool。
Commands to execute:
Command{name='1', actions=[Action{name='1'}, Action{name='2'}, Action{name='3'}]}
Command{name='2', actions=[Action{name='1'}, Action{name='2'}, Action{name='3'}, Action{name='4'}, Action{name='5'}, Action{name='6'}]}
Command{name='3', actions=[Action{name='1'}, Action{name='2'}, Action{name='3'}, Action{name='4'}, Action{name='5'}, Action{name='6'}, Action{name='7'}]}
Command{name='4', actions=[Action{name='1'}, Action{name='2'}, Action{name='3'}, Action{name='4'}, Action{name='5'}, Action{name='6'}]}
Command{name='5', actions=[Action{name='1'}, Action{name='2'}, Action{name='3'}, Action{name='4'}, Action{name='5'}, Action{name='6'}]}
2020-12-30 21:17:27.11: Command-2:Action-1 executing on Thread 'ForkJoinPool.commonPool-worker-5' executing for 207ms
2020-12-30 21:17:27.11: Command-4:Action-1 executing on Thread 'ForkJoinPool.commonPool-worker-9' executing for 930ms
2020-12-30 21:17:27.11: Command-1:Action-1 executing on Thread 'ForkJoinPool.commonPool-worker-3' executing for 948ms
2020-12-30 21:17:27.11: Command-3:Action-1 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 173ms
2020-12-30 21:17:27.11: Command-5:Action-1 executing on Thread 'ForkJoinPool.commonPool-worker-11' executing for 348ms
2020-12-30 21:17:27.314: Command-3:Action-2 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 890ms
2020-12-30 21:17:27.345: Command-2:Action-2 executing on Thread 'ForkJoinPool.commonPool-worker-5' executing for 178ms
2020-12-30 21:17:27.485: Command-5:Action-2 executing on Thread 'ForkJoinPool.commonPool-worker-11' executing for 702ms
2020-12-30 21:17:27.485: Command-5:Action-3 executing on Thread 'ForkJoinPool.commonPool-worker-15' executing for 161ms
2020-12-30 21:17:27.532: Command-2:Action-3 executing on Thread 'ForkJoinPool.commonPool-worker-5' executing for 201ms
2020-12-30 21:17:27.657: Command-5:Action-4 executing on Thread 'ForkJoinPool.commonPool-worker-15' executing for 257ms
2020-12-30 21:17:27.735: Command-2:Action-4 executing on Thread 'ForkJoinPool.commonPool-worker-5' executing for 518ms
2020-12-30 21:17:27.919: Command-5:Action-5 executing on Thread 'ForkJoinPool.commonPool-worker-15' executing for 258ms
2020-12-30 21:17:28.06: Command-4:Action-2 executing on Thread 'ForkJoinPool.commonPool-worker-9' executing for 926ms
2020-12-30 21:17:28.075: Command-1:Action-2 executing on Thread 'ForkJoinPool.commonPool-worker-3' executing for 413ms
2020-12-30 21:17:28.184: Command-5:Action-6 executing on Thread 'ForkJoinPool.commonPool-worker-15' executing for 77ms
2020-12-30 21:17:28.216: Command-3:Action-3 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 487ms
2020-12-30 21:17:28.263: Command-2:Action-5 executing on Thread 'ForkJoinPool.commonPool-worker-5' executing for 570ms
2020-12-30 21:17:28.497: Command-1:Action-3 executing on Thread 'ForkJoinPool.commonPool-worker-3' executing for 273ms
2020-12-30 21:17:28.716: Command-3:Action-4 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 302ms
2020-12-30 21:17:28.833: Command-2:Action-6 executing on Thread 'ForkJoinPool.commonPool-worker-5' executing for 202ms
2020-12-30 21:17:28.992: Command-4:Action-3 executing on Thread 'ForkJoinPool.commonPool-worker-9' executing for 733ms
2020-12-30 21:17:29.024: Command-3:Action-5 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 756ms
2020-12-30 21:17:29.727: Command-4:Action-4 executing on Thread 'ForkJoinPool.commonPool-worker-9' executing for 131ms
2020-12-30 21:17:29.78: Command-3:Action-6 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 920ms
2020-12-30 21:17:29.858: Command-4:Action-5 executing on Thread 'ForkJoinPool.commonPool-worker-9' executing for 305ms
2020-12-30 21:17:30.168: Command-4:Action-6 executing on Thread 'ForkJoinPool.commonPool-worker-9' executing for 612ms
2020-12-30 21:17:30.715: Command-3:Action-7 executing on Thread 'ForkJoinPool.commonPool-worker-7' executing for 330ms
执行器实现
package executors;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.stream.Collectors;
public class CommandActionExample {
public static void main(String[] args) {
// Initialize some starting params
Random random = new Random();
int maxActions = 20;
int maxCommands = 5;
// Generate some commands, with a random number of actions.
// We'll use the indexes as the command and action names to keep it simple/readable
List<Command> commands = new ArrayList<>();
for(Integer c = 0; c < maxCommands; c++) {
Command command = new Command(String.format("%d", c+1));
for(Integer a = 0; a < random.nextInt(maxActions); a++) {
Action action = new Action(random, String.format("%d", a+1));
command.addAction(action);
}
commands.add(command);
}
// Print out the commands we'll execute, again to keep the results readable/understandable
System.out.println("Commands to execute: n" + commands.stream().map(Command::toString).collect(Collectors.joining("n")) + "n");
ExecutorService executorService = Executors.newFixedThreadPool(20);
for(Command command:commands) executorService.submit(command);
// When we execute the results, the actions should be executed in-order within a command at some point in the future
// (not started all at once), so something like:
// 0 Command-2:Action-1 scheduled at 34
// 0 Command-1:Action-1 scheduled at 21
// 0 Command-3:Action-1 scheduled at 4
// 4 Command-3:Action2 scheduled at ...
// 21 Command-1:Action-2 scheduled at ...
// 34 Command-1-Action-2 scheduled at ...
// ...
// Now how to test this...Maybe with JUnit inOrder.verify(...).run() ?
}
public static class Action implements Runnable {
private Command command;
private final Random random;
private final String name;
public Action(Random random, String name) {
this.random = random;
this.name = name;
}
public void setCommand(Command command) {
this.command = command;
}
@Override
public void run() {
// Simply sleep for a random period of time. This simulates pieces of work being done (network request, etc.)
long msTime = random.nextInt(1000);
System.out.println(new Timestamp(System.currentTimeMillis()) + ": Command-" + command.name + ":Action-" + name + " executing on Thread '" + Thread.currentThread().getName() + "' executing for " + msTime + "ms");
try {
Thread.sleep(msTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return "Action{" +
"name='" + name + ''' +
'}';
}
}
public static class Command implements Runnable {
private final String name;
private final List<Action> actions = new ArrayList<>();
public Command(String name) {
this.name = name;
}
public void addAction(Action action) {
action.setCommand(this);
actions.add(action);
}
@Override
public void run() {
// If there are no actions, then do nothing
if(actions.isEmpty()) return;
ExecutorService executor = Executors.newSingleThreadExecutor(); // Because there is only one thread, this has the effect of executing sequentially and blocking
for(Action action:actions) executor.submit(action);
}
@Override
public String toString() {
return "Command{" +
"name='" + name + ''' +
", actions=" + actions +
'}';
}
}
}
结果
输出和时间表符合预期
2020-12-31 09:43:09.952: Command-3:Action-1 executing on Thread 'pool-4-thread-1' executing for 632ms
2020-12-31 09:43:09.952: Command-4:Action-1 executing on Thread 'pool-3-thread-1' executing for 932ms
2020-12-31 09:43:09.952: Command-2:Action-1 executing on Thread 'pool-6-thread-1' executing for 586ms
2020-12-31 09:43:09.952: Command-5:Action-1 executing on Thread 'pool-5-thread-1' executing for 987ms
2020-12-31 09:43:09.952: Command-1:Action-1 executing on Thread 'pool-2-thread-1' executing for 706ms
2020-12-31 09:43:10.562: Command-2:Action-2 executing on Thread 'pool-6-thread-1' executing for 329ms
2020-12-31 09:43:10.608: Command-3:Action-2 executing on Thread 'pool-4-thread-1' executing for 503ms
2020-12-31 09:43:10.891: Command-2:Action-3 executing on Thread 'pool-6-thread-1' executing for 443ms
2020-12-31 09:43:10.9: Command-4:Action-2 executing on Thread 'pool-3-thread-1' executing for 866ms
2020-12-31 09:43:10.955: Command-5:Action-2 executing on Thread 'pool-5-thread-1' executing for 824ms
2020-12-31 09:43:11.346: Command-2:Action-4 executing on Thread 'pool-6-thread-1' executing for 502ms
2020-12-31 09:43:11.766: Command-4:Action-3 executing on Thread 'pool-3-thread-1' executing for 638ms
2020-12-31 09:43:11.779: Command-5:Action-3 executing on Thread 'pool-5-thread-1' executing for 928ms
2020-12-31 09:43:11.848: Command-2:Action-5 executing on Thread 'pool-6-thread-1' executing for 179ms
2020-12-31 09:43:12.037: Command-2:Action-6 executing on Thread 'pool-6-thread-1' executing for 964ms
2020-12-31 09:43:12.412: Command-4:Action-4 executing on Thread 'pool-3-thread-1' executing for 370ms
2020-12-31 09:43:12.709: Command-5:Action-4 executing on Thread 'pool-5-thread-1' executing for 204ms
2020-12-31 09:43:12.783: Command-4:Action-5 executing on Thread 'pool-3-thread-1' executing for 769ms
2020-12-31 09:43:12.913: Command-5:Action-5 executing on Thread 'pool-5-thread-1' executing for 188ms
2020-12-31 09:43:13.102: Command-5:Action-6 executing on Thread 'pool-5-thread-1' executing for 524ms
2020-12-31 09:43:13.555: Command-4:Action-6 executing on Thread 'pool-3-thread-1' executing for 673ms
2020-12-31 09:43:13.634: Command-5:Action-7 executing on Thread 'pool-5-thread-1' executing for 890ms
2020-12-31 09:43:14.23: Command-4:Action-7 executing on Thread 'pool-3-thread-1' executing for 147ms
2020-12-31 09:43:14.527: Command-5:Action-8 executing on Thread 'pool-5-thread-1' executing for 538ms