Mounting DAGs
DAGs can be mounted by using a ConfigMap or git-sync.
This is best illustrated with an example of each, shown in the sections below.
Via ConfigMap
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: cm-dag (1)
data:
  test_airflow_dag.py: | (2)
    from datetime import datetime, timedelta
    from airflow import DAG
    from airflow.operators.bash import BashOperator
    from airflow.operators.dummy import DummyOperator
    with DAG(
        dag_id='test_airflow_dag',
        schedule_interval='0 0 * * *',
        start_date=datetime(2021, 1, 1),
        catchup=False,
        dagrun_timeout=timedelta(minutes=60),
        tags=['example', 'example2'],
        params={"example_key": "example_value"},
    ) as dag:
        run_this_last = DummyOperator(
            task_id='run_this_last',
        )
        # [START howto_operator_bash]
        run_this = BashOperator(
            task_id='run_after_loop',
            bash_command='echo 1',
        )
        # [END howto_operator_bash]
        run_this >> run_this_last
        for i in range(3):
            task = BashOperator(
                task_id='runme_' + str(i),
                bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
            )
            task >> run_this
        # [START howto_operator_bash_template]
        also_run_this = BashOperator(
            task_id='also_run_this',
            bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
        )
        # [END howto_operator_bash_template]
        also_run_this >> run_this_last
    # [START howto_operator_bash_skip]
    this_will_skip = BashOperator(
        task_id='this_will_skip',
        bash_command='echo "hello world"; exit 99;',
        dag=dag,
    )
    # [END howto_operator_bash_skip]
    this_will_skip >> run_this_last
    if __name__ == "__main__":
        dag.cli()---
apiVersion: airflow.stackable.tech/v1alpha1
kind: AirflowCluster
metadata:
  name: airflow
spec:
  image:
    productVersion: 2.10.5
  clusterConfig:
    loadExamples: false
    exposeConfig: false
    credentialsSecret: simple-airflow-credentials
    volumes:
      - name: cm-dag (3)
        configMap:
          name: cm-dag (4)
    volumeMounts:
      - name: cm-dag (5)
        mountPath: /dags/test_airflow_dag.py (6)
        subPath: test_airflow_dag.py (7)
  webservers:
    roleConfig:
      listenerClass: external-unstable
    roleGroups:
      default:
        envOverrides:
          AIRFLOW__CORE__DAGS_FOLDER: "/dags" (8)
        replicas: 1
  celeryExecutors:
    roleGroups:
      default:
        envOverrides:
          AIRFLOW__CORE__DAGS_FOLDER: "/dags" (8)
        replicas: 2
  schedulers:
    roleGroups:
      default:
        envOverrides:
          AIRFLOW__CORE__DAGS_FOLDER: "/dags" (8)
        replicas: 1
| 1 | The name of the ConfigMap | 
| 2 | The name of the DAG (this is a renamed copy of the example_bash_operator.pyfrom the Airflow examples) | 
| 3 | The volume backed by the ConfigMap | 
| 4 | The name of the ConfigMap referenced by the Airflow cluster | 
| 5 | The name of the mounted volume | 
| 6 | The path of the mounted resource. Note that should map to a single DAG. | 
| 7 | The resource has to be defined using subPath: this is to prevent the versioning of ConfigMap elements which may cause a conflict with how Airflow propagates DAGs between its components. | 
| 8 | If the mount path described above is anything other than the standard location (the default is $AIRFLOW_HOME/dags), then the location should be defined using the relevant environment variable. | 
| If a DAG mounted via ConfigMap consists of modularized files then using the standard location is mandatory as Python uses this as a "root" directory when looking for referenced files. | 
The advantage of this approach is that DAGs are provided "in-line". However, handling multiple DAGs this way becomes cumbersome, as each must be mapped individually. For multiple DAGs, it is easier to expose them via a mounted volume, as shown below.
Via git-sync
git-sync is a command that pulls a git repository into a local directory and is supplied as a sidecar container for use within Kubernetes. The Stackable Airflow images already ship with git-sync included, and the operator takes care of calling the tool and mounting volumes, so that only the repository and synchronization details are required:
git-sync usage example
---
apiVersion: airflow.stackable.tech/v1alpha1
kind: AirflowCluster
metadata:
  name: airflow
spec:
  image:
    productVersion: "2.10.5"
  clusterConfig:
    loadExamples: false
    exposeConfig: false
    credentialsSecret: test-airflow-credentials (1)
    dagsGitSync: (2)
      - repo: https://github.com/stackabletech/airflow-operator (3)
        branch: "main" (4)
        gitFolder: "tests/templates/kuttl/mount-dags-gitsync/dags" (5)
        depth: 10 (6)
        wait: 20s (7)
        credentialsSecret: git-credentials (8)
        gitSyncConf: (9)
          --rev: HEAD (10)
          # --rev: git-sync-tag # N.B. tag must be covered by "depth" (the number of commits to clone)
          # --rev: 39ee3598bd9946a1d958a448c9f7d3774d7a8043 # N.B. commit must be covered by "depth"
          --git-config: http.sslCAInfo:/tmp/ca-cert/ca.crt (11)
  webservers:
    ...| 1 | A Secret used for accessing database and admin user details (included here to illustrate where different credential secrets are defined) | 
| 2 | The git-gync configuration block that contains list of git-sync elements | 
| 3 | The repository to clone (required) | 
| 4 | The branch name (defaults to main) | 
| 5 | The location of the DAG folder, relative to the synced repository root.
It can optionally start with /, however, no trailing slash is recommended.
An empty string (`) or slash (/`) corresponds to the root folder in Git.
Defaults to "/". | 
| 6 | The depth of syncing i.e. the number of commits to clone (defaults to 1) | 
| 7 | The synchronisation interval in seconds, e.g. 20sor1h(defaults to "20s") | 
| 8 | The name of the Secret used to access the repository if it is not public.
This should include two fields: userandpassword(which can be either a password — which is not recommended — or a GitHub token, as described here) | 
| 9 | A map of optional configuration settings that are listed in this configuration section (and the ones that follow on that link) | 
| 10 | An example showing how to specify a target revision (the default is HEAD).
The revision can also be a tag or a commit, though this assumes that the target hash is contained within the number of commits specified by depth.
If a tag or commit hash is specified, then git-sync recognizes this and does not perform further cloning. | 
| 11 | Git-sync settings can be provided inline, although some of these ( --dest,--root) are specified internally in the operator and are ignored if provided by the user.
Git-config settings can also be specified, although a warning is logged ifsafe.directoryis specified as this is defined internally, and should not be defined by the user. | 
| The example shows a list of git-sync definitions, with a single element. This is to avoid breaking-changes in future releases. Currently, only one such git-sync definition is considered and processed. | 
| git-sync can be used with DAGs that make use of Python modules, as Python is configured to use the git-sync target folder as the "root" location when looking for referenced files. See the Applying Custom Resources example for more details. |