Skip to content

Commit 41c79fc

Browse files
committed
Tests passing locally, ready for CB submission
1 parent 0985d48 commit 41c79fc

File tree

5 files changed

+151
-276
lines changed

5 files changed

+151
-276
lines changed

awswrangler/s3/_read_parquet.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ def _read_parquet_chunked(
246246
use_threads: Union[bool, int],
247247
arrow_kwargs: Dict[str, Any],
248248
) -> Iterator[pd.DataFrame]:
249-
batch_size = None if isinstance(chunked, bool) else chunked
249+
batch_size = None if chunked is True else chunked
250250
tables = _utils.batches_to_table(
251251
pieces=pieces,
252252
schema=schema,
@@ -255,8 +255,24 @@ def _read_parquet_chunked(
255255
use_threads=use_threads,
256256
batch_size=batch_size,
257257
)
258+
259+
next_slice: Optional[pd.DataFrame] = None
258260
for table in tables:
259-
yield _table_to_df(table=table, kwargs=arrow_kwargs)
261+
df = _table_to_df(table=table, kwargs=arrow_kwargs)
262+
if chunked is True:
263+
yield df
264+
else:
265+
if next_slice is not None:
266+
df = pd.concat(objs=[next_slice, df], sort=False, copy=False)
267+
while len(df.index) >= chunked:
268+
yield df.iloc[:chunked, :].copy()
269+
df = df.iloc[chunked:, :]
270+
if df.empty:
271+
next_slice = None
272+
else:
273+
next_slice = df
274+
if next_slice is not None:
275+
yield next_slice
260276

261277

262278
def _read_parquet(

0 commit comments

Comments
 (0)