Skip to content

feat(frontend): add source backfill progress tracking #22217

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jun 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,8 @@ message TableFunction {
MYSQL_QUERY = 21;
// Internal backfill table function
INTERNAL_BACKFILL_PROGRESS = 30;
// Internal source backfill table function
INTERNAL_SOURCE_BACKFILL_PROGRESS = 31;
// User defined table function
USER_DEFINED = 100;
}
Expand Down
6 changes: 6 additions & 0 deletions src/common/src/catalog/internal_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ pub fn is_backfill_table(table_name: &str) -> bool {
parts_len >= 2 && parts[parts_len - 2] == "streamscan"
}

pub fn is_source_backfill_table(table_name: &str) -> bool {
let parts: Vec<&str> = table_name.split('_').collect();
let parts_len = parts.len();
parts_len >= 2 && parts[parts_len - 2] == "sourcebackfill"
}

pub fn get_dist_key_in_pk_indices<I: Eq + Copy + Debug, O: TryFrom<usize>>(
dist_key_indices: &[I],
pk_indices: &[I],
Expand Down
9 changes: 9 additions & 0 deletions src/frontend/src/binder/expr/function/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,15 @@ impl Binder {
self.ensure_table_function_allowed()?;
return Ok(TableFunction::new_internal_backfill_progress().into());
}
// `internal_source_backfill_progress` table function
if func_name.eq("internal_source_backfill_progress") {
reject_syntax!(
arg_list.variadic,
"`VARIADIC` is not allowed in table function call"
);
self.ensure_table_function_allowed()?;
return Ok(TableFunction::new_internal_source_backfill_progress().into());
}
// UDTF
if let Some(ref udf) = udf
&& udf.kind.is_table()
Expand Down
14 changes: 14 additions & 0 deletions src/frontend/src/expr/table_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,20 @@ impl TableFunction {
}
}

pub fn new_internal_source_backfill_progress() -> Self {
TableFunction {
args: vec![],
return_type: DataType::Struct(StructType::new(vec![
("job_id".to_owned(), DataType::Int32),
("fragment_id".to_owned(), DataType::Int32),
("backfill_state_table_id".to_owned(), DataType::Int32),
("backfill_progress".to_owned(), DataType::Jsonb),
])),
function_type: TableFunctionType::InternalSourceBackfillProgress,
user_defined: None,
}
}

pub fn to_protobuf(&self) -> PbTableFunction {
PbTableFunction {
function_type: self.function_type as i32,
Expand Down
12 changes: 12 additions & 0 deletions src/frontend/src/optimizer/logical_optimization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ static TABLE_FUNCTION_CONVERT: LazyLock<OptimizationStage> = LazyLock::new(|| {
TableFunctionToFileScanRule::create(),
// Apply internal backfill progress rule first
TableFunctionToInternalBackfillProgressRule::create(),
// Apply internal source backfill progress rule next
TableFunctionToInternalSourceBackfillProgressRule::create(),
// Apply postgres query rule next
TableFunctionToPostgresQueryRule::create(),
// Apply mysql query rule next
Expand Down Expand Up @@ -181,6 +183,15 @@ static TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS: LazyLock<OptimizationStage>
)
});

static TABLE_FUNCTION_TO_INTERNAL_SOURCE_BACKFILL_PROGRESS: LazyLock<OptimizationStage> =
LazyLock::new(|| {
OptimizationStage::new(
"Table Function To Internal Source Backfill Progress",
vec![TableFunctionToInternalSourceBackfillProgressRule::create()],
ApplyOrder::TopDown,
)
});

static VALUES_EXTRACT_PROJECT: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Values Extract Project",
Expand Down Expand Up @@ -757,6 +768,7 @@ impl LogicalOptimizer {
plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_POSTGRES_QUERY)?;
plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_MYSQL_QUERY)?;
plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_BACKFILL_PROGRESS)?;
plan = plan.optimize_by_rules(&TABLE_FUNCTION_TO_INTERNAL_SOURCE_BACKFILL_PROGRESS)?;
// In order to unnest a table function, we need to convert it into a `project_set` first.
plan = plan.optimize_by_rules(&TABLE_FUNCTION_CONVERT)?;

