处理Apache Beam中来自多租户Kafka主题的无序事件窗口



我一直在思考如何解决Beam中的一个给定问题,并认为我应该向更多的受众寻求一些建议。目前,事情似乎是稀疏工作,我很好奇是否有人可以提供一个声音板,看看这个工作流程是否有意义。

主要的高级目标是从Kafka读取可能无序的记录,需要根据记录上的另一个属性在事件时间中打开窗口,并最终发出这些窗口的内容并将它们写出来给GCS。

当前的管道看起来大致如下:

val partitionedEvents = pipeline
.apply("Read Events from Kafka",
KafkaIO
.read<String, Log>()
.withBootstrapServers(options.brokerUrl)
.withTopic(options.incomingEventsTopic)
.withKeyDeserializer(StringDeserializer::class.java)
.withValueDeserializerAndCoder(
SpecificAvroDeserializer<Log>()::class.java,
AvroCoder.of(Log::class.java)
)
.withReadCommitted()
.commitOffsetsInFinalize()
// Set the watermark to use a specific field for event time
.withTimestampPolicyFactory { _, previousWatermark -> WatermarkPolicy(previousWatermark) }
.withConsumerConfigUpdates(
ImmutableMap.of<String, Any?>(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
ConsumerConfig.GROUP_ID_CONFIG, "log-processor-pipeline",
"schema.registry.url", options.schemaRegistryUrl
)
).withoutMetadata()
)
.apply("Logging Incoming Logs", ParDo.of(Events.log()))
.apply("Rekey Logs by Tenant", ParDo.of(Events.key()))
.apply("Partition Logs by Source",
// This is a custom function that will partition incoming records by a specific
// datasource field
Partition.of(dataSources.size, Events.partition<KV<String, Log>>(dataSources))
)
dataSources.forEach { dataSource ->
// Store a reference to the data source name to avoid serialization issues
val sourceName = dataSource.name
val tempDirectory = Directories.resolveTemporaryDirectory(options.output)
// Grab all of the events for this specific partition and apply the source-specific windowing
// strategies
partitionedEvents[dataSource.partition]
.apply(
"Building Windows for $sourceName",
SourceSpecificWindow.of<KV<String, Log>>(dataSource)
)
.apply("Group Windowed Logs by Key for $sourceName", GroupByKey.create())
.apply("Log Events After Windowing for $sourceName", ParDo.of(Events.logAfterWindowing()))
.apply(
"Writing Windowed Logs to Files for $sourceName",
FileIO.writeDynamic<String, KV<String, MutableIterable<Log>>>()
.withNumShards(1)
.by { row -> "${row.key}/${sourceName}" }
.withDestinationCoder(StringUtf8Coder.of())
.via(Contextful.fn(SerializableFunction { logs -> Files.stringify(logs.value) }), TextIO.sink())
.to(options.output)
.withNaming { partition -> Files.name(partition)}
.withTempDirectory(tempDirectory)
)
}

在更简单的项目符号形式中,它看起来像这样:

  • 从单个Kafka主题读取记录
  • 将其租户的所有记录密钥
  • 用另一个事件正确分区流
  • 在上一步中遍历已知分区
  • 为每个分区应用自定义窗口规则(与数据源相关,自定义窗口规则)
  • 按键(租户)分组窗口项
  • 通过FileIO将租户密钥对组写入GCP

问题是传入的Kafka主题包含跨多个租户的乱序数据(例如,tenant1的事件现在可能正在流式传输,但几分钟后你会在同一分区中获得tenant2的事件,等等)。这将导致水印在时间上来回反弹,因为每个传入的记录不能保证不断增加,这听起来像是一个问题,但我不确定。当然,当数据流经时,有些文件根本没有被发出。

自定义窗口函数非常简单,目的是在允许的延迟和窗口持续时间经过后发出单个窗口:

object SourceSpecificWindow {
fun <T> of(dataSource: DataSource): Window<T> {
return Window.into<T>(FixedWindows.of(dataSource.windowDuration()))
.triggering(Never.ever())
.withAllowedLateness(dataSource.allowedLateness(), Window.ClosingBehavior.FIRE_ALWAYS)
.discardingFiredPanes()
}
}

然而,这似乎不一致,因为我们看到日志记录在窗口关闭后输出,但不一定是文件被写入到GCS。

这种方法有什么明显的错误或不正确的地方吗?由于数据可能在源内无序(即现在,2小时前,从现在起5分钟)并且涵盖多个租户的数据,但目的是尝试并确保一个租户保持最新不会淹没过去可能到来的租户。

我们是否可能需要另一个Beam应用程序或其他东西来"拆分"?这个单一的事件流进入子流,每个子流被独立处理(这样每个水印处理自己)?这就是SplittableDoFn的作用吗?因为我在SparkRunner上运行,它似乎不支持这一点-但它似乎是一个有效的用例。

任何建议都将非常感激,甚至只是另一双眼睛。我很乐意提供我所能提供的更多细节。

环境
  • 当前运行在SparkRunner

虽然这可能不是最有帮助的回答,但我将对最终结果保持透明。最终,这个特定用例所需的逻辑扩展远远超出了Apache Beam的内置功能,主要是在窗口/时间治理方面。

解决方案是将首选流技术从Apache Beam切换到Apache Flink你可以想象这是一个相当大的飞跃。Flink以状态为中心的特性使我们能够更容易地处理我们的用例,围绕窗口定义自定义的退出标准(和排序),同时在它上面失去一层抽象。

最新更新