Back to Repositories

Testing Performance Monitoring Implementation in Apache Airflow

This test suite provides performance testing utilities for Apache Airflow components, including memory tracing, profiling, and SQL query analysis tools. The performance kit offers decorators and context managers to measure and optimize Airflow’s various components.

Test Coverage Overview

The test suite provides comprehensive coverage for performance-critical aspects of Airflow:

  • Memory usage tracking and tracing
  • Python profiling and execution timing
  • SQL query analysis and counting
  • Timeout and repetition testing capabilities
Integration points include SQLAlchemy database interactions and Python runtime performance monitoring.

Implementation Analysis

The testing approach utilizes decorators and context managers for modular and reusable performance testing. The implementation leverages Python’s introspection capabilities and SQLAlchemy’s event system to track resource usage and performance metrics.

Key patterns include decorator-based profiling, context manager-based resource tracking, and automated query counting.

Technical Details

Testing tools and libraries include:

  • py-spy for Python profiling
  • SQLAlchemy event listeners for query tracking
  • Custom timing decorators for performance measurement
  • Memory tracing utilities
  • Configurable timeout mechanisms

Best Practices Demonstrated

The test suite exemplifies excellent testing practices through modular design and comprehensive documentation. Notable features include:

  • Clear separation of concerns between different performance aspects
  • Extensive docstrings and usage examples
  • Flexible implementation supporting both decorator and context manager patterns
  • Integration with existing test frameworks

apache/airflow

tests_common/test_utils/perf/perf_kit/__init__.py

            
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Perf-kit.

Useful decorators and context managers used when testing the performance of various Airflow components.

Content
========

The following decorators and context managers are included.

.. autofunction:: tests.utils.perf.perf_kit.memory.trace_memory

.. autofunction:: tests.utils.perf.perf_kit.python.pyspy

.. autofunction:: tests.utils.perf.perf_kit.python.profiled

.. autofunction:: tests.utils.perf.perf_kit.repeat_and_time.timing

.. autofunction:: tests.utils.perf.perf_kit.repeat_and_time.repeat

.. autofunction:: tests.utils.perf.perf_kit.repeat_and_time.timeout

.. autofunction:: tests.utils.perf.perf_kit.sqlalchemy.trace_queries

.. autofunction:: tests.utils.perf.perf_kit.sqlalchemy.count_queries

Documentation for each function is provided in the function docstrings. Each module also has an example in
the main section of the module.

Examples
========

If you want to run an all example for ``tests.utils.perf.perf_kit.sqlalchemy``, you can run the
following command.

.. code-block:: bash

    python -m tests.utils.perf_kit.sqlalchemy

If you want to know how to use these functions, it is worth to familiarize yourself with these examples.

Use in tests
============

If you need it, you can easily test only one test using context manager.

Suppose we have the following fragment of the file with tests.

.. code-block:: python

    prev = dag.previous_schedule(_next)
    prev_local = local_tz.convert(prev)

    assert prev_local.isoformat() == "2018-03-24T03:00:00+01:00"
    assert prev.isoformat() == "2018-03-24T02:00:00+00:00"


    def test_bulk_write_to_db(self):
        clear_db_dags()
        dags = [DAG(f"dag-bulk-sync-{i}", start_date=DEFAULT_DATE, tags=["test-dag"]) for i in range(0, 4)]

        with assert_queries_count(3):
            DAG.bulk_write_to_db(dags)

You can add a code snippet before the method definition, and then perform only one test and count the
queries in it.

.. code-block:: python
   :emphasize-lines: 6-8

    prev = dag.previous_schedule(_next)
    prev_local = local_tz.convert(prev)

    assert prev_local.isoformat() == "2018-03-24T03:00:00+01:00"
    assert prev.isoformat() == "2018-03-24T02:00:00+00:00"

    from tests.utils.perf.perf_kit.sqlalchemy import trace_queries


    @trace_queries
    def test_bulk_write_to_db(self):
        clear_db_dags()
        dags = [DAG(f"dag-bulk-sync-{i}", start_date=DEFAULT_DATE, tags=["test-dag"]) for i in range(0, 4)]

        with assert_queries_count(3):
            DAG.bulk_write_to_db(dags)

To run the test, execute the command

.. code-block:: bash

    pytest tests.models.dag -k test_bulk_write_to_db -s

This is not a beautiful solution, but it allows you to easily check a random piece of code.

Having a separate file to save various tests cases can be helpful.
"""