我有一个Spark Streaming应用程序来分析来自Kafka代理的事件。我有如下规则,可以通过组合现有规则来生成新规则:
If this event type occurs raise an alert.
If this event type occurs more than 3 times in a 5-minute interval, raise an alert.
同时,我将每个传入的数据保存到 Cassandra。我喜欢做的是运行这个流媒体应用程序以获取来自 Cassandra 的历史数据。例如
<This rule> would have generated <these> alerts for <last week>.
有没有办法在 Spark 中做到这一点,还是在路线图中?例如,Apache Flink 具有事件时间处理功能。但是将现有代码库迁移到它似乎很困难,我想通过重用现有代码来解决这个问题。
这是相当简单的,有一些警告。首先,它有助于从卡夫卡方面理解它是如何工作的。
Kafka 管理所谓的偏移量 - Kafka 中的每条消息都有一个相对于其在分区中的位置的偏移量。(分区是主题的逻辑划分。分区中的第一条消息的偏移量为 0L
,第二条消息的偏移量为 1L
,依此类推。除此之外,由于日志滚动更新和可能的主题压缩,0L
并不总是分区中最早的偏移量。
您要做的第一件事是从一开始就收集要读取的所有分区的偏移量。下面是一个执行此操作的函数:
def getOffsets(consumer: SimpleConsumer, topic: String, partition: Int) : (Long,Long) = {
val time = kafka.api.OffsetRequest.LatestTime
val reqInfo = Map[TopicAndPartition,PartitionOffsetRequestInfo](
(new TopicAndPartition(topic, partition)) -> (new PartitionOffsetRequestInfo(time, 1000))
)
val req = new kafka.javaapi.OffsetRequest(
reqInfo, kafka.api.OffsetRequest.CurrentVersion, "test"
)
val resp = consumer.getOffsetsBefore(req)
val offsets = resp.offsets(topic, partition)
(offsets(offsets.size - 1), offsets(0))
}
你可以这样称呼它:
val (firstOffset,nextOffset) = getOffsets(consumer, "MyTopicName", 0)
有关您想知道的有关从 Kafka 检索偏移量的所有信息,请阅读此内容。至少可以说,这很神秘。(例如,当您完全理解要PartitionOffsetRequestInfo
的第二个参数时,请告诉我。
现在您已经firstOffset
并lastOffset
了要从历史中查看的分区,然后使用 createDirectStream
的 fromOffset
参数,其类型为:fromOffset: Map[TopicAndPartition, Long]
。您可以将Long
/值设置为从getOffsets()
获得firstOffset
。
至于nextOffset
- 您可以使用它来确定何时从处理历史数据移动到新数据。如果msg.offset == nextOffset
则表示正在处理分区中的第一个非历史记录。
现在,对于警告,直接来自文档:
- 启动上下文后,就不能进行新的流式计算设置或添加到其中。
- 一旦上下文停止,就不能重新 启动。
- 在一个 JVM 中,只有一个 StreamingContext 可以处于活动状态,位于同时。
- StreamingContext 上的 stop() 也会停止 SparkContext。自只停止 StreamingContext,设置 stop() 的可选参数将 stopSparkContext 称为 false。
- SparkContext可以重用于创建多个流上下文,只要前一个StreamingContext 已停止(不停止 SparkContext)在创建下一个 StreamingContext 之前。
正是因为这些警告,我和firstOffset
同时抓住nextOffset
- 所以我可以保持流,但将上下文从历史处理更改为当前处理。