Python分布式异步任务框架Celery使用教程 环球通讯

目录
一、Celery架构介绍二、Celery简单使用三、Celery包结构四、Celery延迟任务五、Celery定时任务六、Django中集成Celery

一、Celery架构介绍

Celery:芹菜?(跟翻译没有任何关系),分布式异步任务框架(跟其他web框架无关)


(相关资料图)

Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.(不支持windows)

celery服务为其他项目服务提供异步解决任务需求的。

架构:

分为三部分

broker:任务中间件,用户提交的任务,存在这个里面(redis,rabbitmq)worker:任务执行者,消费者,真正执行任务的进程(真正干活的人)backend:任务结果存储,任务执行后的结果(redis,rabbitmq)

celery能够做的事:

异步任务(区分同步任务)延迟任务定时任务(其他框架做)

怎么更好的理解celery?

会有两个服务同时运行,一个是项目服务(django服务),一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求。打个比方,人是一个独立运行的服务(django) | 医院也是一个独立运行的服务(celery)。正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题,人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求。

注:python有自己的定时任务,感兴趣的了解下apscheduler

二、Celery简单使用

安装:pip install celery==5.1.2

使用:

1.配置celery

from celery import Celery
# app=Celery("test",)
# backend="redis://:密码@127.0.0.1:6379/1"  如果有密码,这么写
broker = "redis://127.0.0.1:6379/1"  # redis地址
backend = "redis://127.0.0.1:6379/2"  # redis地址
# 1 实例化得到celery对象
app = Celery(__name__, backend=backend, broker=broker)
# 2 写一堆任务(计算a+b,挖井,砍树),函数
# 使用装饰器包裹任务(函数)
@app.task()
def add(a, b):
    import time
    time.sleep(2)
    return a + b

2.提交任务

# from celery_task import app
import celery_task
# 1 同步执行
# res = celery_task.add(2, 3)  # 普通的同步任务,同步执行任务
# print(res)

2 异步任务:

第一步:提交(使用任务名.apply_async(参数))

结果是任务id号,唯一标识这个任务

# res = celery_task.add.apply_async(args=[2, 3])
res = celery_task.add.apply_async(kwargs={"a":2,"b":3})
print(res)  # abab1ad3-0e58-4faa-bc05-14d157dc8217

第二步:让worker执行—>结果存到redis

通过命令启动,非windows:

5.x之前这么启动

命令:celery worker -A celery_task -l info

5.x以后

命令:celery -A celery_task worker -l info

windows:

pip3 install eventlet

5.x之前这么启动

命令:celery worker -A celery_task -l info -P eventlet

5.x以后

命令:celery -A celery_task worker -l info -P eventlet

3.查看任务执行结果

from celery_task import app
from celery.result import AsyncResult
id = "abab1ad3-0e58-4faa-bc05-14d157dc8217"
if __name__ == "__main__":
    a = AsyncResult(id=id, app=app)
    if a.successful():
        print("任务执行成功了")
        result = a.get()  # 异步任务执行的结果
        print(result)
    elif a.failed():
        print("任务失败")
    elif a.status == "PENDING":
        print("任务等待中被执行")
    elif a.status == "RETRY":
        print("任务异常后正在重试")
    elif a.status == "STARTED":
        print("任务已经开始被执行")

三、Celery包结构

目录结构:

-celery_task # 包名
__init__.py
celery.py # app所在py文件
course_task.py # 任务
order_task.py # 任务
user_task.py # 任务
提交任务.py # 提交任务
查看结果.py # 查看结果

创建多个任务:

celery_task /celery.py

from celery import Celery
broker = "redis://127.0.0.1:6379/1"
backend = "redis://127.0.0.1:6379/2"
# include  是一个列表,放被管理的task 的py文件
app = Celery(__name__, backend=backend, broker=broker,include=[
    "celery_task.course_task",
    "celery_task.order_task",
    "celery_task.user_task",
])
# 原来,任务写在这个py文件中
# 后期任务非常多,可能有用户相关任务,课程相关任务,订单相关任务。。。

celery_task /任务.py

user_task.py

import time
from .celery import app
# 发送短信任务
@app.task()
def send_sms(phone, code):
    time.sleep(3)  # 模拟发送短信延迟
    print("短信发送成功,手机号是:%s,验证码是:%s" % (phone, code))
    return "短信发送成功"

order_task.py

from .celery import app
# 生成订单任务
@app.task()
def make_order():
    with open(r"D:\py18\luffy_api\script\2 celery的包结构\celery_task\order.txt", "a", encoding="utf-8") as f:
        f.write("生成一条订单\n")
    return True

course_task.py

from .celery import app
@app.task()
def add(a,b):
    return a+b

提交多个任务:

from celery_task import user_task,order_task
# 提交一个发送短信任务
# res = user_task.send_sms.apply_async(args=["18972374345", "8888"])
# print(res)
# 提交一个生成订单任务
# res=order_task.make_order.apply_async()
# print(res)

查看结果:

from celery_task.celery import app
from celery.result import AsyncResult
id = "0f283e22-e8d0-40a6-a8ed-8998038bc7a3"
if __name__ == "__main__":
    a = AsyncResult(id=id, app=app)
    print(app.conf)
    if a.successful():
        print("任务执行成功了")
        result = a.get()  # 异步任务执行的结果
        print(result)
    elif a.failed():
        print("任务失败")
    elif a.status == "PENDING":
        print("任务等待中被执行")
    elif a.status == "RETRY":
        print("任务异常后正在重试")
    elif a.status == "STARTED":
        print("任务已经开始被执行")

四、Celery延迟任务

# 添加延迟任务方式一:
# from datetime import datetime, timedelta
# datetime.utcnow()  获取当前的utc时间
# eta=datetime.utcnow() + timedelta(seconds=50) #  50s后的utc时间
# 10s后,发送短信
res=user_task.send_sms.apply_async(args=("12345566677", "8888"), eta=eta)
print(res)
# 使用第二种方式执行异步任务(两者传参不同;不写时间,就表示立即执行):
res=user_task.send_sms.delay("12345566677", "8888")
print(res)

五、Celery定时任务

第一步:celery.py中写入

# 第一步,在包(celery_task)下的celey.py中写入
###修改celery的配置信息    app.conf整个celery的配置信息
# 时区
app.conf.timezone = "Asia/Shanghai"
# 是否使用UTC
app.conf.enable_utc = False
####配置定时任务
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
    "send_sms_every_3_seconds": {
        "task": "celery_task.user_task.send_sms",  # 指定执行的是哪个任务
        "schedule": timedelta(seconds=3),
        # "schedule": crontab(hour=8, day_of_week=1),  # 每周一早八点
        "args": ("18953675221", "8888"),
    },
    "make_order_every_5_seconds": {
        "task": "celery_task.order_task.make_order",  # 指定执行的是哪个任务
        "schedule": timedelta(seconds=5),
    },
    "add_every_1_seconds": {
        "task": "celery_task.course_task.add",  # 指定执行的是哪个任务
        "schedule": crontab(hour=8, day_of_week=1),  # 每周一早八点
        "args": (3, 5),
    },
}

第二步:启动worker

# celery worker -A 包名 -l info -P eventlet
celery worker -A celery_task -l info -P eventlet

如果beat没有启动,worker是没有活干的,需要启动beat,worker才能干活,和beat启动顺序无先后

第三步:启动beat

# celery beat -A celery_task -l
celery -A celery_task beat -l info

六、Django中集成Celery

第一种方式使用django-celery(了解):

第三方把django和celery集成起来,方便我们使用,但是,第三方写的包的版本,跟celery和django版本完全对应。

我们自己使用包结构集成到django中:

第一步,把写好的包,直接复制到项目根路径

第二步,在视图类中(函数中)

