Back to Repositories

Testing DAG Processor Deployment Configuration in Apache Airflow

This test suite validates the Airflow DAG Processor functionality in Helm chart deployments. It covers configuration options, version compatibility, and deployment settings for the standalone DAG processor component.

Test Coverage Overview

The test suite provides comprehensive coverage of DAG processor deployment configurations and features. Key areas tested include:

  • Version compatibility checks for Airflow 2.2+
  • Deployment enabling/disabling functionality
  • Container configuration and resource settings
  • Volume mounting and persistence options
  • Security context and migration controls

Implementation Analysis

The testing approach uses pytest fixtures to validate Helm chart rendering and configuration. Tests employ jmespath queries to verify generated Kubernetes manifests, with parameterized tests checking version-specific behaviors and configuration combinations.

Key patterns include template validation, resource definition verification, and configuration override testing.

Technical Details

  • Testing Framework: pytest with parametrize
  • Template Rendering: Helm chart generator
  • Validation Tools: jmespath for YAML parsing
  • Configuration: Values.yaml overrides
  • Test Types: Unit tests for manifest generation

Best Practices Demonstrated

The test suite demonstrates several testing best practices:

  • Comprehensive version compatibility testing
  • Modular test organization by feature
  • Thorough configuration validation
  • Clear test case isolation
  • Effective use of parametrized testing

apache/airflow

helm_tests/airflow_core/test_dag_processor.py

            
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import jmespath
import pytest

from tests.charts.helm_template_generator import render_chart
from tests.charts.log_groomer import LogGroomerTestBase


