Skip to content

(perf): Distribute timestream write with executor #1715

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Oct 27, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions awswrangler/_data_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -779,14 +779,12 @@ def database_types_from_pandas(
return database_types


def timestream_type_from_pandas(df: pd.DataFrame) -> str:
def timestream_type_from_pandas(df: pd.DataFrame) -> List[str]:
"""Extract Amazon Timestream types from a Pandas DataFrame."""
pyarrow_types: Dict[str, Optional[pa.DataType]] = pyarrow_types_from_pandas(df=df, index=False, ignore_cols=[])
if len(pyarrow_types) != 1 or list(pyarrow_types.values())[0] is None:
raise RuntimeError(f"Invalid pyarrow_types: {pyarrow_types}")
pyarrow_type: pa.DataType = list(pyarrow_types.values())[0]
_logger.debug("pyarrow_type: %s", pyarrow_type)
return pyarrow2timestream(dtype=pyarrow_type)
return [
pyarrow2timestream(pyarrow_type)
for pyarrow_type in pyarrow_types_from_pandas(df=df, index=False, ignore_cols=[]).values()
]


def get_arrow_timestamp_unit(data_type: pa.lib.DataType) -> Any:
Expand Down
6 changes: 6 additions & 0 deletions awswrangler/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,12 @@ def check_schema_changes(columns_types: Dict[str, str], table_input: Optional[Di
)


@engine.dispatch_on_engine
def split_pandas_frame(df: pd.DataFrame, splits: int) -> List[pd.DataFrame]:
"""Split a DataFrame into n chunks."""
return [sub_df for sub_df in np.array_split(df, splits) if not sub_df.empty] # type: ignore


@engine.dispatch_on_engine
def table_refs_to_df(tables: List[pa.Table], kwargs: Dict[str, Any]) -> pd.DataFrame: # type: ignore
"""Build Pandas dataframe from list of PyArrow tables."""
Expand Down
14 changes: 11 additions & 3 deletions awswrangler/distributed/ray/_register.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# pylint: disable=import-outside-toplevel
from awswrangler._data_types import pyarrow_types_from_pandas
from awswrangler._distributed import MemoryFormatEnum, engine, memory_format
from awswrangler._utils import is_pandas_frame, table_refs_to_df
from awswrangler._utils import is_pandas_frame, split_pandas_frame, table_refs_to_df
from awswrangler.distributed.ray import ray_remote
from awswrangler.lakeformation._read import _get_work_unit_results
from awswrangler.s3._delete import _delete_objects
Expand All @@ -13,6 +13,7 @@
from awswrangler.s3._write_dataset import _to_buckets, _to_partitions
from awswrangler.s3._write_parquet import _to_parquet
from awswrangler.s3._write_text import _to_text
from awswrangler.timestream import _write_batch, _write_df


def register_ray() -> None:
Expand All @@ -24,12 +25,18 @@ def register_ray() -> None:
_select_query,
_select_object_content,
_wait_object_batch,
_write_batch,
_write_df,
]:
engine.register_func(func, ray_remote(func))

if memory_format.get() == MemoryFormatEnum.MODIN:
from awswrangler.distributed.ray.modin._data_types import pyarrow_types_from_pandas_distributed
from awswrangler.distributed.ray.modin._utils import _arrow_refs_to_df, _is_pandas_or_modin_frame
from awswrangler.distributed.ray.modin._utils import (
_arrow_refs_to_df,
_is_pandas_or_modin_frame,
_split_modin_frame,
)
from awswrangler.distributed.ray.modin.s3._read_parquet import _read_parquet_distributed
from awswrangler.distributed.ray.modin.s3._read_text import _read_text_distributed
from awswrangler.distributed.ray.modin.s3._write_dataset import (
Expand All @@ -47,7 +54,8 @@ def register_ray() -> None:
_to_parquet: _to_parquet_distributed,
_to_partitions: _to_partitions_distributed,
_to_text: _to_text_distributed,
table_refs_to_df: _arrow_refs_to_df,
is_pandas_frame: _is_pandas_or_modin_frame,
split_pandas_frame: _split_modin_frame,
table_refs_to_df: _arrow_refs_to_df,
}.items():
engine.register_func(o_f, d_f) # type: ignore
6 changes: 6 additions & 0 deletions awswrangler/distributed/ray/modin/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from modin.distributed.dataframe.pandas import from_partitions
from ray.data._internal.arrow_block import ArrowBlockAccessor, ArrowRow
from ray.data._internal.remote_fn import cached_remote_fn
from ray.types import ObjectRef

from awswrangler import exceptions
from awswrangler._arrow import _table_to_df
Expand Down Expand Up @@ -43,6 +44,11 @@ def _to_modin(
)


def _split_modin_frame(df: modin_pd.DataFrame, splits: int) -> List[ObjectRef[Any]]: # pylint: disable=unused-argument
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not 100% convinced that this is the best way to split a modin dataframe

object_refs: List[ObjectRef[Any]] = ray.data.from_modin(df).get_internal_block_refs()
return object_refs


def _arrow_refs_to_df(arrow_refs: List[Callable[..., Any]], kwargs: Optional[Dict[str, Any]]) -> modin_pd.DataFrame:
return _to_modin(dataset=ray.data.from_arrow_refs(arrow_refs), to_pandas_kwargs=kwargs)

Expand Down
95 changes: 69 additions & 26 deletions awswrangler/timestream.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Amazon Timestream Module."""

import concurrent.futures
import itertools
import logging
from datetime import datetime
Expand All @@ -11,10 +10,17 @@
from botocore.config import Config

from awswrangler import _data_types, _utils
from awswrangler._distributed import engine
from awswrangler._threading import _get_executor
from awswrangler.distributed.ray import ray_get

_logger: logging.Logger = logging.getLogger(__name__)


def _flatten_list(elements: List[List[Any]]) -> List[Any]:
return [item for sublist in elements for item in sublist]


def _df2list(df: pd.DataFrame) -> List[List[Any]]:
"""Extract Parameters."""
parameters: List[List[Any]] = df.values.tolist()
Expand All @@ -27,17 +33,17 @@ def _df2list(df: pd.DataFrame) -> List[List[Any]]:
return parameters


@engine.dispatch_on_engine
def _write_batch(
boto3_session: Optional[boto3.Session],
database: str,
table: str,
cols_names: List[str],
measure_cols_names: List[str],
measure_types: List[str],
version: int,
batch: List[Any],
boto3_primitives: _utils.Boto3PrimitivesType,
) -> List[Dict[str, str]]:
boto3_session: boto3.Session = _utils.boto3_from_primitives(primitives=boto3_primitives)
client: boto3.client = _utils.client(
service_name="timestream-write",
session=boto3_session,
Expand Down Expand Up @@ -85,6 +91,33 @@ def _write_batch(
return []


@engine.dispatch_on_engine
def _write_df(
df: pd.DataFrame,
executor: Any,
database: str,
table: str,
cols_names: List[str],
measure_cols_names: List[str],
measure_types: List[str],
version: int,
boto3_session: Optional[boto3.Session] = None,
) -> List[Dict[str, str]]:
batches: List[List[Any]] = _utils.chunkify(lst=_df2list(df=df), max_length=100)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the split modin dataframe block reference id is received. I assume modin/ray is smart enough to avoid a shuffle (i.e. pulling a block from one worker to another) and would instead run the remote functions (_write_df and _write_batch) in the worker where the block already exists...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The blocks would be broken down into batches and sent to workers so unfortunately some shuffle or rather copy will inevitably happen. One thing I'm afraid of is max_length=100 - these would be too fine-grained tasks, might not be worth it because of the overhead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point, the load test on 64,000 rows was fine but let me check with an even larger one tomorrow

_logger.debug("len(batches): %s", len(batches))
return executor.map( # type: ignore
_write_batch,
boto3_session,
itertools.repeat(database),
itertools.repeat(table),
itertools.repeat(cols_names),
itertools.repeat(measure_cols_names),
itertools.repeat(measure_types),
itertools.repeat(version),
batches,
)


def _cast_value(value: str, dtype: str) -> Any: # pylint: disable=too-many-branches,too-many-return-statements
if dtype == "VARCHAR":
return value
Expand Down Expand Up @@ -173,14 +206,18 @@ def write(
measure_col: Union[str, List[str]],
dimensions_cols: List[str],
version: int = 1,
num_threads: int = 32,
use_threads: Union[bool, int] = True,
boto3_session: Optional[boto3.Session] = None,
) -> List[Dict[str, str]]:
"""Store a Pandas DataFrame into a Amazon Timestream table.

Note
----
In case `use_threads=True`, the number of threads from os.cpu_count() is used.

Parameters
----------
df: pandas.DataFrame
df : pandas.DataFrame
Pandas DataFrame https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html
database : str
Amazon Timestream database name.
Expand All @@ -195,8 +232,10 @@ def write(
version : int
Version number used for upserts.
Documentation https://docs.aws.amazon.com/timestream/latest/developerguide/API_WriteRecords.html.
num_threads : str
Number of thread to be used for concurrent writing.
use_threads : bool, int
True to enable concurrent writing, False to disable multiple threads.
If enabled, os.cpu_count() is used as the number of threads.
If integer is provided, specified number is used.
boto3_session : boto3.Session(), optional
Boto3 Session. The default boto3 Session will be used if boto3_session receive None.

Expand Down Expand Up @@ -232,29 +271,33 @@ def write(
"""
measure_cols_names: List[str] = measure_col if isinstance(measure_col, list) else [measure_col]
_logger.debug("measure_cols_names: %s", measure_cols_names)
measure_types: List[str] = [
_data_types.timestream_type_from_pandas(df[[measure_col_name]]) for measure_col_name in measure_cols_names
]
measure_types: List[str] = _data_types.timestream_type_from_pandas(df.loc[:, measure_cols_names])
_logger.debug("measure_types: %s", measure_types)
cols_names: List[str] = [time_col] + measure_cols_names + dimensions_cols
_logger.debug("cols_names: %s", cols_names)
batches: List[List[Any]] = _utils.chunkify(lst=_df2list(df=df[cols_names]), max_length=100)
_logger.debug("len(batches): %s", len(batches))
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
res: List[List[Any]] = list(
executor.map(
_write_batch,
itertools.repeat(database),
itertools.repeat(table),
itertools.repeat(cols_names),
itertools.repeat(measure_cols_names),
itertools.repeat(measure_types),
itertools.repeat(version),
batches,
itertools.repeat(_utils.boto3_to_primitives(boto3_session=boto3_session)),
)
dfs = _utils.split_pandas_frame(df.loc[:, cols_names], _utils.ensure_cpu_count(use_threads=use_threads))
_logger.debug("len(dfs): %s", len(dfs))

executor = _get_executor(use_threads=use_threads)
errors = _flatten_list(
ray_get(
[
_write_df(
df=df,
executor=executor,
database=database,
table=table,
cols_names=cols_names,
measure_cols_names=measure_cols_names,
measure_types=measure_types,
version=version,
boto3_session=boto3_session,
)
for df in dfs
]
)
return [item for sublist in res for item in sublist]
)
return _flatten_list(ray_get(errors))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two _flatten_list(ray_get()) where required here because of the imbricated ray remote methods (_write_df and _write_batch). This is not needed in S3 select for instance because we feed the ray reference ids from the first ray_get to a Ray dataset



def query(
Expand Down
67 changes: 67 additions & 0 deletions tests/load/test_database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from datetime import datetime

import pytest
import ray
from pyarrow import csv

import awswrangler as wr

from .._utils import ExecutionTimer


@pytest.mark.parametrize("benchmark_time", [180])
def test_real_csv_load_scenario(benchmark_time: int, timestream_database_and_table: str) -> None:
name = timestream_database_and_table
df = (
ray.data.read_csv(
"https://raw.githubusercontent.com/awslabs/amazon-timestream-tools/mainline/sample_apps/data/sample.csv",
**{
"read_options": csv.ReadOptions(
column_names=[
"ignore0",
"region",
"ignore1",
"az",
"ignore2",
"hostname",
"measure_kind",
"measure",
"ignore3",
"ignore4",
"ignore5",
]
)
},
)
.to_modin()
.loc[:, ["region", "az", "hostname", "measure_kind", "measure"]]
)

df["time"] = datetime.now()
df.reset_index(inplace=True, drop=False)
df_cpu = df[df.measure_kind == "cpu_utilization"]
df_memory = df[df.measure_kind == "memory_utilization"]

with ExecutionTimer("elapsed time of wr.timestream.write()") as timer:
rejected_records = wr.timestream.write(
df=df_cpu,
database=name,
table=name,
time_col="time",
measure_col="measure",
dimensions_cols=["index", "region", "az", "hostname"],
)
assert len(rejected_records) == 0
rejected_records = wr.timestream.write(
df=df_memory,
database=name,
table=name,
time_col="time",
measure_col="measure",
dimensions_cols=["index", "region", "az", "hostname"],
)
assert len(rejected_records) == 0
assert timer.elapsed_time < benchmark_time

df = wr.timestream.query(f'SELECT COUNT(*) AS counter FROM "{name}"."{name}"')
assert df["counter"].iloc[0] == 126_000
Loading