我有一个相当简单的要求,围绕凤凰城构建的 2 个服务(目前):
ServiceA
负责注册用户。注册用户后,ServiceA
广播一条消息,其中包含有关新创建用户的信息。目前正在控制器操作中使用以下代码完成此操作:
ServiceA.Endpoint.broadcast("activity:all", "new:user", %{email: "test@test.com"})
ServiceB
负责监听所有这些活动广播并用它们做一些事情(本质上是建立活动源)。
我遇到了一个绊脚石,因为我可以看到ServiceA
将消息广播到 Redis(使用 Phoenix.PubSub.Redis
),但不完全了解如何让订阅者ServiceB
处理它......
以下代码段是我设法获得的,它在广播消息然后引发异常时执行某些操作。
部分订户模块
defmodule ServiceB.UserSubscriber do
def start_link do
sub = spawn_link &(process_feed/0)
ServiceB.Endpoint.subscribe(:user_pubsub, "activity:all")
{:ok, sub}
end
def process_feed do
receive do
params ->
IO.inspect "processing goes here..."
end
process_feed
end
end
例外
[error] GenServer :user_pubsub terminating
** (FunctionClauseError) no function clause matching in Phoenix.PubSub.RedisServer.handle_info/2
我猜我在某个地方错过了一大堆GenServer
工作,但似乎在网上找不到任何表明在哪里的东西。
问题(正如预期的那样)是我的订阅者模块没有作为GenServer实现,但我试图复制相同的功能(而且很糟糕!按如下方式更新我的订阅者模型已经完成了技巧:
defmodule SubscriberService.ActivitySubscriber do
use GenServer
def start_link(channel) do
GenServer.start_link(__MODULE__, channel)
end
def init(channel) do
pid = self
ref = SubscriberService.Endpoint.subscribe(pid, channel)
{:ok, {pid, channel, ref}}
end
def handle_info(%{event: "new:user"} = message, state) do
IO.inspect "#######################"
IO.inspect "New User - Received Message:"
IO.inspect message
IO.inspect "#######################"
{:noreply, state}
end
def handle_info(message, state) do
IO.inspect "#######################"
IO.inspect "Catch All - Received Message:"
IO.inspect message
IO.inspect "#######################"
{:noreply, state}
end
end
如您所见,init/1
触发订阅,handle_info/2
函数接收传入的消息。
如果您想了解它的所有荣耀(发布服务器和订阅服务器服务)的工作方式,请查看存储库。