如何在Python中使用套接字创建通道



我已经开始使用Python几次了,现在,我正在创建一个套接字服务器。我已经让服务器运行多个线程和多个客户端(万岁!)但我正在寻找的功能,我不能调用(我甚至不知道它是否存在),我想创建一种通道,客户端可以发送不同类型的消息。

一个例子,我创建了一个通道INFO,如果服务器接收到这种类型的套接字,它只是打印

我创建了另一个通道DEBUG,在那里我可以发送自定义命令,服务器将执行

在非编程语言中,它会这样做:

def socketDebug(command):
     run command
def socketInfo(input):
     print input
if socket == socketDebug:
     socketDebug(socket.rcv)
else:
   if socket == socketInfo:
     socketInfo(socket.rcv)

这是一个非常简单的Channel类实现。它创建一个套接字来接受来自客户端的连接和发送消息。它本身也是一个客户,接收来自其他Channel实例的消息(例如,在单独的进程中)。

通信是在两个线程中完成的,这是非常糟糕的(我会使用async io)。当接收到消息后,它调用接收线程中的注册函数可能导致一些线程问题。

每个Channel实例都创建自己的套接字,但它更易于扩展通过单个实例复用通道"topics"

一些现有的库提供了"通道"功能,如nanomsg。

这里的代码是用于教育目的,如果它能帮助…

import socket
import threading
class ChannelThread(threading.Thread):
  def __init__(self):
    threading.Thread.__init__(self)
    self.clients = []
    self.chan_sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    self.chan_sock.bind(('',0))  
    _, self.port = self.chan_sock.getsockname()
    self.chan_sock.listen(5)
    self.daemon=True
    self.start()
  def run(self):
    while True:
      new_client = self.chan_sock.accept()
      if not new_client:
        break
      self.clients.append(new_client)
  def sendall(self, msg):
    for client in self.clients:
      client[0].sendall(msg)
class Channel(threading.Thread):
  def __init__(self):
    threading.Thread.__init__(self)
    self.daemon = True
    self.channel_thread = ChannelThread()
  def public_address(self):
    return "tcp://%s:%d" % (socket.gethostname(), self.channel_thread.port)
  def register(self, channel_address, update_callback):
    host, s_port = channel_address.split("//")[-1].split(":")
    port = int(s_port)
    self.peer_chan_sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)   
    self.peer_chan_sock.connect((host, port))
    self._callback = update_callback
    self.start()
  def deal_with_message(self, msg):
    self._callback(msg)
  def run(self):
    data = ""
    while True:
      new_data = self.peer_chan_sock.recv(1024)
      if not new_data:
        # connection reset by peer
        break
      data += new_data
      msgs = data.split("nn")
      if msgs[-1]:
        data = msgs.pop()
      for msg in msgs:
        self.deal_with_message(msg)
  def send_value(self, channel_value):
    self.channel_thread.sendall("%snn" % channel_value)
<标题>用法:

进程A:

c = Channel()
c.public_address()

进程B:

def msg_received(msg):
  print "received:", msg
c = Channel()
c.register("public_address_string_returned_in_process_A", msg_received)

进程A:

c.send_value("HELLO")

进程B:

received: HELLO

相关内容

  • 没有找到相关文章

最新更新