Back to Repositories

Testing Parameter Passing Implementation in Apache Airflow

This example DAG demonstrates the implementation of parameter passing in Airflow test commands and task execution. It showcases different methods of passing parameters to tasks and accessing them during test runs.

Test Coverage Overview

The test suite validates parameter passing mechanisms in Apache Airflow tasks.

Key areas covered include:
  • Parameter passing via test command arguments
  • Environment variable handling in test mode
  • Task parameter inheritance and override behavior
  • Integration between Python and Bash operators for parameter handling

Implementation Analysis

The testing approach utilizes Airflow’s native testing capabilities with both decorated Python tasks and BashOperator implementations.

Technical patterns include:
  • Task decoration using @task decorator
  • Parameter templating in bash commands
  • Environment variable injection
  • Test mode conditional logic

Technical Details

Testing tools and configuration:
  • Airflow CLI test command interface
  • Python decorators for task definition
  • Environment variable configuration
  • BashOperator for shell command execution
  • Pendulum for datetime handling
  • Task parameter dictionary structures

Best Practices Demonstrated

The implementation showcases several testing best practices in Airflow DAG development.

Notable practices include:
  • Separation of parameter passing mechanisms
  • Clear test mode handling
  • Explicit environment variable management
  • Proper task dependency definition
  • Effective use of templated fields

apache/airflow

airflow/example_dags/example_passing_params_via_test_command.py

            
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Example DAG demonstrating the usage of the params arguments in templated arguments."""

from __future__ import annotations

import datetime
import os
import textwrap

import pendulum

from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.providers.standard.operators.bash import BashOperator


@task(task_id="run_this")
def my_py_command(params, test_mode=None, task=None):
    """
    Print out the "foo" param passed in via
    `airflow tasks test example_passing_params_via_test_command run_this <date>
    -t '{"foo":"bar"}'`
    """
    if test_mode:
        print(
            f" 'foo' was passed in via test={test_mode} command : kwargs[params][foo] = {task.params['foo']}"
        )
    # Print out the value of "miff", passed in below via the Python Operator
    print(f" 'miff' was passed in via task params = {params['miff']}")
    return 1


@task(task_id="env_var_test_task")
def print_env_vars(test_mode=None):
    """
    Print out the "foo" param passed in via
    `airflow tasks test example_passing_params_via_test_command env_var_test_task <date>
    --env-vars '{"foo":"bar"}'`
    """
    if test_mode:
        print(f"foo={os.environ.get('foo')}")
        print(f"AIRFLOW_TEST_MODE={os.environ.get('AIRFLOW_TEST_MODE')}")


with DAG(
    "example_passing_params_via_test_command",
    schedule="*/1 * * * *",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    dagrun_timeout=datetime.timedelta(minutes=4),
    tags=["example"],
) as dag:
    run_this = my_py_command(params={"miff": "agg"})

    my_command = textwrap.dedent(
        """
        echo "'foo' was passed in via Airflow CLI Test command with value '$FOO'"
        echo "'miff' was passed in via BashOperator with value '$MIFF'"
        """
    )

    also_run_this = BashOperator(
        task_id="also_run_this",
        bash_command=my_command,
        params={"miff": "agg"},
        env={"FOO": "{{ params.foo }}", "MIFF": "{{ params.miff }}"},
    )

    env_var_test_task = print_env_vars()

    run_this >> also_run_this