Creating remote Celery worker for Flask with separate code base

This flask snippet shows how to integrate celery in a flask to have access to flask's app context. What if we don't want celery tasks to be in Flask apps codebase? We can call celery tasks without having access to the task functions in flask by using a name for the task and sending the task with celery.send_task method.

Here is an example of flask-app using celery task that isn't available in flask's codebase:

- flask-app
    - app.py

Here is how example app.py looks like

import os

from flask import Flask  
from flask import url_for

from celery import Celery  
from celery.result import AsyncResult  
import celery.states as states


env=os.environ  
CELERY_BROKER_URL=env.get('CELERY_BROKER_URL','redis://localhost:6379'),  
CELERY_RESULT_BACKEND=env.get('CELERY_RESULT_BACKEND','redis://localhost:6379')

celery= Celery('tasks',  
                broker=CELERY_BROKER_URL,
                backend=CELERY_RESULT_BACKEND)

env=os.environ  
app = Flask(__name__)

# Send two numbers to add
@app.route('/add/<int:param1>/<int:param2>')
def add(param1,param2):  
    task = celery.send_task('mytasks.add', args=[param1, param2])
    return task.id

# Check the status of the task with the id found in the add function
@app.route('/check/<string:id>')
def check_task(id):  
    res = celery.AsyncResult(id)
    return res.state if res.state==states.PENDING else str(res.result)

if __name__ == '__main__':  
    app.run(debug=env.get('DEBUG',True),
            port=int(env.get('PORT',5000)),
            host=env.get('HOST','0.0.0.0'))

Now celery tasks can run on a separate machine (or multiple machines) with its own codebase.

- flask-celery
    - tasks.py
import os  
import time  
from celery import Celery

env=os.environ  
CELERY_BROKER_URL=env.get('CELERY_BROKER_URL','redis://localhost:6379'),  
CELERY_RESULT_BACKEND=env.get('CELERY_RESULT_BACKEND','redis://localhost:6379')


celery= Celery('tasks',  
                broker=CELERY_BROKER_URL,
                backend=CELERY_RESULT_BACKEND)

# The name parameter is the key here
@celery.task(name='mytasks.add')
def add(x, y):  
    time.sleep(5) # lets sleep for a while before doing the gigantic addition task!
    return x + y

And to run the worker as always:

celery -A tasks worker --loglevel=info  

We can now basically start as many workers as we want by having flask-celery module cloned to as many servers we want.

Dockerized Example to scale workers

A dockerised example is available here.

To run the docker example:

docker-compose build  
docker-compose up -d # run in the detached mode  

Now load http://your-dockermachine-ip:5000/add/2/3 in the browser. It should create a task and return a task id.

To check the status of the job hit http://your-dockermachine-ip:5000/check/taskid. It should either show PENDING or the result 5.

To monitor that the worker with flower go to http://your-dockermachine-ip:5555. It should show one worker ready to serve.

To scale the workers, now run docker-compose scale worker=5. This will create 4 more containers each running a worker. http://your-dockermachine-ip:5555 should now show 5 workers waiting for some jobs!

Moinul Hossain

Read more posts by this author.