class TestDagProcessor:
    """Tests DAG processor."""

    @pytest.mark.parametrize(
        "airflow_version, num_docs",
        [
            ("2.2.0", 0),
            ("2.3.0", 1),
        ],
    )
    def test_only_exists_on_new_airflow_versions(self, airflow_version, num_docs):
        """Standalone Dag Processor was only added from Airflow 2.3 onwards."""
        docs = render_chart(
            values={
                "airflowVersion": airflow_version,
                "dagProcessor": {"enabled": True},
            },
            show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
        )

        assert num_docs == len(docs)

    def test_can_be_disabled(self):
        """Standalone Dag Processor is disabled by default."""
        docs = render_chart(
            values={"dagProcessor": {"enabled": False}},
            show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
        )

        assert len(docs) == 0

    def test_disable_wait_for_migration(self):
        docs = render_chart(
            values={
                "dagProcessor": {
                    "enabled": True,
                    "waitForMigrations": {"enabled": False},
                },
            },
            show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
        )
        actual = jmespath.search(
            "spec.template.spec.initContainers[?name=='wait-for-airflow-migrations']", docs[0]
        )
        assert actual is None

    def test_wait_for_migration_security_contexts_are_configurable(self):
        docs = render_chart(
            values={
                "dagProcessor": {
                    "enabled": True,
                    "waitForMigrations": {
                        "enabled": True,
                        "securityContexts": {
                            "container": {
                                "allowPrivilegeEscalation": False,
                                "readOnlyRootFilesystem": True,
                            },
                        },
                    },
                },
            },
            show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
        )

        assert jmespath.search("spec.template.spec.initContainers[0].securityContext", docs[0]) == {
            "allowPrivilegeEscalation": False,
            "readOnlyRootFilesystem": True,
        }

    def test_should_add_extra_containers(self):
        docs = render_chart(
            values={
                "dagProcessor": {
                    "enabled": True,
                    "extraContainers": [
                        {"name": "{{ .Chart.Name }}", "image": "test-registry/test-repo:test-tag"}
                    ],
                },
            },
            show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
        )

        assert jmespath.search("spec.template.spec.containers[-1]", docs[0]) == {
            "name": "airflow",
            "image": "test-registry/test-repo:test-tag",
        }

    def test_should_template_extra_containers(self):
        docs = render_chart(
            values={
                "dagProcessor": {
                    "enabled": True,
                    "extraContainers": [{"name": "{{ .Release.Name }}-test-container"}],
                },
            },
            show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
        )

        assert jmespath.search("spec.template.spec.containers[-1]", docs[0]) == {
            "name": "release-name-test-container"
        }

    def test_should_add_extra_init_containers(self):
        docs = render_chart(
            values={
                "dagProcessor": {
                    "enabled": True,
                    "extraInitContainers": [
                        {"name": "test-init-container", "image": "test-registry/test-repo:test-tag"}
                    ],
                },
            },
            show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
        )

        assert jmespath.search("spec.template.spec.initContainers[-1]", docs[0]) == {
            "name": "test-init-container",
            "image": "test-registry/test-repo:test-tag",
        }

    def test_should_template_extra_init_containers(self):
        docs = render_chart(
            values={
                "dagProcessor": {
                    "enabled": True,
                    "extraInitContainers": [{"name": "{{ .Release.Name }}-test-init-container"}],
                },
            },
            show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
        )

        assert jmespath.search("spec.template.spec.initContainers[-1]", docs[0]) == {
            "name": "release-name-test-init-container"
        }

    def test_should_add_extra_volume_and_extra_volume_mount(self):
        docs = render_chart(
            values={
                "dagProcessor": {
                    "enabled": True,
                    "extraVolumes": [{"name": "test-volume-{{ .Chart.Name }}", "emptyDir": {}}],
                    "extraVolumeMounts": [
                        {"name": "test-volume-{{ .Chart.Name }}", "mountPath": "/opt/test"}
                    ],
                },
            },
            show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
        )

        assert jmespath.search("spec.template.spec.volumes[1].name", docs[0]) == "test-volume-airflow"
        assert (
            jmespath.search("spec.template.spec.containers[0].volumeMounts[0].name", docs[0])
            == "test-volume-airflow"
        )
        assert (
            jmespath.search("spec.template.spec.initContainers[0].volumeMounts[0].name", docs[0])
            == "test-volume-airflow"
        )

    def test_should_add_global_volume_and_global_volume_mount(self):
        docs = render_chart(
            values={
                "dagProcessor": {"enabled": True},
                "volumes": [{"name": "test-volume", "emptyDir": {}}],
                "volumeMounts": [{"name": "test-volume", "mountPath": "/opt/test"}],
            },
            show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
        )

        assert jmespath.search("spec.template.spec.volumes[1].name", docs[0]) == "test-volume"
        assert (
            jmespath.search("spec.template.spec.containers[0].volumeMounts[0].name", docs[0]) == "test-volume"
        )
        assert (
            jmespath.search("spec.template.spec.initContainers[0].volumeMounts[0].name", docs[0])
            == "test-volume"
        )

    def test_should_add_extraEnvs(self):
        docs = render_chart(
            values={
                "dagProcessor": {
                    "enabled": True,
                    "env": [
                        {"name": "TEST_ENV_1", "value": "test_env_1"},
                        {
                            "name": "TEST_ENV_2",
                            "valueFrom": {"secretKeyRef": {"name": "my-secret", "key": "my-key"}},
                        },
                        {
                            "name": "TEST_ENV_3",
                            "valueFrom": {"configMapKeyRef": {"name": "my-config-map", "key": "my-key"}},
                        },
                    ],
                },
            },
            show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
        )

        assert {"name": "TEST_ENV_1", "value": "test_env_1"} in jmespath.search(
            "spec.template.spec.containers[0].env", docs[0]
        )
        assert {
            "name": "TEST_ENV_2",
            "valueFrom": {"secretKeyRef": {"name": "my-secret", "key": "my-key"}},
        } in jmespath.search("spec.template.spec.containers[0].env", docs[0])
        assert {
            "name": "TEST_ENV_3",
            "valueFrom": {"configMapKeyRef": {"name": "my-config-map", "key": "my-key"}},
        } in jmespath.search("spec.template.spec.containers[0].env", docs[0])

    def test_should_add_extraEnvs_to_wait_for_migration_container(self):
        docs = render_chart(
            values={
                "dagProcessor": {
                    "enabled": True,
                    "waitForMigrations": {
                        "env": [{"name": "TEST_ENV_1", "value": "test_env_1"}],
                    },
                }
            },
            show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
        )

        assert {"name": "TEST_ENV_1", "value": "test_env_1"} in jmespath.search(
            "spec.template.spec.initContainers[0].env", docs[0]
        )

    def test_scheduler_name(self):
        docs = render_chart(
            values={"dagProcessor": {"enabled": True}, "schedulerName": "airflow-scheduler"},
            show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
        )

        assert (
            jmespath.search(
                "spec.template.spec.schedulerName",
                docs[0],
            )
            == "airflow-scheduler"
        )

    def test_should_create_valid_affinity_tolerations_and_node_selector(self):
        docs = render_chart(
            values={
                "dagProcessor": {
                    "enabled": True,
                    "affinity": {
                        "nodeAffinity": {
                            "requiredDuringSchedulingIgnoredDuringExecution": {
                                "nodeSelectorTerms": [
                                    {
                                        "matchExpressions": [
                                            {"key": "foo", "operator": "In", "values": ["true"]},
                                        ]
                                    }
                                ]
                            }
                        }
                    },
                    "tolerations": [
                        {"key": "dynamic-pods", "operator": "Equal", "value": "true", "effect": "NoSchedule"}
                    ],
                    "nodeSelector": {"diskType": "ssd"},
                },
            },
            show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
        )

        assert jmespath.search("kind", docs[0]) == "Deployment"
        assert (
            jmespath.search(
                "spec.template.spec.affinity.nodeAffinity."
                "requiredDuringSchedulingIgnoredDuringExecution."
                "nodeSelectorTerms[0]."
                "matchExpressions[0]."
                "key",
                docs[0],
            )
            == "foo"
        )
        assert (
            jmespath.search(
                "spec.template.spec.nodeSelector.diskType",
                docs[0],
            )
            == "ssd"
        )
        assert (
            jmespath.search(
                "spec.template.spec.tolerations[0].key",
                docs[0],
            )
            == "dynamic-pods"
        )

    def test_affinity_tolerations_topology_spread_constraints_and_node_selector_precedence(self):
        """When given both global and triggerer affinity etc, triggerer affinity etc is used."""
        expected_affinity = {
            "nodeAffinity": {
                "requiredDuringSchedulingIgnoredDuringExecution": {
                    "nodeSelectorTerms": [
                        {
                            "matchExpressions": [
                                {"key": "foo", "operator": "In", "values": ["true"]},
                            ]
                        }
                    ]
                }
            }
        }
        expected_topology_spread_constraints = {
            "maxSkew": 1,
            "topologyKey": "foo",
            "whenUnsatisfiable": "ScheduleAnyway",
            "labelSelector": {"matchLabels": {"tier": "airflow"}},
        }
        docs = render_chart(
            values={
                "dagProcessor": {
                    "enabled": True,
                    "affinity": expected_affinity,
                    "tolerations": [
                        {"key": "dynamic-pods", "operator": "Equal", "value": "true", "effect": "NoSchedule"}
                    ],
                    "topologySpreadConstraints": [expected_topology_spread_constraints],
                    "nodeSelector": {"type": "ssd"},
                },
                "affinity": {
                    "nodeAffinity": {
                        "preferredDuringSchedulingIgnoredDuringExecution": [
                            {
                                "weight": 1,
                                "preference": {
                                    "matchExpressions": [
                                        {"key": "not-me", "operator": "In", "values": ["true"]},
                                    ]
                                },
                            }
                        ]
                    }
                },
                "tolerations": [
                    {"key": "not-me", "operator": "Equal", "value": "true", "effect": "NoSchedule"}
                ],
                "topologySpreadConstraints": [
                    {
                        "maxSkew": 1,
                        "topologyKey": "not-me",
                        "whenUnsatisfiable": "ScheduleAnyway",
                        "labelSelector": {"matchLabels": {"tier": "airflow"}},
                    }
                ],
                "nodeSelector": {"type": "not-me"},
            },
            show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
        )

        assert expected_affinity == jmespath.search("spec.template.spec.affinity", docs[0])
        assert (
            jmespath.search(
                "spec.template.spec.nodeSelector.type",
                docs[0],
            )
            == "ssd"
        )
        tolerations = jmespath.search("spec.template.spec.tolerations", docs[0])
        assert len(tolerations) == 1
        assert tolerations[0]["key"] == "dynamic-pods"
        assert expected_topology_spread_constraints == jmespath.search(
            "spec.template.spec.topologySpreadConstraints[0]", docs[0]
        )

    def test_should_create_default_affinity(self):
        docs = render_chart(show_only=["templates/scheduler/scheduler-deployment.yaml"])

        assert jmespath.search(
            "spec.template.spec.affinity.podAntiAffinity."
            "preferredDuringSchedulingIgnoredDuringExecution[0]."
            "podAffinityTerm.labelSelector.matchLabels",
            docs[0],
        ) == {"component": "scheduler"}

    def test_livenessprobe_values_are_configurable(self):
        docs = render_chart(
            values={
                "dagProcessor": {
                    "enabled": True,
                    "livenessProbe": {
                        "initialDelaySeconds": 111,
                        "timeoutSeconds": 222,
                        "failureThreshold": 333,
                        "periodSeconds": 444,
                        "command": ["sh", "-c", "echo", "wow such test"],
                    },
                },
            },
            show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
        )

        assert (
            jmespath.search("spec.template.spec.containers[0].livenessProbe.initialDelaySeconds", docs[0])
            == 111
        )
        assert (
            jmespath.search("spec.template.spec.containers[0].livenessProbe.timeoutSeconds", docs[0]) == 222
        )
        assert (
            jmespath.search("spec.template.spec.containers[0].livenessProbe.failureThreshold", docs[0]) == 333
        )
        assert jmespath.search("spec.template.spec.containers[0].livenessProbe.periodSeconds", docs[0]) == 444

        assert jmespath.search("spec.template.spec.containers[0].livenessProbe.exec.command", docs[0]) == [
            "sh",
            "-c",
            "echo",
            "wow such test",
        ]

    @pytest.mark.parametrize(
        "airflow_version, probe_command",
        [
            ("2.4.9", "airflow jobs check --hostname $(hostname)"),
            ("2.5.0", "airflow jobs check --local"),
            ("2.5.2", "airflow jobs check --local --job-type DagProcessorJob"),
        ],
    )
    def test_livenessprobe_command_depends_on_airflow_version(self, airflow_version, probe_command):
        docs = render_chart(
            values={"airflowVersion": f"{airflow_version}", "dagProcessor": {"enabled": True}},
            show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
        )
        assert (
            probe_command
            in jmespath.search("spec.template.spec.containers[0].livenessProbe.exec.command", docs[0])[-1]
        )

    @pytest.mark.parametrize(
        "log_values, expected_volume",
        [
            ({"persistence": {"enabled": False}}, {"emptyDir": {}}),
            (
                {"persistence": {"enabled": False}, "emptyDirConfig": {"sizeLimit": "10Gi"}},
                {"emptyDir": {"sizeLimit": "10Gi"}},
            ),
            (
                {"persistence": {"enabled": True}},
                {"persistentVolumeClaim": {"claimName": "release-name-logs"}},
            ),
            (
                {"persistence": {"enabled": True, "existingClaim": "test-claim"}},
                {"persistentVolumeClaim": {"claimName": "test-claim"}},
            ),
        ],
    )
    def test_logs_persistence_changes_volume(self, log_values, expected_volume):
        docs = render_chart(
            values={
                "logs": log_values,
                "dagProcessor": {"enabled": True},
            },
            show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
        )

        assert jmespath.search("spec.template.spec.volumes[1]", docs[0]) == {
            "name": "logs",
            **expected_volume,
        }

    def test_resources_are_configurable(self):
        docs = render_chart(
            values={
                "dagProcessor": {
                    "enabled": True,
                    "resources": {
                        "limits": {"cpu": "200m", "memory": "128Mi"},
                        "requests": {"cpu": "300m", "memory": "169Mi"},
                    },
                },
            },
            show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
        )
        assert jmespath.search("spec.template.spec.containers[0].resources.limits.memory", docs[0]) == "128Mi"
        assert jmespath.search("spec.template.spec.containers[0].resources.limits.cpu", docs[0]) == "200m"
        assert (
            jmespath.search("spec.template.spec.containers[0].resources.requests.memory", docs[0]) == "169Mi"
        )
        assert jmespath.search("spec.template.spec.containers[0].resources.requests.cpu", docs[0]) == "300m"

        assert (
            jmespath.search("spec.template.spec.initContainers[0].resources.limits.memory", docs[0])
            == "128Mi"
        )
        assert jmespath.search("spec.template.spec.initContainers[0].resources.limits.cpu", docs[0]) == "200m"
        assert (
            jmespath.search("spec.template.spec.initContainers[0].resources.requests.memory", docs[0])
            == "169Mi"
        )
        assert (
            jmespath.search("spec.template.spec.initContainers[0].resources.requests.cpu", docs[0]) == "300m"
        )

    def test_resources_are_not_added_by_default(self):
        docs = render_chart(
            values={"dagProcessor": {"enabled": True}},
            show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
        )
        assert jmespath.search("spec.template.spec.containers[0].resources", docs[0]) == {}

    @pytest.mark.parametrize(
        "strategy, expected_strategy",
        [
            (None, None),
            (
                {"rollingUpdate": {"maxSurge": "100%", "maxUnavailable": "50%"}},
                {"rollingUpdate": {"maxSurge": "100%", "maxUnavailable": "50%"}},
            ),
        ],
    )
    def test_strategy(self, strategy, expected_strategy):
        """Strategy should be used when we aren't using both LocalExecutor and workers.persistence."""
        docs = render_chart(
            values={
                "dagProcessor": {"enabled": True, "strategy": strategy},
            },
            show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
        )

        assert expected_strategy == jmespath.search("spec.strategy", docs[0])

    def test_default_command_and_args(self):
        docs = render_chart(
            values={"dagProcessor": {"enabled": True}},
            show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
        )

        assert jmespath.search("spec.template.spec.containers[0].command", docs[0]) is None
        assert jmespath.search("spec.template.spec.containers[0].args", docs[0]) == [
            "bash",
            "-c",
            "exec airflow dag-processor",
        ]

    @pytest.mark.parametrize(
        "revision_history_limit, global_revision_history_limit",
        [(8, 10), (10, 8), (8, None), (None, 10), (None, None)],
    )
    def test_revision_history_limit(self, revision_history_limit, global_revision_history_limit):
        values = {
            "dagProcessor": {
                "enabled": True,
            }
        }
        if revision_history_limit:
            values["dagProcessor"]["revisionHistoryLimit"] = revision_history_limit
        if global_revision_history_limit:
            values["revisionHistoryLimit"] = global_revision_history_limit
        docs = render_chart(
            values=values,
            show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
        )
        expected_result = revision_history_limit or global_revision_history_limit
        assert jmespath.search("spec.revisionHistoryLimit", docs[0]) == expected_result

    @pytest.mark.parametrize("command", [None, ["custom", "command"]])
    @pytest.mark.parametrize("args", [None, ["custom", "args"]])
    def test_command_and_args_overrides(self, command, args):
        docs = render_chart(
            values={
                "dagProcessor": {
                    "enabled": True,
                    "command": command,
                    "args": args,
                }
            },
            show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
        )

        assert command == jmespath.search("spec.template.spec.containers[0].command", docs[0])
        assert args == jmespath.search("spec.template.spec.containers[0].args", docs[0])

    def test_command_and_args_overrides_are_templated(self):
        docs = render_chart(
            values={
                "dagProcessor": {
                    "enabled": True,
                    "command": ["{{ .Release.Name }}"],
                    "args": ["{{ .Release.Service }}"],
                },
            },
            show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
        )

        assert jmespath.search("spec.template.spec.containers[0].command", docs[0]) == ["release-name"]
        assert jmespath.search("spec.template.spec.containers[0].args", docs[0]) == ["Helm"]

    def test_dags_volume_mount_with_persistence_true(self):
        docs = render_chart(
            values={"dagProcessor": {"enabled": True}, "dags": {"gitSync": {"enabled": True}}},
            show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
        )

        assert "dags" in [
            vm["name"] for vm in jmespath.search("spec.template.spec.containers[0].volumeMounts", docs[0])
        ]
        assert "dags" in [vm["name"] for vm in jmespath.search("spec.template.spec.volumes", docs[0])]

    def test_dags_gitsync_sidecar_and_init_container(self):
        docs = render_chart(
            values={"dagProcessor": {"enabled": True}, "dags": {"gitSync": {"enabled": True}}},
            show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
        )

        assert "git-sync" in [c["name"] for c in jmespath.search("spec.template.spec.containers", docs[0])]
        assert "git-sync-init" in [
            c["name"] for c in jmespath.search("spec.template.spec.initContainers", docs[0])
        ]

    def test_dags_gitsync_with_persistence_no_sidecar_or_init_container(self):
        docs = render_chart(
            values={
                "dagProcessor": {"enabled": True},
                "dags": {"gitSync": {"enabled": True}, "persistence": {"enabled": True}},
            },
            show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
        )

        # No gitsync sidecar or init container
        assert "git-sync" not in [
            c["name"] for c in jmespath.search("spec.template.spec.containers", docs[0])
        ]
        assert "git-sync-init" not in [
            c["name"] for c in jmespath.search("spec.template.spec.initContainers", docs[0])
        ]

    def test_no_airflow_local_settings(self):
        docs = render_chart(
            values={"dagProcessor": {"enabled": True}, "airflowLocalSettings": None},
            show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
        )
        volume_mounts = jmespath.search("spec.template.spec.containers[0].volumeMounts", docs[0])
        assert "airflow_local_settings.py" not in str(volume_mounts)
        volume_mounts_init = jmespath.search("spec.template.spec.containers[0].volumeMounts", docs[0])
        assert "airflow_local_settings.py" not in str(volume_mounts_init)

    def test_airflow_local_settings(self):
        docs = render_chart(
            values={"dagProcessor": {"enabled": True}, "airflowLocalSettings": "# Well hello!"},
            show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
        )
        volume_mount = {
            "name": "config",
            "mountPath": "/opt/airflow/config/airflow_local_settings.py",
            "subPath": "airflow_local_settings.py",
            "readOnly": True,
        }
        assert volume_mount in jmespath.search("spec.template.spec.containers[0].volumeMounts", docs[0])
        assert volume_mount in jmespath.search("spec.template.spec.initContainers[0].volumeMounts", docs[0])

    def test_should_add_component_specific_annotations(self):
        docs = render_chart(
            values={
                "dagProcessor": {
                    "enabled": True,
                    "annotations": {"test_annotation": "test_annotation_value"},
                },
            },
            show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
        )
        assert "annotations" in jmespath.search("metadata", docs[0])
        assert jmespath.search("metadata.annotations", docs[0])["test_annotation"] == "test_annotation_value"

    @pytest.mark.parametrize(
        "webserver_config, should_add_volume",
        [
            ("CSRF_ENABLED = True", True),
            (None, False),
        ],
    )
    def test_should_add_webserver_config_volume_and_volume_mount_when_exists(
        self, webserver_config, should_add_volume
    ):
        expected_volume = {
            "name": "webserver-config",
            "configMap": {"name": "release-name-webserver-config"},
        }
        expected_volume_mount = {
            "name": "webserver-config",
            "mountPath": "/opt/airflow/webserver_config.py",
            "subPath": "webserver_config.py",
            "readOnly": True,
        }

        docs = render_chart(
            values={
                "dagProcessor": {"enabled": True},
                "webserver": {"webserverConfig": webserver_config},
            },
            show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
        )

        created_volumes = jmespath.search("spec.template.spec.volumes", docs[0])
        created_volume_mounts = jmespath.search("spec.template.spec.containers[1].volumeMounts", docs[0])

        if should_add_volume:
            assert expected_volume in created_volumes
            assert expected_volume_mount in created_volume_mounts
        else:
            assert expected_volume not in created_volumes
            assert expected_volume_mount not in created_volume_mounts

    def test_validate_if_ssh_params_are_added_with_git_ssh_key(self):
        docs = render_chart(
            values={
                "dagProcessor": {"enabled": True},
                "dags": {
                    "gitSync": {
                        "enabled": True,
                        "sshKey": "my-ssh-key",
                    },
                    "persistence": {"enabled": False},
                },
            },
            show_only=["templates/dag-processor/dag-processor-deployment.yaml"],
        )

        assert {"name": "GIT_SSH_KEY_FILE", "value": "/etc/git-secret/ssh"} in jmespath.search(
            "spec.template.spec.containers[1].env", docs[0]
        )
        assert {"name": "GITSYNC_SSH_KEY_FILE", "value": "/etc/git-secret/ssh"} in jmespath.search(
            "spec.template.spec.containers[1].env", docs[0]
        )
        assert {"name": "GIT_SYNC_SSH", "value": "true"} in jmespath.search(
            "spec.template.spec.containers[1].env", docs[0]
        )
        assert {"name": "GITSYNC_SSH", "value": "true"} in jmespath.search(
            "spec.template.spec.containers[1].env", docs[0]
        )
        assert {"name": "GIT_KNOWN_HOSTS", "value": "false"} in jmespath.search(
            "spec.template.spec.containers[1].env", docs[0]
        )
        assert {"name": "GITSYNC_SSH_KNOWN_HOSTS", "value": "false"} in jmespath.search(
            "spec.template.spec.containers[1].env", docs[0]
        )
        assert {
            "name": "git-sync-ssh-key",
            "secret": {"secretName": "release-name-ssh-secret", "defaultMode": 288},
        } in jmespath.search("spec.template.spec.volumes", docs[0])


class TestDagProcessorLogGroomer(LogGroomerTestBase):
    """DAG processor log groomer."""

    obj_name = "dag-processor"
    folder = "dag-processor"


class TestDagProcessorServiceAccount:
    """Tests DAG processor service account."""

    def test_default_automount_service_account_token(self):
        docs = render_chart(
            values={
                "dagProcessor": {
                    "enabled": True,
                    "serviceAccount": {"create": True},
                },
            },
            show_only=["templates/dag-processor/dag-processor-serviceaccount.yaml"],
        )
        assert jmespath.search("automountServiceAccountToken", docs[0]) is True

    def test_overridden_automount_service_account_token(self):
        docs = render_chart(
            values={
                "dagProcessor": {
                    "enabled": True,
                    "serviceAccount": {"create": True, "automountServiceAccountToken": False},
                },
            },
            show_only=["templates/dag-processor/dag-processor-serviceaccount.yaml"],
        )
        assert jmespath.search("automountServiceAccountToken", docs[0]) is False