Skip to content

Commit a91ded1

Browse files
authored
(fix) partition block overwriting (#1695)
* Fix blocks overwriting * Add test case
1 parent 07ecbd0 commit a91ded1

File tree

2 files changed

+27
-4
lines changed

2 files changed

+27
-4
lines changed

awswrangler/distributed/ray/modin/s3/_write_dataset.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ def _to_partitions_distributed( # pylint: disable=unused-argument
138138
func = engine.dispatch_func(func, PandasDataFrame)
139139

140140
@ray_remote
141-
def write_partitions(df: pd.DataFrame) -> Tuple[List[str], Dict[str, List[str]]]:
141+
def write_partitions(df: pd.DataFrame, block_index: int) -> Tuple[List[str], Dict[str, List[str]]]:
142142
paths, partitions_values = _to_partitions_func(
143143
# Passing a copy of the data frame because data in ray object store is immutable
144144
# and that leads to "ValueError: buffer source array is read-only" during df.groupby()
@@ -154,7 +154,7 @@ def write_partitions(df: pd.DataFrame) -> Tuple[List[str], Dict[str, List[str]]]
154154
table_type=table_type,
155155
transaction_id=transaction_id,
156156
bucketing_info=None,
157-
filename_prefix=filename_prefix,
157+
filename_prefix=f"{filename_prefix}_{block_index:05d}",
158158
partition_cols=partition_cols,
159159
partitions_types=partitions_types,
160160
boto3_session=None,
@@ -163,7 +163,9 @@ def write_partitions(df: pd.DataFrame) -> Tuple[List[str], Dict[str, List[str]]]
163163
return paths, partitions_values
164164

165165
block_object_refs = ray.data.from_modin(df).get_internal_block_refs()
166-
result = ray_get([write_partitions(object_ref) for object_ref in block_object_refs])
166+
result = ray_get(
167+
[write_partitions(object_ref, block_index) for block_index, object_ref in enumerate(block_object_refs)]
168+
)
167169
paths = [path for row in result for path in row[0]]
168170
partitions_values = {
169171
partition_key: partition_value for row in result for partition_key, partition_value in row[1].items()

tests/load/test_s3.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import pandas as pd
1+
import modin.pandas as pd
22
import pytest
33
import ray
44

@@ -31,6 +31,13 @@ def big_modin_df():
3131
return frame
3232

3333

34+
def _modin_repartition(df: pd.DataFrame, num_blocks: int) -> pd.DataFrame:
35+
"""Repartition modin dataframe into n blocks"""
36+
dataset = ray.data.from_modin(df)
37+
dataset = dataset.repartition(num_blocks)
38+
return dataset.to_modin()
39+
40+
3441
@pytest.mark.repeat(1)
3542
@pytest.mark.parametrize("benchmark_time", [180])
3643
def test_s3_select(benchmark_time):
@@ -90,6 +97,20 @@ def test_s3_write_parquet_dataset(df_s, path, partition_cols, bucketing_info, be
9097
assert timer.elapsed_time < benchmark_time
9198

9299

100+
@pytest.mark.parametrize("benchmark_time", [200])
101+
@pytest.mark.parametrize("partition_cols", [None, ["payment_type"]])
102+
@pytest.mark.parametrize("num_blocks", [None, 1, 5])
103+
def test_s3_write_parquet_blocks(df_s, path, partition_cols, num_blocks, benchmark_time):
104+
dataset = True if partition_cols else False
105+
if num_blocks:
106+
df_s = _modin_repartition(df_s, num_blocks)
107+
with ExecutionTimer(f"elapsed time of wr.s3.to_parquet() with repartitioning into {num_blocks} blocks") as timer:
108+
wr.s3.to_parquet(df_s, path=path, dataset=dataset, partition_cols=partition_cols)
109+
df = wr.s3.read_parquet(path=path, dataset=dataset)
110+
assert df.shape == df_s.shape
111+
assert timer.elapsed_time < benchmark_time
112+
113+
93114
@pytest.mark.parametrize("benchmark_time", [5])
94115
def test_s3_delete_objects(path, path2, benchmark_time):
95116
df = pd.DataFrame({"id": [1, 2, 3]})

0 commit comments

Comments
 (0)