Skip to content

Commit 0745e6a

Browse files
committed
Add ignore_null to read_parquet_metadata
1 parent bac44ea commit 0745e6a

File tree

3 files changed

+30
-5
lines changed

3 files changed

+30
-5
lines changed

awswrangler/_data_types.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
_logger: logging.Logger = logging.getLogger(__name__)
1717

1818

19-
def pyarrow2athena(dtype: pa.DataType) -> str: # pylint: disable=too-many-branches,too-many-return-statements
19+
def pyarrow2athena(dtype: pa.DataType, ignore_null: bool) -> Optional[str]: # pylint: disable=too-many-branches,too-many-return-statements
2020
"""Pyarrow to Athena data types conversion."""
2121
if pa.types.is_int8(dtype):
2222
return "tinyint"
@@ -53,6 +53,8 @@ def pyarrow2athena(dtype: pa.DataType) -> str: # pylint: disable=too-many-branc
5353
if pa.types.is_map(dtype):
5454
return f"map<{pyarrow2athena(dtype=dtype.key_type)}, {pyarrow2athena(dtype=dtype.item_type)}>"
5555
if dtype == pa.null():
56+
if ignore_null:
57+
return None
5658
raise exceptions.UndetectedType("We can not infer the data type from an entire null object column")
5759
raise exceptions.UnsupportedType(f"Unsupported Pyarrow type: {dtype}")
5860

@@ -585,14 +587,14 @@ def pyarrow_schema_from_pandas(
585587

586588

587589
def athena_types_from_pyarrow_schema(
588-
schema: pa.Schema, partitions: Optional[pyarrow.parquet.ParquetPartitions]
590+
schema: pa.Schema, partitions: Optional[pyarrow.parquet.ParquetPartitions], ignore_null: bool,
589591
) -> Tuple[Dict[str, str], Optional[Dict[str, str]]]:
590592
"""Extract the related Athena data types from any PyArrow Schema considering possible partitions."""
591-
columns_types: Dict[str, str] = {str(f.name): pyarrow2athena(dtype=f.type) for f in schema}
593+
columns_types: Dict[str, str] = {str(f.name): pyarrow2athena(dtype=f.type, ignore_null=ignore_null) for f in schema}
592594
_logger.debug("columns_types: %s", columns_types)
593595
partitions_types: Optional[Dict[str, str]] = None
594596
if partitions is not None:
595-
partitions_types = {p.name: pyarrow2athena(p.dictionary.type) for p in partitions}
597+
partitions_types = {p.name: pyarrow2athena(p.dictionary.type, ignore_null=ignore_null) for p in partitions}
596598
_logger.debug("partitions_types: %s", partitions_types)
597599
return columns_types, partitions_types
598600

awswrangler/s3/_read_parquet.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ def _read_parquet_metadata_file(
6060
s3_additional_kwargs: Optional[Dict[str, str]],
6161
use_threads: Union[bool, int],
6262
version_id: Optional[str] = None,
63+
ignore_null=False,
6364
pyarrow_additional_kwargs: Optional[Dict[str, Any]] = None,
6465
) -> Optional[Dict[str, str]]:
6566
pyarrow_args = _set_default_pyarrow_additional_kwargs(pyarrow_additional_kwargs)
@@ -77,7 +78,7 @@ def _read_parquet_metadata_file(
7778
)
7879
if pq_file is None:
7980
return None
80-
return _data_types.athena_types_from_pyarrow_schema(schema=pq_file.schema.to_arrow_schema(), partitions=None)[0]
81+
return _data_types.athena_types_from_pyarrow_schema(schema=pq_file.schema.to_arrow_schema(), partitions=None, ignore_null=ignore_null)[0]
8182

8283

8384
def _read_schemas_from_files(
@@ -87,6 +88,7 @@ def _read_schemas_from_files(
8788
boto3_session: boto3.Session,
8889
s3_additional_kwargs: Optional[Dict[str, str]],
8990
version_ids: Optional[Dict[str, str]] = None,
91+
ignore_null: bool = False,
9092
pyarrow_additional_kwargs: Optional[Dict[str, Any]] = None,
9193
) -> Tuple[Dict[str, str], ...]:
9294

@@ -102,6 +104,7 @@ def _read_schemas_from_files(
102104
s3_additional_kwargs=s3_additional_kwargs,
103105
use_threads=use_threads,
104106
version_id=version_ids.get(p) if isinstance(version_ids, dict) else None,
107+
ignore_null=ignore_null,
105108
pyarrow_additional_kwargs=pyarrow_additional_kwargs,
106109
)
107110
for p in paths
@@ -117,6 +120,7 @@ def _read_schemas_from_files(
117120
itertools.repeat(s3_additional_kwargs),
118121
itertools.repeat(use_threads),
119122
versions,
123+
itertools.repeat(ignore_null),
120124
itertools.repeat(pyarrow_additional_kwargs),
121125
)
122126
)
@@ -175,6 +179,7 @@ def _read_parquet_metadata(
175179
path_suffix: Optional[str],
176180
path_ignore_suffix: Optional[str],
177181
ignore_empty: bool,
182+
ignore_null: bool,
178183
dtype: Optional[Dict[str, str]],
179184
sampling: float,
180185
dataset: bool,
@@ -207,6 +212,7 @@ def _read_parquet_metadata(
207212
else {paths[0]: version_id}
208213
if isinstance(version_id, str)
209214
else None,
215+
ignore_null=ignore_null,
210216
pyarrow_additional_kwargs=pyarrow_additional_kwargs,
211217
)
212218
columns_types: Dict[str, str] = _merge_schemas(schemas=schemas)
@@ -990,6 +996,7 @@ def read_parquet_metadata(
990996
path_suffix: Optional[str] = None,
991997
path_ignore_suffix: Optional[str] = None,
992998
ignore_empty: bool = True,
999+
ignore_null: bool = False,
9931000
dtype: Optional[Dict[str, str]] = None,
9941001
sampling: float = 1.0,
9951002
dataset: bool = False,
@@ -1030,6 +1037,8 @@ def read_parquet_metadata(
10301037
If None, will try to read all files. (default)
10311038
ignore_empty: bool
10321039
Ignore files with 0 bytes.
1040+
ignore_null: bool
1041+
Ignore columns with null type.
10331042
dtype : Dict[str, str], optional
10341043
Dictionary of columns names and Athena/Glue types to be casted.
10351044
Useful when you have columns with undetermined data types as partitions columns.
@@ -1083,6 +1092,7 @@ def read_parquet_metadata(
10831092
path_suffix=path_suffix,
10841093
path_ignore_suffix=path_ignore_suffix,
10851094
ignore_empty=ignore_empty,
1095+
ignore_null=ignore_null,
10861096
dtype=dtype,
10871097
sampling=sampling,
10881098
dataset=dataset,

tests/test_s3_parquet.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,19 @@ def test_parquet_metadata_partitions_dataset(path, partition_cols):
2828
assert (columns_types.get("c1") == "bigint") or (partitions_types.get("c1") == "string")
2929

3030

31+
def test_read_parquet_metadata_nulls(path):
32+
df = pd.DataFrame({"c0": [None, None, None], "c1": [1, 2, 3], "c2": ["a", "b", "c"]})
33+
path = f"{path}df.parquet"
34+
wr.s3.to_parquet(df, path)
35+
with pytest.raises(wr.exceptions.UndetectedType):
36+
wr.s3.read_parquet_metadata(path)
37+
columns_types, _ = wr.s3.read_parquet_metadata(path, ignore_null=True)
38+
assert len(columns_types) == len(df.columns)
39+
assert columns_types.get("c0") == None
40+
assert columns_types.get("c1") == "bigint"
41+
assert columns_types.get("c2") == "string"
42+
43+
3144
@pytest.mark.parametrize("partition_cols", [None, ["c2"], ["value", "c2"]])
3245
def test_parquet_cast_string_dataset(path, partition_cols):
3346
df = pd.DataFrame({"id": [1, 2, 3], "value": ["foo", "boo", "bar"], "c2": [4, 5, 6], "c3": [7.0, 8.0, 9.0]})

0 commit comments

Comments
 (0)