如何使用API java.util.concurrent.Future而不是在java中显式创建线程



我有两个线程在java程序中并行运行,如下所示:

// Threading
new Thread(new Runnable() {
@Override
public void run() {
try {
gpTableCount   = getGpTableCount();
} catch (SQLException e) {
e.printStackTrace();
} catch(Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
hiveTableCount = getHiveTableCount();
} catch (SQLException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
while(!(gpTableCount != null && gpTableCount.size() > 0 && hiveTableCount != null && hiveTableCount.size() > 0)) {
Thread.sleep(5000);
}
// Threading

两者具有相同的功能。下面是getHiveTableCount((中的代码。另一种方法与下面的方法略有不同(一两行(,但功能保持不变。

public Map<String, String> getHiveTableCount() throws IOException, SQLException {
hiveDataMap     = new HashMap<String, String>();
hiveTableErrs   = new HashMap<String, String>();
Iterator<String> hiveIterator = filteredList.iterator();
Connection hiveConnection = DbManager.getHiveConnection();
PreparedStatement hive_pstmnt = null;
String hiveExcpnMsg;
String ssn;
String hiveMaxUpdTms;
Long hiveCount;
String gpHiveRec;
String[] hiveArray;
String[] hiveDetails;
String hiveQuery;
while(hiveIterator.hasNext()) {
gpHiveRec   = hiveIterator.next();      
hiveArray   = gpHiveRec.split(",");     
hiveDetails = hiveArray[1].split("\.");
hiveQuery   = "select '" + hiveDetails[1] + "' as TableName, count(*) as Count, source_system_name, max(xx_last_update_tms) from " + hiveArray[1] + " where source_system_name='" + hiveArray[2] + "' group by source_system_name";
try {
hive_pstmnt             = hiveConnection.prepareStatement(hiveQuery);
ResultSet hiveCountRs   = hive_pstmnt.executeQuery();
while(hiveCountRs.next()) {
hiveCount     = hiveCountRs.getLong(2);
ssn           = hiveCountRs.getString(3);
hiveMaxUpdTms = hiveCountRs.getTimestamp(4).toString();
hiveDataMap.put(hiveDetails[1] + "," + ssn, hiveCount + "," + hiveMaxUpdTms);
}
} catch(org.postgresql.util.PSQLException e) {
hiveExcpnMsg = e.getMessage();
hiveTableErrs.put(hiveDetails[1] + ": for the SSN: " + hiveArray[2], hiveExcpnMsg + "n");
} catch(SQLException e) {
hiveExcpnMsg = e.getMessage();
hiveTableErrs.put(hiveDetails[1] + ": for the SSN: " + hiveArray[2], hiveExcpnMsg + "n");
} catch(Exception e) {
hiveExcpnMsg = e.getMessage();
hiveTableErrs.put(hiveDetails[1] + ": for the SSN: " + hiveArray[2], hiveExcpnMsg + "n");
}
}
return hiveDataMap;
}

这两个线程同时运行。我最近在网上读到:

Future类表示异步计算的未来结果–在处理完成。

我从理论上理解了这个概念,但我不知道如何将java.util.concurrent.Futureapi应用于上述相同的代码,而不是显式地创建线程。有人能告诉我如何在方法上实现多线程吗?getGpTableCount() & getHiveTableCount使用java.util.courrent.Future-api,而不是创建线程创建新线程,比如new Thread(new Runnable(?

您使用Runnable接口提交任务,该接口不允许线程在计算结束时返回值(并导致您使用共享变量gpTableCounthiveTableCount(。

Callable接口是稍后添加的,它允许您的任务返回一个值(在您的情况下为Map<String, String>(。

作为直接使用线程的替代方案,并发API引入ExecutorService作为一个更高级别的对象,它管理线程池并能够异步执行任务。

当将类型为Callable的任务提交给ExecutorService时,您希望该任务产生一个值,但由于提交点和计算结束没有耦合,ExecutorService将返回Future,允许您获得该值,如果该值不可用,则进行阻止。因此,Future可以用于在不同的线程之间进行同步。

作为ExecutorService的替代方案,您还可以查看FutureTask<V>,它是RunnableFuture<V>:的实现

这个类提供了Future的基本实现,具有启动和取消计算、查询计算是否完成以及检索计算结果的方法

FutureTask可用于包装Callable或Runnable对象。

如果您使用的是Java 8+,您可以使用CompletableFuture.supplyAsync来实现这一点,简称为:

import static java.util.concurrent.CompletableFuture.supplyAsync;
.....
Future<Map<String, String>> f= supplyAsync(()->{
try{
return getHiveTableCount();
} catch(Exception e) {
throw new RuntimeException(e);
}
}

默认情况下,CompletableFuture.supplyAsync将使用ForkJoinPool.commonPool()运行它。如果你想使用自己的参数,Executor也有另一个重叠:

public class CompletableFuture<T>
extends Object
implements Future<T>, CompletionStage<T>

事实确实如此。

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) 

首先,创建最适合您需求的执行器服务,例如:

ExecutorService ex = Executors.newFixedThreadPool(2);

(更多关于遗嘱执行人的信息:https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html)

使用Callable而不是Runnable对象,它类似于Runnable,但返回一个值(更多关于Callable的信息:https://docs.oracle.com/javase/8/docs/api/index.html?java/util/concurrent/Callable.html):

Callable<Map<String, String>> callable1 = // your Callable class 

类型参数应与您希望作为结果返回的类型相同。

接下来创建一个任务列表:

List<Callable<Map<String, String>>> tasks = new LinkedList<>();
tasks.add(callable1);
tasks.add(callable2);

并执行它们:

List<Future<Map<String, String>>> results = ex.invokeAll(tasks);

上面的方法在所有任务完成时返回(如果我正确理解您的情况,这就是您想要实现的(,但是完成的任务可能已经正常终止或抛出异常。

最后关闭执行器服务:

ex.shutdown();

最新更新