-
Notifications
You must be signed in to change notification settings - Fork 952
Add PDS-DS Query 1 #19131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: branch-25.08
Are you sure you want to change the base?
Add PDS-DS Query 1 #19131
Conversation
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually. Contributors can view more details about this message here. |
except AssertionError as e: | ||
validation_failures.append(q_id) | ||
print(f"❌ Query {q_id} failed validation!\n{e}") | ||
if args.validate and run_config.executor != "cpu": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sign post: There are no duckdb queries in this module to validate against, which is why we validate against CPU for pdsh.
.select(["c_customer_id"]) | ||
.sort("c_customer_id") | ||
.limit(100) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are more, and each individual query is longer than, the pdsh versions. I think we should make a module with one file per query and import them here.
that is, a directory structure: benchmarks/pdsds/{q1, ..., q99}.py
etc. That would also make it easier to navigate and review the individual queries.
for filename in os.listdir(dataset_path): | ||
if filename.endswith(".parquet"): | ||
table_name = filename.replace(".parquet", "") | ||
parquet_path = Path(dataset_path) / filename | ||
create_view_sql = f"CREATE VIEW {table_name} AS SELECT * FROM read_parquet('{parquet_path}');" | ||
create_statements.append(create_view_sql) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
statements = [
f"CREATE VIEW {table.stem} as SELECT * FROM read_parquet('{table.absolute()}')"
for table in Path(dataset_path).glob("*.parquet")
]
statements.append(query)
return conn.execute("\n".join(statements)).pl()
?
print(f"Completed {q_id} in {t1 - t0:.2f} seconds") | ||
|
||
if args.print_results: | ||
print(result) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please track and store, in structured format, the timing and other relevant run config data. (this is done for the polars run below, I suppose we should do it for duckdb as well).
|
||
for q_id in run_config.queries: | ||
try: | ||
q = getattr(PDSDSPolarsQueries, f"q{q_id}")(run_config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the only line that is different from the equivalent function in pdsh.py. Let's refactor so we have a function run_polars(q: pl.LazyFrame, options: ...)
and can reuse that in both files.
That way we don't have two places where we have to remember to update config options for the executors.
color="green", | ||
): | ||
if run_config.executor == "cpu": | ||
return q.collect(new_streaming=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be:
return q.collect(new_streaming=True) | |
return q.collect(engine="streaming") |
|
||
else: | ||
raise RuntimeError( | ||
"Cannot provide debug information because cudf_polars is not installed." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this is code movement, but this is a weird error message. I think the message should just report that the requested engine is not supported?
if run_config.executor == "cpu": | ||
if args.explain_logical: | ||
print(f"\nQuery {q_id} - Logical plan\n") | ||
print(q.explain()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One can explain the streaming physical plan with q.show_graph(engine="streaming", plan_stage="physical")
(needs graphviz...)
Description
Contributes to #19125.
pdsds.py
pdsh.py
just move out common logic that it shares withpdsds.py
Checklist