airflow - XCOM returns "None" - Stack Overflow

I have the following problem with my Airflow task. I'm new to this topic, and even though I'v

I have the following problem with my Airflow task. I'm new to this topic, and even though I've already read up on it, I just can't solve it.

One task retrieves the access token from Spotify. The request successfully returns a valid access token, which I can see in the task's logs. At least I know that part of the task is working correctly. However, I'm unsure if the token is being stored properly in XCom. When I check under Admin > XCom in the Airflow UI, nothing is stored there.

The next task is supposed to use this access token to fetch information from a playlist. However, it only gets None as the access token from XCom, which causes the request to fail.

Here's my code and two tasks:

dag = DAG(
    'spotify_etl_pipeline',
    start_date=datetime(2024, 11, 11),
    schedule_interval=None,
    params={"playlist_id": DEFAULT_PLAYLIST_ID}
)

#Task1
get_access_token = BashOperator(
    task_id='get_access_token',
    bash_command=f"""
    set -x
    ACCESS_TOKEN=$(curl -X POST "; \
        -H "Authorization: Basic {encoded_credentials}" \
        -H "Content-Type: application/x-www-form-urlencoded" \
        -d "grant_type=client_credentials" \
        | jq -r '.access_token')
    echo $ACCESS_TOKEN
    """,
    
    do_xcom_push=True,
    dag=dag
)

# Task 2
get_playlist_tracks = BashOperator(
    task_id='get_playlist_tracks',
    bash_command="""
    ACCESS_TOKEN="{{ task_instance.xcom_pull(task_ids='get_access_token') }}"
    echo "ACCESS TOKEN FROM XCom: $ACCESS_TOKEN"
    curl -X "GET" "/{{ dag_run.conf['playlist_id'] }}/tracks" \
        -H "Accept: application/json" \
        -H "Content-Type: application/json" \
        -H "Authorization: Bearer $ACCESS_TOKEN" \
        | jq '[.items[].track]' > /user/hadoop/spotify/track_data/raw/playlist_tracks.json
    """,  
    dag=dag 
)
    

After starting the Docker container through Visual Studio, I manually trigger the DAG in the Airflow web interface. The Docker container running the task is based on an older Airflow version (1.X.X), so don't be surprised that the code reflects this outdated version.

I’m not very experienced with building Dockerfiles and simply wanted to use the resources provided for this assignment. At this point, the code has also been revised by ChatGPT and Copilot, which I turned to in desperation. As a result, there might be even more issues now.

I’ve spent hours trying to fix the problem, but I just can’t make any progress. You don’t need to provide me with a pre-made solution or anything like that, but an explanation of what the issue is and how I can improve it would be great.

I have the following problem with my Airflow task. I'm new to this topic, and even though I've already read up on it, I just can't solve it.

One task retrieves the access token from Spotify. The request successfully returns a valid access token, which I can see in the task's logs. At least I know that part of the task is working correctly. However, I'm unsure if the token is being stored properly in XCom. When I check under Admin > XCom in the Airflow UI, nothing is stored there.

The next task is supposed to use this access token to fetch information from a playlist. However, it only gets None as the access token from XCom, which causes the request to fail.

Here's my code and two tasks:

dag = DAG(
    'spotify_etl_pipeline',
    start_date=datetime(2024, 11, 11),
    schedule_interval=None,
    params={"playlist_id": DEFAULT_PLAYLIST_ID}
)

#Task1
get_access_token = BashOperator(
    task_id='get_access_token',
    bash_command=f"""
    set -x
    ACCESS_TOKEN=$(curl -X POST "https://accounts.spotify/api/token" \
        -H "Authorization: Basic {encoded_credentials}" \
        -H "Content-Type: application/x-www-form-urlencoded" \
        -d "grant_type=client_credentials" \
        | jq -r '.access_token')
    echo $ACCESS_TOKEN
    """,
    
    do_xcom_push=True,
    dag=dag
)

# Task 2
get_playlist_tracks = BashOperator(
    task_id='get_playlist_tracks',
    bash_command="""
    ACCESS_TOKEN="{{ task_instance.xcom_pull(task_ids='get_access_token') }}"
    echo "ACCESS TOKEN FROM XCom: $ACCESS_TOKEN"
    curl -X "GET" "https://api.spotify/v1/playlists/{{ dag_run.conf['playlist_id'] }}/tracks" \
        -H "Accept: application/json" \
        -H "Content-Type: application/json" \
        -H "Authorization: Bearer $ACCESS_TOKEN" \
        | jq '[.items[].track]' > /user/hadoop/spotify/track_data/raw/playlist_tracks.json
    """,  
    dag=dag 
)
    

After starting the Docker container through Visual Studio, I manually trigger the DAG in the Airflow web interface. The Docker container running the task is based on an older Airflow version (1.X.X), so don't be surprised that the code reflects this outdated version.

I’m not very experienced with building Dockerfiles and simply wanted to use the resources provided for this assignment. At this point, the code has also been revised by ChatGPT and Copilot, which I turned to in desperation. As a result, there might be even more issues now.

I’ve spent hours trying to fix the problem, but I just can’t make any progress. You don’t need to provide me with a pre-made solution or anything like that, but an explanation of what the issue is and how I can improve it would be great.

Share Improve this question asked Nov 19, 2024 at 13:25 Holly2207Holly2207 113 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

So, first piece of feedback is folks are going to struggle to give you guidance on Airflow 1.x stuff. It's a super old version of the tool. I'd probably look to use a newer version of Airflow so folks are more responsive on here and documentation is easier to find.

  1. I looked up docs for 1.10.15 and the first thing that sticks out is that do_xcom_push is called xcom_push in 1.10.15 according to the docs.. I'd doublecheck what version you're using and verify you have the correct param.

I did the following and managed to push and pull from xcoms using your general code. Note that this was successful on Airflow 2.10.3.

from airflow.models import DAG
from datetime import datetime

from airflow.operators.bash import BashOperator

encoded_credentials="foo"

dag = DAG(
    'spotify_etl_pipeline',
    start_date=datetime(2024, 11, 11),
    schedule_interval=None,
    params={"playlist_id": "1010101"}
)

# Task1
get_access_token = BashOperator(
    task_id='get_access_token',
    bash_command=f"""
    set -x
    ACCESS_TOKEN=$(curl -X GET "http://date.jsontest" \
        | jq -r '.date')
    echo $ACCESS_TOKEN
    """,
    do_xcom_push=True,
    dag=dag
)

# Task 2
get_playlist_tracks = BashOperator(
    task_id='get_playlist_tracks',
    bash_command="""
    ACCESS_TOKEN="{{ task_instance.xcom_pull(task_ids='get_access_token') }}"
    echo "ACCESS TOKEN FROM XCom: $ACCESS_TOKEN"
    """,
    dag=dag
)

get_access_token >> get_playlist_tracks

Again, you'd probably need to change do_xcom_push --> xcom_push. Here are photos of the xcoms values shown in Airflow UI: XCom value shown in Airflow UI

If you do move towards a newer version, it even comes with example DAGs that show pushing and pulling to xcoms from a bash operator.

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1745559276a4633007.html

相关推荐

  • airflow - XCOM returns "None" - Stack Overflow

    I have the following problem with my Airflow task. I'm new to this topic, and even though I'v

    7小时前
    20

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信