运维人

如何使用Celery(芹菜)异步神器执行后台任务

关于异步的知识网上很多,这里就直接上代码,目前结合Flask这个Python框架实现后台任务的执行操作;

1.需要了解的知识点:

  • 了解生产消费模型或者发布订阅模式来实现消息队列
  • 了解异步、同步之间的差别

2.实现过程

(1)目录结构

├── LICENSE
├── README.md
├── app                     // Flask APP应用
│   ├── __init__.py 
│   ├── auth
│   ├── email.py
│   ├── main
│   ├── models.py
│   ├── salt
│   ├── static
│   └── templates
├── config.py               // 通用配置
├── config.pyc
├── manager.py
├── migrations
│   ├── README
│   ├── alembic.ini
│   ├── env.py
│   └── versions
└── requirements.txt

(2)指定broker/backend(此处使用redis)

vim app/__init__.py

// 部分代码
from celery import Celery

app = Flask(__name__)

broker = 'redis://127.0.0.1:6379'
backend = 'redis://127.0.0.1:6379/0'

celery = Celery(app.name, broker=broker, backend=backend)

// 部分代码

以上引入了Celery,指定了Celery的生产消费端都为我们的redis

(3)这里我将任务写到了视图函数中

vim app/main/views.py

  • (1 创建一个任务函数, 并且将这个任务绑定上了celery的标记
### 部分代码
@celery.task(bind=True)
def update_cbt_resource(self):
    '''
    @summary: 创建一个需要后台去执行的任务,我这里只是举个栗子
    '''
    result=commands.getoutput("sleep 20 && echo 'ok'")
    return result
  • (2 创建一个执行任务请求函数
### 部分代码
@main.route('/execute_task/<update_env>', methods=['GET', 'POST'])
def execute_task(update_env):
    """
    @summary: 请求函数入口
    """
    # 这里用最笨的办法执行了在我们web中点击执行任务之前去检查celery这个服务是否存在,如果不存在会提示用户    
    result = commands.getoutput("ps -ef | grep celery | grep -v grep")
    if not result:
        return jsonify({"result":False,"message":u'未发现Celery进程,请检查该服务是否正常启动'})

     # 将前面写的任务函数直接调用apply_async属性
    task = update_cbt_resource.apply_async()
  
     # 这里就可以获取到这个任务一开始执行就返回的一些属性
     # 比如任务ID, 任务状态
    data = {
        "task_id": task.id,
        "task_status": task.status
    }
    
    # 将以上的信息作为这请求函数的返回
    result = {"result":True,"data":data,"message":u'执行开始'}
    return jsonify(result)
  • (3 创建一个任务状态查询的函数

一般任务触发之后我们想要知道这个任务的执行状态,是处于哪个阶段的,各个状态已经在官网详细说明了;有兴趣的直接看官网

### 部分的代码
@main.route('/task_result', methods=['GET'])
@login_required
def task_result():
    '''
    @summary: 任务的状态是通过task_id来获取,在触发了任务之后,通过前端js轮询请求这个函数,就可以得到该任务的当前执行状态
    '''
    # 点击执行按钮前端会传一个task_id到后端,这里使用form的方式获取到task_id
    task_id = json.loads(request.form.get('data'))['task_id']
    
    # AsyncResult,它的作用是被用来检查任务状态,等待任务执行完毕或获取任务结果,如果任务失败,它会返回异常信息或者调用栈。
    the_task = update_cbt_resource.AsyncResult(task_id)
    
    print("任务:{0} 当前的 state 为:{1}".format(task_id, the_task.state))

     # 执the_task.state
    if  the_task.state  == 'PROGRESS':
        print the_task.info.get('i', 0)
        result = {'state': 'progress',"result_data":the_task.result}
    elif  the_task.state  == 'SUCCESS':
        result = {'state': "success", "result_data":the_task.result}
    elif  the_task.state  == 'PENDING':
        result = {'state': 'waitting',"result_data":the_task.result}
    elif  the_task.state  == 'REVOKED':
        result = { 'state': 'revoke', "result_data":the_task.result}
        print the_task.result
    else:
        result = {'state': the_task.state,'progress':0,"result_data":the_task.result}
    return jsonify(result)
  • (4 尝试在前端页面点击执行任务

    Oops! 这就是上面进行celery进程判断之后得到的提示信息,那下面就把celery启动起来吧~~~

  • (5 启动Celey服务

celery worker -A manager.celery -l debug
### 我这里是本地使用的virtualenv环境~~~


ok, 现在启动了celery服务之后我们在前台执行一下:

通过点击执行后我们看日志:

任务执行完毕了;

再看看Flask的日志,这就是通过js轮询请求这个task_resul函数的结果

再来看看redis中的内容,这就将结果保存到了backend中

(4)如何撤销一个任务呢?

还是通过task_id来实现。视图函数中编写一个任务的函数

### 部分代码
@main.route('/cancel_task/', methods=['GET', 'POST'])
@login_required
def cancel_task():
     # 通过前端的取消按钮来获取到这个任务的id
    task_id = json.loads(request.form.get('data'))['task_id']
    try:
        celery.control.revoke(task_id, terminate=True, signal='SIGKILL')
        return Response(json.dumps({'result':True,"message": "取消任务完成" }))
    except Exception,e:
        return Response(json.dumps({'result': True, "message": u'取消任务失败.{0}'.format(e)}))

执行过程如下:



需要注意的是,在上面视图函数中的这段代码其实有也可以撤销

task = update_cbt_resource.apply_async()
task.revoke() 

但是这种方式只是撤销,如果任务已经在执行撤销则无效;所以我这里可以使用下面的方法来撤销

# 通过task_id撤销
celery.control.revoke(task_id)
# 撤销正在执行的任务,默认使用TERM信号
celery.control.revoke(task_id, terminate=True)
# 撤销正在执行的任务,默认使用KILL信号
celery.control.revoke(task_id, terminate=True, signal='SIGKILL')
#在官网文档中也可以将多个task_id组成列表形式然后同时撤销多个任务
celery.control.revoke([task_id1,task_id2,task_id3,task_id4.......])

ok,以上就是Celery 的基本使用。

    分享到:
码字很辛苦,转载请注明来自运维人《如何使用Celery(芹菜)异步神器执行后台任务》

评论