1.安装包
pip install celery
pip install django-
celery
pip install pymysql
2.创建一个django项目
- proj/
- proj/
__init__.py
- proj/
settings.py
- proj/
urls.py
- manage.py
3.修改__init__.py
import pymysql
pymysql.install_as_MySQLdb()
4.修改settings.py,加celery配置
# 是否启用celery任务
IS_USE_CELERY =
True
# 本地开发的 celery 的消息队列(RabbitMQ)信息
# BROKER_URL_DEV = 'amqp://guest:guest@127.0.0.1:5672/'
BROKER_URL_DEV =
'redis://127.0.0.1:6379/1'
# TOCHANGE 调用celery任务的文件路径, List of modules to import when celery starts.
CELERY_IMPORTS =
(
'celerytest.task',
)
# ===============================================================================
# CELERY 配置
# ===============================================================================
if IS_USE_CELERY:
try:
import djcelery
import sys
INSTALLED_APPS +=
(
'djcelery',
# djcelery
)
djcelery.setup_loader()
CELERY_ENABLE_UTC =
False
CELERYBEAT_SCHEDULER =
"djcelery.schedulers.DatabaseScheduler"
if "celery" in sys.argv:
DEBUG =
False
# celery 的消息队列(redis)信息
BROKER_URL = os.environ.get(
'BK_BROKER_URL', BROKER_URL_DEV)
#BROKER_URL = 'amqp://guest:guest@127.0.0.1:5672/'
except:
pass
5.修改settings.py,改数据库配置
# ===============================================================================
# 数据库设置, 本地开发数据库设置
# ===============================================================================
DATABASES =
{
'default': {
'ENGINE':
'django.db.backends.mysql',
# 默认用mysql
'NAME':
'celerytest',
# 数据库名 (默认与APP_ID相同)
'USER':
'root',
# 你的数据库user
'PASSWORD':
'123456',
# 你的数据库password
'HOST':
'127.0.0.1',
# 开发的时候,使用localhost
'PORT':
'3306',
# 默认3306
},
}
6.修改urls.py,添加链接
from django.conf.urls
import include, url
from django.contrib
import admin
import test
urlpatterns =
[
url(r'^admin/', include(admin.site.urls)),
url(r'^aa/$', test.test),
url(r'^add/$', test.addtest),
url(r'^del/$', test.deltest),
url(r'^tt/$', test.startandstop),
]
7.添加task.py,celery任务
# -*- coding: utf-8 -*-
import django
#测试不加如这两行启动worker要报错,而且需要加在最前
django.setup()
import time
from celery
import task,shared_task
#直接执行
@task()
def sayhello():
print(
'hello ...')
time.sleep(2
)
print(
'world ...')
#定时任务
@shared_task(name=
"celerytest")
def everysay(**
kwargs):
"""定时任务调用执行作业"""
sayadd(**
kwargs)
def sayadd(**
kwargs):
print(
'add1 ...')
time.sleep(2
)
print(int(kwargs[
'x'])+int(kwargs[
'y']))
8.添加test.py,调用任务
# -*- coding: utf-8 -*-
from django.shortcuts
import render,HttpResponse
from task
import *
from djcelery
import models as djmodels
import jsondef getcrontabtime(minute,hour,day_of_month,month_of_year,day_of_week):
mycrontab =
{}
mycrontab['minute'] =
minute
mycrontab['hour'] =
hour
mycrontab['day_of_month'] =
day_of_month
mycrontab['month_of_year'] =
month_of_year
mycrontab['day_of_week'] =
day_of_week
return mycrontab
def test(request):
sayhello.delay()#如果有参数sayhello.delay(x,y,z)
return HttpResponse(
"hello world")
def addtest(request):
'''添加定时任务'''
task, created = djmodels.PeriodicTask.objects.get_or_create(name=
'123456', task=
'celerytest')
#task对应的是task文件里的函数方法
mycrontab = getcrontabtime(5,
'*',
'*',
'*',
'*')
crontab = djmodels.CrontabSchedule.objects.filter(**
mycrontab).first()
if crontab
is None:
crontab = djmodels.CrontabSchedule.objects.create(**
mycrontab)
task.crontab =
crontab
task.enabled =
True
param_dic =
{}
param_dic['x'] = 5
param_dic['y'] = 2
# print param_dic
# if type(param_dic) == list:
# task.args = param_dic
if type(param_dic) ==
dict:
task.kwargs =
json.dumps(param_dic)
task.save()
return HttpResponse(
"添加任务")
def deltest(request):
'''删除定时任务'''
try:
djmodels.PeriodicTask.objects.filter(name=
'123456').delete()
except:
return HttpResponse(
"删除定时任务失败")
return HttpResponse(
"删除定时任务成功")
def startandstop(request):
'''启停定时作业'''
status = request.GET.get(
'status',
'0')
task, created = djmodels.PeriodicTask.objects.get_or_create(name=
'123456', task=
'celerytest')
if status ==
'0':
task.enabled =
False
task.save()
return HttpResponse(
"停用成功")
elif status ==
'1':
task.enabled =
True
task.save()
return HttpResponse(
"启用成功")
else:
return HttpResponse(
"系统错误")
9.项目架构
10.运行前准备
#配置数据库迁移,生成celery需要的数据表
python manage.py migrate
#启动Redis
sudo redis-server /etc/redis/
redis.conf
#启动worker
python manage.py celery worker --loglevel=
info
#启动心跳
python manage.py celery beat --max-interval=10 --loglevel=
INFO
--max-interval=10 :每十秒侦测一次任务 --loglevel=INFO:日志等级是INFO
11.启动测试
#启动web
python manage.py runserver 0.0.0.0:9999
#直接执行
http://127.0.0.1:9999/aa/
#添加定时服务
http://127.0.0.1:9999/add/
#删除定时服务
http://127.0.0.1:9999/add/
#暂停执行定时服务
http://127.0.0.1:9999/tt/
http://127.0.0.1:9999/tt/?status=
0
#启动执行定时服务
http://127.0.0.1:9999/tt/?status=1
转载于:https://www.cnblogs.com/CGCong/p/10191182.html