在Yaws/Elang中使用streamcontent_from_pid进行数据流传输



我想yaws将数据流传输到我的彗星应用程序,我已经阅读并努力理解它,但yaws的例子对我来说似乎有点复杂(我是Erlang的新手)。我就是想不通。。。

这是偏航的例子(我修改了一点):

out(A) ->
%% Create a random number
{_A1, A2, A3} = now(),
random:seed(erlang:phash(node(), 1),
erlang:phash(A2, A3),
A3),
Sz = random:uniform(1),
Pid = spawn(fun() ->
%% Read random junk
S="Hello World",
P = open_port({spawn, S}, [binary,stream, eof]),
rec_loop(A#arg.clisock, P)
end),
[{header, {content_length, Sz}},
{streamcontent_from_pid, "text/html; charset=utf-8", Pid}].

rec_loop(Sock, P) ->
receive
{discard, YawsPid} ->
yaws_api:stream_process_end(Sock, YawsPid);
{ok, YawsPid} ->
rec_loop(Sock, YawsPid, P)
end,
port_close(P),
exit(normal).
rec_loop(Sock, YawsPid, P) ->
receive
{P, {data, BinData}} ->
yaws_api:stream_process_deliver(Sock, BinData),
rec_loop(Sock, YawsPid, P);
{P, eof} ->
yaws_api:stream_process_end(Sock, YawsPid)
end.

我需要的是将上面的脚本转换为可以与以下脚本组合的脚本

mysql:start_link(p1, "127.0.0.1", "root", "azzkikr", "mydb"),
{data, Results}  = mysql:fetch(p1, "SELECT*FROM messages WHERE id > " ++ LASTID),
{mysql_result, FieldNames, FieldValues, NoneA, NoneB} = Results,
parse_data(FieldValues, [], [], [], [], [])

其中parse_data(FieldValues, [], [], [], [], [])返回条目的JSON字符串。。Combined这个脚本应该不断地检查数据库中是否有新的条目,如果有,它应该像comet一样获取。

谢谢你们,愿你们都去天堂!

正如这个答案所解释的,有时您需要运行一个独立于任何传入HTTP请求的进程。对于您的案例,您可以使用发布/订阅的形式:

  • Publisher:当您的Erlang节点启动时,启动某种数据库客户端进程或一个此类进程池,执行查询并独立于Yaws运行
  • 订阅服务器:当Yaws接收到HTTP请求并将其发送到您的代码时,您的代码就会订阅发布服务器。当发布者向订阅者发送数据时,订阅者会将数据流式传输回HTTP客户端

在这里详细说明完整的解决方案是不切实际的,但一般步骤是:

  • 当数据库客户端进程启动时,它们会将自己注册到pg2组或类似的组中。使用类似poolboy的东西,而不是滚动自己的进程池,因为它们很难正确处理。每个数据库客户端都可以是运行查询、接收数据库结果以及处理订阅请求调用的gen_server的实例
  • 当您的Yaws代码接收到请求时,它会查找数据库客户端发布程序进程并对其进行订阅。订阅需要调用数据库客户端模块中的一个函数,该函数反过来使用gen_server:call/2,3与实际的gen_server发布程序进程进行通信。订阅者使用Yaws流功能(或SSE或WebSocket)来完成与HTTP客户端的连接,并向其发送任何所需的响应标头
  • 发布者存储订阅服务器的进程ID,并在订阅服务器上建立一个监视器,以便在订阅服务器意外死亡或退出时清理订阅
  • 发布者在发送给订阅服务器的消息中使用监视器的引用作为唯一ID,因此订阅函数将该引用返回给订阅服务器。订阅服务器使用引用来匹配来自发布服务器的传入消息
  • 当发布者从数据库中获得新的查询结果时,它会将数据发送给它的每个订阅者。这可以通过普通的Erlang消息来完成
  • 订阅者使用Yaws流功能(或SSE或WebSocket功能)将查询结果发送到HTTP客户端
  • 当HTTP客户端断开连接时,订阅者会调用另一个发布者函数来取消订阅

最新更新