Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-1840] Make celery configuration congruent with Celery 4 #2806

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,17 @@
This file documents any backwards-incompatible changes in Airflow and
assists people when migrating to a new version.

## Airflow 1.9.1

### Celery config

To make the config of Airflow compatible with Celery, some properties have been renamed:
```
celeryd_concurrency -> worker_concurrency
celery_result_backend -> result_backend
```
This will result in the same config parameters as Celery 4 and will make it more transparent.

## Airflow 1.9

### SSH Hook updates, along with new SSH Operator & SFTP Operator
Expand Down
18 changes: 15 additions & 3 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ celery_app_name = airflow.executors.celery_executor
# "airflow worker" command. This defines the number of task instances that
# a worker will take, so size up your workers based on the resources on
# your worker box and the nature of your tasks
celeryd_concurrency = 16
worker_concurrency = 16

# When you start an airflow worker, airflow starts a tiny web server
# subprocess to serve the workers local log files to the airflow main
Expand All @@ -292,10 +292,16 @@ worker_log_server_port = 8793
# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
# a sqlalchemy database. Refer to the Celery documentation for more
# information.
# http://docs.celeryproject.org/en/latest/userguide/configuration.html#broker-settings
broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow

# Another key Celery setting
celery_result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
# The Celery result_backend. When a job finishes, it needs to update the
# metadata of the job. Therefore it will post a message on a message bus,
# or insert it into a database (depending of the backend)
# This status is used by the scheduler to update the state of the task
# The use of a database is highly recommended
# http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-result-backend-settings
result_backend = db+mysql://airflow:airflow@localhost:3306/airflow

# Celery Flower is a sweet UI for Celery. Airflow has a shortcut to start
# it `airflow flower`. This defines the IP that Celery Flower runs on
Expand All @@ -317,6 +323,12 @@ celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_C
# ETA you're planning to use. Especially important in case of using Redis or SQS
visibility_timeout = 21600

# In case of using SSL
ssl_active = False
ssl_key =
ssl_cert =
ssl_cacert =

[dask]
# This section only applies if you are using the DaskExecutor in
# [core] section above
Expand Down
21 changes: 10 additions & 11 deletions airflow/config_templates/default_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

import ssl

from airflow.exceptions import AirflowConfigException, AirflowException
from airflow import configuration
from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.utils.log.logging_mixin import LoggingMixin

log = LoggingMixin().log
Expand All @@ -27,34 +27,33 @@
DEFAULT_CELERY_CONFIG = {
'accept_content': ['json', 'pickle'],
'event_serializer': 'json',
'result_serializer': 'pickle',
'worker_prefetch_multiplier': 1,
'task_acks_late': True,
'task_default_queue': configuration.get('celery', 'DEFAULT_QUEUE'),
'task_default_exchange': configuration.get('celery', 'DEFAULT_QUEUE'),
'broker_url': configuration.get('celery', 'BROKER_URL'),
'broker_transport_options': {'visibility_timeout': broker_transport_options},
'result_backend': configuration.get('celery', 'CELERY_RESULT_BACKEND'),
'worker_concurrency': configuration.getint('celery', 'CELERYD_CONCURRENCY'),
'result_backend': configuration.get('celery', 'RESULT_BACKEND'),
'worker_concurrency': configuration.getint('celery', 'WORKER_CONCURRENCY'),
}

celery_ssl_active = False
try:
celery_ssl_active = configuration.getboolean('celery', 'CELERY_SSL_ACTIVE')
celery_ssl_active = configuration.getboolean('celery', 'SSL_ACTIVE')
except AirflowConfigException as e:
log.warning("Celery Executor will run without SSL")

try:
if celery_ssl_active:
broker_use_ssl = {'keyfile': configuration.get('celery', 'CELERY_SSL_KEY'),
'certfile': configuration.get('celery', 'CELERY_SSL_CERT'),
'ca_certs': configuration.get('celery', 'CELERY_SSL_CACERT'),
broker_use_ssl = {'keyfile': configuration.get('celery', 'SSL_KEY'),
'certfile': configuration.get('celery', 'SSL_CERT'),
'ca_certs': configuration.get('celery', 'SSL_CACERT'),
'cert_reqs': ssl.CERT_REQUIRED}
DEFAULT_CELERY_CONFIG['broker_use_ssl'] = broker_use_ssl
except AirflowConfigException as e:
raise AirflowException('AirflowConfigException: CELERY_SSL_ACTIVE is True, '
'please ensure CELERY_SSL_KEY, '
'CELERY_SSL_CERT and CELERY_SSL_CACERT are set')
raise AirflowException('AirflowConfigException: SSL_ACTIVE is True, '
'please ensure SSL_KEY, '
'SSL_CERT and SSL_CACERT are set')
except Exception as e:
raise AirflowException('Exception: There was an unknown Celery SSL Error. '
'Please ensure you want to use '
Expand Down
2 changes: 1 addition & 1 deletion airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class AirflowConfigParser(ConfigParser):
('core', 'sql_alchemy_conn'),
('core', 'fernet_key'),
('celery', 'broker_url'),
('celery', 'celery_result_backend')
('celery', 'result_backend')
}

def __init__(self, *args, **kwargs):
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ You can also derive the connection string at run time by appending ``_cmd`` to t
[core]
sql_alchemy_conn_cmd = bash_command_to_run

But only three such configuration elements namely sql_alchemy_conn, broker_url and celery_result_backend can be fetched as a command. The idea behind this is to not store passwords on boxes in plain text files. The order of precedence is as follows -
-But only three such configuration elements namely sql_alchemy_conn, broker_url and result_backend can be fetched as a command. The idea behind this is to not store passwords on boxes in plain text files. The order of precedence is as follows -

1. environment variable
2. configuration in airflow.cfg
Expand Down
3 changes: 1 addition & 2 deletions scripts/ci/airflow_travis.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ celery_app_name = airflow.executors.celery_executor
celeryd_concurrency = 16
worker_log_server_port = 8793
broker_url = amqp://guest:guest@localhost:5672/
celery_result_backend = db+mysql://root@localhost/airflow
result_backend = db+mysql://root@localhost/airflow
flower_port = 5555
default_queue = default

Expand All @@ -55,4 +55,3 @@ job_heartbeat_sec = 1
scheduler_heartbeat_sec = 5
authenticate = true
max_threads = 2

8 changes: 3 additions & 5 deletions tests/executors/test_celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,18 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import sys
import unittest
from celery.contrib.testing.worker import start_worker

from airflow.executors.celery_executor import app
from airflow.executors.celery_executor import CeleryExecutor
from airflow.executors.celery_executor import app
from airflow.utils.state import State
from celery.contrib.testing.worker import start_worker

# leave this it is used by the test worker
import celery.contrib.testing.tasks


class CeleryExecutorTest(unittest.TestCase):

def test_celery_integration(self):
executor = CeleryExecutor()
executor.start()
Expand Down