GenStage:如何处理制作人无法提供活动的情况



以下场景:GenStage生产者处理Twitter流(使用Stream API和ExTwitter),并向GenStage消费者提供一组推文(最大为消费者要求的需求)。然后消费者只需打印它们。

以下问题:我正在寻找特定的推文,所以并不总是有新的推文可用。如果GenStage制作人返回了一个空的事件列表,消费者将停止询问。请参阅本期,JoséValims回复了解更多信息。

我不知道如何解决这个问题。非常感谢您的帮助。这就是我目前所拥有的:

defmodule MyApp.TwitterProducer do
use GenStage
alias MyApp.TwitterStream
def start_link(:ok) do
GenStage.start_link(__MODULE__, :ok)
end
def init(:ok) do
# This creates a regular Elixir Stream
# I use this as the state so that not every
# time the consumer asks for new data
# a new stream is initiated
stream = TwitterStream.get_stream
{:producer, stream}
end
def handle_demand(demand, stream) do
# Take tweets from the stream and 
# turn them into a list. Then return 
# them to the consumer
chunk = Stream.take(stream, demand)
events = Enum.to_list(chunk)
{:noreply, events, stream}
end

def handle_info(_msg, state) do
# I as getting an "wrong message" error 
# before I implemented this function myself
# It does nothing special to my case
{:noreply, [], state}
end
end
defmodule MyApp.TwitterConsumer do
use GenStage
def start_link() do
GenStage.start_link(__MODULE__, :ok)
end
def init(:ok) do
{:consumer, :the_state_does_not_matter}
end
def handle_events(events, _from, state) do
Process.sleep(3000)
IO.inspect(events)
# We are a consumer, so we would never emit items.
{:noreply, [], state}
end
end
# Let's fire this thing up
{:ok, p} = GenStage.start_link(MyApp.TwitterProducer, :ok, name: MyApp.TwitterProducer)
{:ok, c} = GenStage.start_link(MyApp.TwitterConsumer, :ok, name: MyApp.TwitterConsumer)
GenStage.sync_subscribe(c, to: p, max_demand: 3)

实际情况是:这种情况会持续一段时间,然后停止。据我所知,只要制片人返回了一个空的事件列表。

编辑:有趣的是:如果我将需求设置为1,它就会继续运行。但它比直接查询TwitterStream API要慢得多。这意味着我收到的推文减少了十倍。我的理论是,这是由于重复的Stream.take调用,而不是仅对整个流调用Enum.to_list。但我还是觉得很困惑。你知道我缺少什么吗?

关于GenStage.handle_demand/2:的文档中有一句重要的话(但遗憾的是没有用粗体表示)

生产者必须存储需求或返回请求的事件

也就是说,与其在Stream.take上进行阻塞,不如明确地意识到任务可能正在进行阻塞并处理该情况,在这种情况下使用具有合理超时的Task.await/2收集需求(也许Task.yield/2可以用于更复杂的检查,但在这里它似乎是一种过度处理。)

来自文件:

如果您不希望任务失败,那么您必须以与没有异步调用时相同的方式更改heavy_fun/0代码。例如,返回{:ok, val} | :error结果,或者在更极端的情况下,使用try/rescue

不过,文档中缺少示例。OTOH,在这里,只返回空列表并忘记收集需求可能会更容易:

def handle_demand(demand, stream) do
try do
task = Task.async(fn ->
stream
|> Stream.take(demand)
|> Enum.to_list()
end)
Task.await(task, 1000) # one sec
catch
:exit, {:timeout, {Task, :await, [_, 1000]}} ->
{:noreply, [], stream}
else
events when is_list(events) ->
{:noreply, events, stream}
end
end

相关内容

最新更新