对ZeroMQ来说相当新。我有一个简单的 REQ/REP 队列,如下所示。我正在使用PHP,但这并不重要,因为任何语言绑定对我来说都很好。 这是客户端请求任务
$ctx = new ZMQContext();
$req = new ZMQSocket($ctx, ZMQ::SOCKET_REQ);
$req->connect('tcp://localhost:5454');
$req->send("Export Data as Zip");
echo $i . ":" . $req->recv().PHP_EOL;
这是一个实际执行任务的工人。
$ctx = new ZMQContext();
$srvr = new ZMQSocket($ctx, ZMQ::SOCKET_REP);
$srvr->bind("tcp://*:5454");
echo "Server is started at port $port" . PHP_EOL;
while(true)
{
$msg = $srvr->recv();
echo "Message = " . $msg . PHP_EOL;
// Do the work here, takes 10 min, knows the count of lines added and remaining
$srvr->send($msg . " is exported as zip file" . date('H:i:s'));
}
由于导出数据的任务大约需要 10 分钟,因此我想从其他客户端连接到服务器并完成任务的进度/百分比。 我想知道这是否是一种有效的方法。
我尝试了这种方法,其中 REQ/REP 部分有效,但我在 PUB/SUB 部分中一无所获
服务器部分
$ctx = new ZMQContext();
$srvr = new ZMQSocket($ctx, ZMQ::SOCKET_REP);
$srvr->bind("tcp://*:5454");
// add PUB socket to publish progress
$c = new ZMQContext();
$p = new ZMQSocket($c, ZMQ::SOCKET_PUB);
$p->bind("tcp://*:5460");
echo "Server is started at port 5454" . PHP_EOL;
$prog = 0;
while(true)
{
$p->send($prog++ . '%'); // this part doesn't get to the progress client
$msg = $srvr->recv();
echo "Message = " . $msg . PHP_EOL;
sleep(2);// some long task
$srvr->send($msg . " Done zipping " . date('H:i:s'));
}
进度客户端
$ctx = new ZMQContext();
$stat = new ZMQSocket($ctx, ZMQ::SOCKET_SUB);
$stat->connect('tcp://localhost:5460');
while (true){
echo $stat->recv() . PHP_EOL; //nothing shows here
}
请求客户端
$ctx = new ZMQContext();
$req = new ZMQSocket($ctx, ZMQ::SOCKET_REQ);
$req->connect('tcp://localhost:5454');
for($i=0;$i<100;$i++){
$req->send("$i : Zip the file please");
echo $i . ":" . $req->recv().PHP_EOL; //works and get the output
}
这个概念是可行的,需要进行一些调整:
所有 PUB 交易对手都必须设置任何非默认订阅,通过,至少是一个空订阅.setsockopt( ZMQ_SUBSCRIBE, "" )
这意味着接收所有主题(没有"过滤"(。
接下来,应该.setsockopt( ZMQ_CONFLATE, 1 )
配置PUB端和SUB端,因为一旦唯一的值在"最后一个",最新的消息中,就没有价值来填充所有中间值并将其馈送到en-queue/de-queue管道中。
始终,应该首选 ZeroMQ 调用的非阻塞模式(.recv( ..., flags = ZMQ_NOBLOCK )
等人(,或者应该使用Poller.poll()
预测试来首先嗅探消息的(不存在(,然后再花费更多精力从 ZeroMQ 上下文管理器读取其上下文。简而言之,在生产级系统中,阻塞模式服务调用可能很好地发挥作用的情况并不多。
此外,一些进一步的调整可能有助于PUB方面,以防更大规模的"攻击"来自不受限制的SUB方实体池,并且PUB必须为每个(不受限制的(交易对手创建/管理/维护资源。
只有当有多个客户端想要接收相同的进度更新时,才需要使用 PUB/SUB。只需使用 PUSH/PULL 即可通过 tcp 进行简单的点对点传输。
哲学讨论
对于要解决的问题,有两种方法。
- 使用其他套接字传达其他消息类型,
- 仅使用两个套接字,但通过它们传达多种消息类型
你说的是做 1(。可能值得考虑 2(,尽管我必须强调我对 PHP 几乎一无所知,所以不知道是否有语言功能鼓励人们拥有单独的请求和进度客户端。
如果这样做,原始客户端需要一个循环(在发送请求后(来接收多条消息,无论是进度更新消息还是最终结果。您的服务器在执行 10 分钟查找时,将定期发送进度更新消息,并在最后发送最终结果消息。您可能会使用PUSH/PULL客户端到服务器,并且对于从服务器返回到客户端的进度/结果也是如此。
遵循 2( 在体系结构上更灵活。一旦您有通过单个套接字发送两种或多种消息类型并在接收端解码它们的方法,您就可以发送更多消息。例如,您可以决定将"取消"消息从客户端添加到服务器,或将部分结果消息从服务器添加回客户端。这比仅仅因为要在客户机和服务器之间添加另一个消息流而继续向体系结构添加更多套接字要容易得多。同样,我对PHP的了解还不够多,无法说这绝对是使用该语言的正确方法。这在C,C++中当然很有意义。
我发现像谷歌协议缓冲区(我更喜欢ASN.1(这样的东西对于这种事情非常有用。这些允许您定义要发送的消息类型,并且(至少使用 GPB(将它们组合在一个"一个"中(在 ASN.1 中使用标记来区分不同的消息(。GPB 和 ASN.1 很方便,因为这样您就可以在系统中使用不同的语言、操作系统和平台,而不必担心它正在发送什么。作为二进制(而不是文本(,它们在网络连接中效率更高。