Skip to content

Commit

Permalink
Merge branch 'main' into release-3.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
jaidisido committed Aug 11, 2022
2 parents 748215c + c2232aa commit 90a0486
Show file tree
Hide file tree
Showing 20 changed files with 251 additions and 52 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/bandit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ on:
- main
- release-3.0.0

permissions:
contents: read

jobs:
build:
runs-on: ubuntu-latest
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/cfn-nag.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ on:
branches:
- main

permissions:
contents: read

jobs:
build:
runs-on: ubuntu-latest
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/minimal-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ on:
- main
- release-3.0.0

permissions:
contents: read

jobs:
Check:

Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/snyk.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ name: Snyk
on:
workflow_dispatch

permissions:
contents: read

jobs:
security:
runs-on: ubuntu-latest
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/static-checking.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ on:
- main
- release-3.0.0

permissions:
contents: read

jobs:
Check:

Expand Down
6 changes: 6 additions & 0 deletions .github/workflows/unlabel-assigned-issue.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,14 @@ on:
issues:
types:
- assigned
permissions:
contents: read

jobs:
unlabel-issue:
permissions:
issues: write # for andymckay/labeler to label issues
pull-requests: write # for andymckay/labeler to label PRs
runs-on: ubuntu-latest
steps:
- name: unlabel-issues
Expand Down
6 changes: 3 additions & 3 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,11 @@ or

* To run a specific test function:

``pytest tests/test_db.py::test_sql``
``pytest tests/test_mysql.py::test_read_sql_query_simple``

* To run all database test functions (Using 8 parallel processes):
* To run all database test functions for MySQL (Using 8 parallel processes):

``pytest -n 8 tests/test_db.py``
``pytest -n 8 tests/test_mysql.py``

* To run all data lake test functions for all python versions (Only if Amazon QuickSight is activated and Amazon OpenSearch template is deployed):

