Skip to content

[SPARK-52450][CONNECT] Improve performance of schema deepcopy #51157

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

xi-db
Copy link
Contributor

@xi-db xi-db commented Jun 11, 2025

What changes were proposed in this pull request?

In Spark Connect, DataFrame.schema returns a deep copy of the schema to prevent unexpected behavior caused by user modifications to the returned schema object. However, if a user accesses df.schema repeatedly on a DataFrame with a complex schema, it can lead to noticeable performance degradation.

The performance issue can be reproduced using the code snippet below. Since copy.deepcopy is known to be slow to handle complex objects, this PR replaces it with pickle-based ser/de to improve the performance of df.schema access. Given the limitations of pickle, the implementation falls back to deepcopy in cases where pickling fails.

from pyspark.sql.types import StructType, StructField, StringType

def make_nested_struct(level, max_level, fields_per_level):
    if level == max_level - 1:
        return StructType(
            [StructField(f"f{level}_{i}", StringType(), True) for i in range(fields_per_level)])
    else:
        return StructType(
            [StructField(f"s{level}_{i}",
                         make_nested_struct(level + 1, max_level, fields_per_level), True) for i in
             range(fields_per_level)])

# Create a 4 level nested schema with in total 10,000 leaf fields
schema = make_nested_struct(0, 4, 10)

The existing needs 21.9s to copy the schema for 100 times.

import copy
timeit.timeit(lambda: copy.deepcopy(schema), number=100)
# 21.9

The updated approach only needs 2.0s to copy for 100 times:

from pyspark.serializers import CPickleSerializer
cached_schema_serialized = CPickleSerializer().dumps(schema)

timeit.timeit(lambda: CPickleSerializer().loads(cached_schema_serialized), number=100)
# 2.0

Why are the changes needed?

It improves the performance when calling df.schema many times.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing tests and new tests.

Was this patch authored or co-authored using generative AI tooling?

No.

@xi-db
Copy link
Contributor Author

xi-db commented Jun 11, 2025

Hi @hvanhovell @vicennial , could you take a look at this PR? Thanks.

try:
self._cached_schema_serialized = CPickleSerializer().dumps(self._schema)
except Exception as e:
logger.warn(f"DataFrame schema pickle dumps failed with exception: {e}.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what cases do we think this will happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it never happens because schema is nested Spark type classes. It shouldn't hit any of those special types that pickle doesn't support (link). Anyway, maybe we still need to handle the exception just in case. What do you think?

Copy link
Contributor

@heyihong heyihong Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to add some comments to the code to make it easier for future readers to understand. Currently, it's not very clear why a CPickleSerializer is used, or why an error is being handled without looking at the corresponding pull request.

Also, it may be clearer to create a function called _fast_cached_schema_deepcopy that caches the serialized schema and then deserializes it.

@@ -1836,11 +1839,26 @@ def _schema(self) -> StructType:
if self._cached_schema is None:
query = self._plan.to_proto(self._session.client)
self._cached_schema = self._session.client.schema(query)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we store the proto response instead of doing the pickle? In that case we are guaranteed it will always work. Does that have the same performance problems?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell Just tested. types.proto_schema_to_pyspark_data_type took 6.1s to transform the proto schema 100 times. Compared with it, pickle is faster - 2.0s.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

works for me. thanks for looking into it!

Copy link
Contributor

@grundprinzip grundprinzip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before merging would be great to test @hvanhovell's proposal to test the performance of reconstructing the schema from the proto response at all times and what the impact is.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants