Celery 셀러리 사용법 공부중

https://docs.celeryproject.org/en/latest/getting-started/

Broker 선택

셀러리로 메세지를 보내거나 받기 위해서는 Message Broker가 필요하다.

브로커의 종류

RabbitMQ

설치하기

apt-get install rabbitmq-server

사용자 추가하기

rabbitmqctl add_user <사용자이름> <비밀번호>

시작하기

rabbitmqctl start_app 

끄기

rabbitmqctl stop

docker에서 실행시키려면

docker run -d -p 5462:5462 rabbitmq

Redis

redis

Redis 는 데이터 검색을 위해 Database 에 접근하기 전 메모리에서 Cache 를 가져다 쓴다는 점에서 속도가 빠른 장점이 있다. (2020.01.07, https://medium.com/towncompany-engineering/celery-를-이용한-비동기-분산-처리로-api-속도-개선-8c9bcf0f119c)

설치

pip3 install --upgrade celery[redis]
wget http://download.redis.io/redis-stable.tar.gz
tar xvzf redis-stable.tar.gz
cd redis-stable
make
make test
make install
redis-server --daemonize yes
redis-server # redis 실행
redis-cli ping # 정상 설치되었는지 확인

redis background 실행

redis-server --daemonize yes

redis 사용법

docker 에서 실행시키려면

docker run -d -p 6379:6379 redis

AmazonSQS#skip

Celery 설치

셀러리는 PIP를 통해서 설치할 수 있다.

pip3 install celery

celery 가동

실행시킬때 celery instance가 있는 모듈을 골라야함

celery -A <moduel.celery> worker --loglevel=info
celery -A myapp.pub_api.agent.celery worker --loglevel=info concurrency=1 # 하나의 워커

tasks.py

from celery import Celery
#RabbitMQ version
app = Celery('tasks', broker='pyamqp://guest@localhost//')
app = Celery('tasks',backend='redis://localhost:6379', broker='redis://localhost:6379')
@app.task
def add(x, y):
    return x + y

tasks를 실행시키는 법

ex) from tasks import add

add.delay(4,4)

여기서 return 값을 띄우려면 result backend 를 설정해야 한다.

app = Celery(‘tasks’, backend=’amqp’, broker=’pyamqp://guest@localhost//’)

프로젝트에 셀러리 연동하는법

조건

  1. Redis가 구동되어 있어야 한다.
  2. Celery worker가 실행되어 있어야 한다.

위 조건을 만족하는 상태에서 아래와 같이 celery 로 수행할 작업을 @celery.task decorator를 붙인다.

e_mail.py

from celery import Celery
from flask import Flask,render_template
from flask_mail import Mail, Message
from instance.config import *

app = Flask(__name__)
app.config.from_object('config')
app.config.from_pyfile('instance/config.py')
mail = Mail(app)

celery = Celery(app.import_name ,broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

@celery.task
def send_async_email(to, subject, template, **kwargs):
    msg = Message(MAIL_SUBJECT_PREFIX + ' ' + subject,
                  sender=MAIL_SENDER, recipients=[to])
    with app.app_context():
        msg.body = render_template(template + '.txt', **kwargs)
        msg.html = render_template(template + '.html', **kwargs)
        mail.send(msg)
    print("Mail Sent")

함수를 실행할때는

함수명.delay(인자)

이런식으로 함수명에 .delay를 붙이고 ()안에 인자를 넣어주면 정상적으로 실행된다.

ex)e_mail.send_async_email.delay(22************@na***.com">“pe**********@na***.com“, “Current Air Stat”, “email/inform”, list=self.values)

웹서비스 실행시키기;

docker restart rweb
docker exec -it rweb  sh -c "cd /data && /bin/bash"           
redis-server --daemonize yes                     
celery -A myapp.e_mail.celery worker --loglevel=info
docker attach rweb
python3 manage.py runserver

Celery worker별로 task 수행 시키는법

Celery 에서 worker별로 task를 부여하는 방법최근 개인적으로 준비하고 있는 프로젝트가 하나 있는데, 그 프로젝트에서 돌고 돌다가 결국 Celery를 도입하게 되었다. (나중에 다른 글을 통해서 공개할 수 있으면 공개하도록 하겠다.) 지금 진행하고 있는 프로젝트에는 task가 2개가 있고 이를 concurrency를 이용해서 10개의 worker를 돌리고 있지만 이 과정에서 문제가 발생하였다.

Celery 에서 worker별로 task를 부여하는 방법 정리해놓은 다른 블로그

https://iam.namjun.kim/celery/2018/09/09/celery-routing/

핵심:

app = Celery('tasks', broker='redis://localhost:6379', backend='redis://localhost:6379')
app.conf.task_default_queue = 'default'
app.conf.task_queues = (
	Queue('slow_tasks', routing_key='slow.#'),
	Queue('quick_tasks', routing_key='quick.#'),
)