Skip to content

Add Athena UNLOAD #1038

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jan 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion awswrangler/athena/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Amazon Athena Module."""

from awswrangler.athena._read import read_sql_query, read_sql_table # noqa
from awswrangler.athena._read import read_sql_query, read_sql_table, unload # noqa
from awswrangler.athena._utils import ( # noqa
create_athena_bucket,
describe_table,
Expand All @@ -26,5 +26,6 @@
"show_create_table",
"start_query_execution",
"stop_query_execution",
"unload",
"wait_query",
]
271 changes: 267 additions & 4 deletions awswrangler/athena/_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,54 @@ def _resolve_query_without_cache_ctas(
)


def _resolve_query_without_cache_unload(
sql: str,
file_format: str,
compression: Optional[str],
field_delimiter: Optional[str],
partitioned_by: Optional[List[str]],
database: Optional[str],
data_source: Optional[str],
s3_output: str,
keep_files: bool,
chunksize: Union[int, bool, None],
categories: Optional[List[str]],
encryption: Optional[str],
kms_key: Optional[str],
wg_config: _WorkGroupConfig,
use_threads: Union[bool, int],
s3_additional_kwargs: Optional[Dict[str, Any]],
boto3_session: boto3.Session,
pyarrow_additional_kwargs: Optional[Dict[str, Any]] = None,
) -> Union[pd.DataFrame, Iterator[pd.DataFrame]]:
query_metadata = _unload(
sql,
s3_output,
file_format,
compression,
field_delimiter,
partitioned_by,
wg_config,
database,
encryption,
kms_key,
boto3_session,
data_source,
)
if file_format == "PARQUET":
return _fetch_parquet_result(
query_metadata=query_metadata,
keep_files=keep_files,
categories=categories,
chunksize=chunksize,
use_threads=use_threads,
s3_additional_kwargs=s3_additional_kwargs,
boto3_session=boto3_session,
pyarrow_additional_kwargs=pyarrow_additional_kwargs,
)
raise exceptions.InvalidArgumentValue("Only PARQUET file format is supported when unload_approach=True.")


def _resolve_query_without_cache_regular(
sql: str,
database: Optional[str],
Expand Down Expand Up @@ -390,6 +438,8 @@ def _resolve_query_without_cache(
database: str,
data_source: Optional[str],
ctas_approach: bool,
unload_approach: bool,
unload_parameters: Optional[Dict[str, Any]],
categories: Optional[List[str]],
chunksize: Union[int, bool, None],
s3_output: Optional[str],
Expand Down Expand Up @@ -443,6 +493,29 @@ def _resolve_query_without_cache(
catalog.delete_table_if_exists(
database=ctas_database_name or database, table=name, boto3_session=boto3_session
)
elif unload_approach is True:
if unload_parameters is None:
unload_parameters = {}
return _resolve_query_without_cache_unload(
sql=sql,
file_format=unload_parameters.get("file_format") or "PARQUET",
compression=unload_parameters.get("compression"),
field_delimiter=unload_parameters.get("field_delimiter"),
partitioned_by=unload_parameters.get("partitioned_by"),
database=database,
data_source=data_source,
s3_output=_s3_output,
keep_files=keep_files,
chunksize=chunksize,
categories=categories,
encryption=encryption,
kms_key=kms_key,
wg_config=wg_config,
use_threads=use_threads,
s3_additional_kwargs=s3_additional_kwargs,
boto3_session=boto3_session,
pyarrow_additional_kwargs=pyarrow_additional_kwargs,
)
return _resolve_query_without_cache_regular(
sql=sql,
database=database,
Expand All @@ -461,11 +534,81 @@ def _resolve_query_without_cache(
)


def _unload(
sql: str,
path: str,
file_format: str,
compression: Optional[str],
field_delimiter: Optional[str],
partitioned_by: Optional[List[str]],
wg_config: _WorkGroupConfig,
database: Optional[str],
encryption: Optional[str],
kms_key: Optional[str],
boto3_session: boto3.Session,
data_source: Optional[str],
) -> _QueryMetadata:
# Set UNLOAD parameters
unload_parameters = f" format='{file_format}'"
if compression:
unload_parameters += f" , compression='{compression}'"
if field_delimiter:
unload_parameters += f" , field_delimiter='{field_delimiter}'"
if partitioned_by:
unload_parameters += f" , partitioned_by=ARRAY{partitioned_by}"

sql = f"UNLOAD ({sql}) " f"TO '{path}' " f"WITH ({unload_parameters})"
_logger.debug("sql: %s", sql)
try:
query_id: str = _start_query_execution(
sql=sql,
wg_config=wg_config,
database=database,
data_source=data_source,
s3_output=path,
encryption=encryption,
kms_key=kms_key,
boto3_session=boto3_session,
)
except botocore.exceptions.ClientError as ex:
msg: str = str(ex)
error: Dict[str, Any] = ex.response["Error"]
if error["Code"] == "InvalidRequestException":
raise exceptions.InvalidArgumentValue(f"Exception parsing query. Root error message: {msg}")
raise ex
_logger.debug("query_id: %s", query_id)
try:
query_metadata: _QueryMetadata = _get_query_metadata(
query_execution_id=query_id,
boto3_session=boto3_session,
metadata_cache_manager=_cache_manager,
)
except exceptions.QueryFailed as ex:
msg = str(ex)
if "Column name" in msg and "specified more than once" in msg:
raise exceptions.InvalidArgumentValue(
f"Please, define distinct names for your columns. Root error message: {msg}"
)
if "Column name not specified" in msg:
raise exceptions.InvalidArgumentValue(
"Please, define all columns names in your query. (E.g. 'SELECT MAX(col1) AS max_col1, ...')"
)
if "Column type is unknown" in msg:
raise exceptions.InvalidArgumentValue(
"Please, don't leave undefined columns types in your query. You can cast to ensure it. "
"(E.g. 'SELECT CAST(NULL AS INTEGER) AS MY_COL, ...')"
)
raise ex
return query_metadata


@apply_configs
def read_sql_query(
sql: str,
database: str,
ctas_approach: bool = True,
unload_approach: bool = False,
unload_parameters: Optional[Dict[str, Any]] = None,
categories: Optional[List[str]] = None,
chunksize: Optional[Union[int, bool]] = None,
s3_output: Optional[str] = None,
Expand Down Expand Up @@ -498,7 +641,7 @@ def read_sql_query(
- `Global Configurations <https://aws-data-wrangler.readthedocs.io/en/2.13.0/
tutorials/021%20-%20Global%20Configurations.html>`_

**There are two approaches to be defined through ctas_approach parameter:**
**There are three approaches available through ctas_approach and unload_approach parameters:**

**1** - ctas_approach=True (Default):

Expand All @@ -518,7 +661,25 @@ def read_sql_query(
- A temporary table will be created and then deleted immediately.
- Does not support custom data_source/catalog_id.

**2** - ctas_approach=False:
**2** - unload_approach=True and ctas_approach=False:

Does an UNLOAD query on Athena and parse the Parquet result on s3.

PROS:

- Faster for mid and big result sizes.
- Can handle some level of nested types.
- Does not modify Glue Data Catalog

CONS:

- Output S3 path must be empty.
- Does not support timestamp with time zone.
- Does not support columns with repeated names.
- Does not support columns with undefined data types.
- Does not support custom data_source/catalog_id.

**3** - ctas_approach=False:

Does a regular query on Athena and parse the regular CSV result on s3.

Expand All @@ -534,7 +695,6 @@ def read_sql_query(
- Slower for big results (But stills faster than other libraries that uses the regular Athena's API)
- Does not handle nested types at all.


Note
----
The resulting DataFrame (or every DataFrame in the returned Iterator for chunked queries) have a
Expand Down Expand Up @@ -574,7 +734,7 @@ def read_sql_query(
`P.S.` `chunksize=True` is faster and uses less memory while `chunksize=INTEGER` is more precise
in number of rows for each Dataframe.

`P.P.S.` If `ctas_approach=False` and `chunksize=True`, you will always receive an interador with a
`P.P.S.` If `ctas_approach=False` and `chunksize=True`, you will always receive an iterator with a
single DataFrame because regular Athena queries only produces a single output file.

Note
Expand All @@ -593,6 +753,11 @@ def read_sql_query(
ctas_approach: bool
Wraps the query using a CTAS, and read the resulted parquet data on S3.
If false, read the regular CSV on S3.
unload_approach: bool
Wraps the query using UNLOAD, and read the results from S3.
Only PARQUET format is supported.
unload_parameters : Optional[Dict[str, Any]]
Params of the UNLOAD such as format, compression, field_delimiter, and partitioned_by.
categories: List[str], optional
List of columns names that should be returned as pandas.Categorical.
Recommended for memory restricted environments.
Expand Down Expand Up @@ -691,6 +856,10 @@ def read_sql_query(
"(https://github.com/awslabs/aws-data-wrangler/blob/main/"
"tutorials/006%20-%20Amazon%20Athena.ipynb)"
)
if ctas_approach and unload_approach:
raise exceptions.InvalidArgumentCombination("Only one of ctas_approach=True or unload_approach=True is allowed")
if unload_parameters and unload_parameters.get("file_format") not in (None, "PARQUET"):
raise exceptions.InvalidArgumentCombination("Only PARQUET file format is supported if unload_approach=True")
chunksize = sys.maxsize if ctas_approach is False and chunksize is True else chunksize
session: boto3.Session = _utils.ensure_session(session=boto3_session)
if params is None:
Expand Down Expand Up @@ -730,6 +899,8 @@ def read_sql_query(
database=database,
data_source=data_source,
ctas_approach=ctas_approach,
unload_approach=unload_approach,
unload_parameters=unload_parameters,
categories=categories,
chunksize=chunksize,
s3_output=s3_output,
Expand Down Expand Up @@ -979,3 +1150,95 @@ def read_sql_table(
s3_additional_kwargs=s3_additional_kwargs,
pyarrow_additional_kwargs=pyarrow_additional_kwargs,
)


@apply_configs
def unload(
sql: str,
path: str,
database: str,
file_format: str = "PARQUET",
compression: Optional[str] = None,
field_delimiter: Optional[str] = None,
partitioned_by: Optional[List[str]] = None,
workgroup: Optional[str] = None,
encryption: Optional[str] = None,
kms_key: Optional[str] = None,
boto3_session: Optional[boto3.Session] = None,
data_source: Optional[str] = None,
params: Optional[Dict[str, Any]] = None,
) -> _QueryMetadata:
"""Write query results from a SELECT statement to the specified data format using UNLOAD.

