Back to Repositories

Testing LatestOnlyOperator and TriggerRule Integration in Apache Airflow

This example DAG demonstrates the interaction between LatestOnlyOperator and TriggerRule in Apache Airflow. It showcases how to implement conditional task execution based on scheduling timing while managing task dependencies and trigger rules.

Test Coverage Overview

The test coverage focuses on the interaction between LatestOnlyOperator and task trigger rules in an Airflow DAG.

  • Validates conditional execution based on schedule timing
  • Tests task dependency chains with LatestOnlyOperator
  • Verifies TriggerRule.ALL_DONE behavior in task4
  • Covers parallel task execution paths

Implementation Analysis

The implementation demonstrates a DAG structure using both LatestOnlyOperator and EmptyOperator with custom trigger rules.

The DAG runs on a 4-hour schedule with catchup disabled, utilizing pendulum for datetime handling. Task dependencies are established using bitshift operators, creating multiple execution paths that converge at task3 and task4.

Technical Details

  • Uses Airflow’s DAG and operator classes
  • Implements TriggerRule.ALL_DONE for task coordination
  • Configures schedule interval using datetime.timedelta
  • Utilizes pendulum for timezone-aware scheduling
  • Employs EmptyOperator for task placeholders

Best Practices Demonstrated

The example follows Airflow best practices for DAG design and task organization.

  • Clear task dependency definition
  • Proper use of trigger rules for flow control
  • Explicit timezone handling with pendulum
  • Meaningful task naming conventions
  • Appropriate tagging for DAG organization

apache/airflow

airflow/example_dags/example_latest_only_with_trigger.py

            
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example LatestOnlyOperator and TriggerRule interactions
"""

from __future__ import annotations

# [START example]
import datetime

import pendulum

from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.providers.standard.operators.latest_only import LatestOnlyOperator
from airflow.utils.trigger_rule import TriggerRule

with DAG(
    dag_id="latest_only_with_trigger",
    schedule=datetime.timedelta(hours=4),
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example3"],
) as dag:
    latest_only = LatestOnlyOperator(task_id="latest_only")
    task1 = EmptyOperator(task_id="task1")
    task2 = EmptyOperator(task_id="task2")
    task3 = EmptyOperator(task_id="task3")
    task4 = EmptyOperator(task_id="task4", trigger_rule=TriggerRule.ALL_DONE)

    latest_only >> task1 >> [task3, task4]
    task2 >> [task3, task4]
# [END example]