python 异步任务框架 Celery 入门,速看! | 您所在的位置:网站首页 › python调用os › python 异步任务框架 Celery 入门,速看! |
Celery 是使用 python 编写的分布式任务调度框架。 它有几个主要的概念: celery 应用 用户编写的代码脚本,用来定义要执行的任务,然后通过 broker 将任务发送到消息队列中broker 代理,通过消息队列在客户端和 worker 之间进行协调。celery 本身并不包含消息队列,它支持以下消息队列RabbitMQRdisAmazon SQSZookeeper更多关于 Broker 见往期文章backend 数据库,用来存储任务返回的结果。worker 工人,用来执行 broker 分派的任务。任务 任务,定义的需要执行的任务版本要求 Celery5.1 要求: python(3.6,3.7,3.8)Celery 是一个资金最少的项目,所以我们不支持 Microsoft Windows。 更多更详细的版本要求见往期文章 安装 使用 pip 安装: pip install -U Celery捆绑包 Celery 还定义了一组包,用于安装 Celery 和给定的依赖项。 可以在 pip 命令中实现中括号来指定这些依赖项。 pip install "celery[librabbitmq]" pip install "celery[librabbitmq,redis,auth,msgpack]"具体支持的依赖包见往期文章 1. 选择一个 broker 使用 celery 首先需要选择一个消息队列。安装任意你熟悉的前面提到的 celery 支持的消息队列。 2. 编写一个 celery 应用 首先我们需要编写一个 celery 应用,它用来创建任务和管理 wokers,它要能够被其他的模块导入。 创建一个tasks.py 文件: from celery import Celery app = Celery('tasks', broker='redis://localhost:6379/0') @app.task def add(x, y): return x + y第一个参数tasks是当前模块的名称,它可以省略,建议以当前模块名为名称。 第二个关键字参数 broker='redis://localhost:6379/0'指定我们使用 Redis 作为消息队列,并指定连接地址。 3.运行 celery 的 worker 服务 cd 到 tasks.py 所在目录,然后运行下面的命令来启动 worker 服务 celery -A tasks worker --loglevel=INFO4. 调用任务 >>> from tasks import add >>> add.delay(4,4)通过调用任务的 delay 来执行对应的任务。celery 会把执行命令发送到 broker,broker 再将消息发送给 worker 服务来执行,如果一切正常你将会在 worker 服务的日志中看到接收任务和执行任务的日志。 5. 保存结果 如果你想要跟踪任务的状态以及保存任务的返回结果,celery 需要把它发送到某个地方。celery 提供多种结果后端。 我们这里以 reids 为例,修改 tasks.py中的代码,添加一个 Redis 后端。 app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')更多结果后端见往期文章 重新启动 worker 服务,重新打开 python 解释器 >>> from tasks import add >>> result = add.delay(4,4)ready()方法返回任务是否执行完成: >>> result.ready() False还可以等待结果完成,但很少使用这种方法,因为它将异步调用转换为同步调用 >>> result.get(timeout=1) 803在应用中使用 celery 创建项目 项目结构: proj/__init__.py /celery.py /tasks.pyproj/celery.py from celery import Celery app = Celery('proj', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1', include=['proj.tasks'] )# 配置 app.conf.update( result_expires=3600, # 结果过期时间 )在这个模块中我们创建了一个 Celery 模块。要在你的项目中使用 celery 只需要导入此实例。 proj/tasks.py from .celery import app @app.task def add(x, y): return x + y @app.task def mul(x, y): return x * y @app.tas kdef xsum(numbers) return sum(numbers)启动 worker celery -A proj worker -l INFO调用任务 >>> from proj.tasks import add >>> add.delay(2, 2)04在 django 中使用celery要在你的 django 项目中使用 celery,首先需要定义一个 Celery 的实例。 如果你又 django 项目如下: - proj/ - manage.py - proj/ - __init__.py - settings.py - urls.py那么推荐的方法是创建一个新的proj/proj/celery.py模块来定义芹菜实例:file:proj/proj/celery.py import os from celery import Celery # 为`celery`设置默认的django设置模块 os.environ.setdefault('DJANGO_SETTINGS_MODULE','proj.settings') app = Celery('proj') # 设置配置来源 app.config_from_object('django.conf:settings',namespace='CELERY') # 加载所有的已注册django应用中的任务 app.autodiscover_tasks() @app.task(bind=True) def debug_task(self): print(f'Request: {self.request!r}')然后你需要在你的 proj/proj/__init__.py模块中导入这个应用程序。这样就可以保证 Django 启动时加载应用程序,以便于 @shared_task 装饰器的使用。 proj/proj/__init__.py: from .celery import app as celery_app __all__ = ('celery_app',)请注意,此示例项目布局适用于较大的项目,对于简单的项目,可以使用包含定义应用程序和任务的单个模块。 接下来我们来解释一下 celery.py 中的代码,首先,我们设置celery命令行程序的环境变量DJANGO_SETTINGS_MODULE的默认值: os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')这一行的作用是加载当前 django 项目的环境设置,特别是当需要在异步任务中用到 ORM。它必须在创建应用程序实例之前。 app = Celery('proj')我们还添加了 Django 设置模块作为 Celery 的配置源。这意味着我们不必使用多个配置文件,而是直接在 Django 的配置文件中配置 Celery。 app.config_from_object('django.conf:settings', namespace='CELERY')大写命名空间意味着所有Celery配置项必须以大写指定,并以 CELERY_ 开头,因此例如broker_url 设置变为 CELERY_BROKER_URL。 例如,Django 项目的配置文件可能包括: settings.py CELERY_TIMEZONE = "Asia/Shanghai" CELERY_TASK_TRACK_STARTED = True CELERY_TASK_TIME_LIMIT = 30*60接下来,可重用应用程序的常见做法是在单独的tasks.py模块中定义所有任务Celery有一种方法可以自动发现这些模块: app.autodiscover_tasks()使用上面的行,Celery 将按照tasks.py 约定自动从所有已安装的应用程序中发现任务: - app1/ - tasks.py - models.py - app2/ - tasks.py - models.py这样就不必手动将各个模块添加到CELERY_IMPORTS 设置中。 使用 @shared_task 装饰器 我们编写的任务可能会存在于可重用的应用程序中,而可重用的应用程序不能依赖与项目本身,因此无法直接导入 celery 应用实例。 @shared_task装饰器可以让我们无需任何具体的 celery 实例创建任务:demoapp/tasks.py # Create your tasks here from demoapp.models import Widget from celery import shared_task @shared_task def add(x, y): return x + y @shared_task def mul(x, y): return x * y @shared_task def xsum(numbers): return sum(numbers) @shared_task def count_widgets(): return Widget.objects.count() @shared_task def rename_widget(widget_id, name): w = Widget.objects.get(id=widget_id) w.name = name w.save()学习的路上除了理论知识还离不开动手实践,今天给大家精心准备了几个项目,有需要的朋友可以关注并私信我关键词“资料”免费获取 |
CopyRight 2018-2019 实验室设备网 版权所有 |