与 websocket 连接时,使用 akka-stream-kafka 从 kafka 主题获取最后一条消息



是否可以使用 Akka Streams Kafka 获取有关 Kafka 主题的最后一条消息?我正在创建一个侦听 Kafka 主题的 websocket,但目前它在我连接时检索所有以前的未红消息。这可能会增加相当多的消息,所以我只在最后一条消息 + 任何未来的消息中休息。(或仅将来的消息(

资料来源:

def source(): Flow[Any, String, NotUsed] = {
  val source = Consumer.plainSource(consumerSettings, Subscriptions.topics(MyTopic))
  Flow.fromSinkAndSource[Any, String](Sink.ignore, source.map(_.value)
}

使用者设置:

  @Provides
def providesConsumerSettings(@Named("kafkaUrl") kafkaUrl: String): ConsumerSettings[String, String] = {
  val deserializer = new StringDeserializer()
  val config = configuration.getOptional[Configuration]("akka.kafka.consumer")
    .getOrElse(Configuration.empty)
  ConsumerSettings(config.underlying, deserializer, deserializer)
    .withBootstrapServers(kafkaUrl)
    .withGroupId(GroupId)
}

我尝试添加设置ConsumerSettings.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

这应该"自动将偏移量重置为最新的偏移量",但它似乎没有任何效果。

我能够避免在客户端连接时使用David van Geest在这里非常简洁地描述的方法获取任何上游数据

它归结为在消费者上拥有一个BroadcastHub:

val liveSource = Consumer.plainSource(consumerSettings, Subscriptions.topics(topic1, topic2))
.map(kafkaObject => utils.WebSockets.kafkaWrapper(kafkaObject.topic(), kafkaObject.value()))
.toMat(BroadcastHub.sink)(Keep.right)
.run()

并连接静态消费者以吃掉所有上游数据

liveSource.to(Sink.ignore).run()

接下来,这让我让 WebSocket 客户端订阅消费者接收的所有数据:

def source(): Flow[Any, String, NotUsed] = {Flow.fromSinkAndSource(Sink.ignore, liveSource)}

或者基于KafkaTopic(或其他任何你想要的东西(进行过滤

def KafkaSpecificSource(kafkaTopic: String): Flow[Any, String, NotUsed] = {
  Flow.fromSinkAndSource(Sink.ignore, liveSource.filter({
    x =>
      (Json.parse(x)  "topic").asOpt[String] match {
        case Some(str) => str.equals(kafkaTopic)
        case None => false
      }
  }))
}

这并不能解决首次连接时向用户提供 x 量数据的问题,但我预见我们会为任何历史数据添加一个简单的数据库查询,让 WebSocket 连接只关注实时流数据。

相关内容

  • 没有找到相关文章

最新更新