Skip to content

[airflow] Avoid implicit DAG schedule (AIR301) #14581

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions crates/ruff_linter/resources/test/fixtures/airflow/AIR301.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from airflow import DAG, dag
from airflow.timetables.trigger import CronTriggerTimetable

DAG(dag_id="class_default_schedule")

DAG(dag_id="class_schedule", schedule="@hourly")

DAG(dag_id="class_timetable", timetable=CronTriggerTimetable())

DAG(dag_id="class_schedule_interval", schedule_interval="@hourly")


@dag()
def decorator_default_schedule():
pass


@dag(schedule="0 * * * *")
def decorator_schedule():
pass


@dag(timetable=CronTriggerTimetable())
def decorator_timetable():
pass


@dag(schedule_interval="0 * * * *")
def decorator_schedule_interval():
pass
7 changes: 5 additions & 2 deletions crates/ruff_linter/src/checkers/ast/analyze/expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use ruff_text_size::Ranged;
use crate::checkers::ast::Checker;
use crate::registry::Rule;
use crate::rules::{
flake8_2020, flake8_async, flake8_bandit, flake8_boolean_trap, flake8_bugbear, flake8_builtins,
flake8_comprehensions, flake8_datetimez, flake8_debugger, flake8_django,
airflow, flake8_2020, flake8_async, flake8_bandit, flake8_boolean_trap, flake8_bugbear,
flake8_builtins, flake8_comprehensions, flake8_datetimez, flake8_debugger, flake8_django,
flake8_future_annotations, flake8_gettext, flake8_implicit_str_concat, flake8_logging,
flake8_logging_format, flake8_pie, flake8_print, flake8_pyi, flake8_pytest_style, flake8_self,
flake8_simplify, flake8_tidy_imports, flake8_type_checking, flake8_use_pathlib, flynt, numpy,
Expand Down Expand Up @@ -1061,6 +1061,9 @@ pub(crate) fn expression(expr: &Expr, checker: &mut Checker) {
if checker.enabled(Rule::UnrawRePattern) {
ruff::rules::unraw_re_pattern(checker, call);
}
if checker.enabled(Rule::AirflowDagNoScheduleArgument) {
airflow::rules::dag_no_schedule_argument(checker, expr);
}
}
Expr::Dict(dict) => {
if checker.any_enabled(&[
Expand Down
1 change: 1 addition & 0 deletions crates/ruff_linter/src/codes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1032,6 +1032,7 @@ pub fn code_to_rule(linter: Linter, code: &str) -> Option<(RuleGroup, Rule)> {

// airflow
(Airflow, "001") => (RuleGroup::Stable, rules::airflow::rules::AirflowVariableNameTaskIdMismatch),
(Airflow, "301") => (RuleGroup::Preview, rules::airflow::rules::AirflowDagNoScheduleArgument),

// perflint
(Perflint, "101") => (RuleGroup::Stable, rules::perflint::rules::UnnecessaryListCast),
Expand Down
1 change: 1 addition & 0 deletions crates/ruff_linter/src/rules/airflow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mod tests {
use crate::{assert_messages, settings};

#[test_case(Rule::AirflowVariableNameTaskIdMismatch, Path::new("AIR001.py"))]
#[test_case(Rule::AirflowDagNoScheduleArgument, Path::new("AIR301.py"))]
fn rules(rule_code: Rule, path: &Path) -> Result<()> {
let snapshot = format!("{}_{}", rule_code.noqa_code(), path.to_string_lossy());
let diagnostics = test_path(
Expand Down
113 changes: 113 additions & 0 deletions crates/ruff_linter/src/rules/airflow/rules/dag_schedule_argument.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
use ruff_diagnostics::{Diagnostic, Violation};
use ruff_macros::{derive_message_formats, violation};
use ruff_python_ast::Expr;
use ruff_python_ast::{self as ast, Keyword};
use ruff_text_size::Ranged;

use crate::checkers::ast::Checker;

/// ## What it does
/// Checks that the `DAG()` class or a `@dag()` decorator has an explicit
/// `schedule` parameter.
///
/// ## Why is this bad?
/// The default `schedule` value on Airflow 2 is `timedelta(days=1)`, which is
/// almost never what a user is looking for. Airflow 3 changes this the default
/// to *None*, and would break existing DAGs using the implicit default.
Comment on lines +16 to +17
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we tell a bit more about how it breaks existing DAGS that use the implicit default. Will it fail with an exception or is it just that the default is different?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just the default is different—your DAGs will simply stop doing anything silently. I’ll add a paragraph to call this out.

///
/// Airflow 2 also provides alternative arguments `schedule_interval` and
/// `timetable` to specify the DAG schedule. They existed for backward
/// compatibility, and have been removed from Airflow 3.
///
/// ## Example
/// ```python
/// from airflow import DAG
///
///
/// # Using the implicit default schedule.
/// dag1 = DAG(dag_id="my_dag_1")
///
/// # Using a deprecated argument to set schedule.
/// dag2 = DAG(dag_id="my_dag_2", schedule_interval="@daily")
/// ```
///
/// Use instead:
/// ```python
/// from datetime import timedelta
///
/// from airflow import DAG
///
///
/// dag1 = DAG(dag_id="my_dag_1", schedule=timedelta(days=1))
/// dag2 = DAG(dag_id="my_dag_2", schedule="@daily")
/// ```
#[violation]
pub struct AirflowDagNoScheduleArgument {
deprecated_argument: Option<String>,
}

impl Violation for AirflowDagNoScheduleArgument {
#[derive_message_formats]
fn message(&self) -> String {
let AirflowDagNoScheduleArgument {
deprecated_argument,
} = self;
match deprecated_argument {
Some(argument) => {
format!("argument `{argument}` is deprecated; use `schedule` instead")
}
None => "DAG should have an explicit `schedule` argument".to_string(),
}
}
}

/// AIR301
pub(crate) fn dag_no_schedule_argument(checker: &mut Checker, expr: &Expr) {
// Don't check non-call expressions.
let Expr::Call(ast::ExprCall {
func, arguments, ..
}) = expr
else {
return;
};

// We don't do anything unless this is a `DAG` (class) or `dag` (decorator
// function) from Airflow.
if !checker
.semantic()
.resolve_qualified_name(func)
.is_some_and(|qualname| matches!(qualname.segments(), ["airflow", .., "DAG" | "dag"]))
{
return;
}

// If there's a `schedule` keyword argument, we are good.
if arguments.find_keyword("schedule").is_some() {
return;
}

// Produce a diagnostic on either a deprecated schedule keyword argument,
// or no schedule-related keyword arguments at all.
let diagnostic = if let Some(keyword) = arguments.keywords.iter().find(|keyword| {
let Keyword { arg, .. } = keyword;
arg.as_ref()
.is_some_and(|arg| arg == "timetable" || arg == "schedule_interval")
}) {
// A deprecated argument is used.
Diagnostic::new(
AirflowDagNoScheduleArgument {
deprecated_argument: keyword.arg.as_ref().map(ToString::to_string),
},
keyword.range(),
)
} else {
// The implicit default is used.
Diagnostic::new(
AirflowDagNoScheduleArgument {
deprecated_argument: None,
},
expr.range(),
)
};
checker.diagnostics.push(diagnostic);
}
2 changes: 2 additions & 0 deletions crates/ruff_linter/src/rules/airflow/rules/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub(crate) use dag_schedule_argument::*;
pub(crate) use task_variable_name::*;

mod dag_schedule_argument;
mod task_variable_name;
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
---
source: crates/ruff_linter/src/rules/airflow/mod.rs
---
AIR301.py:4:1: AIR301 DAG should have an explicit `schedule` argument
|
2 | from airflow.timetables.trigger import CronTriggerTimetable
3 |
4 | DAG(dag_id="class_default_schedule")
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ AIR301
5 |
6 | DAG(dag_id="class_schedule", schedule="@hourly")
|

AIR301.py:8:31: AIR301 argument `timetable` is deprecated; use `schedule` instead
|
6 | DAG(dag_id="class_schedule", schedule="@hourly")
7 |
8 | DAG(dag_id="class_timetable", timetable=CronTriggerTimetable())
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ AIR301
9 |
10 | DAG(dag_id="class_schedule_interval", schedule_interval="@hourly")
|

AIR301.py:10:39: AIR301 argument `schedule_interval` is deprecated; use `schedule` instead
|
8 | DAG(dag_id="class_timetable", timetable=CronTriggerTimetable())
9 |
10 | DAG(dag_id="class_schedule_interval", schedule_interval="@hourly")
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^ AIR301
|

AIR301.py:13:2: AIR301 DAG should have an explicit `schedule` argument
|
13 | @dag()
| ^^^^^ AIR301
14 | def decorator_default_schedule():
15 | pass
|

AIR301.py:23:6: AIR301 argument `timetable` is deprecated; use `schedule` instead
|
23 | @dag(timetable=CronTriggerTimetable())
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ AIR301
24 | def decorator_timetable():
25 | pass
|

AIR301.py:28:6: AIR301 argument `schedule_interval` is deprecated; use `schedule` instead
|
28 | @dag(schedule_interval="0 * * * *")
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ AIR301
29 | def decorator_schedule_interval():
30 | pass
|
3 changes: 3 additions & 0 deletions ruff.schema.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading