博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
一文搞懂Celery
阅读量:3959 次
发布时间:2019-05-24

本文共 6633 字,大约阅读时间需要 22 分钟。

一.Celery简介

Celery是一个功能完备即插即用的异步任务队列系统。它适用于异步处理问题,当发送邮件、或者文件上传, 图像处理等等一些比较耗时的操作,我们可将其异步执行,这样用户不需要等待很久,提高用户体验。

文档:http://docs.jinkan.org/docs/celery/getting-started/index.html

Celery的特点是:

  • 简单,易于使用和维护,有丰富的文档。
  • 高效,单个celery进程每分钟可以处理数百万个任务。
  • 灵活,celery中几乎每个部分都可以自定义扩展。
任务队列是一种跨线程、跨机器工作的一种机制.任务队列中包含称作任务的工作单元。有专门的工作进程持续不断的监视任务队列,并从中获得新的任务并处理.celery通过消息进行通信,通常使用一个叫Broker(中间人)来协client(任务的发出者)和worker(任务的处理者). clients发出消息到队列中,broker将队列中的信息派发给worker来处理。

Celery的架构

Celery的架构由三部分组成,消息队列(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

在这里插入图片描述

一个celery系统可以包含很多的worker和brokerCelery本身不提供消息队列功能,但是可以很方便地和第三方提供的消息中间件进行集成,包括RabbitMQ,Redis,MongoDB等

二.安装

pip install -U celery  #-U是update的意思,有就进行更新,没有就安装#后面单独将celery运行起来就可以了

也可从官方直接下载安装包:

tar xvfz celery-0.0.0.tar.gzcd celery-0.0.0python setup.py python setup.py install

使用

使用celery第一件要做的最为重要的事情是需要先创建一个Celery实例,我们一般叫做celery应用,或者更简单直接叫做一个app。app应用是我们使用celery所有功能的入口,比如创建任务,管理任务等,在使用celery的时候,app必须能够被其他的模块导入。

一般celery任务目录直接放在项目的根目录下即可,路径:

luffyapi/├── mycelery/    ├── config.py     # 配置文件    ├── __init__.py       ├── main.py       # 主程序    └── sms/          # 一个目录可以放置多个任务,该目录下存放当前任务执行时需要的模块或依赖,也可以每个任务单独一个目录        └── tasks.py  # 任务的文件,名称必须是这个!!!

main.py,代码:

# 主程序from celery import Celery# 创建celery实例对象app = Celery("luffy")# 通过app对象加载配置,文件路径app.config_from_object("mycelery.config")# 自动搜索并加载任务# 参数必须必须是一个列表,里面的每一个任务都是任务的路径名称# app.autodiscover_tasks(["任务1","任务2"])app.autodiscover_tasks(["mycelery.sms","mycelery.cache"]) #会自动识别sms目录下面的tasks.py文件中的任务,所以不需写成mycelery.sms.tasks# 启动Celery的命令# 强烈建议切换目录到项目的根目录下启动celery!!# celery -A mycelery.main worker --loglevel=info

配置文件config.py,代码:(文件形式,json形式,对象形式都行)

# 任务队列的链接地址(变量名必须叫这个)broker_url = 'redis://127.0.0.1:6379/14'  # 结果队列的链接地址(变量名必须叫这个)result_backend = 'redis://127.0.0.1:6379/15'

创建一个任务文件sms/tasks.py,并创建任务,代码:

# celery的任务必须写在tasks.py的文件中,别的文件名称不识别!!!from mycelery.main import app@app.task(name="send_sms")  # name表示设置任务的名称,如果不填写,则默认使用函数名(路径)做为任务名def send_sms():    print("发送短信!!!")@app.task  # name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名def send_sms2():    print("发送短信任务2!!!")

接下来,我们运行celery,在终端,项目根目录下(也就是mycelery的外层目录里面)执行指令

celery -A mycelery.main worker --loglevel=info (或者直接写info也行) #-A是指定celery启动入口

效果如下:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fZ4cNNo3-1604837761033)(assets/1562037230098.png)]

- ** ---------- [config]- ** ---------- .> app:         __main__:0x10b24ba50- ** ---------- .> transport:   redis://127.0.0.1:6379/14- ** ---------- .> results:     redis://127.0.0.1:6379/15- *** --- * --- .> concurrency: 16 (prefork)  #表示它开启了16个线程准备来来执行任务,可以在后面执行任务的时候自行测试一下,一共可以有16个任务同时执行-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) #有没有开启其他的事件(比如事件监听等等一些东西)

运行起来之后,如果又添加了新的任务,需要重新启动celery。

然后执行任务,可以在mycelery下面创建一个py文件进行测试,名字随便起,比如叫做runtask.py文件,内容如下

#引入任务from mycelery.sms.tasks import send_sms  #执行任务send_sms.delay() #这就是将任务交给worker去执行了,这个任务在上面的时候已经加到队列中了,所以调用它的意思就是让worker去队列中找到send_sms这个任务去执行#然后运行我们这个文件,右键运行就行,celery会在后台一直运行着

去redis中查看,就能看到任务执行结果了

如果想获取任务结果可以通过get方法,或者AsyncResult这个类来拿

方式1:import timefrom mycelery.sms.tasks import send_smsfrom mycelery.mail.tasks import send_emailret = send_sms.delay()print(ret,type(ret))print(ret.ready())print(ret.id)# time.sleep(3)print(ret.ready())print(ret.get(timeout=1),)方式2import timefrom mycelery.sms.tasks import send_smsfrom mycelery.mail.tasks import send_emailfrom celery.result import AsyncResultret = send_sms.delay()  #执行的任务如果需要参数,那么就直接在delay方法里面写:send_sms(mobile,sms_code),执行时:delay(mobile,sms_code)async_task = AsyncResult(id=ret.id,app=send_sms)print(async_task.successful())result = async_task.get()print(result)

