@@ -84,6 +84,20 @@ fn create_airflow_config(
8484 log_config : & AutomaticContainerLogConfig ,
8585 log_dir : & str ,
8686 resolved_product_image : & ResolvedProductImage ,
87+ ) -> String {
88+ if resolved_product_image. product_version . starts_with ( "2." )
89+ || resolved_product_image. product_version . starts_with ( "3.0." )
90+ {
91+ create_airflow_stdlib_config ( log_config, log_dir, resolved_product_image)
92+ } else {
93+ create_airflow_structlog_config ( log_config, log_dir)
94+ }
95+ }
96+
97+ fn create_airflow_stdlib_config (
98+ log_config : & AutomaticContainerLogConfig ,
99+ log_dir : & str ,
100+ resolved_product_image : & ResolvedProductImage ,
87101) -> String {
88102 let loggers_config = log_config
89103 . loggers
@@ -176,3 +190,99 @@ LOGGING_CONFIG['root'] = {{
176190 . to_python_expression( ) ,
177191 )
178192}
193+
194+ fn create_airflow_structlog_config (
195+ log_config : & AutomaticContainerLogConfig ,
196+ log_dir : & str ,
197+ ) -> String {
198+ let loggers_config = log_config
199+ . loggers
200+ . iter ( )
201+ . filter ( |( name, _) | name. as_str ( ) != AutomaticContainerLogConfig :: ROOT_LOGGER )
202+ . fold ( String :: new ( ) , |mut output, ( name, config) | {
203+ let _ = writeln ! (
204+ output,
205+ "
206+ LOGGING_CONFIG['loggers'].setdefault('{name}', {{ 'propagate': True }})
207+ LOGGING_CONFIG['loggers']['{name}']['level'] = {level}
208+ " ,
209+ level = config. level. to_python_expression( )
210+ ) ;
211+ output
212+ } ) ;
213+
214+ format ! (
215+ "\
216+ import logging
217+ import os
218+ from airflow.config_templates import airflow_local_settings
219+
220+ os.makedirs('{log_dir}', exist_ok=True)
221+
222+ LOGGING_CONFIG = {{
223+ 'filters': {{
224+ 'mask_secrets_core': {{
225+ '()': 'airflow._shared.secrets_masker._secrets_masker',
226+ }}
227+ }},
228+ 'formatters': {{
229+ 'airflow': {{
230+ 'format': '%(asctime)s logLevel=%(levelname)s logger=%(name)s - %(message)s',
231+ 'class': 'airflow.utils.log.timezone_aware.TimezoneAware',
232+ }},
233+ 'json': {{
234+ '()': 'airflow.utils.log.json_formatter.JSONFormatter',
235+ 'json_fields': ['asctime', 'levelname', 'message', 'name']
236+ }}
237+ }},
238+ 'handlers': {{
239+ 'default': {{
240+ 'level': {console_log_level}
241+ }},
242+ 'file': {{
243+ 'class': 'logging.handlers.RotatingFileHandler',
244+ 'level': {file_log_level},
245+ 'formatter': 'json',
246+ 'filename': '{log_dir}/{LOG_FILE}',
247+ 'maxBytes': 1048576,
248+ 'backupCount': 1
249+ }},
250+ 'task': {{
251+ 'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
252+ 'formatter': 'airflow',
253+ 'base_log_folder': '{log_dir}',
254+ 'filters': ['mask_secrets_core']
255+ }}
256+ }},
257+ 'loggers': {{
258+ 'airflow.task': {{
259+ 'handlers': ['task'],
260+ 'level': logging.INFO,
261+ 'propagate': True,
262+ 'filters': ['mask_secrets_core']
263+ }}
264+ }},
265+ 'root': {{
266+ 'handlers': ['default', 'file'],
267+ 'level': {root_log_level},
268+ 'propagate': True
269+ }}
270+ }}
271+ {loggers_config}
272+ REMOTE_TASK_LOG = airflow_local_settings.REMOTE_TASK_LOG
273+ " ,
274+ console_log_level = log_config
275+ . console
276+ . as_ref( )
277+ . and_then( |console| console. level)
278+ . unwrap_or_default( )
279+ . to_python_expression( ) ,
280+ file_log_level = log_config
281+ . file
282+ . as_ref( )
283+ . and_then( |file| file. level)
284+ . unwrap_or_default( )
285+ . to_python_expression( ) ,
286+ root_log_level = log_config. root_log_level( ) . to_python_expression( ) ,
287+ )
288+ }
0 commit comments