-
-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathsimple-airflow-cluster-dags-cmap.yaml
More file actions
116 lines (109 loc) · 3.2 KB
/
simple-airflow-cluster-dags-cmap.yaml
File metadata and controls
116 lines (109 loc) · 3.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
---
apiVersion: v1
kind: Secret
metadata:
name: simple-airflow-credentials
type: Opaque
stringData:
adminUser.username: airflow
adminUser.firstname: Airflow
adminUser.lastname: Admin
adminUser.email: airflow@airflow.com
adminUser.password: airflow
connections.sqlalchemyDatabaseUri: postgresql+psycopg2://airflow:airflow@airflow-postgresql.default.svc.cluster.local/airflow
# Only needed when using celery workers (instead of Kubernetes executors)
connections.celeryResultBackend: db+postgresql://airflow:airflow@airflow-postgresql.default.svc.cluster.local/airflow
connections.celeryBrokerUrl: redis://:redis@airflow-redis-master:6379/0
---
apiVersion: v1
kind: ConfigMap
metadata:
name: cm-dag
data:
test_airflow_dag.py: |
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
with DAG(
dag_id='test_airflow_dag',
schedule='0 0 * * *',
start_date=datetime(2021, 1, 1),
catchup=False,
dagrun_timeout=timedelta(minutes=60),
tags=['example', 'example2'],
params={"example_key": "example_value"},
) as dag:
run_this_last = DummyOperator(
task_id='run_this_last',
)
# [START howto_operator_bash]
run_this = BashOperator(
task_id='run_after_loop',
bash_command='echo 1',
)
# [END howto_operator_bash]
run_this >> run_this_last
for i in range(3):
task = BashOperator(
task_id='runme_' + str(i),
bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
)
task >> run_this
# [START howto_operator_bash_template]
also_run_this = BashOperator(
task_id='also_run_this',
bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
)
# [END howto_operator_bash_template]
also_run_this >> run_this_last
# [START howto_operator_bash_skip]
this_will_skip = BashOperator(
task_id='this_will_skip',
bash_command='echo "hello world"; exit 99;',
dag=dag,
)
# [END howto_operator_bash_skip]
this_will_skip >> run_this_last
if __name__ == "__main__":
dag.cli()
---
apiVersion: airflow.stackable.tech/v1alpha1
kind: AirflowCluster
metadata:
name: airflow-dags-cmap
spec:
image:
productVersion: 3.1.6
clusterConfig:
loadExamples: false
exposeConfig: false
credentialsSecret: simple-airflow-credentials
volumes:
- name: cm-dag
configMap:
name: cm-dag
volumeMounts:
- name: cm-dag
mountPath: /dags/test_airflow_dag.py
subPath: test_airflow_dag.py
webservers:
roleConfig:
listenerClass: external-unstable
roleGroups:
default:
envOverrides:
AIRFLOW__CORE__DAGS_FOLDER: "/dags"
replicas: 1
celeryExecutors:
roleGroups:
default:
envOverrides:
AIRFLOW__CORE__DAGS_FOLDER: "/dags"
replicas: 2
schedulers:
roleGroups:
default:
envOverrides:
AIRFLOW__CORE__DAGS_FOLDER: "/dags"
replicas: 1