django 使用celery定时任务完成邮件,信息,钉钉的发送

it2024-08-07  51

django 使用celery定时任务完成邮件,信息,钉钉的发送

celery定时任务

​ 电商项目有这样的需求,每天中午12点进行秒杀活动,对于有过预约的用户,在11:50进行短信提醒。最初接触定时任务是在Linux系统的计划任务部分。Celery已经提供了这样的一种功能。

Settings配置

将任务写在celery独立的App里的tasks里

celery常用的定时方法

Crontab

​ Crontab(hour=”*/2”) 每2小时执行一次

​ Crontab(minute=0,hour=”*/3”) 每3个小时的0分执行一次

​ Crontab(minute=0,hour=”*/3,8-12”) 每3小时或者8-12,0分点执行一次

​ Crontab(month_of_year=”*/3”) 每3个月执行一次

​ Crontab(minute=0,hour=0,day_of_month=”2-31/2”) 偶数天的0时0分执行

​ Crontab(0,0,day_of_month=”1”,month_of_year=”5”) 每年五月一号执行

timedelta

​ timedelta(seconds=1) 每秒执行一次

邮件

163

qq

方法一:

import smtplib #登陆邮件服务器,进行邮件发送 from email.mime.text import MIMEText #负责构建邮件格式 @ task subject = "老边的学习邮件" content = "孩子不学习,多半是欠的,抄五遍就好了" sender = "3x92x9x511@qq.com" recver = """3xx27x9x1x@qq.com, 2x5xx8x97@qq.com, 773xx3x59@qq.com, 912xx5x70@qq.com, 152xx25x04@qq.com, 130xxx80x51@qq.com, 7217xxx7x41@qq.com, 3303xx6x12@qq.com, 7107xx910@qq.com, 3296xxx91@qq.com, 6269xx318@qq.com, 419xxx2@qq.com, 16xxxx36x@qq.com, 32xxx391@qq.com, 12xxx108@qq.com, 32xxx391@qq.com, 12xxxxx08@qq.com""" password = "svhbjrvepdoqdbfi" #授权码 message = MIMEText(content,"plain","utf-8") message["Subject"] = subject message["To"] = recver message["From"] = sender smtp = smtplib.SMTP_SSL("smtp.qq.com",465) smtp.login(sender,password) smtp.sendmail(sender,recver.split(",\n"),message.as_string()) smtp.close()

方法二【Django一如既往的对发送邮件也进行了封装】:

①Settings配置

②视图使用

信息

发送短信(接口请求形式)

短信发送都需要借助短信发送平台,使用平台发送短信,通常是收费。

今天要使用的平台是互亿平台。http://www.ihuyi.com/

代码:

import requests url = "http://106.ihuyi.com/webservice/sms.php?method=Submit" account = "C85050877" password = "9c14def972fa00acf877b04cc827fa8a" mobile = "133xxxx360" content = "您的验证码是:201981。请不要把验证码泄露给其他人。" #定义请求的头部 headers = { "Content-type": "application/x-www-form-urlencoded", "Accept": "text/plain" } #定义请求的数据 data = { "account": account, "password": password, "mobile": mobile, "content": content, } #发起数据 response = requests.post(url,headers = headers,data=data) #url 请求的地址 #headers 请求头部 #data 请求的数据 print(response.content.decode())

钉钉

#settings from celery.schedules import crontab from celery.schedules import timedelta CELERYBEAT_SCHEDULE = { # 定时器策略 u'钉钉': { # 每隔3s运行一次 'task': 'CeleryTask.tasks.dingTalk', #'schedule': crontab(minute='*/1'), 'schedule':timedelta(seconds=3), 'args': (), }, } #tasks from __future__ import absolute_import from Djangoshop.celery import app #在成功安装celery框架之后,django新生成的模块 import json import requests @app.task def dingTalk(): url = 'https://oapi.dingtalk.com/robot/send?access_token=ab6d36689780c0fc9b82a12b5e04f445e188379fcc569e8e460173b044a3432' headers = { "Content-Type": "application/json", "Chartset": "utf-8" } requests_data = { "msgtype": "text", "text": { "content": "未认证" }, "at": { "atMobiles": [ ], }, "isAtAll": True } sendData = json.dumps(requests_data) response = requests.post(url, headers=headers, data=sendData) content = response.json() print(content)
最新回复(0)