This flask snippet shows how to integrate celery in a flask to have access to flask's app context. What if we don't want celery tasks to be in Flask apps codebase? We can call celery tasks without having access to the task functions in flask by using a name
for the task and sending the task with celery.send_task
method.
Here is an example of flask-app
using celery task that isn't available in flask's codebase:
- flask-app
- app.py
Here is how example app.py
looks like
import os
from flask import Flask
from flask import url_for
from celery import Celery
from celery.result import AsyncResult
import celery.states as states
env=os.environ
CELERY_BROKER_URL=env.get('CELERY_BROKER_URL','redis://localhost:6379'),
CELERY_RESULT_BACKEND=env.get('CELERY_RESULT_BACKEND','redis://localhost:6379')
celery= Celery('tasks',
broker=CELERY_BROKER_URL,
backend=CELERY_RESULT_BACKEND)
env=os.environ
app = Flask(__name__)
# Send two numbers to add
@app.route('/add/<int:param1>/<int:param2>')
def add(param1,param2):
task = celery.send_task('mytasks.add', args=[param1, param2])
return task.id
# Check the status of the task with the id found in the add function
@app.route('/check/<string:id>')
def check_task(id):
res = celery.AsyncResult(id)
return res.state if res.state==states.PENDING else str(res.result)
if __name__ == '__main__':
app.run(debug=env.get('DEBUG',True),
port=int(env.get('PORT',5000)),
host=env.get('HOST','0.0.0.0'))
Now celery tasks can run on a separate machine (or multiple machines) with its own codebase.
- flask-celery
- tasks.py
import os
import time
from celery import Celery
env=os.environ
CELERY_BROKER_URL=env.get('CELERY_BROKER_URL','redis://localhost:6379'),
CELERY_RESULT_BACKEND=env.get('CELERY_RESULT_BACKEND','redis://localhost:6379')
celery= Celery('tasks',
broker=CELERY_BROKER_URL,
backend=CELERY_RESULT_BACKEND)
# The name parameter is the key here
@celery.task(name='mytasks.add')
def add(x, y):
time.sleep(5) # lets sleep for a while before doing the gigantic addition task!
return x + y
And to run the worker as always:
celery -A tasks worker --loglevel=info
We can now basically start as many workers as we want by having flask-celery
module cloned to as many servers we want.
Dockerized Example to scale workers
A dockerised example is available here.
To run the docker example:
docker-compose build
docker-compose up -d # run in the detached mode
Now load http://your-dockermachine-ip:5000/add/2/3
in the browser. It should create a task and return a task id.
To check the status of the job hit http://your-dockermachine-ip:5000/check/taskid
. It should either show PENDING or the result 5.
To monitor that the worker with flower go to http://your-dockermachine-ip:5555
. It should show one worker ready to serve.
To scale the workers, now run docker-compose scale worker=5. This will create 4 more containers each running a worker. http://your-dockermachine-ip:5555
should now show 5 workers waiting for some jobs!