Pyzmq-将消息发送到STREAM套接字



我试图在pyzmq中实现两个STREAM套接字之间连接的简单示例。

发件人.py

import zmq
context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.bind("tcp://*:5555")
socket.connect("tcp://localhost:5556")
socket.send("message")

接收器.py

import zmq
context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.bind("tcp://*:5556")
message = socket.recv()
print("Received -> [ %s ]" % (message))

输出

Received [ b'x00kx8bEg' ]
Received [ b'' ]

我想问一下在STREAM套接字之间发送消息的正确方式是什么。

您的socket.recv()ed数据与ZeroMQ规范完全匹配,尽管它们不一定会让您感到高兴,并且您怀疑为什么会得到这些数据,而不是很好地提供所发送消息的确切副本。

所以,要有耐心,继续阅读。

ZeroMQ最近添加的STREAM套接字原型非常具体

任何有几年ZeroMQ信令/消息传递工具经验的人都会告诉你,最近(v4.x)添加的STREAM原型并不是ZeroMQ进程与ZeroMQ进程互通需求的最佳选择。

为什么?ZeroMQ工具拥有并且必须是STREAM中的快捷方式,以便允许ZeroMQ套接字访问点能够与相反的套接字端点进程"对话",而该进程对ZeroMQ智能套接字高级协议一无所知。

本机模式

本机模式用于与TCP对等端通信,并允许在任一方向上进行异步请求和回复。ZMQ_STREAM

当使用tcp://传输时,类型为ZMQ_STREAM的套接字用于发送和接收来自非websphere MQ对等端的TCP数据。ZMQ_STREAM套接字可以充当客户端和/或服务器,异步发送和/或接收TCP数据。

当接收TCP数据时,ZMQ_STREAM套接字应在将消息传递给应用程序之前,将包含始发对等体身份的消息部分预先发送给消息。接收到的消息在所有连接的对等端之间公平排队。

发送TCP数据时,ZMQ_STREAM套接字应删除消息的第一部分,并使用它来确定消息应路由到的对等方的身份,不可更改的消息应导致EHOSTUNREACHEAGAIN错误。

若要打开与服务器的连接,请使用zmq_connect()调用,然后使用ZMQ_IDENTITYzmq_getsockopt()呼叫获取套接字标识。

要关闭特定连接,请发送标识帧,然后发送一条长度为零的消息(请参阅示例部分)。

当建立连接时,应用程序将收到一条长度为零的消息。类似地,当对等方断开连接(或连接丢失)时,应用程序将接收到一条长度为零的消息。

您必须先发送一个标识帧,然后再发送一个数据帧。ZMQ_SNDMORE标志对于标识帧是必需的,但对于数据帧则被忽略。

示例

void    *ctx = zmq_ctx_new ();
assert ( ctx );
/*                                             Create ZMQ_STREAM socket */
void    *socket = zmq_socket ( ctx, ZMQ_STREAM );
assert ( socket );
int      rc = zmq_bind ( socket, "tcp://*:8080" );
assert ( rc == 0 );
/*                                            Data structure to hold the ZMQ_STREAM ID */
uint8_t id [256];
size_t  id_size = 256;
/*                                            Data structure to hold the ZMQ_STREAM received data */
uint8_t raw [256];
size_t  raw_size = 256;
while ( 1 ) {
/*                                         Get HTTP request; ID frame and then request */
id_size  = zmq_recv ( socket, id, 256, 0 );
assert ( id_size >  0 );
do {
raw_size  = zmq_recv ( socket, raw, 256, 0 );
assert ( raw_size >= 0 );
} while (     raw_size == 256 );
/*                                         Prepares the response */
char http_response [] =
"HTTP/1.0 200 OKrn"
"Content-Type: text/plainrn"
"rn"
"Hello, World!";
/*                                         Sends the ID frame followed by the response */
zmq_send ( socket, id, id_size, ZMQ_SNDMORE );
zmq_send ( socket, http_response, strlen ( http_response ), 0 );
/*                                         Closes the connection by sending the ID frame followed by a zero response */
zmq_send ( socket, id, id_size, ZMQ_SNDMORE );
zmq_send ( socket, 0, 0, 0 );
}
zmq_close ( socket );
zmq_ctx_destroy ( ctx );

如果您在多连接套接字情况下遵循STREAM行为的描述,则发送方将碰巧在socket实例上接收到公平队列循环读取,该实例连接(1x通过.connect()+Nx通过.bind()N = < 0, +INF ))到多个端点,到目前为止,对通信对等体的计数或/和性质没有任何控制,但在CCD_ 22-s上具有公平排队循环机制。绝对不是一个安全的设计实践。

Summary of ZMQ_STREAM characteristics
Compatible peer sockets     none
Direction                   Bidirectional
Send/receive pattern        Unrestricted
Outgoing routing strategy   See text ( above )
Incoming routing strategy   Fair-queued
Action in mute state        EAGAIN

下面是使用问题中的单向连接pyzmq的简化示例。

发件人.py

import zmq
context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.connect('tcp://localhost:5555')
id_sock = socket.getsockopt(zmq.IDENTITY)
socket.send(id_sock, zmq.SNDMORE)
socket.send(b'message')

接收器.py

import zmq
context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.bind('tcp://*:5555')
id_sock = socket.recv()
assert not socket.recv()    # empty data here
assert socket.recv() == id_sock
message = socket.recv()
print('received:', message)

最新更新