如何使用Kafka、Alpakka-Kafka、Play Framework和Websocket处理POST请求



假设我有两个kafka主题,request_topic用于我的Post请求,response_topic用于我的响应。

以下是型号:

case class Request(requestId: String, body: String)
case class Response(responseId: String, body: String, requestId: String)

这是我的套接字处理程序

def socket = WebSocket.accept[String, String] { req =>
val requestId = ??? // Generate a unique requestId
val in: Sink[String, Future[Done]] = Sink.foreach[String]{ msg =>
val record = new ProducerRecord[String, Request]("request_topic", "key", Request(requestId, msg))
val producer: KafkaProducer[String, Request] = ???
Future { producer.send(record).get }
}
// Once produced, some stream processing apps will manage to process request and publish the reponse to response_topic
// The Request and Response object are linked by the requestId field.
val consumerSettings = ???
val out: Source[ConsumerRecord[String, Response], _] = Consumer
.plainSource(consumerSettings, Subscriptions.topics("response_topic"))
.filter(cr => cr.value.requestId == requestId)
.map(cr => someResponseString(cr.value))
Flow.formSinkAndSource(in, out)
}
def someResponseString(res: Response): String = ???

基本上,对于每个传入的消息,我都会向Kafka发布一个Request对象,然后由一些流处理应用程序(此处未显示(处理该请求,并希望将响应发布回Kafka。

我有一些担忧:

1-Alpakka Kafka连接器会为每个传入消息创建一个新的连接器实例吗?还是只要Play正在运行,它就会使用相同的实例?

2-根据单个requestId过滤响应是个好主意,还是我应该将整个流发送回每个客户端,让他们根据他们感兴趣的requestId来过滤响应。

3-我什么都错了吗?(我是Websocket的新手(

提前谢谢。

1(取决于您如何配置它。例如,在in: Sink正文中,您为每条消息创建一个新的KafkaProducer。相反,整个应用程序应该有一个生产者。

我不确定Akka/Play的线程模型是如何工作的,但大多数Web服务器为每个传入连接启动一个新线程,线程池中最多有固定数量的线程。

2( 我认为尽快过滤是首选,同时在服务器端尽可能多地进行过滤。这样可以节省回客户端的带宽。

此外,如果您只想将网络服务器上Kafka的数据向一个方向推送到客户端,那么您可能想要SSE,而不是Websocket

最新更新