如何设置 Phoenix PubSub 订阅者回调



我有一个相当简单的要求,围绕凤凰城构建的 2 个服务(目前):

ServiceA负责注册用户。注册用户后,ServiceA广播一条消息,其中包含有关新创建用户的信息。目前正在控制器操作中使用以下代码完成此操作:

ServiceA.Endpoint.broadcast("activity:all", "new:user", %{email: "test@test.com"})

ServiceB负责监听所有这些活动广播并用它们做一些事情(本质上是建立活动源)。

我遇到了一个绊脚石,因为我可以看到ServiceA将消息广播到 Redis(使用 Phoenix.PubSub.Redis),但不完全了解如何让订阅者ServiceB处理它......

以下代码段是我设法获得的,它在广播消息然后引发异常时执行某些操作

部分订户模块

defmodule ServiceB.UserSubscriber do
  def start_link do
    sub = spawn_link &(process_feed/0)
    ServiceB.Endpoint.subscribe(:user_pubsub, "activity:all")
    {:ok, sub}
  end
  def process_feed do
    receive do
      params ->
        IO.inspect "processing goes here..."
    end
    process_feed
  end
end

例外

[error] GenServer :user_pubsub terminating
** (FunctionClauseError) no function clause matching in Phoenix.PubSub.RedisServer.handle_info/2

我猜我在某个地方错过了一大堆GenServer工作,但似乎在网上找不到任何表明在哪里的东西。

问题(正如预期的那样)是我的订阅者模块没有作为GenServer实现,但我试图复制相同的功能(而且很糟糕!按如下方式更新我的订阅者模型已经完成了技巧:

defmodule SubscriberService.ActivitySubscriber do
  use GenServer
  def start_link(channel) do
    GenServer.start_link(__MODULE__, channel)
  end
  def init(channel) do
    pid = self
    ref = SubscriberService.Endpoint.subscribe(pid, channel)
    {:ok, {pid, channel, ref}}
  end
  def handle_info(%{event: "new:user"} = message, state) do
    IO.inspect "#######################"
    IO.inspect "New User - Received Message:"
    IO.inspect message
    IO.inspect "#######################"
    {:noreply, state}
  end
  def handle_info(message, state) do
    IO.inspect "#######################"
    IO.inspect "Catch All - Received Message:"
    IO.inspect message
    IO.inspect "#######################"
    {:noreply, state}
  end
end

如您所见,init/1触发订阅,handle_info/2函数接收传入的消息。

如果您想了解它的所有荣耀(发布服务器和订阅服务器服务)的工作方式,请查看存储库。

最新更新