diff --git a/UPDATING.md b/UPDATING.md index 3c7d5494acdce..2fec5e9a6ccee 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -3,6 +3,17 @@ This file documents any backwards-incompatible changes in Airflow and assists people when migrating to a new version. +## Airflow 1.9.1 + +### Celery config + +To make the config of Airflow compatible with Celery, some properties have been renamed: +``` +celeryd_concurrency -> worker_concurrency +celery_result_backend -> result_backend +``` +This will result in the same config parameters as Celery 4 and will make it more transparent. + ## Airflow 1.9 ### SSH Hook updates, along with new SSH Operator & SFTP Operator diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index a02bb3908ae41..e75cf9a05906d 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -280,7 +280,7 @@ celery_app_name = airflow.executors.celery_executor # "airflow worker" command. This defines the number of task instances that # a worker will take, so size up your workers based on the resources on # your worker box and the nature of your tasks -celeryd_concurrency = 16 +worker_concurrency = 16 # When you start an airflow worker, airflow starts a tiny web server # subprocess to serve the workers local log files to the airflow main @@ -292,10 +292,16 @@ worker_log_server_port = 8793 # The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally # a sqlalchemy database. Refer to the Celery documentation for more # information. +# http://docs.celeryproject.org/en/latest/userguide/configuration.html#broker-settings broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow -# Another key Celery setting -celery_result_backend = db+mysql://airflow:airflow@localhost:3306/airflow +# The Celery result_backend. When a job finishes, it needs to update the +# metadata of the job. Therefore it will post a message on a message bus, +# or insert it into a database (depending of the backend) +# This status is used by the scheduler to update the state of the task +# The use of a database is highly recommended +# http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-result-backend-settings +result_backend = db+mysql://airflow:airflow@localhost:3306/airflow # Celery Flower is a sweet UI for Celery. Airflow has a shortcut to start # it `airflow flower`. This defines the IP that Celery Flower runs on @@ -317,6 +323,12 @@ celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_C # ETA you're planning to use. Especially important in case of using Redis or SQS visibility_timeout = 21600 +# In case of using SSL +ssl_active = False +ssl_key = +ssl_cert = +ssl_cacert = + [dask] # This section only applies if you are using the DaskExecutor in # [core] section above diff --git a/airflow/config_templates/default_celery.py b/airflow/config_templates/default_celery.py index 4f8c92dab0da1..3309cbe168219 100644 --- a/airflow/config_templates/default_celery.py +++ b/airflow/config_templates/default_celery.py @@ -14,8 +14,8 @@ import ssl -from airflow.exceptions import AirflowConfigException, AirflowException from airflow import configuration +from airflow.exceptions import AirflowConfigException, AirflowException from airflow.utils.log.logging_mixin import LoggingMixin log = LoggingMixin().log @@ -27,34 +27,33 @@ DEFAULT_CELERY_CONFIG = { 'accept_content': ['json', 'pickle'], 'event_serializer': 'json', - 'result_serializer': 'pickle', 'worker_prefetch_multiplier': 1, 'task_acks_late': True, 'task_default_queue': configuration.get('celery', 'DEFAULT_QUEUE'), 'task_default_exchange': configuration.get('celery', 'DEFAULT_QUEUE'), 'broker_url': configuration.get('celery', 'BROKER_URL'), 'broker_transport_options': {'visibility_timeout': broker_transport_options}, - 'result_backend': configuration.get('celery', 'CELERY_RESULT_BACKEND'), - 'worker_concurrency': configuration.getint('celery', 'CELERYD_CONCURRENCY'), + 'result_backend': configuration.get('celery', 'RESULT_BACKEND'), + 'worker_concurrency': configuration.getint('celery', 'WORKER_CONCURRENCY'), } celery_ssl_active = False try: - celery_ssl_active = configuration.getboolean('celery', 'CELERY_SSL_ACTIVE') + celery_ssl_active = configuration.getboolean('celery', 'SSL_ACTIVE') except AirflowConfigException as e: log.warning("Celery Executor will run without SSL") try: if celery_ssl_active: - broker_use_ssl = {'keyfile': configuration.get('celery', 'CELERY_SSL_KEY'), - 'certfile': configuration.get('celery', 'CELERY_SSL_CERT'), - 'ca_certs': configuration.get('celery', 'CELERY_SSL_CACERT'), + broker_use_ssl = {'keyfile': configuration.get('celery', 'SSL_KEY'), + 'certfile': configuration.get('celery', 'SSL_CERT'), + 'ca_certs': configuration.get('celery', 'SSL_CACERT'), 'cert_reqs': ssl.CERT_REQUIRED} DEFAULT_CELERY_CONFIG['broker_use_ssl'] = broker_use_ssl except AirflowConfigException as e: - raise AirflowException('AirflowConfigException: CELERY_SSL_ACTIVE is True, ' - 'please ensure CELERY_SSL_KEY, ' - 'CELERY_SSL_CERT and CELERY_SSL_CACERT are set') + raise AirflowException('AirflowConfigException: SSL_ACTIVE is True, ' + 'please ensure SSL_KEY, ' + 'SSL_CERT and SSL_CACERT are set') except Exception as e: raise AirflowException('Exception: There was an unknown Celery SSL Error. ' 'Please ensure you want to use ' diff --git a/airflow/configuration.py b/airflow/configuration.py index 84913ffb93b57..ed639521517ed 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -116,7 +116,7 @@ class AirflowConfigParser(ConfigParser): ('core', 'sql_alchemy_conn'), ('core', 'fernet_key'), ('celery', 'broker_url'), - ('celery', 'celery_result_backend') + ('celery', 'result_backend') } def __init__(self, *args, **kwargs): diff --git a/docs/configuration.rst b/docs/configuration.rst index 35616f214071e..51984e0da4990 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -35,7 +35,7 @@ You can also derive the connection string at run time by appending ``_cmd`` to t [core] sql_alchemy_conn_cmd = bash_command_to_run -But only three such configuration elements namely sql_alchemy_conn, broker_url and celery_result_backend can be fetched as a command. The idea behind this is to not store passwords on boxes in plain text files. The order of precedence is as follows - +-But only three such configuration elements namely sql_alchemy_conn, broker_url and result_backend can be fetched as a command. The idea behind this is to not store passwords on boxes in plain text files. The order of precedence is as follows - 1. environment variable 2. configuration in airflow.cfg diff --git a/scripts/ci/airflow_travis.cfg b/scripts/ci/airflow_travis.cfg index 6a8db93c9d078..b71947e2e650f 100644 --- a/scripts/ci/airflow_travis.cfg +++ b/scripts/ci/airflow_travis.cfg @@ -46,7 +46,7 @@ celery_app_name = airflow.executors.celery_executor celeryd_concurrency = 16 worker_log_server_port = 8793 broker_url = amqp://guest:guest@localhost:5672/ -celery_result_backend = db+mysql://root@localhost/airflow +result_backend = db+mysql://root@localhost/airflow flower_port = 5555 default_queue = default @@ -55,4 +55,3 @@ job_heartbeat_sec = 1 scheduler_heartbeat_sec = 5 authenticate = true max_threads = 2 - diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py index 1c411e7e8c97e..9abc60c62703e 100644 --- a/tests/executors/test_celery_executor.py +++ b/tests/executors/test_celery_executor.py @@ -11,20 +11,18 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import unittest import sys +import unittest +from celery.contrib.testing.worker import start_worker -from airflow.executors.celery_executor import app from airflow.executors.celery_executor import CeleryExecutor +from airflow.executors.celery_executor import app from airflow.utils.state import State -from celery.contrib.testing.worker import start_worker # leave this it is used by the test worker import celery.contrib.testing.tasks - class CeleryExecutorTest(unittest.TestCase): - def test_celery_integration(self): executor = CeleryExecutor() executor.start()