Expand Down
21 changes: 20 additions & 1 deletion CONTRIBUTING_COMMON_ERRORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,23 @@ https://stackoverflow.com/questions/54302793/dyld-library-not-loaded-usr-local-o
brew install unixodbc
```

-----
-----

## CloudFormation Deployment

### Error Message

During the deployment of `aws-data-wrangler-databases`, the creation of the resource `CodeBuildTestRoleLFPermissions` fails with

```
Resource does not exist or requester is not authorized to access requested permissions. (Service: AWSLakeFormation; Status Code: 400; Error Code: AccessDeniedException; Request ID: 14a26718-ee4e-49f2-a7ca-d308e49485f8; Proxy: null)
```

### Solution

The IAM role used to deploy the CloudForation stack does not have permissions to assign permissions in AWS Lake Formation. The quickest solution is to find the IAM role and set it as an admin in Lake Formation.

In order to find the role:
1. Navigate to the CloudFormation console in your account
1. Select the `aws-data-wrangler-databases` stack which failed to deploy
1. Under the "Stack info" tab, find the value for "IAM role". The name of the role should be in the following format: `arn:aws:iam::{ACCOUNT_ID}:role/cdk-{UUID}-cfn-exec-role-{ACCOUNT_ID}-{REGION}`
11 changes: 11 additions & 0 deletions awswrangler/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class _ConfigArg(NamedTuple):
"lakeformation_endpoint_url": _ConfigArg(dtype=str, nullable=True, enforced=True),
"dynamodb_endpoint_url": _ConfigArg(dtype=str, nullable=True, enforced=True),
"secretsmanager_endpoint_url": _ConfigArg(dtype=str, nullable=True, enforced=True),
"timestream_endpoint_url": _ConfigArg(dtype=str, nullable=True, enforced=True),
# Botocore config
"botocore_config": _ConfigArg(dtype=botocore.config.Config, nullable=True),
"verify": _ConfigArg(dtype=str, nullable=True),
Expand Down Expand Up @@ -78,6 +79,7 @@ def __init__(self) -> None:
self.lakeformation_endpoint_url = None
self.dynamodb_endpoint_url = None
self.secretsmanager_endpoint_url = None
self.timestream_endpoint_url = None
self.botocore_config = None
self.verify = None
self.distributed = all(importlib.util.find_spec(pkg) for pkg in ("modin", "ray"))
Expand Down Expand Up @@ -398,6 +400,15 @@ def secretsmanager_endpoint_url(self) -> Optional[str]:
def secretsmanager_endpoint_url(self, value: Optional[str]) -> None:
self._set_config_value(key="secretsmanager_endpoint_url", value=value)

@property
def timestream_endpoint_url(self) -> Optional[str]:
"""Property timestream_endpoint_url."""
return cast(Optional[str], self["timestream_endpoint_url"])

@timestream_endpoint_url.setter
def timestream_endpoint_url(self, value: Optional[str]) -> None:
self._set_config_value(key="timestream_endpoint_url", value=value)

@property
def botocore_config(self) -> botocore.config.Config:
"""Property botocore_config."""
Expand Down
2 changes: 1 addition & 1 deletion awswrangler/_databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def _get_connection_attributes_from_secrets_manager(
user=secret_value["username"],
password=secret_value["password"],
host=secret_value["host"],
port=secret_value["port"],
port=int(secret_value["port"]),
database=_dbname,
ssl_context=None,
)
Expand Down
2 changes: 2 additions & 0 deletions awswrangler/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ def _get_endpoint_url(service_name: str) -> Optional[str]:
endpoint_url = _config.config.dynamodb_endpoint_url
elif service_name == "secretsmanager" and _config.config.secretsmanager_endpoint_url is not None:
endpoint_url = _config.config.secretsmanager_endpoint_url
elif service_name == "timestream" and _config.config.timestream_endpoint_url is not None:
endpoint_url = _config.config.timestream_endpoint_url
return endpoint_url


Expand Down
3 changes: 2 additions & 1 deletion awswrangler/athena/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Amazon Athena Module."""

from awswrangler.athena._read import read_sql_query, read_sql_table, unload # noqa
from awswrangler.athena._read import get_query_results, read_sql_query, read_sql_table, unload # noqa
from awswrangler.athena._utils import ( # noqa
create_athena_bucket,
create_ctas_table,
Expand All @@ -23,6 +23,7 @@
"describe_table",
"get_query_columns_types",
"get_query_execution",
"get_query_results",
"get_named_query_statement",
"get_work_group",
"repair_table",
Expand Down
90 changes: 90 additions & 0 deletions awswrangler/athena/_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,96 @@ def _unload(
return query_metadata


@apply_configs
def get_query_results(
query_execution_id: str,
use_threads: Union[bool, int] = True,
boto3_session: Optional[boto3.Session] = None,
categories: Optional[List[str]] = None,
chunksize: Optional[Union[int, bool]] = None,
s3_additional_kwargs: Optional[Dict[str, Any]] = None,
pyarrow_additional_kwargs: Optional[Dict[str, Any]] = None,
) -> Union[pd.DataFrame, Iterator[pd.DataFrame]]:
"""Get AWS Athena SQL query results as a Pandas DataFrame.
Parameters
----------
query_execution_id : str
SQL query's execution_id on AWS Athena.
use_threads : bool, int
True to enable concurrent requests, False to disable multiple threads.
If enabled os.cpu_count() will be used as the max number of threads.
If integer is provided, specified number is used.
boto3_session : boto3.Session(), optional
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
categories: List[str], optional
List of columns names that should be returned as pandas.Categorical.
Recommended for memory restricted environments.
chunksize : Union[int, bool], optional
If passed will split the data in a Iterable of DataFrames (Memory friendly).
If `True` wrangler will iterate on the data by files in the most efficient way without guarantee of chunksize.
If an `INTEGER` is passed Wrangler will iterate on the data by number of rows igual the received INTEGER.
s3_additional_kwargs : Optional[Dict[str, Any]]
Forwarded to botocore requests.
e.g. s3_additional_kwargs={'RequestPayer': 'requester'}
pyarrow_additional_kwargs : Optional[Dict[str, Any]]
Forward to the ParquetFile class or converting an Arrow table to Pandas, currently only an
"coerce_int96_timestamp_unit" or "timestamp_as_object" argument will be considered. If reading parquet
files where you cannot convert a timestamp to pandas Timestamp[ns] consider setting timestamp_as_object=True,
to allow for timestamp units larger than "ns". If reading parquet data that still uses INT96 (like Athena
outputs) you can use coerce_int96_timestamp_unit to specify what timestamp unit to encode INT96 to (by default
this is "ns", if you know the output parquet came from a system that encodes timestamp to a particular unit
then set this to that same unit e.g. coerce_int96_timestamp_unit="ms").
Returns
-------
Union[pd.DataFrame, Iterator[pd.DataFrame]]
Pandas DataFrame or Generator of Pandas DataFrames if chunksize is passed.
Examples
--------
>>> import awswrangler as wr
>>> res = wr.athena.get_query_results(
... query_execution_id="cbae5b41-8103-4709-95bb-887f88edd4f2"
... )
"""
query_metadata: _QueryMetadata = _get_query_metadata(
query_execution_id=query_execution_id,
boto3_session=boto3_session,
categories=categories,
metadata_cache_manager=_cache_manager,
)
client_athena: boto3.client = _utils.client(service_name="athena", session=boto3_session)
query_info: Dict[str, Any] = client_athena.get_query_execution(QueryExecutionId=query_execution_id)[
"QueryExecution"
]
statement_type: Optional[str] = query_info.get("StatementType")
if (statement_type == "DDL" and query_info["Query"].startswith("CREATE TABLE")) or (
statement_type == "DML" and query_info["Query"].startswith("UNLOAD")
):
return _fetch_parquet_result(
query_metadata=query_metadata,
keep_files=True,
categories=categories,
chunksize=chunksize,
use_threads=use_threads,
boto3_session=boto3_session,
s3_additional_kwargs=s3_additional_kwargs,
pyarrow_additional_kwargs=pyarrow_additional_kwargs,
)
if statement_type == "DML" and not query_info["Query"].startswith("INSERT"):
return _fetch_csv_result(
query_metadata=query_metadata,
keep_files=True,
chunksize=chunksize,
use_threads=use_threads,
boto3_session=boto3_session,
s3_additional_kwargs=s3_additional_kwargs,
)
raise exceptions.UndetectedType(f"""Unable to get results for: {query_info["Query"]}.""")


@apply_configs
def read_sql_query(
sql: str,
Expand Down
2 changes: 1 addition & 1 deletion awswrangler/catalog/_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ def create_database(
args["Description"] = description

try:
r = client_glue.get_database(Name=name)
r = client_glue.get_database(**_catalog_id(catalog_id=catalog_id, Name=name))
if not exist_ok:
raise exceptions.AlreadyExists(f"Database {name} already exists and <exist_ok> is set to False.")
if description and description != r["Database"].get("Description", ""):
Expand Down
Loading

0 comments on commit 90a0486

Please sign in to comment.