• 28-Dec-2022
Lightrun Team
Author Lightrun Team
Share
This is a glossary of all the common issues in Apache Airflow

Troubleshooting Common Issues in Apache Airflow

Lightrun Team
Lightrun Team
28-Dec-2022

Project Description

 

Apache Airflow is an open-source platform to programmatically author, schedule, and monitor workflows. It was developed by Airbnb and later open-sourced in 2015. Airflow is used to build, schedule, and monitor data pipelines. A pipeline is a set of tasks that extract data from a source, transform the data, and load it into a destination. Airflow allows you to define these tasks as Python functions, called “operators,” and group them into workflows called “DAGs” (Directed Acyclic Graphs).
Airflow also provides a web-based user interface and command-line tools to manage and monitor the workflows. It has many features that make it easy to use, including integration with popular data storage and processing systems, support for scheduling tasks on a recurring basis, and the ability to retry tasks that fail. Airflow is widely used in the data engineering community and has a large and active user base. It is a popular choice for building data pipelines because it is easy to use, has a rich feature set, and is highly extensible.

Troubleshooting Apache Airflow with the Lightrun Developer Observability Platform

 

Getting a sense of what’s actually happening inside a live application is a frustrating experience, one that relies mostly on querying and observing whatever logs were written during development.
Lightrun is a Developer Observability Platform, allowing developers to add telemetry to live applications in real-time, on-demand, and right from the IDE.
  • Instantly add logs to, set metrics in, and take snapshots of live applications
  • Insights delivered straight to your IDE or CLI
  • Works where you do: dev, QA, staging, CI/CD, and production

Start for free today

The following issues are the most popular issues regarding this project:

Tasks stuck in queued state

 

For those seeking an expedited approach to beginning the tasks that have been queued up, this snippet can provide a useful solution:

from airflow import models, settings
from airflow.executors.executor_loader import ExecutorLoader

session = settings.Session()
tis = session.query(models.TaskInstance).filter(models.TaskInstance.state=='queued')
dagbag = models.DagBag()
for ti in tis:
  dag = dagbag.get_dag(ti.dag_id)
  task = dag.get_task(ti.task_id)
  ti.refresh_from_task(task)
  executor = ExecutorLoader.get_default_executor()
  executor.job_id = "manual"
  executor.start()
  executor.queue_task_instance(ti, ignore_all_deps=False, ignore_task_deps=False, ignore_ti_state=False)
  executor.heartbeat()

 

Access specific values within an `XCom` value using Taskflow API

 

In Apache Airflow, an XCom (short for cross-communication) is a way for tasks to share data with each other. An XCom value is a piece of data that is stored in Airflow’s metadata database and can be retrieved by a task using the Taskflow API.

To access specific values within an XCom value using the Taskflow API, you can use the xcom_pull function. This function takes the task_id of the task that set the XCom value and the key of the XCom value as arguments, and returns the value of the XCom.

For example, suppose you have a DAG with two tasks, task_A and task_B, and task_A sets an XCom value with the key "output" and the value {"a": 1, "b": 2}. To access the value of the XCom in task_B, you can use the following code:

output = task_B.xcom_pull(task_ids=task_A.task_id, key="output")
print(output)  # {"a": 1, "b": 2}

You can then access specific values within the XCom value using standard Python syntax. For example:

a = output["a"]  # 1
b = output["b"]  # 2

 

on_failure_callback is not called when task is terminated externally

 

In Apache Airflow, the on_failure_callback is a function that is called when a task fails. This function can be used to perform cleanup or notification actions when a task fails.

However, the on_failure_callback function will not be called if a task is terminated externally, such as by killing the task’s process or shutting down the Airflow web server. In these cases, the task will be marked as “killed” or “failed” in the Airflow UI, but the on_failure_callback function will not be called.

If you want to perform certain actions when a task is terminated externally, you can use the on_kill callback function instead. The on_kill callback function is called when a task is killed, either manually or automatically by Airflow.

You can set the on_kill callback function for a task using the on_kill parameter of the PythonOperator or other operator class. For example:

def on_kill_callback():
    # Perform cleanup or notification actions when the task is killed
    print("Task was killed")

task = PythonOperator(
    task_id="task_A",
    python_callable=my_function,
    on_kill=on_kill_callback,
    # other operator parameters
)
Share

It’s Really not that Complicated.

You can actually understand what’s going on inside your live applications. It’s a registration form away.

Get Lightrun

Lets Talk!

Looking for more information about Lightrun and debugging?
We’d love to hear from you!
Drop us a line and we’ll get back to you shortly.

By submitting this form, I agree to Lightrun’s Privacy Policy and Terms of Use.