在 scala 中用 while 循环中的队列结果填充列表



我正在尝试在scala中以函数式方式编写while循环。我想做的是用队列中的消息填充列表(在这种情况下是 Kafka,但并不重要(。

我这样做是为了集成测试,并且由于 Kafka 在 CI 中运行时测试正在远程运行,因此测试有时会失败,因为 Kafka 不返回任何消息。所以我写了一个循环来查询 Kafka,直到我得到我期望的所有结果(否则测试将在一段时间后超时并失败(。我现在有这个:

var result = List[Int]()
while (result.size < expectedNumberOfMessages) {
result = result ++ kafkaConsumer.poll(Duration.ofSeconds(10)).records(KAFKA_TOPIC).iterator().toList.map(_.value.getPayload)
}

这工作正常,但对我来说看起来很可怕。另外,如果是生产代码,它也会效率低下。谁能提出更好的功能方法?

如果您打算保留while循环,我首先建议您使用scala.collection.mutable.ListBuffer而不是不可变的List。这将防止在每次迭代时在内存中复制整个列表。

如果你想要一种更"实用"的方式来编写上述代码,同时保留消费者 API(而不是 Kafka Streams API(,你可以手动定义一个 scalaStream如下所示:

import scala.util.Random
// mock Kafka's "poll", returns a random number of Ints (max 10)
def poll(): List[Int] = {
val size = Random.nextInt(10)
println("fetching messages")
Thread.sleep(1000)
(1 to size).map(_ => Random.nextInt(10)).toList
}
lazy val s: Stream[Int] = Stream.continually(poll()).flatten
// s is now a Stream that will be evaluated when a certain number of messages is requested
// for example, fetching 40 results:
/*
scala> s.take(40).toList
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
res0: List[Int] = List(3, 6, 2, 7, 7, 8, 0, 4, 6, 2, 0, 3, 8, 9, 5, 8, 2, 9, 2, 7, 9, 2, 6, 1, 6, 7, 2, 4, 4, 6, 6, 3, 5, 7, 2, 0, 9, 4, 9, 4)
*/

也许是这样的东西?

def pollKafka = kafkaConsumer.poll(Duration.ofSeconds(10)).records(KAFKA_TOPIC).iterator.map(_.value.getPayload)
Iterator
.continually(pollKafka)
.flatten
.take(expectedNumberOfMessages)
.toList

Iterator内部是可变的,但如果您使用其高级功能接口并且不重复使用Iterator恕我直言,这完全没问题。

如果你想一直向下移动函数流,你可以考虑像 fs2 这样的库。

最新更新