我正在尝试使用multiprocessing
模块来实现一个简单的网络流量转发器。
我的应用程序监听一个端口,当它接收到一个入站连接时,它向另一个服务器发出一个TCP连接,然后在两个连接之间来回传输数据。
我一直在尝试使用nc
实用程序测试我的代码,但似乎我的应用程序进入recv_bytes()
调用和块,即使那里有数据。
这是我的转发代码的简化版本(测试完全可运行):
from multiprocessing import Process
from multiprocessing.connection import Listener, Client, wait
def start_serving(listen_port, outbound_port):
with Listener(('', listen_port)) as server:
print(f"Waiting for connections on port {listen_port}")
with server.accept() as inbound_conn:
print(f"Connection accepted from {server.last_accepted}")
outbound_conn = Client(('localhost', outbound_port))
print(f"Connected to port {outbound_port}")
readers = [inbound_conn, outbound_conn]
print(f"inbound_reader = {inbound_conn}")
print(f"outbound_reader = {outbound_conn}")
while readers:
for r in wait(readers):
try:
print(f"Calling recv_bytes with reader {r}")
data = r.recv_bytes() # This blocks even when there's data
print(f"Out of recv_bytes with reader {r}")
except EOFError:
readers.remove(r)
else:
fwd_to_conn = None
if r is inbound_conn:
fwd_to_conn = outbound_conn
print("read from inbound connection")
elif r is outbound_conn:
fwd_to_conn = outbound_conn
print("read from outbound connection")
if fwd_to_conn is not None:
print(f"Forwarding {len(bytes)} bytes")
fwd_to_conn.send_bytes(data)
forwarder = Process(target=start_serving, daemon=True, args=(19001, 19002))
forwarder.start()
forwarder.join()
我运行这个脚本,并在一个单独的控制台运行:
$ echo "Hi from outbound" | nc -l -p 19002
和
$ echo "Hi from inbound" | nc localhost 19001
这是我得到的输出:
Waiting for connections on port 19001
Connection accepted from ('127.0.0.1', 56874)
Connected to port 19002
inbound_reader = <multiprocessing.connection.Connection object at 0x7ff3730e2850>
outbound_reader = <multiprocessing.connection.Connection object at 0x7ff3730e2a60>
Calling recv_bytes with reader <multiprocessing.connection.Connection object at 0x7ff3730e2850>
正如您所看到的,应用程序被阻塞在入站连接的recv_bytes()
调用中,即使那里有数据。
似乎是一个非常简单的应用程序,所以我希望这里有一个明显的解决方案。
编辑
深入multiprocessing.connection.Connection
代码,似乎当使用recv_bytes()
时,代码期望消息的大小在前4个字节。但是,我不希望它假设任何类型的格式,只是读取尽可能多的数据,而不尝试解释它。
谢谢!
看起来你并不是真的想要或需要mp.Pipe
或mp.connection.Connection
对象的功能,所以在这里跳过它是有意义的,只使用socket.socket
,也许socketserver
(这可以用socket
来完成,但有一个很好的功能基础预先为你写的)。
下面是一个使用socketserver
侦听的示例(从文档中复制粘贴了相当数量的内容),并在连接处理程序中创建到另一个端口的新连接以转发数据。
import socketserver
import socket
class MyTCPServer(socketserver.TCPServer):
def __init__(self, *args, outbound_port=None, **kwargs):
self.outbound_port = outbound_port
super().__init__(*args, **kwargs)
class MyTCPHandler(socketserver.BaseRequestHandler):
def handle(self):
# self.request is the TCP socket connected to the client
self.data = self.request.recv(1024).strip()
print("{} wrote:".format(self.client_address[0]))
print(self.data)
# just send back the same data, but upper-cased
self.request.sendall(self.data.upper())
#also forward the data to a second server (nc)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
# Connect to server and send data
sock.connect(("localhost", self.server.outbound_port))
sock.sendall(self.data.upper() + b"n")
# Receive data from the server and shut down
received = sock.recv(1024)
print(received)
if __name__ == "__main__":
HOST, PORT = "localhost", 9999
# Create the server, binding to localhost on port 9999
with MyTCPServer((HOST, PORT), MyTCPHandler, outbound_port=9998) as server:
# Activate the server; this will keep running until you
# interrupt the program with Ctrl-C
server.serve_forever()
使用相同的nc
命令(不同的端口)应该给出您期望的输出,我相信。
如果你做实际上,我想要发送比原始字节更多的结构化数据的功能,这里有一个类似布局的例子,但使用mp.connection
:
import multiprocessing as mp
from multiprocessing.connection import Listener, Client
import sys
def server_A():
listen_port, outbound_port = 9999, 9998
with Listener(('', listen_port)) as listener:
while True: #serve forever (until keyboardinterrupt) right in main...
try:
conn1 = listener.accept()
except mp.AuthenticationError:
print("auth error")
except KeyboardInterrupt:
break
obj = conn1.recv() #get from client
forward = ("object recieved: server A", obj)
with Client(('', outbound_port)) as conn2:
conn2.send(forward) #forward to B
response = conn2.recv() #recv from B
conn1.send(response) #send response from B back to Client
print(conn1.recv_bytes())
print("but sending bytes can have an overhead benefit for large blocks of data (arrays for example)")
conn1.close() #because we're not using contextmanager for conn1
def server_B():
listen_port = 9998
with Listener(('', listen_port)) as listener:
while True: #serve forever (until keyboardinterrupt) right in main...
try:
conn = listener.accept()
except mp.AuthenticationError:
print("auth error")
except KeyboardInterrupt:
break
obj = conn.recv()
response = ("object recieved: server B", obj)
conn.send(response)
conn.close() #because we're not using contextmanager
def client():
with Client(('', 9999)) as conn:
conn.send({"some":["data"]}) #connections are useful for sending more structured data than bytes.
print(conn.recv())
conn.send_bytes(b"binary data isn't sent totally raw because there's a "protocol" for communication")
if __name__ == "__main__":
if "A" in sys.argv[1]:
server_A()
elif "B" in sys.argv[1]:
server_B()
elif "C" in sys.argv[1]:
client()
*注意:这两个例子都是在ubuntu 20.04 python 3.8.5上测试的