Celery调用不同的函数并继续链接过程



我在一个链中有三个任务,分别是fetch_page、check_source和存储页面

def update_page_info(**headers):
    chain=fetch_page.s(headers['key']) | check_source.s(headers['key_1']) | store_info.s()
    chain().apply_async()

fetch_page获取页面并收集需要收集的内容:

@app.task(bind=True)
def fetch_page(self,url):
    #fetch_page here and return a tuple so that it can be unpacked
    # dosomething

现在获取页面后,它将在下一个任务check_source中检查源。

@app.task(bind=True)
def check_source(self,page_and_url,handle):
    try:
        #unpack your stuffs here
        page,url=page_and_url
        get_result={}
        if handle=='first_option':
            get_result=select_first_option(one,two)
            return get_result
        elif handle=='second_option':
            get_result=select_second_option(one,two)
            return (get_result)
        elif handle=='third_option':
            get_result=select_third_option(one,two)
            return (get_result)
        else:
            return "IGNORE FOR NOW"
    except Exception as exc:
        pass

所以困惑的是,我可以从这里调用一些其他任务吗??会有任何不一致的地方吗?或者工人在做这件事时会陷入僵局吗?

最后,它应该执行store_info(),它只存储从check_source()返回的东西

@app.task(bind=True)
def store_info(self,result):
    print ("store_info ")
    try:
        #store the fetched pages
    except Exception as exc:
        #dosomething
    finally:
        pass

我遵循的是这种只需要少量修改的方法http://docs.celeryproject.org/en/latest/userguide/tasks.html#avoid-启动同步子任务。

有人能建议我应该如何做,以及我需要更加小心的事情吗?

这一切都应该像阅读(和交流)一样工作。这三项任务将按顺序执行,不会出现任何"不一致"

如果您调用update_page_info一次,则三个链接的子任务将以独占方式运行。也就是说,这种设置仍然存在死锁的可能性。如果您调用update_page_info,而上次调用它时的前几项任务正在运行,则可以同时运行多个任务。这将引入死锁的可能性,这取决于您的任务共享资源的方式。

如果您的任务共享资源,我建议使用redis或memcached之类的东西作为跨工作者的锁定系统。

编辑:我现在看到的代码是完全好的,因为结果作为参数传递给下一个任务。

最新更新