使用ApacheFlink流处理缓冲转换后的消息(例如,1000计数)



我正在使用Apache Flink进行流处理。

在从源订阅消息(例如:Kafka、AWS Kinesis Data Streams(,然后使用Flink运算符对流数据应用转换、聚合等之后,我想缓冲最终消息(例如1000条(,并在单个请求中将每个批次发布到外部REST API。

如何在ApacheFlink中实现缓冲机制(将每1000条记录创建为一批(?

Flink pipileine:流式源-->使用运算符转换/减少-->缓冲1000条消息-->发布到REST API

感谢你的帮助!

我会创建一个具有状态的接收器,该接收器将保留传入的消息。当计数足够高时(1000(,接收器将发送批处理。状态可以在内存中(例如,包含消息的ArrayList的实例变量(,但您应该使用检查点,以便在发生某种故障时恢复该状态。

当你的接收器具有检查点状态时,它需要实现CheckpointedFunction(在org.apache.flink.streaming.api.checkpoint中(,这意味着你需要向你的接收器添加两个方法:

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
// HttpSinkStateItem is a user-written class 
// that just holds a collection of messages (Strings, in this case)
//
// Buffer is declared as ArrayList<String>
checkpointedState.add(new HttpSinkStateItem(buffer));
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// Mix and match different kinds of states as needed:
//   - Use context.getOperatorStateStore() to get basic (non-keyed) operator state
//        - types are list and union        
//   - Use context.getKeyedStateStore() to get state for the current key (only for processing keyed streams)
//        - types are value, list, reducing, aggregating and map
//   - Distinguish between state data using state name (e.g. "HttpSink-State")      
ListStateDescriptor<HttpSinkStateItem> descriptor =
new ListStateDescriptor<>(
"HttpSink-State",
HttpSinkStateItem.class);
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
if (context.isRestored()) {
for (HttpSinkStateItem item: checkpointedState.get()) {
buffer = new ArrayList<>(item.getPending());  
}
}       
}

如果计数没有达到您的阈值,您也可以使用接收器中的计时器(如果输入流是键控/分区的(定期发送。

相关内容

  • 没有找到相关文章

最新更新