celery是一个分布式的任务调度模块,那么celery是如何和分布式挂钩呢?
celery可以支持多台不同的计算机执行不同的任务或者相同的任务。
如果要说celery的分布式应用的话,就要提到celery的消息路由机制,提到AMQP协议。
具体可以查看AMQP文档详细了解。
简单理解:
可以有多个"消息队列"(message Queue),不同的消息可以指定发送给不同的Message Queue,
而这是通过Exchange来实现的,发送消息到"消息队列"中时,可以指定routiing_key,Exchange通过routing_key来吧消息路由(routes)到不同的"消息队列"中去。
如图:
exchange 对应 一个消息队列(queue),即:通过"消息路由"的机制使exchange对应queue,每个queue对应每个worker
写个例子:
vim demon3.py
1
2
3
4
5
6
7
8
9
10
11
12
|
from
celery
import
Celery
app
=
Celery()
app.config_from_object(
"celeryconfig"
)
@app
.task
def
taskA(x, y):
return
x
*
y
@app
.task
def
taskB(x, y, z):
return
x
+
y
+
z
@app
.task
def
add(x, y):
return
x
+
y
|
vim celeryconfig.py
1
2
3
4
5
6
7
8
9
10
11
12
13
|
from
kombu
import
Queue
BORKER_URL
=
"redis://192.168.48.131:6379/1"
#1库
CELERY_RESULT_BACKEND
=
"redis://192.168.48.131:6379/2"
#2库
CELERY_QUEUES
=
{
Queue(
"default"
, Exchange(
"default"
), routing_key
=
"default"
),
Queue(
"for_task_A"
, Exchange(
"for_task_A"
), routing_key
=
"for_task_A"
),
Queue(
"for_task_B"
, Exchange(
"for_task_B"
), routing_key
=
"for_task_B"
)
}
#路由
CELERY_ROUTES
=
{
"demon3.taskA"
:{
"queue"
:
"for_task_A"
,
"routing_key"
:
"for_task_A"
},
"demon3.taskB"
:{
"queue"
:
"for_task_B"
,
"routing_key"
:
"for_task_B"
}
}
|
下面把两个脚本导入服务器:
指定taskA启动一个worker:
1
|
# celery -A demon3 worker -l info -n workerA.%h -Q for_task_A
|
同理:
1
|
# celery -A demon3 worker -l info -n workerB.%h -Q for_task_B
|
下面远程客户端调用:新文件
vim remote.py
1
2
3
4
5
6
7
8
9
10
11
12
|
from
demon3
import
*
r1
=
taskA.delay(
10
,
20
)
print
(r1.result)
print
(r1.status)
r2
=
taskB.delay(
10
,
20
,
30
)
time.sleep(
1
)
prnit (r2.result)
print
(r2.status)
#print (dir(r2))
r3
=
add.delay(
100
,
200
)
print
(r3.result)
print
(r3.status)
#PENDING
|
看到状态是PENDING,表示没有执行,这个是因为没有celeryconfig.py文件中指定改route到哪一个Queue中,所以会被发动到默认的名字celery的Queue中,但是我们还没有启动worker执行celery中的任务。
下面,我们来启动一个worker来执行celery队列中的任务
1
|
# celery -A tasks worker -l info -n worker.%h -Q celery ##默认的
|
可以看到这行的结果为success
print(re3.status) #SUCCESS
定时任务:
Celery 与 定时任务
在celery中执行定时任务非常简单,只需要设置celery对象中的CELERYBEAT_SCHEDULE属性即可。
下面我们接着在配置文件:celeryconfig.py,添加关于 CELERYBEAT_SCHEDULE 变量到脚本中去:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
CELERY_TIMEZONE
=
'UTC'
CELERYBEAT_SCHEDULE
=
{
'taskA_schedule'
: {
'task'
:
'tasks.taskA'
,
'schedule'
:
20
,
'args'
:(
5
,
6
)
},
'taskB_scheduler'
: {
'task'
:
"tasks.taskB"
,
"schedule"
:
200
,
"args"
:(
10
,
20
,
30
)
},
'add_schedule'
: {
"task"
:
"tasks.add"
,
"schedule"
:
10
,
"args"
:(
1
,
2
)
}
}
|
注意格式,否则会有问题
启动:
celery -A demon3 worker -l info -n workerA.%h -Q for_task_A
celery -A demon3 worker -l info -n workerB.%h -Q for_task_B
celery -A tasks worker -l info -n worker.%h -Q celery
celery -A demon3 beat