celery 实践方面的内容记录

配置

去年的一个项目中,使用到了 celery 的部分功能,当时主要是作为一个异步调度来使用的。最近的项目中需要定时任务,消息发布等功能,我就在项目中用上了 celery,又一次看了它的文档,这里做一个记录。

首先要说的是,celery 的 import 规则和一般情况不一样,需要按照文档上的方法来编写代码。

一般的 celery 工程的项目结构都是一个目录中包含了所有的 celery 相关的信息,然后在外层使用相关命令进行调度。

1
2
3
4
tasks/
├── __init__.py
├── celery.py
└── younixue_tasks.py

上面这样就是一个最简单的 celery 目录,在运行时,可以使用 celery -A tasks worker -l debug 这样的命令进行运行。

先来看看 celery.py 中的内容。这里以定时任务为模板,进行的配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#!/usr/bin/env python
#-*-coding:utf-8-*-
from __future__ import absolute_import
from celery import Celery
from datetime import timedelta
import sys
sys.path.append('..')
import web.settings as settings
redis_port = settings.REDIS_PORT
redis_host = settings.REDIS_HOST
broker_uri = "redis://{host}:{port}/1".format(host=redis_host, port=redis_port)
app = Celery("auto_run",
broker=broker_uri,
include=["tasks.younixue_tasks"],
)
app.conf.update(
CELERYBEAT_SCHEDULE = {
"set_course_on": { # 上架课程
"task": "tasks.younixue_tasks.set_course_on",
"schedule": timedelta(seconds=30),
},
"set_course_off": { # 下架课程
"task": "tasks.younixue_tasks.set_course_off",
"schedule": timedelta(seconds=30),
},
"sync_visit_record": { # 同步 redis 访问记录
"task": "tasks.younixue_tasks.sync_visit_record",
"schedule": timedelta(seconds=60*60),
},
},
)
if __name__ == "__main__":
app.start()

在上面的代码中,第一行代码是必须要有的。后面的配置中,我使用的是 redis 作为存储后端。
这里重要的有两点:

  1. 定义 app;
  2. 配置 app;

像这里的,配置的就是自动任务的内容,自动任务的具体实现是写在 tasks.younixue_tasks.py 文件中的,这里在配置时,就具体指定到对应的函数中去了。后续扩展就可以以这个为模板进行配置。

来看一下具体的任务函数的编写。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#!/usr/bin/env python
#-*-coding:utf-8-*-
import time
import sys
import redis
from celery import app
import web.settings as settings
import web.lock_torndb as torndb
tdb = torndb.Connection(host=settings.DBHOST, database=settings.DATABASE, user=settings.DBUSER, password=settings.DBPWD)
rclient = redis.Redis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=settings.REDIS_DB_NUM)
# 上架/下架程序
@app.task()
def set_course_on():
cur_time = int(time.time())
# 上架课程,条件是,起始时间大于等于当前时间,且结束时间大于起始时间,且课程 course_on=N 及 publish=N
sql = "select course_id from courses where start_time>=%s and end_time>start_time and course_on='N'"
ret = tdb.query(sql, cur_time)
if ret:
for cid in ret:
update_course(cid.course_id, 'Y', 'Y')
return ret

一般的开发模式即为这样的,使用修饰符对对应的函数进行修饰,对应的函数就可以变成 celery 可以调用的任务了。

如果要自动化任务运行,一般使用的命令是 celery -A tasks worker -B -l info 这里,多了个 -B 参数。

配合 supervisor

在默认的 supervisord.conf 配置文件中,可以增加如下内容

1
2
[include]
files=celeryd.conf

这里 celeryd.conf 的内容为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
; ==================================
; celery worker supervisor example
; ==================================
[program:celery]
; Set full path to celery program if using virtualenv
command=celery -A tasks worker -B --concurrency=1 --loglevel=INFO
directory=/Users/axel/coding_net/younixue/server/web/
user=axel
numprocs=1
stdout_logfile=/var/log/celery/celery.log
stderr_logfile=/var/log/celery/celery_err.log
autostart=true
autorestart=true
startsecs=10
; Need to wait for currently executing tasks to finish at shutdown.
; Increase this if you have very long running tasks.
stopwaitsecs = 600
; When resorting to send SIGKILL to the program to terminate it
; send SIGKILL to its whole process group instead,
; taking care of its children as well.
killasgroup=true
; if rabbitmq is supervised, set its priority higher
; so it starts first
priority=998

这里需要注意的是, user 那里不要配置 root 用户。

其他内容

后续记录