我正在编写我的Apache Flink(1.10(来实时更新记录,如下所示:
public class WalletConsumeRealtimeHandler {
public static void main(String[] args) throws Exception {
walletConsumeHandler();
}
public static void walletConsumeHandler() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkUtil.initMQ();
FlinkUtil.initEnv(env);
DataStream<String> dataStreamSource = env.addSource(FlinkUtil.initDatasource("wallet.consume.report.realtime"));
DataStream<ReportWalletConsumeRecord> consumeRecord =
dataStreamSource.map(new MapFunction<String, ReportWalletConsumeRecord>() {
@Override
public ReportWalletConsumeRecord map(String value) throws Exception {
ObjectMapper mapper = new ObjectMapper();
ReportWalletConsumeRecord consumeRecord = mapper.readValue(value, ReportWalletConsumeRecord.class);
consumeRecord.setMergedRecordCount(1);
return consumeRecord;
}
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator());
consumeRecord.keyBy(
new KeySelector<ReportWalletConsumeRecord, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> getKey(ReportWalletConsumeRecord value) throws Exception {
return Tuple2.of(value.getConsumeItem(), value.getTenantId());
}
})
.timeWindow(Time.seconds(5))
.reduce(new SumField(), new CollectionWindow())
.addSink(new SinkFunction<List<ReportWalletConsumeRecord>>() {
@Override
public void invoke(List<ReportWalletConsumeRecord> reportPumps, Context context) throws Exception {
WalletConsumeRealtimeHandler.invoke(reportPumps);
}
});
env.execute(WalletConsumeRealtimeHandler.class.getName());
}
private static class CollectionWindow extends ProcessWindowFunction<ReportWalletConsumeRecord,
List<ReportWalletConsumeRecord>,
Tuple2<String, Long>,
TimeWindow> {
public void process(Tuple2<String, Long> key,
Context context,
Iterable<ReportWalletConsumeRecord> minReadings,
Collector<List<ReportWalletConsumeRecord>> out) throws Exception {
ArrayList<ReportWalletConsumeRecord> employees = Lists.newArrayList(minReadings);
if (employees.size() > 0) {
out.collect(employees);
}
}
}
private static class SumField implements ReduceFunction<ReportWalletConsumeRecord> {
public ReportWalletConsumeRecord reduce(ReportWalletConsumeRecord d1, ReportWalletConsumeRecord d2) {
Integer merged1 = d1.getMergedRecordCount() == null ? 1 : d1.getMergedRecordCount();
Integer merged2 = d2.getMergedRecordCount() == null ? 1 : d2.getMergedRecordCount();
d1.setMergedRecordCount(merged1 + merged2);
d1.setConsumeNum(d1.getConsumeNum() + d2.getConsumeNum());
return d1;
}
}
public static void invoke(List<ReportWalletConsumeRecord> records) {
WalletConsumeService service = FlinkUtil.InitRetrofit().create(WalletConsumeService.class);
Call<ResponseBody> call = service.saveRecords(records);
call.enqueue(new Callback<ResponseBody>() {
@Override
public void onResponse(Call<ResponseBody> call, Response<ResponseBody> response) {
}
@Override
public void onFailure(Call<ResponseBody> call, Throwable t) {
t.printStackTrace();
}
});
}
}
现在我发现 Flink 任务只收到至少 2 条记录来触发接收器,减少操作需要这个吗?
您需要两条记录来触发窗口。Flink 只有在收到大于窗口末尾配置值的水印时,才知道何时关闭窗口(并触发后续计算(。
在您的情况下,您可以使用BoundedOutOfOrdernessGenerator
,它会根据传入的记录更新水印。因此,只有在看到第二条记录后,它才会生成第二个水印。
您可以使用不同的水印生成器。在故障排除培训中,有一个水印生成器,该生成器也会在超时时生成水印。