Akka流动态接收器取决于Kafka主题中的消息



我有一个读取Message的Kafka消费者。每条消息都有一个ID和内容。

case class Message(id: String, content: String)

根据ID,我想将消息写入一个单独的接收器。具体到MongoDB集合中。Mongo提供了一个Sink,它将它写入指定集合的DB中。

val sink: Sink[Document, Future[Done]] = MongoSink.insertOne(collection(id))

问题是,在连接Kafka消费者源时,我需要指定接收器,但每个元素都定义了它应该进入哪个接收器。有没有一种方法可以在元素到达时动态使用特定的接收器。或者这是不可能的,例如,我应该为每个id使用不同的Kafka主题,并将每个源连接到一个单独的接收器?

在您的示例中,类型是如何排列的(例如DocumentMessage之间的关系(并不完全清楚,但您可以采取以下几种方法:

  • 如果有很多可能的集合,但无法提前知道,那么Akka Streams中最不糟糕的选择将是
Sink.foreachAsync[Message](parallelism) { msg =>
val document = documentFromMessage(msg)
val collection = collection(msg.id)
Source.single(document).runWith(MongoSink.insertOne(collection))
}

请注意,这将为每条消息使用一个新的Mongo接收器,这可能会带来效率问题。请注意,如果有一种更轻的方式(例如在reactivemongo驱动程序中?(,在插入单个文档后返回Future,但使用连接池之类的东西来减少单个文档插入的开销,那么这可能是更可取的。

  • 如果集合事先已知,则可以为每个集合预构建接收器,并使用PartitionGraphDSL定义一个包含预构建接收器的接收器
// collection0, etc. are predefined and encompass all of the collections which might be returned by collection(id)
val collections: Map[MongoCollection[Document], (Int, Sink[Document, Future[Done]])] = Map(
collection0 -> (0 -> MongoSink.insertOne(collection0)),
collection1 -> (1 -> MongoSink.insertOne(collection1)),
collection2 -> (2 -> MongoSink.insertOne(collection2)),
collection3 -> (3 -> MongoSink.insertOne(collection3))
)
val combinedSink = Sink.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val partition = builder.add(
Partition[Message](
collections.size,
{ msg => collections(collection(msg.id))._1 }
)
)
val toDocument = Flow[Message].map(documentFromMessage)
collections.foreach {
case (_, (n, sink)) =>
partition.out(n) ~> toDocument ~> sink
}
SinkShape.of(partition.in)
}

最新更新