Expand Down
7 changes: 5 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_source_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ pub struct StreamSourceScan {
impl_plan_tree_node_for_leaf! { StreamSourceScan }

impl StreamSourceScan {
pub const BACKFILL_PROGRESS_COLUMN_NAME: &str = "backfill_progress";
pub const PARTITION_ID_COLUMN_NAME: &str = "partition_id";

pub fn new(core: generic::Source) -> Self {
let base = PlanBase::new_stream_with_core(
&core,
Expand Down Expand Up @@ -84,11 +87,11 @@ impl StreamSourceScan {

let key = Field {
data_type: DataType::Varchar,
name: "partition_id".to_owned(),
name: Self::PARTITION_ID_COLUMN_NAME.to_owned(),
};
let value = Field {
data_type: DataType::Jsonb,
name: "backfill_progress".to_owned(),
name: Self::BACKFILL_PROGRESS_COLUMN_NAME.to_owned(),
};

let ordered_col_idx = builder.add_column(&key);
Expand Down
36 changes: 28 additions & 8 deletions src/frontend/src/optimizer/plan_node/stream_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ pub struct StreamTableScan {
}

impl StreamTableScan {
pub const BACKFILL_FINISHED_COLUMN_NAME: &str = "backfill_finished";
pub const EPOCH_COLUMN_NAME: &str = "epoch";
pub const IS_EPOCH_FINISHED_COLUMN_NAME: &str = "is_epoch_finished";
pub const ROW_COUNT_COLUMN_NAME: &str = "row_count";
pub const VNODE_COLUMN_NAME: &str = "vnode";

pub fn new_with_stream_scan_type(
core: generic::TableScan,
stream_scan_type: StreamScanType,
Expand Down Expand Up @@ -174,7 +180,10 @@ impl StreamTableScan {

// We use vnode as primary key in state table.
// If `Distribution::Single`, vnode will just be `VirtualNode::default()`.
catalog_builder.add_column(&Field::with_name(VirtualNode::RW_TYPE, "vnode"));
catalog_builder.add_column(&Field::with_name(
VirtualNode::RW_TYPE,
Self::VNODE_COLUMN_NAME,
));
catalog_builder.add_order_column(0, OrderType::ascending());

match stream_scan_type {
Expand All @@ -190,22 +199,33 @@ impl StreamTableScan {
}

// `backfill_finished` column
catalog_builder
.add_column(&Field::with_name(DataType::Boolean, "backfill_finished"));
catalog_builder.add_column(&Field::with_name(
DataType::Boolean,
Self::BACKFILL_FINISHED_COLUMN_NAME,
));

// `row_count` column
catalog_builder.add_column(&Field::with_name(DataType::Int64, "row_count"));
catalog_builder.add_column(&Field::with_name(
DataType::Int64,
Self::ROW_COUNT_COLUMN_NAME,
));
}
StreamScanType::SnapshotBackfill | StreamScanType::CrossDbSnapshotBackfill => {
// `epoch` column
catalog_builder.add_column(&Field::with_name(DataType::Int64, "epoch"));
catalog_builder
.add_column(&Field::with_name(DataType::Int64, Self::EPOCH_COLUMN_NAME));

// `row_count` column
catalog_builder.add_column(&Field::with_name(DataType::Int64, "row_count"));
catalog_builder.add_column(&Field::with_name(
DataType::Int64,
Self::ROW_COUNT_COLUMN_NAME,
));

// `is_finished` column
catalog_builder
.add_column(&Field::with_name(DataType::Boolean, "is_epoch_finished"));
catalog_builder.add_column(&Field::with_name(
DataType::Boolean,
Self::IS_EPOCH_FINISHED_COLUMN_NAME,
));

// pk columns
for col_order in self.core.primary_key() {
Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/optimizer/rule/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ mod source_to_iceberg_scan_rule;
mod source_to_kafka_scan_rule;
mod table_function_to_file_scan_rule;
mod table_function_to_internal_backfill_progress;
mod table_function_to_internal_source_backfill_progress;
mod table_function_to_mysql_query_rule;
mod table_function_to_postgres_query_rule;
mod values_extract_project_rule;
Expand All @@ -261,6 +262,7 @@ pub use source_to_iceberg_scan_rule::*;
pub use source_to_kafka_scan_rule::*;
pub use table_function_to_file_scan_rule::*;
pub use table_function_to_internal_backfill_progress::*;
pub use table_function_to_internal_source_backfill_progress::*;
pub use table_function_to_mysql_query_rule::*;
pub use table_function_to_postgres_query_rule::*;
pub use values_extract_project_rule::*;
Expand Down Expand Up @@ -329,6 +331,7 @@ macro_rules! for_all_rules {
, { TableFunctionToPostgresQueryRule }
, { TableFunctionToMySqlQueryRule }
, { TableFunctionToInternalBackfillProgressRule }
, { TableFunctionToInternalSourceBackfillProgressRule }
, { ApplyLimitTransposeRule }
, { CommonSubExprExtractRule }
, { BatchProjectMergeRule }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::expr::{AggCall, ExprImpl, InputRef, Literal, OrderBy, TableFunctionTy
use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{
LogicalAgg, LogicalProject, LogicalScan, LogicalTableFunction, LogicalUnion, LogicalValues,
StreamTableScan,
};
use crate::optimizer::{OptimizerContext, PlanRef};
use crate::utils::{Condition, GroupBy};
Expand All @@ -47,7 +48,6 @@ impl FallibleRule for TableFunctionToInternalBackfillProgressRule {
}

let reader = plan.ctx().session_ctx().env().catalog_reader().read_guard();
// TODO(kwannoel): Make sure it reads from source tables as well.
let backfilling_tables = get_backfilling_tables(reader);
let plan = Self::build_plan(plan.ctx(), backfilling_tables)?;
ApplyResult::Ok(plan)
Expand Down Expand Up @@ -190,12 +190,20 @@ impl BackfillInfo {
let Some(job_id) = table.job_id.map(|id| id.table_id) else {
bail!("`job_id` column not found in backfill table");
};
let Some(row_count_column_index) =
table.columns.iter().position(|c| c.name() == "row_count")
let Some(row_count_column_index) = table
.columns
.iter()
.position(|c| c.name() == StreamTableScan::ROW_COUNT_COLUMN_NAME)
else {
bail!("`row_count` column not found in backfill table");
bail!(
"`{}` column not found in backfill table",
StreamTableScan::ROW_COUNT_COLUMN_NAME
);
};
let epoch_column_index = table.columns.iter().position(|c| c.name() == "epoch");
let epoch_column_index = table
.columns
.iter()
.position(|c| c.name() == StreamTableScan::EPOCH_COLUMN_NAME);
let fragment_id = table.fragment_id;
let table_id = table.id.table_id;

Expand Down
Loading
Loading