A reproduction repo for a Scheduling bug in AirFlow 2.2.3

Overview

How to run

docker-compose build
docker-compose up

Setup

Have 3 DAGs:

  1. ~240 Tasks, executes every hour, runs for about 45-50 minutes total
  2. ~600 Tasks, executes every 5 days, runs for days
  3. ~10 Tasks, executes on trigger, runs for 10-50 minutes

DAGs run on the default_pool with max_active_tasks set to 2.

My AirFlow config file has the following:

parallelism = 32
default_pool_task_slot_count = 6
executor = LocalExecutor
default_task_weight_rule = absolute

parallelism was set initially like that. default_pool_task_slot_count was set to 6, because it seemed rational that if all of my 3 dags are executing at the same time, the maximum amount of tasks that can be executed is 3*2=6.

The problem:

Almost every time any one of the DAGs is executed, no tasks from other DAGs will start until all the tasks from the first one finish. That is, if slow DAG_2 with 600 tasks starts, DAG_1 will have to wait for days to start even a single Task.

the_bug

The Logs for the Scheduler look like this:

scheduler_1  | 
scheduler_1  | [2022-02-09 10:02:27,116] {scheduler_job.py:288} INFO - 4 tasks up for execution:
scheduler_1  |  
   
    
scheduler_1  |  
    
     
scheduler_1  |  
     
      
scheduler_1  |  
      
       
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:322} INFO - Figuring out tasks to run in Pool(name=default_pool) with 4 open slots and 4 task instances ready to be queued
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
       
         since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
        
          since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
         
           since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
          
            since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:410} INFO - Setting the following tasks to queued state: scheduler_1 | 
          
         
        
       
      
     
    
   

I have fixed this by setting

parallelism = 1000
default_pool_task_slot_count = 999

The reason why this occurs and why the solution works

We need to look at this method of the Scheduler: https://github.com/apache/airflow/blob/2.2.3/airflow/jobs/scheduler_job.py#L229
In it, the overall logic is the following:

  1. Receive max_tis(== parallelism - active running tasks from the config) in the arguments. In my case that would be 30
  2. Calculate how many free slots in all all Pools we have. In my case that would be 4.
  3. Select the minimum and update the max_tis variable: max_tis = min(4, 30) = 4
  4. Query the DB for tasks that are Scheduled in an unpaused DAGs that are running normally and order them by DAG execution date.
  5. Limit the query by max_tis.
  6. Loop over each returned task and check if we can run it. Run, if possible.

Now, since the tasks are ordered by execution date, this will return us tasks for a DAG that started first, which lets say was DAG_2. It has over 600 tasks. The LIMIT operation will return 4 of them. Each off these tasks cannot be run since there are already 2 tasks running and the max_active_tasks of the DAG is 2. Thus, we just wait till one of these tasks finish and start another task from the same DAG.

Owner
Ilya Strelnikov
Ilya Strelnikov
Reproduction repository for the MDX 2021 Hybrid Demucs model

Submission This is the submission for MDX 2021 Track A, for Track B go to the track_b branch. Submission Summary Submission ID: 151378 Submitter: defo

Alexandre Défossez 62 Dec 18, 2022
RDFLib is a Python library for working with RDF, a simple yet powerful language for representing information.

RDFLib RDFLib is a pure Python package for working with RDF. RDFLib contains most things you need to work with RDF, including: parsers and serializers

RDFLib 1.8k Jan 02, 2023
🤖🧭Creates google-like navigation menu using python-telegram-bot wrapper

python telegram bot menu pagination Makes a google style pagination line for a list of items. In other words it builds a menu for navigation if you ha

Sergey Smirnov 9 Nov 27, 2022
Repository for DNN training, theory to practice, part of the Large Scale Machine Learning class at Mines Paritech

DNN Training, from theory to practice This repository is complementary to the deep learning training lesson given to les Mines ParisTech on the 11th o

Alexandre Défossez 6 Nov 14, 2022
Mute your mic while you're typing. An app for Ubuntu.

Hushboard Mute your microphone while typing, for Ubuntu. Install from kryogenix.org/code/hushboard/. Installation We recommend you install Hushboard t

Stuart Langridge 142 Jan 05, 2023
Abilian Core: an enterprise application development platform based on the Flask micro-framework, the SQLAlchemy ORM

About Abilian Core is an enterprise application development platform based on the Flask micro-framework, the SQLAlchemy ORM, good intentions and best

Abilian open source projects 47 Apr 14, 2022
Paintbot - Forward & Inverse Kinematics

PAINTBOT - FORWARD & INVERSE KINEMATICS: Overview: We built a simulation of a RRR robot shown in the figure below. The robot has 3 links and is connec

Alex Lin 1 Oct 21, 2021
The program converts Swiss notes into American notes

Informatik-Programmieren Einleitung: Das Programm rechnet Schweizer Noten in das Amerikanische Noten um. Der Benutzer kann seine Note eingeben und der

2 Dec 16, 2021
These are my solutions to Advent of Code problems.

Advent of Code These are my solutions to Advent of Code problems. If you want to join my leaderboard, the code is 540750-9589f56d. When I solve for sp

Sumner Evans 5 Dec 19, 2022
An Embedded Linux Project Build and Compile Tool -- An Bitbake UI Extension

Dianshao - An Embedded Linux Project Build and Compile Tool

0 Mar 27, 2022
Python Service for MISP Feed Management

Python Service for MISP Feed Management This set of scripts is designed to offer better reliability and more control over the fetching of feeds into M

Chris 7 Aug 24, 2022
A simple spyware in python.

Spyware-Python- Dependencies: Python 3.x OpenCV PyAutoGUI PyMongo (for mongodb connection) Flask (Web Server) Ngrok (helps us push our fla

Abubakar 3 Sep 07, 2022
Developing and Comparing Vision-based Algorithms for Vision-based Agile Flight

DodgeDrone: Vision-based Agile Drone Flight (ICRA 2022 Competition) Would you like to push the boundaries of drone navigation? Then participate in the

Robotics and Perception Group 115 Dec 10, 2022
Life Dynamics for python

Daphny_counter run command must be like this: /usr/bin/python3 /home/nmakagonov/Daphny/daphny_counter/Daphny_counter.py -o /home/nmakagonov/Daphny/out

12 Sep 05, 2022
Reference python implementation of Chia pool operations for pool operators

This repository provides a sample server written in python, which is meant to server as a basis for a Chia Pool. While this is a fully functional implementation, it requires some work in scalability

Chia Network 451 Dec 13, 2022
A webapp for taking fast notes, designed for business, school, and collaboration with groups.

JOTS Journal of the Session A webapp for taking fast notes, designed for business, school, and collaboration with groups.

Zebadiah S. Taylor 2 Jun 10, 2022
Trashselected - Plugin for fman.io to move files that has been selected in fman to trash

TrashSelected Plugin for fman.io to move files that has been selected in fman to

1 Feb 04, 2022
take home quiz

guess the correlation data inspection a pretty normal distribution train/val/test split splitting amount .dataset: 150000 instances ├─8

HR Wu 1 Nov 04, 2021
This is a repository containing the backend and the frontend of a simple pokédex.

Pokémon This is a repository containing the backend and the frontend of a simple pokédex. This is a work in progress project! Project Structure 🗂 pok

André Rato 1 Nov 28, 2021
Your copilot to studies and work (Pomodoro-timer, Translate and Notes app)

Copylot Your copilot to studies and work (Pomodoro-timer, Translate and Notes app) Copylot are three applications in one: Pomodoro Translate Notes Cop

Eduardo Mendes 20 Dec 16, 2022