我有一个服务器应用程序,它处理 2 个作业(文件处理(,它们是完全不同的流。 我调用客户端提供 job1 或作业 2,因此在某个时间间隔内可能是这 2 个中的任何一个.由于文件是从文件夹中挑选的,我想同步作业,即例如,如果我在第一次触发作业 2 后再次快速触发作业 2;其中的文件可能没有被处理,所以我不应该再拿起它们.而且我宁愿等待第一个执行器(执行器服务(完成,然后再触发另一个作业/执行器。
我尝试使用存储作业和执行器的映射并尝试同步映射.但我不确定如何继续。为了清楚起见,省略了一些代码。
服务器类:
public class ProcessServer {
public static void main(String[] args) {
try {
Integer port = Integer.parseInt(Configuration.getProperty("Environment", "PORT"));
ServerSocket serverSocket = new ServerSocket(port);
LOG.info("Process Server listening on PORT: " + port);
while (true) {
Socket socket = serverSocket.accept();
new Thread(new ProcessEvent(socket)).start();
}
} catch (Throwable th) {
LOG.error("Exception occured starting server.", th);
}
}
}
进程事件类:
public class ProcessEvent implements Runnable {
//code to extract argument(event/jobtype) from socket stream
private void processEvent(Event event) {
switch (event.getType()) {
case 1:
new ProcessJob1().execute(MAP1);
break;
case 2:
new ProcessJob2().execute(MAP2);
break;
default:
break;
}
}
工作类别 1:
public class ProcessJob1 extends Job {
private static Map<String, ExecutorService> isRunning = new HashMap<String, ExecutorService>();
@Override
public void execute(Map<String, Object> jobData) {
String txn = (String)jobData.get(TYPE);
ExecutorService executor = null;
synchronized (isRunning) {
executor = isRunning.get(type);
if (executor != null && !executor.isTerminated()) {
return;
}
executor = Executors.newFixedThreadPool(MAX_THREAD_CNT);
isRunning.put(type, executor);
}
File[] inputFiles = getValidFiles();
if (inputFiles.length > 0) {
for (File inputFile : inputFiles) {
executor.execute(new ProcessFileTask1(inputFile));
}
}
executor.shutdown();
}
}
工作类别 2:
public class ProcessJob2 extends Job {
private static ExecutorService executor = null;
@Override
public void execute(Map<String, Object> jobData) {
if (executor != null && !executor.isTerminated()) {
return;
}
executor = Executors.newFixedThreadPool(2);
File[] inputFiles = getValidFiles();
if (inputFiles.length > 0) {
ExecutorService executor = Executors.newFixedThreadPool(MAX_THREAD_CNT);
for (File inputFile : inputFiles) {
executor.execute(new ProcessFileTask2(inputFile));
}
executor.shutdown();
}
}
}
而不是每次都必须使用单线程方法使用一个执行程序时创建新的执行程序。Executors#newSingleThreadExecutor()
public class ProcessJob1 extends Job {
private static Map<String, ExecutorService> isRunning = new HashMap<String, ExecutorService>();
private static ExecutorService executor = Executors.newFixedThreadPool(MAX);
private static CountDownLatch countDownLatch = new CountDownLatch(0);
@Override
public void execute(Map<String, Object> jobData) {
File[] inputFiles = getValidFiles();
countDownLatch.await();
if (inputFiles.length > 0) {
countDownLatch = new CountDownLatch(inputFiles.length);
for (File inputFile : inputFiles) {
case 1:
executor.execute(new ProcessFileTask1(inputFile,countDownLatch));
break;
case 2:
executor.execute(new ProcessFileTask2(inputFile,countDownLatch));
break;
default:
break;
}
}
}
}
您可以使用CountDownLatch来实现此目的。
最初它将为零。如果你调用等待,它将返回。
之后,您将闩锁值重置为文件的大小。
将闩锁传递给ProcessFileTask1
,并在完成后调用 latch.countDown((。
下一个作业将进入 latch.await((,它将等待所有任务完成。如果已经完成,它将立即从等待中出来。
与其在上面分支,不如在这里分支。因此,您可以更好地控制同步线程