Skip to content

(enhancement): Apply modin repartitioning where required only #1701

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions awswrangler/distributed/ray/_register.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
from awswrangler._data_types import pyarrow_types_from_pandas
from awswrangler._distributed import MemoryFormatEnum, engine, memory_format
from awswrangler._utils import is_pandas_frame, table_refs_to_df
from awswrangler.distributed.ray._core import ray_remote
from awswrangler.distributed.ray import ray_remote
from awswrangler.lakeformation._read import _get_work_unit_results
from awswrangler.s3._delete import _delete_objects
from awswrangler.s3._read_parquet import _read_parquet, _read_parquet_metadata_file
from awswrangler.s3._read_text import _read_text
from awswrangler.s3._select import _select_object_content, _select_query
from awswrangler.s3._wait import _wait_object_batch
from awswrangler.s3._write_dataset import _to_buckets, _to_partitions
from awswrangler.s3._write_parquet import _to_parquet, to_parquet
from awswrangler.s3._write_text import _to_text, to_csv, to_json
from awswrangler.s3._write_parquet import _to_parquet
from awswrangler.s3._write_text import _to_text


def register_ray() -> None:
Expand All @@ -28,7 +28,6 @@ def register_ray() -> None:
engine.register_func(func, ray_remote(func))

if memory_format.get() == MemoryFormatEnum.MODIN:
from awswrangler.distributed.ray.modin._core import modin_repartition
from awswrangler.distributed.ray.modin._data_types import pyarrow_types_from_pandas_distributed
from awswrangler.distributed.ray.modin._utils import _arrow_refs_to_df, _is_pandas_or_modin_frame
from awswrangler.distributed.ray.modin.s3._read_parquet import _read_parquet_distributed
Expand All @@ -48,9 +47,6 @@ def register_ray() -> None:
_to_parquet: _to_parquet_distributed,
_to_partitions: _to_partitions_distributed,
_to_text: _to_text_distributed,
to_csv: modin_repartition(to_csv),
to_json: modin_repartition(to_json),
to_parquet: modin_repartition(to_parquet),
table_refs_to_df: _arrow_refs_to_df,
is_pandas_frame: _is_pandas_or_modin_frame,
}.items():
Expand Down
2 changes: 1 addition & 1 deletion awswrangler/distributed/ray/modin/_data_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import ray

from awswrangler._data_types import pyarrow_types_from_pandas
from awswrangler.distributed.ray._core import ray_get, ray_remote
from awswrangler.distributed.ray import ray_get, ray_remote


def pyarrow_types_from_pandas_distributed(
Expand Down
3 changes: 3 additions & 0 deletions awswrangler/distributed/ray/modin/s3/_write_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from awswrangler._distributed import engine
from awswrangler.distributed.ray import ray_get, ray_remote
from awswrangler.distributed.ray.modin import modin_repartition
from awswrangler.s3._write_concurrent import _WriteProxy
from awswrangler.s3._write_dataset import _delete_objects, _get_bucketing_series, _to_partitions

Expand All @@ -22,6 +23,7 @@ def _retrieve_paths(values: Union[str, List[Any]]) -> Iterator[str]:
yield values


@modin_repartition
def _to_buckets_distributed( # pylint: disable=unused-argument
df: pd.DataFrame,
func: Callable[..., List[str]],
Expand Down Expand Up @@ -109,6 +111,7 @@ def _write_partitions_distributed(
return prefix, df_group.name, paths


@modin_repartition
def _to_partitions_distributed( # pylint: disable=unused-argument
df: pd.DataFrame,
func: Callable[..., List[str]],
Expand Down
29 changes: 29 additions & 0 deletions tests/load/test_s3.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import math

import modin.config as cfg
import modin.pandas as pd
import numpy as np
import pytest
import ray
from modin.distributed.dataframe.pandas import unwrap_partitions

import awswrangler as wr

Expand Down Expand Up @@ -192,3 +197,27 @@ def test_wait_object_not_exists(path: str, benchmark_time: int) -> None:
wr.s3.wait_objects_not_exist(file_paths, parallelism=16)

assert timer.elapsed_time < benchmark_time


@pytest.mark.parametrize("size", [(5000, 5000), (1, 5000), (5000, 1), (1, 1)])
def test_wide_df(size, path) -> None:
df = pd.DataFrame(np.random.randint(0, 100, size=size))
df.columns = df.columns.map(str)

num_cols = size[0]
df["int"] = np.random.choice(["1", "2", None], num_cols)
df["decimal"] = np.random.choice(["1.0", "2.0", None], num_cols)
df["date"] = np.random.choice(["2020-01-01", "2020-01-02", None], num_cols)
df["par0"] = np.random.choice(["A", "B"], num_cols)

partitions_shape = np.array(unwrap_partitions(df)).shape
assert partitions_shape[1] == min(math.ceil(len(df.columns) / cfg.MinPartitionSize.get()), cfg.NPartitions.get())

dtype = {
"int": "tinyint",
"decimal": "double",
"date": "date",
}

result = wr.s3.to_csv(df=df, path=path, dataset=True, dtype=dtype, partition_cols=["par0"])
assert len(result["paths"]) == partitions_shape[0] * len(df["par0"].unique())
3 changes: 1 addition & 2 deletions tests/unit/test_s3_text.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,10 @@ def test_csv_dataset_header_modes(path, mode, glue_database, glue_table):
index=False,
header=True,
)
dfs_conc = pd.concat(dfs)
df_res = wr.s3.read_csv(path=path0)

if mode == "append":
assert len(df_res) == len(dfs_conc)
assert len(df_res) == sum([len(df) for df in dfs])
else:
assert df_res.equals(dfs[-1])

Expand Down