我在芹菜和芹菜节拍的帮助下运行我的函数
我有这样的错误:(返回的对象不是json格式(
celery_1_a510f09c95a3 | [2022-08-30 01:30:00,069: ERROR/ForkPoolWorker-8] Task
parser_app.views.periodic_exctract_urls[d65eb6a3-187b-4f48-afe6-b7df101cdbcd] raised
unexpected: EncodeError(TypeError('Object of type AsyncResult is not JSON serializable'))
我的代码(第一个函数返回int列表(
@shared_task()
def periodic_exctract_urls():
print(f"{datetime.datetime.now()} Start periodic_exctract_urls")
celery_results = []
film_name_queries = SearchQuery.objects.filter(is_site_search=False)
url_queries = SearchQuery.objects.filter(is_site_search=True)
for film in film_name_queries:
celery_results.append(make_requests.delay(str(film.name)))
for url in url_queries:
for film in film_name_queries:
celery_results.append(make_requests.delay(
f"site:{url.name} {film.name}"
))
task_ids = []
task_ids.extend([result.collect() for result in celery_results])
return task_ids
@shared_task()
def make_requests(search_text) -> int:
...
return int(created_task.pk)
通过调用make_requests.delay
,您得到了AsyncResult
(doc(。如果您想从make_request
任务中获得返回值,可以使用collect
方法。例如:
@shared_task()
def periodic_exctract_urls():
print(f"{datetime.datetime.now()} Start periodic_exctract_urls")
task_ids = []
film_name_queries = SearchQuery.objects.filter(is_site_search=False)
url_queries = SearchQuery.objects.filter(is_site_search=True)
results = []
for film in film_name_queries:
results.append(make_requests.delay(str(film.name)))
...
task_ids.extend([result.collect() for result in results])
return task_ids
@shared_task()
def make_requests(search_text) -> int:
...
return int(created_task.pk)