Celery
- Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统
- 它是一个专注于实时处理的任务队列,同时也支持任务调度
- 中文官网:http://docs.jinkan.org/docs/celery/
#在线安装
pip3 install -U Celery
#离线安装
tar xvfz celery-0.0.0.tar.gz
cd celery-0.0.0
python3 setup.py build
python3 setup.py install
- broker : 消息传输的中间件,生产者一旦有消息发送,将发至broker;【RQ,redis】
- backend : 用于存储消息/任务结果,如果需要跟踪和查询任务状态,则需添加要配置相关
- worker : 工作者 - 消费/执行broker中消息/任务的进程
使用Celery
创建worker
- 创建 tasks.py 文件
from celery import Celery
#初始化celery, 指定broker
app = Celery('chaogege', broker='redis://:@127.0.0.1:6379/1')
# 如果redis有密码,可添加password
# app = Celery('chaogege', broker='redis://:password@127.0.0.1:6379/1')
# 创建任务函数
@app.task
def task_test():
print("task is running....")
- Ubuntu 终端中,
tasks.py
文件同级目录下执行
celery -A tasks worker --loglevel=info
#执行后终端显示如下,证明成功!
创建生产者 - 推送任务
- 在tasks.py文件的同级目录进入ipython3执行如下代码
from tasks import task_test
task_test.delay()
#执行后,worker终端中现如如下
存储执行结果
Celery提供存储任务执行结果的方案,需借助redis或mysql或Memcached等
- 创建tasks_result.py
- http://docs.celeryproject.org/en/latest/reference/celery.result.html#module-celery.result
from celery import Celery
app = Celery('demo',
broker='redis://@127.0.0.1:6379/1',
backend='redis://@127.0.0.1:6379/2',
)
# 创建任务函数
@app.task
def task_test(a, b):
print("task is running")
return a + b
- tasks_result.py 同级目录终端中-启动celery worker
celery -A tasks_result worker --loglevel=info
- 在相同目录下打开终端创建生产者,执行成功后,可调用如下方法取得执行结果
from tasks_result import task_test
s = task_test.delay(10,100)
Django + Celery
- 创建项目+应用
#常规命令
django-admin startproject test_celery
cd test_celery
python3 manage.py startapp user
# 并在settings.py中添加应用
- 在settings.py同级目录下 创建celery.py文件
from celery import Celery
from django.conf import settings
import os
# 为celery设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'test_celery.settings')
# 创建应用
app = Celery("test_celery", broker='redis://@127.0.0.1:6379/1')
# 设置app自动加载任务
app.autodiscover_tasks(settings.INSTALLED_APPS)
- 在应用模块user目录下创建tasks.py文件
from test_celery.celery import app
import time
@app.task
def task_test():
print("task begin....")
time.sleep(10)
print("task over....")
- 应用视图编写
from django.http import HttpResponse
from .tasks import task_test
import datetime
def test_celery(request):
task_test.delay()
now = datetime.datetime.now()
html = "return at %s"%(now.strftime('%H:%M:%S'))
return HttpResponse(html)
- 设置路由
# test_celery.urls
from django.urls import path, include
urlpatterns = [
path('admin/', admin.site.urlcs),
path('v1/users/', include('user.urls')),
]
# user.urls
from django.urls import path
from . import views
urlpatterns = [
path('test_celery', views.test_celery),
]
- 启动django和创建celery worker
python3 manage.py runserver
celery -A test_celery worker -l info
- 浏览器中执行对应url
- worker终端中显示
生产环境启动
- 默认并发采用prefork,推荐采用gevent模式 - 协程模式
celery -A proj worker -P gevent -c 1000
# P POOL Pool implementation: 支持 perfork or eventlet or gevent
# C CONCURRENCY 并发数
最后一次更新于2023-03-27 10:41
0 条评论