celery还有很多可配置的项,还可以拓展很多的方法,并且还能完成定时任务:定时备份数据库,定时分析日志文件等。关于这些,还是建议大家学习一下。

其他参考文档:

三.django和celery进行结合使用

在main.py主程序中对django的配置文件进行加载

# 主程序import osfrom celery import Celery# 创建celery实例对象app = Celery("luffy") #celery对象可以创建多个,所以我们最好给我们当前的celery应用起个名字,比如叫做luffy# 把celery和django进行组合,需要识别和加载django的配置文件import osos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffyapi.settings.dev')#如果只是使用了logging日志功能的话可以不写以下两句,因为logging是python提供的模块,但是将来可能使用celery来执行其他的django任务,所以我们先写上import djangodjango.setup()# 通过app对象加载配置app.config_from_object("mycelery.config")# 加载任务# 参数必须必须是一个列表,里面的每一个任务都是任务的路径名称# app.autodiscover_tasks(["任务1","任务2"])app.autodiscover_tasks(["mycelery.sms","mycelery.mail"])# 启动Celery的命令# 切换目录到mycelery根目录下启动# celery -A mycelery.main worker --loglevel=info

在需要使用django配置的任务中,直接加载配置,所以我们把注册的短信发送功能,整合成一个任务函数,代码:

from mycelery.main import appfrom luffyapi.libs.yuntongxun.sms import CCPfrom luffyapi.settings import constantsimport logginglog = logging.getLogger("django")@app.task(name="send_sms")def send_sms(mobile, sms_code):    """发送短信"""    ccp = CCP()    ret = ccp.send_template_sms(mobile, [sms_code, constants.SMS_EXPIRE_TIME//60], constants.SMS_TEMPLATE_ID)    if not ret:        log.error("用户注册短信发送失败!手机号:%s" % mobile)

在这个任务中,我们需要加载短信发送的sdk和相关的配置常量,所以我们可以直接把django中的短信发送模块和相关的常量配置文件直接剪切到当前sms任务目录中

mycelery/├── config.py├── __init__.py├── main.py└── sms/    ├── __init__.py    ├── tasks.py

再次启动项目即可。

最终在django里面,我们调用Celery来异步执行任务。需要完成2个步骤:

# 1. 声明一个和celery一模一样的任务函数,但是我们可以导包来解决from mycelery.sms.tasks import send_sms# 2. 调用任务函数,发布任务send_sms.delay(mobile,sms_code)# send_sms.delay() 如果调用的任务函数没有参数,则不需要填写任何内容

改完之后的views.py

class SMSAPIView(APIView):    def get(self,request,mobile):        # todo 1. 判断手机号是否在60秒曾经发送过短信        redis_conn = get_redis_connection('sms_code')        ret = redis_conn.get("mobile_%s" % mobile)        if ret is not None:            return Response({
'msg':'60秒内已经发送过短信了'},status=status.HTTP_400_BAD_REQUEST) # 2. 创建验证码 sms_code = "%06d" % random.randint(1,999999) pipe = redis_conn.pipeline() pipe.multi() pipe.setex('sms_%s' % mobile, constants.SMS_EXPIRE_TIME , sms_code) pipe.setex("mobile_%s" % mobile,constants.SMS_INTERVAL_TIME,'_') pipe.execute() #执行事务 try: from mycelery.sms.tasks import send_sms ret = send_sms.delay(mobile,sms_code) #执行任务 #其实短信发送,没有必要获取它的返回结果,日志中已经记录了它发送成功与否的状态,如果我们想获取结果,那么可以使用我笔记里面获取celery任务结果的方法。 print('ret>>>',ret.get()) # ccp = CCP() # #由于短信发送那个有效期是分钟为单位的,所以我们SMS_EXPIRE_TIME//60 # ret = ccp.send_template_sms(mobile,[sms_code,constants.SMS_EXPIRE_TIME//60],constants.SMS_TEMPLATE_ID) # if not ret: # logger = logging.getLogger('django') # logger.error('用户注册短信发送失败,手机号为%s' % mobile) # return Response({'msg':'短信发送错误!'}) except: return Response({
'msg':'发送短信失败'},status=status.HTTP_500_INTERNAL_SERVER_ERROR) return Response({
'msg':'发送短信成功'})

转载地址:http://mgqzi.baihongyu.com/

你可能感兴趣的文章
一行代码更改密码
查看>>
非插件实现cookie版Typecho文章阅读次数统计功能
查看>>
非插件实现Typecho语法高亮
查看>>
windows 下 netsh 实现 端口映射(端口转发)
查看>>
两个好用的命令行工具 watch 和 rsync
查看>>
信安入门神级书单
查看>>
【IPFS指南】IPFS的竞争对手们(一)
查看>>
docker更换国内镜像
查看>>
CentOS 下 tree命令用法详解
查看>>
docker上传镜像至Registry时https报错解决方法
查看>>
安装 docker-compose (实测可用,妈妈再也不用担心被墙了)
查看>>
docker下删除none的images
查看>>
Linux提权获取敏感信息方法
查看>>
Ubuntu 16.04开机A start job is running for Raise network interface(5min 4s)解决方法
查看>>
Ubuntu 16.04开机隐藏菜单缩短时间
查看>>
Ubuntu 更换国内源
查看>>
Ubuntu16.04下Docker pull connection refused 解决办法
查看>>
通过 三大机制 揭秘 IPFS 工作原理
查看>>
Ubuntu 16.04卸载PostgresQL
查看>>
华为路由器交换机配置命令
查看>>