为Python脚本创建共享消息流的最佳方法是什么?



我想做的事情:我需要一个简单的消息流,因此有些脚本可以将结果发送到此处,而另一个脚本可以取结果并不同步地进行一些工作。

主要问题:我想看看发生了什么,因此,如果某件事破裂 - 我可以快速修复它。我尝试使用芹菜 兔子(可以看到带有args的工人,使用花,但安排太复杂)和多处理。


我所做的事情:我尝试使用MongoDB限制收藏品并运行多个过程的Popen进行反应。一些脚本将SMTH写入集合,下面的脚本会监视它,如果满足某些条件 - 运行另一个脚本。

主要问题: subprocess.popen()从内部多处理.process()看起来不自然(仍然可以做工作),所以我试图找到更好的或//和//和/和/更稳定的解决方案:)


听众脚本:

from pymongo import MongoClient, CursorType
from time import sleep
from datetime import datetime
from multiprocessing import Process
import subprocess
def worker_email(keyword):
     subprocess.Popen(["python", "worker_email.py", str(keyword)])
def worker_checker(keyword):
     subprocess.Popen(["python", "worker_checker.py", str(keyword)])
if __name__ == '__main__':
    #DB connect
    client = MongoClient('mongodb://localhost:27017/')
    db = client.admetric
    coll = db.my_collection
    cursor = coll.find(cursor_type = CursorType.TAILABLE_AWAIT)
    #Script start UTC time
    utc_run = datetime.utcnow()
    while cursor.alive:
        try:
            doc = cursor.next()
            #Print doc name/args to see in command line, while Listener runs
            print(doc)
            #Filter docs without 'created' data
            if 'created' in doc.keys():
                #Ignore docs older than script
                if doc['created'] > utc_run:
                    #Filter docs without 'type' data
                    if 'type' in doc.keys():
                        #Check type
                        if doc['type'] == 'send_email':
                            #Create process and run external script
                            p = Process(target=worker_email, args=(doc['message'],))
                            p.start()
                            p.join()
                        #Check type
                        elif doc['type'] == 'check_data':
                            #Create process and run external script
                            p = Process(target=worker_checker, args=(doc['message'],))
                            p.start()
                            p.join()
        except StopIteration:
            sleep(1)

只要您控制了 worker_emailworker_checker逻辑,就无需在单独的解释器中执行。

只是在两个模块中公开一个入口点,然后通过multiprocessing.Process运行它们。

worker_email.py

def email_job(message):
    # start processing the message here

worker_checker.py

def check_job(message):
    # start checking the message here

listerer_script.py

# you are not going to pollute the listener namespace
# as the only names you import are the entry points of the scripts
# therefore, encapsulation is preserved
from worker_email import email_job
from worker_checker import check_job
email_process = Process(target=email_job, args=[message])
check_process = Process(target=check_job, args=[message])

如果您不能从工人模块中曝光一个入口点,则只需运行subprocess.Popen即可。您将它们包装在Process中没有好处。

相关内容

  • 没有找到相关文章

最新更新