https://docs.aws.amazon.com/athena/latest/ug/unload.html

Parameters
----------
sql : str
SQL query.
path : str, optional
Amazon S3 path.
database : str
AWS Glue/Athena database name - It is only the origin database from where the query will be launched.
You can still using and mixing several databases writing the full table name within the sql
(e.g. `database.table`).
file_format : str
File format of the output. Possible values are ORC, PARQUET, AVRO, JSON, or TEXTFILE
compression : Optional[str]
This option is specific to the ORC and Parquet formats. For ORC, possible values are lz4, snappy, zlib, or zstd.
For Parquet, possible values are gzip or snappy. For ORC, the default is zlib, and for Parquet,
the default is gzip.
field_delimiter : str
A single-character field delimiter for files in CSV, TSV, and other text formats.
partitioned_by : Optional[List[str]]
An array list of columns by which the output is partitioned.
workgroup : str, optional
Athena workgroup.
encryption : str, optional
Valid values: [None, 'SSE_S3', 'SSE_KMS']. Notice: 'CSE_KMS' is not supported.
kms_key : str, optional
For SSE-KMS, this is the KMS key ARN or ID.
boto3_session : boto3.Session(), optional
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
data_source : str, optional
Data Source / Catalog name. If None, 'AwsDataCatalog' will be used by default.
params: Dict[str, any], optional
Dict of parameters that will be used for constructing the SQL query. Only named parameters are supported.
The dict needs to contain the information in the form {'name': 'value'} and the SQL query needs to contain
`:name;`. Note that for varchar columns and similar, you must surround the value in single quotes.

