Skip to content

(fix) partition block overwriting #1695

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 19, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions awswrangler/distributed/ray/modin/s3/_write_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def _to_partitions_distributed( # pylint: disable=unused-argument
func = engine.dispatch_func(func, PandasDataFrame)

@ray_remote
def write_partitions(df: pd.DataFrame) -> Tuple[List[str], Dict[str, List[str]]]:
def write_partitions(df: pd.DataFrame, block_index: int) -> Tuple[List[str], Dict[str, List[str]]]:
paths, partitions_values = _to_partitions_func(
# Passing a copy of the data frame because data in ray object store is immutable
# and that leads to "ValueError: buffer source array is read-only" during df.groupby()
Expand All @@ -154,7 +154,7 @@ def write_partitions(df: pd.DataFrame) -> Tuple[List[str], Dict[str, List[str]]]
table_type=table_type,
transaction_id=transaction_id,
bucketing_info=None,
filename_prefix=filename_prefix,
filename_prefix=f"{filename_prefix}_{block_index:05d}",
partition_cols=partition_cols,
partitions_types=partitions_types,
boto3_session=None,
Expand All @@ -163,7 +163,9 @@ def write_partitions(df: pd.DataFrame) -> Tuple[List[str], Dict[str, List[str]]]
return paths, partitions_values

block_object_refs = ray.data.from_modin(df).get_internal_block_refs()
result = ray_get([write_partitions(object_ref) for object_ref in block_object_refs])
result = ray_get(
[write_partitions(object_ref, block_index) for block_index, object_ref in enumerate(block_object_refs)]
)
paths = [path for row in result for path in row[0]]
partitions_values = {
partition_key: partition_value for row in result for partition_key, partition_value in row[1].items()
Expand Down