Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-2843] Add flag in ExternalTaskSensor to check if external DAG/task exists #4547

Merged
merged 1 commit into from
Jan 17, 2019

Conversation

XD-DENG
Copy link
Member

@XD-DENG XD-DENG commented Jan 17, 2019

Jira

Description

Background

ExternalTaskSensor will keep waiting even if the external DAG/task specified doesn't exist at all (this is making sense in some scenarios).

But it may be good to provide an option to cease waiting immediately if the external task specified doesn't exist.

Proposal

Provide a flag check_existence. Set to True to check if the external DAG/task exists, and immediately cease waiting if the external DAG/task does not exist.

  • Case - 1 Waiting for External DAG: will check if the DAG exists in DB, AND whether the DAG file exists in file system
  • Case - 2 Waiting for External Task: the check to do for DAG will happen as well. In addition, re-scan the DAG and check if this task is in the DAG.

The default value is set to False (no check or ceasing will happen), so it will not affect any existing DAGs or user expectation.

This is a follow-up on #3688

…G/task exists

In ExternalTaskSensor, it may be good to provide
an option to cease waiting immediately if the external
DAG/task specified doesn't exist.

To provide an argument "check_existence". Set to True to check
if the external DAG/task exists, and immediately cease waiting
if the external DAG/task does not exist.

The default value is set to False (no check or ceasing
will happen) so it will not affect any existing DAGs or
current user expectation.
@@ -26,7 +29,8 @@

class ExternalTaskSensor(BaseSensorOperator):
"""
Waits for a different DAG or a task in in a different DAG to complete
Waits for a different DAG or a task in a different DAG to complete for a
Copy link
Member Author

@XD-DENG XD-DENG Jan 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed a typo.


if self.external_task_id:
refreshed_dag_info = DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id)
Copy link
Member Author

@XD-DENG XD-DENG Jan 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Purposely leave include_examples for DagBag to be default value True, since the external_dag_id may be example DAG.

  2. The exact file path is given to DagBag, so that it will only scan a single DAG file. This helps ensure no heavy overhead.


if self.external_task_id:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If ExternalTaskSensor is waiting for a DAG, we will check database dag table + file system.

If it's waiting for a task, ADDITIONAL check will be done to check if that task exists in the latest DAG file.

@XD-DENG
Copy link
Member Author

XD-DENG commented Jan 17, 2019

Hi @feng-tao , as discussed in #3688. PTAL.

@codecov-io
Copy link

Codecov Report

Merging #4547 into master will increase coverage by <.01%.
The diff coverage is 94.44%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #4547      +/-   ##
==========================================
+ Coverage   74.07%   74.08%   +<.01%     
==========================================
  Files         421      421              
  Lines       27649    27663      +14     
==========================================
+ Hits        20481    20494      +13     
- Misses       7168     7169       +1
Impacted Files Coverage Δ
airflow/sensors/external_task_sensor.py 96.29% <94.44%> (-1.21%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 3cc9f75...b082b76. Read the comment docs.

@feng-tao
Copy link
Member

LGTM overall for the logic, will take a closer look once in the office.

@feng-tao
Copy link
Member

LGTM, thanks @XD-DENG

@feng-tao feng-tao merged commit ee63d0d into apache:master Jan 17, 2019
@XD-DENG
Copy link
Member Author

XD-DENG commented Jan 17, 2019

Thanks @feng-tao

@XD-DENG XD-DENG deleted the airflow_2843 branch January 17, 2019 23:52
ashb pushed a commit that referenced this pull request Mar 21, 2019
…G/task exists (#4547)

In ExternalTaskSensor, it may be good to provide
an option to cease waiting immediately if the external
DAG/task specified doesn't exist.

To provide an argument "check_existence". Set to True to check
if the external DAG/task exists, and immediately cease waiting
if the external DAG/task does not exist.

The default value is set to False (no check or ceasing
will happen) so it will not affect any existing DAGs or
current user expectation.
ashb pushed a commit that referenced this pull request Mar 22, 2019
…G/task exists (#4547)

In ExternalTaskSensor, it may be good to provide
an option to cease waiting immediately if the external
DAG/task specified doesn't exist.

To provide an argument "check_existence". Set to True to check
if the external DAG/task exists, and immediately cease waiting
if the external DAG/task does not exist.

The default value is set to False (no check or ceasing
will happen) so it will not affect any existing DAGs or
current user expectation.
wmorris75 pushed a commit to modmed/incubator-airflow that referenced this pull request Jul 29, 2019
…G/task exists (apache#4547)

In ExternalTaskSensor, it may be good to provide
an option to cease waiting immediately if the external
DAG/task specified doesn't exist.

To provide an argument "check_existence". Set to True to check
if the external DAG/task exists, and immediately cease waiting
if the external DAG/task does not exist.

The default value is set to False (no check or ceasing
will happen) so it will not affect any existing DAGs or
current user expectation.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants