ApacheFlink:在MapReduce()中正确地进行异步Web服务调用



我有一个带有以下mapPartition函数的程序:

public void mapPartition(Iterable<Tuple> values, Collector<Tuple2<Integer, String>> out)

我从输入的CCD_ 2&将它们发送到web服务进行转换。我将结果添加回out集合。

为了加快进程,我通过使用Executors调用了web服务async。这就产生了问题,要么我得到了taskManager释放的异常,要么是AskTimeoutException。我增加了记忆力&暂停,但无济于事。有相当多的输入数据。我相信这导致了大量的工作与ExecutorService&因此占用了大量的内存。

对此,最好的方法是什么?

我还查看了taskManager与taskSlot的配置,但对两者之间的差异有点困惑(我想它们类似于进程与线程?)。不确定在什么时候我应该增加taskManagers和taskSlots?例如,如果我有三台机器,每台机器有4cpus,那么我的taskManager=3taskSlot=4应该是一样的吗?

我还考虑单独增加mapPartition的并行度,比如说10,以获得更多的线程访问web服务。意见或建议?

您应该查看Flink Asyncio,它将使您能够在流应用程序中以异步方式查询Web服务。

需要注意的一点是,Asyncio函数不是多线程调用的,而是按顺序为每个分区的每个记录调用一次,因此您的web应用程序需要确定地返回并可能快速返回,以避免作业被搁置。

此外,潜在的更高数量的分区将有助于您的情况,但您的Web服务需要足够快地满足这些请求

Flinks网站的示例代码块:

// This example implements the asynchronous request and callback with Futures that have the
// interface of Java 8's futures (which is the same one followed by Flink's Future)
/**
* An implementation of the 'AsyncFunction' that sends requests and sets the callback.
*/
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {
/** The database specific client that can issue concurrent requests with callbacks */
private transient DatabaseClient client;
@Override
public void open(Configuration parameters) throws Exception {
client = new DatabaseClient(host, post, credentials);
}
@Override
public void close() throws Exception {
client.close();
}
@Override
public void asyncInvoke(final String str, final AsyncCollector<Tuple2<String, String>> asyncCollector) throws Exception {
// issue the asynchronous request, receive a future for result
Future<String> resultFuture = client.query(str);
// set the callback to be executed once the request by the client is complete
// the callback simply forwards the result to the collector
resultFuture.thenAccept( (String result) -> {
asyncCollector.collect(Collections.singleton(new Tuple2<>(str, result)));
});
}
}
// create the original stream (In your case the stream you are mappartitioning)
DataStream<String> stream = ...;
// apply the async I/O transformation
DataStream<Tuple2<String, String>> resultStream =
AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);

编辑:

由于用户想要创建大小为100的批,而asyncio目前特定于流API,因此最好的方法是创建大小为100%的计数窗口。

此外,为了清除可能没有100个事件的最后一个窗口,自定义触发器可以与计数触发器和基于时间的触发器组合使用,以便触发器在元素计数后或每隔几分钟触发一次。

Flink Mailing List上有一个很好的后续信息,用户"Kostya"创建了一个自定义触发器,该触发器在上可用

最新更新