|
17 | 17 |
|
18 | 18 | use super::*;
|
19 | 19 | use insta::assert_snapshot;
|
20 |
| -use regex::Regex; |
21 | 20 | use rstest::rstest;
|
22 | 21 |
|
23 | 22 | use datafusion::config::ConfigOptions;
|
@@ -57,42 +56,41 @@ async fn explain_analyze_baseline_metrics() {
|
57 | 56 |
|
58 | 57 | println!("Query Output:\n\n{formatted}");
|
59 | 58 |
|
60 |
| - let re = Regex::new(r"\|[^|]*\|([^|]*)\|").unwrap(); |
61 |
| - let actual = formatted |
62 |
| - .lines() |
63 |
| - .map(|line| re.replace_all(line, "$1").trim_end().to_string()) |
64 |
| - .filter(|line| !line.is_empty() && !line.starts_with('+')) |
65 |
| - .collect::<Vec<_>>() |
66 |
| - .join("\n"); |
67 |
| - insta::with_settings!({filters => vec![ |
68 |
| - (r"\d+\.?\d*[µmn]?s", "[TIME]"), |
69 |
| - (r"\[\[[^\]]*?/(testing/data/csv/[^/]+\.csv)\]\]", "[[$1]]"), |
70 |
| - ]}, { |
71 |
| - insta::assert_snapshot!(actual,@r#" |
72 |
| - plan |
73 |
| - CoalescePartitionsExec: fetch=3, metrics=[output_rows=3, elapsed_compute=[TIME]] |
74 |
| - UnionExec, metrics=[output_rows=3, elapsed_compute=[TIME]] |
75 |
| - ProjectionExec: expr=[count(Int64(1))@0 as cnt], metrics=[output_rows=1, elapsed_compute=[TIME]] |
76 |
| - AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))], metrics=[output_rows=1, elapsed_compute=[TIME]] |
77 |
| - CoalescePartitionsExec, metrics=[output_rows=3, elapsed_compute=[TIME]] |
78 |
| - AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))], metrics=[output_rows=3, elapsed_compute=[TIME]] |
79 |
| - ProjectionExec: expr=[], metrics=[output_rows=5, elapsed_compute=[TIME]] |
80 |
| - AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[], metrics=[output_rows=5, elapsed_compute=[TIME], spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, peak_mem_used=50592] |
81 |
| - CoalesceBatchesExec: target_batch_size=4096, metrics=[output_rows=5, elapsed_compute=[TIME]] |
82 |
| - RepartitionExec: partitioning=Hash([c1@0], 3), input_partitions=3, metrics=[fetch_time=[TIME], repartition_time=[TIME], send_time=[TIME]] |
83 |
| - AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[], metrics=[output_rows=5, elapsed_compute=[TIME], spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, skipped_aggregation_rows=0, peak_mem_used=52216] |
84 |
| - CoalesceBatchesExec: target_batch_size=4096, metrics=[output_rows=99, elapsed_compute=[TIME]] |
85 |
| - FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434, projection=[c1@0], metrics=[output_rows=99, elapsed_compute=[TIME]] |
86 |
| - RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1, metrics=[fetch_time=[TIME], repartition_time=[TIME], send_time=[TIME]] |
87 |
| - DataSourceExec: file_groups={1 group: [[testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c13], file_type=csv, has_header=true, metrics=[output_rows=100, elapsed_compute=[TIME], file_open_errors=0, file_scan_errors=0, time_elapsed_opening=[TIME], time_elapsed_processing=[TIME], time_elapsed_scanning_total=[TIME], time_elapsed_scanning_until_data=[TIME]] |
88 |
| - ProjectionExec: expr=[1 as cnt], metrics=[output_rows=1, elapsed_compute=[TIME]] |
89 |
| - PlaceholderRowExec, metrics=[] |
90 |
| - ProjectionExec: expr=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as cnt], metrics=[output_rows=1, elapsed_compute=[TIME]] |
91 |
| - BoundedWindowAggExec: wdw=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }], mode=[Sorted], metrics=[output_rows=1, elapsed_compute=[TIME]] |
92 |
| - ProjectionExec: expr=[1 as c1], metrics=[output_rows=1, elapsed_compute=[TIME]] |
93 |
| - PlaceholderRowExec, metrics=[] |
94 |
| - "#); |
95 |
| - }); |
| 59 | + assert_metrics!( |
| 60 | + &formatted, |
| 61 | + "AggregateExec: mode=Partial, gby=[]", |
| 62 | + "metrics=[output_rows=3, elapsed_compute=" |
| 63 | + ); |
| 64 | + assert_metrics!( |
| 65 | + &formatted, |
| 66 | + "AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1]", |
| 67 | + "metrics=[output_rows=5, elapsed_compute=" |
| 68 | + ); |
| 69 | + assert_metrics!( |
| 70 | + &formatted, |
| 71 | + "FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434", |
| 72 | + "metrics=[output_rows=99, elapsed_compute=" |
| 73 | + ); |
| 74 | + assert_metrics!( |
| 75 | + &formatted, |
| 76 | + "ProjectionExec: expr=[]", |
| 77 | + "metrics=[output_rows=5, elapsed_compute=" |
| 78 | + ); |
| 79 | + assert_metrics!( |
| 80 | + &formatted, |
| 81 | + "CoalesceBatchesExec: target_batch_size=4096", |
| 82 | + "metrics=[output_rows=5, elapsed_compute" |
| 83 | + ); |
| 84 | + assert_metrics!( |
| 85 | + &formatted, |
| 86 | + "UnionExec", |
| 87 | + "metrics=[output_rows=3, elapsed_compute=" |
| 88 | + ); |
| 89 | + assert_metrics!( |
| 90 | + &formatted, |
| 91 | + "WindowAggExec", |
| 92 | + "metrics=[output_rows=1, elapsed_compute=" |
| 93 | + ); |
96 | 94 |
|
97 | 95 | fn expected_to_have_metrics(plan: &dyn ExecutionPlan) -> bool {
|
98 | 96 | use datafusion::physical_plan;
|
|
0 commit comments