Hyperledger结构:使用java网关sdk的异步/并行事务



我试图使用java网关sdk将异步事务发送到Fabric网络,但收到错误Channel [CHANNEL NAME] has been shutdown

这里有一些示例代码:

Gateway.Builder builder = Gateway.createBuilder()
.discovery(true)
.identity(wallet, user.getName())
.networkConfig([PATH TO CONNECTION PROFILE]);
try(Gateway gateway = builder.connect()) {
Network channel = gateway.getNetwork(CHANNEL_NAME);
Contract someChaincode = channel.getContract(CHAINCODE_NAME);
int coresNumber = (Runtime.getRuntime().availableProcessors());
ExecutorService executor = Executors.newFixedThreadPool(coresNumber);
for(String elemt : elements) {                                                                          
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try{
//Exception thrown here
byte[] res = someChaincode.submitTransaction("someFunction", elemt);
return new String(res);     
} catch (ContractException e) {
e.printStackTrace();
}
}, executor);
}
} catch (Exception e) {
// Handle Exception
}

这里有一个例外:

java.util.concurrent.ExecutionException: org.hyperledger.fabric.gateway.GatewayRuntimeException: org.hyperledger.fabric.sdk.exception.InvalidArgumentException: Channel [CHANNEL NAME] has been shutdown.

确切地说,异常是在方法checkChannelState()中抛出的。我感觉我没有正确处理多线程。

您不需要等待在代码片段中创建的未来的完成。因此,您正在调度事务调用,以便在不同的线程上执行,但在执行此代码之前,退出了关闭用于连接的Gateway实例的try-with-resources块。关闭网关会导致关闭所有相关的资源和连接,包括底层通道。因此,当您的事务调用真正运行时,您已经关闭了执行它们所需的连接和资源。

在关闭Gateway实例之前,您需要从您创建的Future对象中获取结果;换句话说,在退出创建网关的try-with-resources块之前。大致是这样的:

Collection<Callable<String>> tasks = elements.stream()
.map(element -> new Callable<String>() {
public String call() throws ContractException, TimeoutException, InterruptedException {
byte[] result = contract.submitTransaction("someFunction", element);
return new String(result);
}
}).collect(Collectors.toList());
try {
Collection<String> results = new ArrayList<>();
Collection<Future<String>> futures = executor.invokeAll(tasks, timeout, timeUnit);
for (Future<String> future : futures) {
try {
String result = future.get(timeout, timeUnit);
results.add(result);
} catch (CancellationException | InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
}
System.out.println("Results: " + results);
} catch (InterruptedException e ) {
e.printStackTrace();
}

最新更新