from celery_task.user_task import send_sms
def test(request):
    mobile = request.GET.get("mobile")
    code = "9999"
    res = send_sms.delay(mobile, code)  # 同步发送假设3分支钟,异步发送,直接就返回id了,是否成功不知道,后期通过id查询
    print(res)
    return HttpResponse(res)

到此这篇关于Python分布式异步任务框架Celery使用教程的文章就介绍到这了,更多相关Python Celery内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

关键词:

Python分布式异步任务框架Celery使用教程 环球通讯

Celery是由Python编写的简单,灵活,可靠的用来处理大量信息的分布式系统,它同时提供操作和维护分布式系统

脚本之家 2023-05-17

欧盟碳关税已成为法律 马上生效|世界今热点

欧盟碳关税已成为法律马上生效2023年5月16日,欧盟宣布将碳边境调节机制(CBAM,即碳关税)法案文本在欧盟

环球零碳 2023-05-17

恢复向好态势延续 发展新动能继续壮大——透视前四月经济数据亮点

原标题:恢复向好态势延续发展新动能继续壮大透视前四月经济数据亮点  5月16日,4月国民经济运行情况数据

经济参考报 2023-05-17

福州首轮10幅地块土拍时间调整至5月18日下午_今亮点

观点网讯:5月15日,福州市自然资源和规划局发布关于《福州市自然资源和规划局关于2023年第一次公开出让国

观点机构 2023-05-17

学校2015歌曲ost百度云 学校2015歌曲

今天来聊聊关于学校2015歌曲ost百度云,学校2015歌曲的文章,现在就为大家来简单介绍下学校2015歌曲ost百度云

城市网 2023-05-17

2023云南高考完多久填报志愿 具体填报时间|全球资讯

2023年云南高考志愿预计6月下旬开始填报,云南省每年高考志愿填报时间大概在考试结束后约20天左右的时间,

高三网 2023-05-17

天亮了!76人正式解雇里弗斯,因哈登老李关系破裂,德安东尼或接班

天亮了!76人正式解雇里弗斯,因哈登老李关系破裂,德安东尼或接班,恩比德,76人队,意大利篮球,道格·里弗斯,詹

818体育 2023-05-17

Python分布式异步任务框架Celery使用教程 环球通讯

Celery是由Python编写的简单,灵活,可靠的用来处理大量信息的分布式系统,它同时提供操作和维护分布式系统

脚本之家 2023-05-17

欧盟碳关税已成为法律 马上生效|世界今热点

欧盟碳关税已成为法律马上生效2023年5月16日,欧盟宣布将碳边境调节机制(CBAM,即碳关税)法案文本在欧盟

环球零碳 2023-05-17

恢复向好态势延续 发展新动能继续壮大——透视前四月经济数据亮点

原标题:恢复向好态势延续发展新动能继续壮大透视前四月经济数据亮点  5月16日,4月国民经济运行情况数据

经济参考报 2023-05-17

福州首轮10幅地块土拍时间调整至5月18日下午_今亮点

观点网讯:5月15日,福州市自然资源和规划局发布关于《福州市自然资源和规划局关于2023年第一次公开出让国

观点机构 2023-05-17

学校2015歌曲ost百度云 学校2015歌曲

今天来聊聊关于学校2015歌曲ost百度云,学校2015歌曲的文章,现在就为大家来简单介绍下学校2015歌曲ost百度云

城市网 2023-05-17

2023云南高考完多久填报志愿 具体填报时间|全球资讯

2023年云南高考志愿预计6月下旬开始填报,云南省每年高考志愿填报时间大概在考试结束后约20天左右的时间,

高三网 2023-05-17

天亮了!76人正式解雇里弗斯,因哈登老李关系破裂,德安东尼或接班

天亮了!76人正式解雇里弗斯,因哈登老李关系破裂,德安东尼或接班,恩比德,76人队,意大利篮球,道格·里弗斯,詹

818体育 2023-05-17
x 广告
x 广告
x 广告

Copyright   2015-2022 华南仓储网版权所有  备案号:粤ICP备18025786号-52   联系邮箱: 954 29 18 82 @qq.com