Elixir流提供可迭代对象,但我找不到任何关于可观察对象的信息(谷歌在这里没有帮助)。如果有人能给我指出同样的资源,我将不胜感激。
你可以组合Stream和Enum来编写可观察风格的代码。下面是一个以可观察方式编写的echo服务器示例:
IO.stream(:stdio, :line)
|> Stream.map(&String.upcase/1)
|> Enum.each(&IO.write(&1))
基本上,对于您发送到标准输入的每一行,它将被转换为大写,然后打印回标准输出。这是一个简单的例子,但关键是你所需要的组成一个可观察对象已经可以通过Stream和Enum获得。
Elixir中的流是对函数组合的抽象。最后,你得到的只是一个函数,调用它将循环遍历输入流并对其进行转换。
为了构建像Twitter4j中的示例那样的有状态流(在一秒钟内缓冲新的twitter法规并将它们全部发送到一个列表中),您需要使用可以具有状态的构建块。在Elixir中,通常将状态封装在进程中。
示例可能如下所示
tweetsPerSecond =
twitterStream
|> SS.buffer({1, :second})
|> SS.map(&length(&1))
SS.subscribe(tweetsPerSecond, fn n -> IO.puts "Got #{n} tweets in the last second" end)
SS.subscribe(tweetsPerSecond, fn n -> IO.puts "Second subscriber" end)
SS
是一个新的模块,我们需要编写来实现可观察功能。核心思想(据我所知)是能够订阅流而不修改它。
为了使其工作,twitterStream
本身应该是一个发出事件供其他进程使用的进程。在这种情况下,你不能使用Stream
,因为它有"阻塞拉"语义,即你将无法中断等待流中的下一个元素后,一些固定的时间已经过去了。
要在Elixir中实现相同的功能,请查看GenEvent
模块。它提供了发出和订阅事件的能力。但是它没有类似流的界面,据我所知没有。
我已经建立了一个Pub-Sub系统的PoC,我遵循了一种"可观察模式":http://mendrugory.weebly.com/blog/pub-sub-system-in-elixir.
为了保持状态(什么进程必须被告知),我使用了Agent