airflow - How to pass a variable from a function to dependent dag tasks - Stack Overflow

I am trying to write a function as below that returns a date :-def date_fn():d = datetime.today() - t

I am trying to write a function as below that returns a date :-

def date_fn():
d = datetime.today() - timedelta(days=days_to_subtract)
return d

And then want to pass on this "d" to my dag and its dependent dag:

 #constants
 SCHEDULE= "0 8 * * 4"
 with DAG(
   dag_id = "processthe_files"
   start_date = datetime(2024, 10, 8),
   schedule_interval = SCHEDULE
   ) as directed_acyclic_graph:
   file_processing =  Job(
   #this  calls a python callable which runs a databricks job 
   ).to_task

  trigger_processtables = TriggerDagRunOperator(
  task_id='trigger_processtables',
  trigger_dag_id='processtables',
  wait_for_completion=True,
  dag=dag
  )
 processthefiles >> trigger_processtables 


  with DAG(
 dag_id = "processthe_tables"
 start_date = datetime(2024, 10, 8),
 schedule_interval = None
 ) as directed_acyclic_graph:
 processtables =  Job(
  #this  calls a python callable which runs a databricks job 
  ).to_task

 processtables

I want to pass on the value of d as a variable to all the tasks in the dag processthe_files tasks and also to the dependent dag. I looked for xcom concept but unable to understand it fully. Can anyone please help how to achieve this?

I am trying to write a function as below that returns a date :-

def date_fn():
d = datetime.today() - timedelta(days=days_to_subtract)
return d

And then want to pass on this "d" to my dag and its dependent dag:

 #constants
 SCHEDULE= "0 8 * * 4"
 with DAG(
   dag_id = "processthe_files"
   start_date = datetime(2024, 10, 8),
   schedule_interval = SCHEDULE
   ) as directed_acyclic_graph:
   file_processing =  Job(
   #this  calls a python callable which runs a databricks job 
   ).to_task

  trigger_processtables = TriggerDagRunOperator(
  task_id='trigger_processtables',
  trigger_dag_id='processtables',
  wait_for_completion=True,
  dag=dag
  )
 processthefiles >> trigger_processtables 


  with DAG(
 dag_id = "processthe_tables"
 start_date = datetime(2024, 10, 8),
 schedule_interval = None
 ) as directed_acyclic_graph:
 processtables =  Job(
  #this  calls a python callable which runs a databricks job 
  ).to_task

 processtables

I want to pass on the value of d as a variable to all the tasks in the dag processthe_files tasks and also to the dependent dag. I looked for xcom concept but unable to understand it fully. Can anyone please help how to achieve this?

Share Improve this question asked Mar 20 at 12:49 AviatorAviator 7401 gold badge9 silver badges19 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

Airlfow provides the solution to your use case. You don't have to calculate the time in your explicitly.

You can use Airflow's template variables, variables like ds, ds_nodash can be used and date could be subtracted while using those variables.

https://airflow.apache./docs/apache-airflow/stable/templates-ref.html

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744408370a4572746.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信