Returns
-------
_QueryMetadata
Query metadata including query execution id, dtypes, manifest & output location.

Examples
--------
>>> import awswrangler as wr
>>> res = wr.athena.unload(
... sql="SELECT * FROM my_table WHERE name=:name; AND city=:city;",
... params={"name": "'filtered_name'", "city": "'filtered_city'"}
... )

"""
session: boto3.Session = _utils.ensure_session(session=boto3_session)
wg_config: _WorkGroupConfig = _get_workgroup_config(session=session, workgroup=workgroup)
# Substitute query parameters
if params is None:
params = {}
for key, value in params.items():
sql = sql.replace(f":{key};", str(value))
return _unload(
sql,
path,
file_format,
compression,
field_delimiter,
partitioned_by,
wg_config,
database,
encryption,
kms_key,
session,
data_source,
)
8 changes: 4 additions & 4 deletions awswrangler/athena/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,13 @@ def _get_query_metadata( # pylint: disable=too-many-statements
for col_name, col_type in cols_types.items():
if col_type == "array":
raise exceptions.UnsupportedType(
"List data type is not support with ctas_approach=False. "
"Please use ctas_approach=True for List columns."
"List data type is not supported with regular (non-CTAS and non-UNLOAD) queries. "
"Please use ctas_approach=True or unload_approach=True for List columns."
)
if col_type == "row":
raise exceptions.UnsupportedType(
"Struct data type is not support with ctas_approach=False. "
"Please use ctas_approach=True for Struct columns."
"Struct data type is not supported with regular (non-CTAS and non-UNLOAD) queries. "
"Please use ctas_approach=True or unload_approach=True for Struct columns."
)
pandas_type: str = _data_types.athena2pandas(dtype=col_type)
if (categories is not None) and (col_name in categories):
Expand Down
Loading