Akka流kafka,到达日志末端时完成流



我正在使用akka streams kafka,我正在寻找一种方法来执行以下操作:

  • 偏移x的启动流
  • 依次消耗项目xx+1x+2 ..直到最后一项
  • 一旦消耗了最后一项,完成流

代码看起来像

Consumer
  .plainSource(consumerSettings, subscription)
  .runForeach(println("got record!"))
  .onComplete {
    case Success(_) => // all items read
    case Failure(error) => // error
  }

读取最后一个元素后将完成。也许这不是该库的使用方式。我该如何实现?

akka消费者以"拉动"方式工作,除非发生与经纪人的错误发生,否则它将永远活着。但是,您什么时候认为流已经结束?可以将KAFKA视为分布式日志,从您读取给定偏移的消息。只要您的客户端连接到经纪人,您的客户端就会启动并运行...如果您考虑到没有来自Kafka的kafka的流终止(例如(时间间隔(例如(,您可以使用 idleteletimeout

  Consumer
    .plainSource(consumerSettings, subscription)
    .idleTimeout(10 seconds)
    .runForeach(e => println("E"))
    .onComplete {
      case Success(_) => // all items read
      case Failure(error) =>
      // TimeoutException if no element in ten seconds the stream stops throwing this exception
    }

另一种可能性可能是使用粉丝阶段,特别是 MergePreferred 。我们可以创建另一个在时间间隔发出事件的tick源。Kafka源将具有偏好,因此,只要元素来自Kafka,舞台总是会从该来源中提取元素。如果某个间隔中没有元素,则将将"超时"字符串推向下游。类似:

  implicit val actorSystem = ActorSystem("test-actor-system")
  implicit val streamMaterializer = ActorMaterializer()
  implicit val ec = actorSystem.dispatcher
  val consumer =
  Consumer
    .plainSource(consumerSettings, subscription)
    .map(_.value())
  val tick = Source.tick(50 millis, 30 seconds, "Timeout")
  val source = GraphDSL.create(consumer, tick)(Keep.both) { implicit b ⇒
    (r1, r2) ⇒
      val merge = b.add(MergePreferred[String](1, false))
      r2 ~> merge.in(0)
      r1 ~> merge.preferred
      SourceShape(merge.out)
  }
  Source
    .fromGraph(source)
    .takeWhile(el => el != "Timeout")
    .runForeach(msg => println(msg))
  .onComplete{
    case Success(_) => println("Stream ended")
    case Failure(error) => println("There was an error")
  }

随着流程的活跃,同时有Kafka的元素。

这只是一种方法。Akka流具有许多不同的阶段,并且以更优雅的方式面对这些情况的图形API。

相关内容

  • 没有找到相关文章

最新更新