Back to Repositories

Testing Task Mapping Implementation in Apache Airflow

This test suite focuses on validating the task mapping functionality in Apache Airflow, specifically testing the expansion of mapped tasks and their integration with the TaskMap model. The tests ensure proper handling of task mapping operations within Airflow’s DAG execution context.

Test Coverage Overview

The test coverage encompasses the core task mapping operations in Airflow, focusing on the expand_mapped_task function.

Key areas tested include:
  • TaskMap creation and persistence
  • Mapped task expansion logic
  • DAG ID and task ID handling
  • Run ID management
  • Session handling for database operations

Implementation Analysis

The testing approach utilizes Python’s type checking and SQLAlchemy session management to validate task mapping operations.

Key implementation patterns include:
  • Session-based database interaction
  • Type-safe operator mapping
  • Proper task expansion with run ID tracking
  • Integration with MappedOperator class

Technical Details

Testing tools and components:
  • SQLAlchemy ORM for database operations
  • Python type hints and TYPE_CHECKING
  • Airflow TaskMap model
  • MappedOperator class integration
  • Session management for transaction handling

Best Practices Demonstrated

The test implementation showcases several testing best practices for Airflow components.

Notable practices include:
  • Proper type hinting and checking
  • Clean session management
  • Explicit parameter passing
  • Clear separation of mapping concerns
  • Structured database interaction patterns

apache/airflow

tests_common/test_utils/mapping.py

            
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING

from airflow.models.taskmap import TaskMap

if TYPE_CHECKING:
    from sqlalchemy.orm import Session

    from airflow.models.mappedoperator import MappedOperator


def expand_mapped_task(
    mapped: MappedOperator, run_id: str, upstream_task_id: str, length: int, session: Session
):
    session.add(
        TaskMap(
            dag_id=mapped.dag_id,
            task_id=upstream_task_id,
            run_id=run_id,
            map_index=-1,
            length=length,
            keys=None,
        )
    )
    session.flush()

    mapped.expand_mapped_task(run_id, session=session)