如何在运行后台程序时不断检查输入线程



我有一个小的服务器和客户端Python脚本,其中客户端发送一个字符串,服务器用相反的响应。当客户端输入退出字符串时,客户端退出,然后服务器退出。

我希望服务器的"接收,反向,发送"过程在后台运行,同时程序不断检查stdin是否有退出字符串。

我试过使用threading,但由于阻塞,许多套接字调用导致它不能正常工作。

这样你就可以知道我已经做了什么。

server.py :

import socket
from time import sleep
sock = socket.socket()
sock.bind(("127.0.0.1",12346))
sock.listen(3)
print "Waiting on connection"
conn = sock.accept() 
print "Client connected"
while True:
    m = conn[0].recv(4096)
    if m == "exit":
        sleep(1)
        break
    else:
        conn[0].send(m[::-1])
sock.shutdown(socket.SHUT_RDWR)
sock.close()

client.py :

import socket
sock = socket.socket()
sock.connect(("127.0.0.1",12346))
while True:
    s = raw_input("message: ")
    sock.send(s)
    if s == "exit":
        print "Quitting"
        break
    print sock.recv(4096)
sock.shutdown(socket.SHUT_RDWR)
sock.close()

由于您希望服务器进程能够处理客户端,同时接收来自服务器stdin的输入,因此您可以将整个当前服务器代码放在Thread中,然后等待来自stdin的输入。

import socket
from time import sleep
import threading
def process():
    sock = socket.socket()
    sock.bind(("127.0.0.1",12346))
    sock.listen(3)
    print "Waiting on connection"
    conn = sock.accept()
    print "Client connected"
    while True:
        m = conn[0].recv(4096)
        conn[0].send(m[::-1])
    sock.shutdown(socket.SHUT_RDWR)
    sock.close()
thread = threading.Thread(target=process)
thread.daemon = True
thread.start()
while True:
    exit_signal = raw_input('Type "exit" anytime to stop servern')
    if exit_signal == 'exit':
        break

,你可以删除"exit" checkin client

在这段代码中,服务器将在客户端断开连接后不做任何事情,但是,它将等待在stdin中键入"exit"。您可能希望扩展代码以使服务器能够接受新的客户机,因为您不希望客户机具有关闭服务器的能力。在这种情况下,您可以在conn = sock.accept()sock.close()之间放置另一个while循环。

正如@usmcs所建议的,如果您没有任何其他命令要发送到服务器,那么使用CTRL-C (KeyboardInterrupt)代替会更好,因此您不需要线程,而它仍然可以优雅地结束服务器(意味着由于CTRL-C报告没有错误),使用以下代码:

import socket
from time import sleep
import threading
sock = socket.socket()
sock.bind(("127.0.0.1",12346))
sock.listen(3)
print "Waiting on connection"
conn = sock.accept()
print "Client connected"
while True:
    try:
        m = conn[0].recv(4096)
        conn[0].send(m[::-1])
    except KeyboardInterrupt:
        break
sock.close()

这是一个非阻塞套接字接收的例子。如果没有数据接收,套接字将抛出异常。

import sys
import socket
import fcntl, os
import errno
from time import sleep
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(('127.0.0.1',9999))
fcntl.fcntl(s, fcntl.F_SETFL, os.O_NONBLOCK)
while True:
    try:
        msg = s.recv(4096)
    except socket.error, e:
        err = e.args[0]
        if err == errno.EAGAIN or err == errno.EWOULDBLOCK:
            sleep(1)
            print 'No data available'
            continue
        else:
            # a "real" error occurred
            print e
            sys.exit(1)
    else:
        # got a message, do something :)

下面是一个非阻塞的stdin read:

import sys
import select
# If there's input ready, do something, else do something
# else. Note timeout is zero so select won't block at all.
while sys.stdin in select.select([sys.stdin], [], [], 0)[0]:
  line = sys.stdin.readline()
  if line:
    something(line)
  else: # an empty line means stdin has been closed
    print('eof')
    exit(0)
else:
  something_else()

基本上,您希望将它们组合在一起,并且可以添加一些超时来强制在多个连接的情况下定期读取stdin。

我参考了之前发表的用Python构建预分叉JSON-RPC服务器的要点,并修改了代码来解决这个问题。要点:https://gist.github.com/matthewstory/4547282

$ python server.py localhost 9999 5
exit
$

更多地了解为什么这个工作。主分支产生N许多分支(在上面的5示例中),每个分支都进入一个accept循环:

# simple pre-fork server, fork before accept
for i in range(int(argv[2])):
    # fork our current process
    pid = os.fork()
    # if we are the child fork ...
    if 0 == pid:
        # die without unhandled exception
        for signum in ( signal.SIGINT, signal.SIGTERM, ):
            signal.signal(signum, _gogentle)
        # under the hood, this calls `socket.accept`
        s.serve_forever()
        os._exit(0)
    # if we are the papa fork
    else:
        _PIDS.append(pid)

这些子分支将处理任何传入localhost:9999的请求。然后主分支进入一个select/waitpid组合循环:

# setup signal relaying for INT and TERM
for signum in ( signal.SIGINT, signal.SIGTERM, ):
    signal.signal(signum, _kronos)
# wait on the kids
while len(_PIDS):
    # 1s timeout here means we're checking for exiting children at most
    # 1x per second, prevents a busy loop
    reads, _, _ = select.select([sys.stdin], [], [], 1)
    if sys.stdin in reads:
        # blocking, read 1 line
        cmd = sys.stdin.readline()
        # kill ourselves ... kronos will propegate
        if cmd.strip() == 'exit':
            os.kill(os.getpid(), signal.SIGTERM)
    # check for exited children, non-blocking
    while True:
        pid, rc = os.waitpid(-1, os.WNOHANG)
        if not pid:
            break
        _PIDS.remove(pid)

select将指示stdin已准备好读取,在这种情况下,我们将从stdin中读取1行,或者它将在最多1s之后超时,在这种情况下,它将直接通过检查任何退出的子节点(使用os.waitpidWNOHANG标志)。

相关内容

  • 没有找到相关文章

最新更新