在这里我有一个视图CrawlerHomeView
,用于从表单创建任务对象,现在我想用芹菜定期安排此任务。
我想通过任务对象search_frequency和检查一些任务对象字段来安排此CrawlerHomeView
过程。
任务模型
class Task(models.Model):
INITIAL = 0
STARTED = 1
COMPLETED = 2
task_status = (
(INITIAL, 'running'),
(STARTED, 'running'),
(COMPLETED, 'completed'),
(ERROR, 'error')
)
FREQUENCY = (
('1', '1 hrs'),
('2', '2 hrs'),
('6', '6 hrs'),
('8', '8 hrs'),
('10', '10 hrs'),
)
name = models.CharField(max_length=255)
scraping_end_date = models.DateField(null=True, blank=True)
search_frequency = models.CharField(max_length=5, null=True, blank=True, choices=FREQUENCY)
status = models.IntegerField(choices=task_status)
tasks.py
如果任务状态为 0 或 1 并且没有超过任务抓取结束日期,我想运行以下定期发布的视图 [period=(任务的search_frequency时间]。但我被困在这里了。我该怎么做?
@periodic_task(run_every=crontab(hour="task.search_frequency")) # how to do with task search_frequency value
def schedule_task(pk):
task = Task.objects.get(pk=pk)
if task.status == 0 or task.status == 1 and not datetime.date.today() > task.scraping_end_date:
# perform the crawl function ---> def crawl() how ??
if task.scraping_end_date == datetime.date.today():
task.status = 2
task.save() # change the task status as complete.
views.py
我想定期运行此视图。我该怎么做?
class CrawlerHomeView(LoginRequiredMixin, View):
login_url = 'users:login'
def get(self, request, *args, **kwargs):
# all_task = Task.objects.all().order_by('-id')
frequency = Task()
categories = Category.objects.all()
targets = TargetSite.objects.all()
keywords = Keyword.objects.all()
form = CreateTaskForm()
context = {
'targets': targets,
'keywords': keywords,
'frequency': frequency,
'form':form,
'categories': categories,
}
return render(request, 'index.html', context)
def post(self, request, *args, **kwargs):
form = CreateTaskForm(request.POST)
if form.is_valid():
# try:
unique_id = str(uuid4()) # create a unique ID.
obj = form.save(commit=False)
# obj.keywords = keywords
obj.created_by = request.user
obj.unique_id = unique_id
obj.status = 0
obj.save()
form.save_m2m()
keywords = ''
# for keys in ast.literal_eval(obj.keywords.all()): #keywords change to csv
for keys in obj.keywords.all():
if keywords:
keywords += ', ' + keys.title
else:
keywords += keys.title
# tasks = request.POST.get('targets')
# targets = ['thehimalayantimes', 'kathmandupost']
# print('$$$$$$$$$$$$$$$ keywords', keywords)
task_ids = [] #one Task/Project contains one or multiple scrapy task
settings = {
'spider_count' : len(obj.targets.all()),
'keywords' : keywords,
'unique_id': unique_id, # unique ID for each record for DB
'USER_AGENT': 'Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)'
}
# res = ast.literal_eval(ini_list)
for site_url in obj.targets.all():
domain = urlparse(site_url.address).netloc # parse the url and extract the domain
spider_name = domain.replace('.com', '')
task = scrapyd.schedule('default', spider_name, settings=settings, url=site_url.address, domain=domain, keywords=keywords)
# task = scrapyd.schedule('default', spider_name , settings=settings, url=obj.targets, domain=domain, keywords=obj.keywords)
return redirect('crawler:task-list')
# except:
# return render(request, 'index.html', {'form':form})
return render(request, 'index.html', {'form':form, 'errors':form.errors})
对这个问题有什么建议或答案吗?
在以15k 任务/秒的设置与 Celery 战斗了 5 年后,我强烈建议您切换到 Dramatiq,它有一个理智、可靠、高性能的代码库,不会拆分在多个复杂的包中,并且到目前为止在我的两个新项目中完美运行。
来自作者的动机
我多年来一直专业使用芹菜,我对它越来越感到沮丧,这是我开发 dramatiq 的原因之一。以下是Dramatiq,Celery和RQ之间的一些主要区别:
还有一个 Django 帮助包:https://github.com/Bogdanp/django_dramatiq
当然,你不会有一个内置的 celerybeat,但调用 python 任务的 cron 无论如何都更健壮,我们丢失了大量数据,因为 celerybeat 决定定期停滞:)
有两个项目旨在添加定期任务创建:https://gitlab.com/bersace/periodiq 和 https://apscheduler.readthedocs.io/en/stable/
我还没有使用这些包,你可以尝试使用 periodiq 选择你的数据库条目,循环访问这些条目并为每个包定义一个周期性任务(但这需要定期重新启动 periodiq worker 来获取更改(:
# tasks.py
from dramatiq import get_broker
from periodiq import PeriodiqMiddleware, cron
broker = get_broker()
broker.add_middleware(PeriodiqMiddleware(skip_delay=30))
for obj in Task.objects.all():
@dramatiq.actor(periodic=cron(obj.frequency))
def hourly(obj=obj):
# import logic based on obj.name
# Do something each hour…
对于错误,
Exception Type: EncodeError
Exception Value:
Object of type timedelta is not JSON serializable
而不是在 django 设置中定义以下变量,
CELERY_BEAT_SCHEDULE = {
'task-first': {
'task': 'scheduler.tasks.create_task',
'schedule': timedelta(minutes=1)
},
您可以在芹菜文件中尝试以下内容:
app.conf.beat_schedule = {
'task-first': {
'task': 'scheduler.tasks.create_task',
'schedule': crontab(minute='*/1')
}
}
鉴于芹菜服务器已启动并运行,这对我有用。
除此之外,为什么要在每个任务后重定向到'list_tasks'
,它到底有什么作用?另外,您已经从视图add_task_celery.delay(name,date,freq)
调用了芹菜任务,除了使用芹菜节拍定义的定期任务之外,这只是添加任务的另一种方法吗?
编辑 1:
我的结构如下所示:
settings.py
CELERY_TIMEZONE = 'Asia/Kolkata'
CELERY_BROKER_URL = 'amqp://localhost'
celery.py
app.conf.beat_schedule = {
'task1': {
'task': '<app_name>.tasks.random_task',
'schedule': crontab(minute=0, hour=0)
},
}
在这里你应该注意,我的应用程序文件夹中有一个名为任务的文件,在那里我编写了一个共享任务,如下所示:
@shared_task
def random_task(total):
...
此外,除此之外,您应该启动芹菜节拍以及芹菜工作过程,如下所示:
celery -A <project_name>.celery worker -l error
celery -A <project_name>.celery beat -l error --scheduler django_celery_beat.schedulers:DatabaseScheduler
你可以任何你想要的调度程序,在生产中我使用DatabaseScheduler
.要进行测试,您可以尝试使用以下命令:
celery -A <project_name> beat -l info -S django
你应该从 Django 项目的项目文件夹中运行所有这些命令
我相信问题出在任务定义中的第 2 和第 3 个参数上,即freq
和date
.尽管从错误中,您发布了Object of type timedelta is not JSON serializable
,但看起来它正在谈论freq
返回 timedelta 对象的 DurationField 类型的字段。
理想情况下,在传递给任务之前,必须序列化这两个字段。 一个简单的方法是——
1(您可以显式序列化这些字段并传递给任务,并在任务中再次将其转换为日期时间/时间增量对象。
或者,如果项目太多,您可以转储整个数据字典。
add_task_celery.delay(json.dumps(form.cleaned_data))
,
然后在任务中执行 ->json.loads(...)
2(您可以尝试的另一件事是在参数中显式传递序列化程序。(使用apply_async
代替delay
(
add_task_celery.apply_async((name, date, freq), serializer='json')
3(您还可以设置值(如果尚未设置(以设置CELERY_TASK_SERIALIZER = 'json'
(默认值为'pickle'
(。