Celery

  • Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统
  • 它是一个专注于实时处理的任务队列,同时也支持任务调度
  • 中文官网:http://docs.jinkan.org/docs/celery/
#在线安装
pip3 install -U Celery

#离线安装

tar xvfz celery-0.0.0.tar.gz
cd celery-0.0.0
python3 setup.py build
python3 setup.py install
  • broker : 消息传输的中间件,生产者一旦有消息发送,将发至broker;【RQ,redis】
  • backend : 用于存储消息/任务结果,如果需要跟踪和查询任务状态,则需添加要配置相关
  • worker : 工作者 - 消费/执行broker中消息/任务的进程

1566233784792.png

使用Celery

创建worker

  • 创建 tasks.py 文件
from celery import Celery
#初始化celery, 指定broker
app = Celery('chaogege', broker='redis://:@127.0.0.1:6379/1')

# 如果redis有密码,可添加password
# app = Celery('chaogege', broker='redis://:password@127.0.0.1:6379/1')

# 创建任务函数
@app.task
def task_test():
    print("task is running....") 
  • Ubuntu 终端中, tasks.py文件同级目录下执行
celery -A tasks worker --loglevel=info
#执行后终端显示如下,证明成功!

1572965411915.png

创建生产者 - 推送任务

  • 在tasks.py文件的同级目录进入ipython3执行如下代码
from tasks import task_test
task_test.delay()
#执行后,worker终端中现如如下

1572965545212.png

存储执行结果

Celery提供存储任务执行结果的方案,需借助redis或mysql或Memcached等

from celery import Celery
app = Celery('demo',
             broker='redis://@127.0.0.1:6379/1',
             backend='redis://@127.0.0.1:6379/2',
             )

# 创建任务函数
@app.task
def task_test(a, b):
    print("task is running")
    return a + b
  • tasks_result.py 同级目录终端中-启动celery worker
celery -A tasks_result worker --loglevel=info

1572966007255.png

  • 在相同目录下打开终端创建生产者,执行成功后,可调用如下方法取得执行结果
from tasks_result import task_test
s = task_test.delay(10,100)

Django + Celery

  • 创建项目+应用
#常规命令
django-admin startproject test_celery
cd test_celery
python3 manage.py startapp user
# 并在settings.py中添加应用
  • 在settings.py同级目录下 创建celery.py文件
from celery import Celery
from django.conf import settings
import os


# 为celery设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'test_celery.settings')

# 创建应用
app = Celery("test_celery", broker='redis://@127.0.0.1:6379/1')

# 设置app自动加载任务
app.autodiscover_tasks(settings.INSTALLED_APPS)
  • 在应用模块user目录下创建tasks.py文件
from test_celery.celery import app
import time

@app.task
def task_test():
    print("task begin....")
    time.sleep(10)
    print("task over....")
  • 应用视图编写
from django.http import HttpResponse
from .tasks import task_test
import datetime

def test_celery(request):
    task_test.delay()
    now = datetime.datetime.now()
    html = "return at %s"%(now.strftime('%H:%M:%S'))
    return HttpResponse(html)
  • 设置路由
# test_celery.urls
from django.urls import path, include

urlpatterns = [
    path('admin/', admin.site.urlcs),
    path('v1/users/', include('user.urls')),
]

# user.urls
from django.urls import path
from . import views

urlpatterns = [
    path('test_celery', views.test_celery),
]
  • 启动django和创建celery worker
python3 manage.py runserver
celery -A test_celery worker -l info
  • 浏览器中执行对应url

1572968646168.png

  • worker终端中显示

1572968687718.png

生产环境启动

  • 默认并发采用prefork,推荐采用gevent模式 - 协程模式
celery -A proj worker -P gevent -c 1000
# P POOL Pool implementation: 支持 perfork or eventlet or gevent
# C CONCURRENCY 并发数