我有一个流列表,在调用它们的next()
时,它们将随机休眠一段时间,然后从不同的来源读取一个字符。
我正在尝试编写一个使用者,该使用者将一直调用这些流直到EOF
,并在运行时构建这些流的通用字典。
到目前为止,我正在使用字典的ConcurrentHashMap
,并简单地为每个流使用者创建一个新线程。
虽然我的解决方案有效,但它似乎很幼稚,我想知道流媒体库(如monix
或fs2
(是否有更好的用途
根据问题的描述和随后的评论,我假设存在多个Iterator[Char]
来源:
val allSources : Iterable[Iterator[Char]] = ???
问题是:如何同时从这些迭代器中收集String
值以形成要计数的字符串映射。
基于流的解决方案
首先,我们需要将每个迭代器转换为基于分隔符的字符串值迭代器:
trait Word {
val data : String
}
object EmptyWord extends Word {
override val data = ""
}
case class PartialWord(val data : String) extends Word
case class WholeWord(val data : String) extends Word
val appendToWord : Char => (Word, Char) => Word =
(separator) => (originalWord, appendChar) => originalWord match {
case PartialWord(d) =>
if(appendChar == separator)
WholeWord(d)
else
PartialWord(d + appendChar)
case _ => PartialWord(appendChar.toString)
}
val isWholeWord : Word => Boolean = (_ : Word) match {
case _ : WholeWord => true
case _ => false
}
//using space as separator
val convertCharIterator : Iterator[Char] => Iterator[String] =
(_ : Iterator[Char])
.scanLeft(EmptyWord)(appendToWord(' '))
.filter(isWholeWord)
.map(_.data)
现在,我们可以转换所有迭代器以生成字符串,并且可以将所有迭代器组合到单个迭代器中:
val allWordSource : Iterator[String] =
allSources.map(convertCharIterator)
.reduceOption( _ ++ _)
.getOrElse(Iterator.empty[String])
此迭代器现在可以是 akka 流的源,它将计算您的计数:
val addToCounter : (Map[String, Int], String) => Map[String, Int] =
(counter, word) =>
counter.updated(word, counter.getOrElse(word, 0) + 1)
val counter : Future[Map[String, Int]] =
Source
.fromIterator( () => allWordSource)
.runFold(Map.empty[String, Int])(addToCounter)