I am trying to write a function as below that returns a date :-
def date_fn():
d = datetime.today() - timedelta(days=days_to_subtract)
return d
And then want to pass on this "d" to my dag and its dependent dag:
#constants
SCHEDULE= "0 8 * * 4"
with DAG(
dag_id = "processthe_files"
start_date = datetime(2024, 10, 8),
schedule_interval = SCHEDULE
) as directed_acyclic_graph:
file_processing = Job(
#this calls a python callable which runs a databricks job
).to_task
trigger_processtables = TriggerDagRunOperator(
task_id='trigger_processtables',
trigger_dag_id='processtables',
wait_for_completion=True,
dag=dag
)
processthefiles >> trigger_processtables
with DAG(
dag_id = "processthe_tables"
start_date = datetime(2024, 10, 8),
schedule_interval = None
) as directed_acyclic_graph:
processtables = Job(
#this calls a python callable which runs a databricks job
).to_task
processtables
I want to pass on the value of d as a variable to all the tasks in the dag processthe_files tasks and also to the dependent dag. I looked for xcom concept but unable to understand it fully. Can anyone please help how to achieve this?
I am trying to write a function as below that returns a date :-
def date_fn():
d = datetime.today() - timedelta(days=days_to_subtract)
return d
And then want to pass on this "d" to my dag and its dependent dag:
#constants
SCHEDULE= "0 8 * * 4"
with DAG(
dag_id = "processthe_files"
start_date = datetime(2024, 10, 8),
schedule_interval = SCHEDULE
) as directed_acyclic_graph:
file_processing = Job(
#this calls a python callable which runs a databricks job
).to_task
trigger_processtables = TriggerDagRunOperator(
task_id='trigger_processtables',
trigger_dag_id='processtables',
wait_for_completion=True,
dag=dag
)
processthefiles >> trigger_processtables
with DAG(
dag_id = "processthe_tables"
start_date = datetime(2024, 10, 8),
schedule_interval = None
) as directed_acyclic_graph:
processtables = Job(
#this calls a python callable which runs a databricks job
).to_task
processtables
I want to pass on the value of d as a variable to all the tasks in the dag processthe_files tasks and also to the dependent dag. I looked for xcom concept but unable to understand it fully. Can anyone please help how to achieve this?
Share Improve this question asked Mar 20 at 12:49 AviatorAviator 7401 gold badge9 silver badges19 bronze badges1 Answer
Reset to default 0Airlfow provides the solution to your use case. You don't have to calculate the time in your explicitly.
You can use Airflow's template variables, variables like ds, ds_nodash can be used and date could be subtracted while using those variables.
https://airflow.apache./docs/apache-airflow/stable/templates-ref.html
发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744408370a4572746.html
评论列表(0条)