database - Detach QuestDB partition with airflow dynamically - Stack Overflow

I am using Apache Airflow to convert old partitions to parquet and detach them using the PostgresOperat

I am using Apache Airflow to convert old partitions to parquet and detach them using the PostgresOperator.

This works fine, but at the moment I am hardcoding the day, as my queries are running relative to now(). I am wondering how could I use backfilling to make this work, so the queries would use the date I pass as a parameter on airflow.

This is my relevant code right now:

# Task to convert partition to Parquet
convert_to_parquet = PostgresOperator(
    task_id='convert_to_parquet',
    postgres_conn_id='questdb',
    sql="""
    alter table ecommerce_stats convert partition to parquet where ts = dateadd('d', -7, systimestamp())'
    """,
    dag=dag,
)

# Task to detach the partition
detach_partition = PostgresOperator(
    task_id='detach_partition',
    postgres_conn_id='questdb',
    sql="ALTER TABLE ecommerce_stats DETACH PARTITION WHERE ts = dateadd('d', -7, systimestamp()) ",
    dag=dag,
)

I am using Apache Airflow to convert old partitions to parquet and detach them using the PostgresOperator.

This works fine, but at the moment I am hardcoding the day, as my queries are running relative to now(). I am wondering how could I use backfilling to make this work, so the queries would use the date I pass as a parameter on airflow.

This is my relevant code right now:

# Task to convert partition to Parquet
convert_to_parquet = PostgresOperator(
    task_id='convert_to_parquet',
    postgres_conn_id='questdb',
    sql="""
    alter table ecommerce_stats convert partition to parquet where ts = dateadd('d', -7, systimestamp())'
    """,
    dag=dag,
)

# Task to detach the partition
detach_partition = PostgresOperator(
    task_id='detach_partition',
    postgres_conn_id='questdb',
    sql="ALTER TABLE ecommerce_stats DETACH PARTITION WHERE ts = dateadd('d', -7, systimestamp()) ",
    dag=dag,
)
Share Improve this question asked Jan 29 at 17:05 Javier RamirezJavier Ramirez 4,0951 gold badge27 silver badges36 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

In Airflow we can use template variables to access the execution/logical date and, if needed, the start and end of the interval.

The code above can be replaced by

# Task to convert partition to Parquet
convert_to_parquet = PostgresOperator(
    task_id='convert_to_parquet',
    postgres_conn_id='questdb',
    sql="""
    alter table ecommerce_stats convert partition to parquet where ts = '{{ (execution_date - macros.timedelta(days=7)) | ds }}'
    """,
    dag=dag,
)

# Task to detach the partition
detach_partition = PostgresOperator(
    task_id='detach_partition',
    postgres_conn_id='questdb',
    sql="ALTER TABLE ecommerce_stats DETACH PARTITION WHERE ts = '{{ (execution_date - macros.timedelta(days=7)) | ds }}' ",
    dag=dag,
)

I am combining a variable, a macro, and the formatter, so in the end the result will have the "YYY-MM-DD" expected by QuestDB. If you need time resolution check the variable reference as it can also be done.

We need to be careful, as variables are not replaced anywhere in the template, just at a specific places. I spent a few minutes debugging about this a while ago before I realised.

Also, on Airflow by default the execution_date is after the interval has passed, so depending on what we need we might have to go back 8 days rather than 7 when doing the time operations. Relevant info at this StackOverflow post.

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1745287937a4620677.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信