我正在使用python3学习分布式编程
有两个python文件,一个名称是main.py,它分发信息,另一个操作数据,名称为worker.py。
当我在一台计算机中运行这两个文件[设置服务器地址= 127.0.0.1,port = 5000]
时,一切顺利但是,当我在单独的计算机中运行这两个文件时,它们无法彼此连接,并且timeouterror被包含在内。
我不知道为什么。一台计算机在我的家中是win10,另一台是我所面临的linux云服务器。
代码在一台计算机中起作用。但是,当我在linux中运行main.py,然后运行worker.py {将服务器更改为linux的IP地址},然后在Win10中,Worker.py遇到了timeouterror
我对Linux一无所知,我需要打开或关闭一些安全设置吗?
"""main.py"""
import queue
from multiprocessing.managers import BaseManager
import datetime
import time
TASK_QUEUE = queue.Queue()
RESULT_QUEUE = queue.Queue()
def get_task_queue():
"""set TASK_QUEUE as a function"""
global TASK_QUEUE
return TASK_QUEUE
def receive_result_queue():
"""set RESULT_QUEUE as a function"""
global RESULT_QUEUE
return RESULT_QUEUE
class QueueManager(BaseManager):
"""inherit BaseManager from multiprocessing.managers"""
pass
if __name__ == '__main__':
QueueManager.register('distribute_task_queue', callable=get_task_queue)
QueueManager.register('receive_result_queue', callable=receive_result_queue)
# bind port 5000, set verification code = 'abc'
MANAGER = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')
# start manager
MANAGER.start()
TASK = MANAGER.distribute_task_queue()
RESULT = MANAGER.receive_result_queue()
# put each line into manager`enter code here`
with open("C:/Users/dayia/Desktop/log.20170817") as f:
for line in f:
TASK.put(line)
# try receive result
while 1:
try:
r = RESULT.get(timeout=1)
if r[0] == r[1] and r[0] == "done":
break
else:
print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),"line %s's length is %s" % (r[0], r[1]))
except queue.Empty:
print('result queue is empty.')
# """worker.py"""
import datetime
from multiprocessing.managers import BaseManager
import queue
import time
class QueueManager(BaseManager):
"""inherit BaseManager from multiprocessing.managers"""
pass
QueueManager.register('distribute_task_queue')
QueueManager.register('receive_result_queue')
server_addr = '127.0.0.1'
print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'Connect to server %s...' % server_addr)
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
m.connect()
TASK = m.distribute_task_queue()
RESULT = m.receive_result_queue()
def parse_line(line):
return len(line)
C = 0
while not TASK.empty():
try:
n = TASK.get(timeout=1)
r = parse_line(n)
print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'running line %s, length is %s' % (C+1, r))
C += 1
RESULT.put([r, C])
except queue.Empty:
print('task queue is empty.')
RESULT.put(["done", "done"])
enter code here
print('worker exit')
地址127.0.0.1
非常具体地指代码正在运行的同一计算机(用网络术语:127.0.0.1
是localhost
的IP地址)。