from airflow.operators.python import PythonOperator def push_function(**context): context['ti'].xcom_push(key='user_id', value=123)
from airflow.decorators import dag, task from datetime import datetime @dag(start_date=datetime(2024,1,1), schedule=None, catchup=False) def xcom_demo(): xcom in airflow
def pull_function(**context): user_id = context['ti'].xcom_pull(task_ids='push_task', key='user_id') print(f"Received user_id") from airflow
XComs are for coordination , not data transfer . Final Takeaway XComs are Airflow’s glue. They turn a set of isolated tasks into a coherent pipeline. Use them for small control signals, IDs, and results. Keep them light. And when you’re tempted to pass a big blob of data – stop, and ask yourself: should this be in object storage instead? Use them for small control signals, IDs, and results
Here, each mapped task gets its own XCom value, and aggregate receives a list of all results. ❌ Passing large data # BAD – will bloat metadata DB @task def bad_task(): return large_dataframe.to_dict() # can be MB/GB ✅ Better: Store data in S3/GCS and pass the path as an XCom. ❌ Pulling from a task that hasn’t run @task def step_one(): return 1 @task def step_two(x): # If step_one failed or was skipped, this will raise an error return x + 1
Here’s a structured, useful blog post about — written for data engineers who want to move beyond basic tasks and build real DAGs. Mastering XComs in Apache Airflow: Cross‑Task Communication Without the Pain One of the first surprises when learning Airflow is that tasks run isolated from each other. You can’t just set task_2.data = task_1.data . So how do you pass a value from one task to another? XComs .
push >> pull Pattern 1: Passing an ID from a query to a processing task @task def get_latest_record_id() -> int: # Imagine a SQL query here return 42 @task def process_record(record_id: int): print(f"Processing record record_id")