所以我在 Json 中有一堆日志和一个我有一个流来验证/过滤掉所需的 Json,效果很好!
现在我想使用 AsyncIO 从过滤后的 JSON 进行数据库查找,但似乎 asyncInvoke 正在对流的每个输入而不是过滤后的结果执行。
DataStream<String> stringInputStream = env.addSource(flinkKafkaConsumer);
stringInputStream
.flatMap(stringToJsonObject()) // Make sure only JSON logs go through.
.returns(JsonObject.class)
.filter(filterLogs("my-app")) // Filter logs for my-app
.flatMap(jsonStringToJsonObject("someJsonEncodedStringField"))
.returns(JsonObject.class)
.filter(filterSpecificEvent()); // This stream works as expected, putting print() here only prints filtered events.
DataStream<JsonObject> lookupCarrierCodeStream =
AsyncDataStream.orderedWait(stringInputStream, lookupCodesInDB(), 3000, TimeUnit.MILLISECONDS, 100);
private static RichAsyncFunction<String, JsonObject> lookupCodesInDB() {
return new RichAsyncFunction<String, JsonObject>() {
@Override
public void asyncInvoke(String input, ResultFuture<JsonObject> resultFuture) throws Exception {
// This seems to receive all events, rather then the filtered ones.
System.out.println("Input:" + input);
resultFuture.complete(Collections.singleton(new JsonObject(input)));
}
};
}
更新
如果我像这样拆分流似乎有效......
DataStream<String> kafkaStringInput = env.addSource(flinkKafkaConsumer);
DataStream<JsonObject> jsonLogsInput = ...;
DataStream<JsonObject> appLogsInput = ...;
DataStream<JsonObject> evenInput = ...;
DataStream<JsonObject> lookupStream = AsyncDataStream.orderedWait(evenInput, ...);
不知道为什么它不能流利地工作,但没关系。
将函数应用于流,如
eventStream
.flatmap()
不修改事件流,而是返回新流。
所以你想做这样的事情:
DataStream<JsonObject>filteredStream = stringInputStream
.flatMap(stringToJsonObject())
.returns(JsonObject.class)
.filter(filterLogs("my-app"))
.flatMap(jsonStringToJsonObject("someJsonEncodedStringField"))
.returns(JsonObject.class)
.filter(filterSpecificEvent());
DataStream<JsonObject> lookupCarrierCodeStream =
AsyncDataStream.orderedWait(filteredStream, lookupCodesInDB(), 3000, TimeUnit.MILLISECONDS, 100);