窗口数据每小时(顺时针)以阿帕奇光束为单位



我正在尝试在 DataFlow/Apache Beam Job 中聚合每小时(如 12:00 到 12:59 和 01:00 到 01:59(的流数据。

以下是我的用例

数据从 pubsub 流式传输,它有一个时间戳(订单日期(。我想在我收到的每个小时内不计算订单,我也想允许延迟 5 小时。以下是我正在使用的示例代码

LOG.info("Start Running Pipeline");
DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(DataflowPipelineOptions.class);
Pipeline pipeline = Pipeline.create(options);
PCollection<String>  directShipmentFeedData = pipeline.apply("Get Direct Shipment Feed Data", PubsubIO.readStrings().fromSubscription(directShipmentFeedSubscription));
PCollection<String>  tibcoRetailOrderConfirmationFeedData = pipeline.apply("Get Tibco Retail Order Confirmation Feed Data", PubsubIO.readStrings().fromSubscription(tibcoRetailOrderConfirmationFeedSubscription));
PCollection<String> flattenData = PCollectionList.of(directShipmentFeedData).and(tibcoRetailOrderConfirmationFeedData)
.apply("Flatten Data from PubSub", Flatten.<String>pCollections());
flattenData
.apply(ParDo.of(new DataParse())).setCoder(SerializableCoder.of(SalesAndUnits.class))
// Adding Window
.apply(
Window.<SalesAndUnits>into(
SlidingWindows.of(Duration.standardMinutes(15))
.every(Duration.standardMinutes(1)))
)
// Data Enrich with Dimensions
.apply(ParDo.of(new DataEnrichWithDimentions()))
// Group And Hourly Sum
.apply(new GroupAndSumSales())
.apply(ParDo.of(new SQLWrite())).setCoder(SerializableCoder.of(SalesAndUnits.class));
pipeline.run();
LOG.info("Finish Running Pipeline");

我会使用一个符合您要求的窗口。类似的东西

Window.into(
FixedWindows.of(Duration.standardHours(1))
).withAllowedLateness(Duration.standardHours(5)))

可能之后是count,因为这就是我理解你需要的。

希望对你有帮助

最新更新