如果使用了多个装饰器那么需要task装饰器在最后即在最上面,一般情况使用的是从celeryapp中引入的app作为的装饰器:app.task(),如果是django那种在app中定义的task则需要使用@shared_task
from __future__ import absolute_import from celery import shared_task @shared_task def add(x, y): #这里可以把x+y结果写入数据库 return x + y @shared_task def mul(x, y): return x * y @shared_task def xsum(numbers): return sum(numbers) @task(ignore_result=True,max_retries=1,default_retry_delay=10) #创建测试用的taskdef just_print(): print "Print from celery task"只需要找到方法,用方法加delay就可。
比如上面的文件:
from demoapp.tasks import add r = add.delay(3,5) # 执行这一行就是再下发任务在后台执行了,结果到时候在取就可以了, 使用技巧,一般会创建一个标识健,存到数据库,然后传到我们要执行的方法,在执行方法里面写上功能,把执行结果保存到指定数据库,等执行成功后,就用我们之前的标识键去查。使用其实就是修改数据库,修改后django-celery会实时推送到celery-beat里生效。所以只要再开发一个页面去配置djcelery_periodictask及其它表就可以了。
djcelery提供了一些Model(定义在djcelery/models.py文件),数据库模型如下,
periodictask描述定时任务。重要字段有:
name: 字符串,标识符
task,字符串,任务函数/类所在的路径,一般是celery_imports + function name。
interval:外键指向intervalschedule,表示每隔多少时间执行
crontab:外键指向crontabschedule,表示在某一时刻执行。
enabled:是否生效
expires: 任务过期时间
intervalschedule表示时间间隔,有两个参数:
every:正数;period间隔单位。比如intervalschedule(every=2, period='day')表示每隔2天
crontabschedule表示某一时刻,有minute、hour、day_of-week、day_of_month、day_of_year它们的组合意义,参见cron时间表示法,比如0 0 * 10 * 表示每个月的10号凌晨。说明:
任务和定时任务的区别:定时任务 = 任务 + intervalschedule/crontabschedule 。两个定时任务可以执行同一个任务。
任务没有相应的Model,用字符串表示,即periodictask模型的task字段
定时任务有相应的Model即periodictask。
通过它提供的Model Query API来操作,同平常的数据库查询一样。
from djcelery import models as celery_models celery_models.PeriodicTask.objects.create(...) celery_models.PeriodicTask.ojects.get(name='add') ....admin毕竟是给后台管理人员使用的,它所有的参数都暴露给使用者了。下面是一个实际使用的例子。
需求:ajax实现月度定时任务monthly_reading_task的执行和控制,即
每个月的某一天执行该任务;可以选择开启或者关闭该定时任务;能够选择任务在哪一天(1-28日)执行。
界面看起来是这样的:
基本上就是Model的增删改查。就不是通过admin来操作了。
查询任务信息
def read(self, request, *args, **kwargs): try: task = celery_models.PeriodicTask.objects.get(name=self.TASK_NAME) if task.enabled: return { 'enabled': True, 'day_of_month': int(task.crontab.day_of_month), 'last_run_at': task.last_run_at if task.last_run_at else '0' } else: return {'enabled': False} except celery_models.PeriodicTask.DoesNotExist: return {'enabled': False}
更新日期
def create(self, request, *args, **kwargs): enabled = request.POST.get('enabled', None) if enabled not in [self.ENABLED_POST_VALUE, self.DISABLED_POST_VALUE]: return self.operate_fail('无效参数') if enabled == self.DISABLED_POST_VALUE: self.disable_task(self.TASK_NAME) return self.operate_success() else: try: day_of_month = int(request.POST.get('day_of_month', '')) if day_of_month > 28 or day_of_month < 1: return self.operate_fail('日期必须在1-28日之间') task, created = celery_models.PeriodicTask.objects.get_or_create(name="monthly_reading", task="mrs_app.my_celery.tasks.monthly_reading_task") if created: crontab = celery_models.CrontabSchedule.objects.create(day_of_month=day_of_month, hour=0, minute=0) crontab.save() task.crontab = crontab task.enabled = True task.save() else: task.crontab.day_of_month = day_of_month task.crontab.save() task.enabled = True task.save() return self.operate_success() except ValueError: return self.operate_fail('抄表日不能为空')新建定时任务
def celery_get_tag(request): name = 'test' task = 'home_application.celery_tasks.async_task' task_args ={"x":1, "y":1} crontab_time = {'month_of_year':'*','day_of_month':'*','day_of_week':'*', 'hour':'*','minute':'*'} create_task(name, task , task_args, crontab_time) result = return_result(status=True, code=200, message="添加任务成功") return result #创建任务 def create_task(name, task, task_args, crontab_time): ''' name # 任务名字 task # 执行的任务 "myapp.tasks.add" task_args # 任务参数 {"x":1, "Y":1} crontab_time # 定时任务时间 格式: { 'month_of_year': 9 # 月份 'day_of_month': 5 # 日期 'hour': 01 # 小时 'minute':05 # 分钟 } ''' # task任务, created是否定时创建 task, created = celery_models.PeriodicTask.objects.get_or_create(name=name,task=task) # 获取 crontab crontab = celery_models.CrontabSchedule.objects.filter(**crontab_time).first() if crontab is None: # 如果没有就创建,有的话就继续复用之前的crontab crontab = celery_models.CrontabSchedule.objects.create(**crontab_time) task.crontab = crontab # 设置crontab task.enabled = True # 开启task task.kwargs = json.dumps(task_args) # 传入task参数 #expiration = timezone.now() + datetime.timedelta(day=1) #task.expires = expiration # 设置任务过期时间为现在时间的一天以后 task.save() result = return_result(status=True, code=200, message="添加任务成功")关闭定时
def disable_task(self, name): try: task = celery_models.PeriodicTask.objects.get(name=name) task.enabled = False task.save() return True except celery_models.PeriodicTask.DoesNotExist: return Truedjcelery在初始化中主要完成两件:
在settings.CELERY_IMPORTS定义下的模块搜索所有任务。这个对数据库没有任何改变,只是用Admin添加定时任务时periodictask.task字段变成选择框,列出了所有定义的任务。
从settings.CELERYBEAT_SCHEDULE创建定时任务,这个会创建数据记录,相当于celery_models.PeriodicTask.objects.create(..)语句。
所以在settings文件中可添加配置信息,例如:
CELERYBEAT_SCHEDULE = { 'add-every-3-minutes': { 'task': 'mrs_app.my_celery.tasks.monthly_reading_task', 'schedule': timedelta(minutes=3) # 'schedule': crontab(minute=u'40', hour=u'17',), }, } celerybeat_schedule定义任务,上面注释表示每隔3分钟执行monthly_reading_task任务
schedule就是执行计划,可以用crontab格式,用这种配置,会自动更新数据库。一般不用这种方法。
启动 python manage.py celery worker -l info
如果有定时任务的话,还需要启动心跳
另开一个cmd窗口 python manage.py celery beat (windows下-B选项不可用)
或者后台: /home/python3/bin/python3 /project/manage.py celery worker --loglevel=info >/dev/null 2>&1 & /home/python3/bin/python3 /project/manage.py celery beat >> /project/celery.log 2>&1 &(参考)官网和https://my.oschina.net/kinegratii/blog/292395
转载于:https://www.cnblogs.com/CGCong/p/9391436.html
相关资源:各显卡算力对照表!