Skip to content

Commit 7a3ae7b

Browse files
Brannon Imamurakukushking
Brannon Imamura
andauthored
Fix Redshift Locking Behavior (#1305)
* Make locking behavior more predictable Tables should be locked before any edits happen. In the case of overwrite + delete/truncate method, the lock would happen too late, leaving room to query an empty table. Since all locks happen after a _create_table(), we can move lock into that function for simplicity. * Update redshift.py * pylint Co-authored-by: kukushking <[email protected]>
1 parent 7787f0a commit 7a3ae7b

File tree

1 file changed

+13
-8
lines changed

1 file changed

+13
-8
lines changed

awswrangler/redshift.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ def _redshift_types_from_path(
267267
return redshift_types
268268

269269

270-
def _create_table( # pylint: disable=too-many-locals,too-many-arguments
270+
def _create_table( # pylint: disable=too-many-locals,too-many-arguments,too-many-branches,too-many-statements
271271
df: Optional[pd.DataFrame],
272272
path: Optional[Union[str, List[str]]],
273273
con: redshift_connector.Connection,
@@ -292,6 +292,7 @@ def _create_table( # pylint: disable=too-many-locals,too-many-arguments
292292
use_threads: Union[bool, int] = True,
293293
boto3_session: Optional[boto3.Session] = None,
294294
s3_additional_kwargs: Optional[Dict[str, str]] = None,
295+
lock: bool = False,
295296
) -> Tuple[str, Optional[str]]:
296297
if mode == "overwrite":
297298
if overwrite_method == "truncate":
@@ -306,14 +307,21 @@ def _create_table( # pylint: disable=too-many-locals,too-many-arguments
306307
_logger.debug(str(e))
307308
con.rollback()
308309
_begin_transaction(cursor=cursor)
310+
if lock:
311+
_lock(cursor, [table], schema=schema)
309312
elif overwrite_method == "delete":
310313
if _does_table_exist(cursor=cursor, schema=schema, table=table):
314+
if lock:
315+
_lock(cursor, [table], schema=schema)
311316
# Atomic, but slow.
312317
_delete_all(cursor=cursor, schema=schema, table=table)
313318
else:
314319
# Fast, atomic, but either fails if there are any dependent views or, in cascade mode, deletes them.
315320
_drop_table(cursor=cursor, schema=schema, table=table, cascade=bool(overwrite_method == "cascade"))
321+
# No point in locking here, the oid will change.
316322
elif _does_table_exist(cursor=cursor, schema=schema, table=table) is True:
323+
if lock:
324+
_lock(cursor, [table], schema=schema)
317325
if mode == "upsert":
318326
guid: str = uuid.uuid4().hex
319327
temp_table: str = f"temp_redshift_{guid}"
@@ -378,6 +386,8 @@ def _create_table( # pylint: disable=too-many-locals,too-many-arguments
378386
)
379387
_logger.debug("Create table query:\n%s", sql)
380388
cursor.execute(sql)
389+
if lock:
390+
_lock(cursor, [table], schema=schema)
381391
return table, schema
382392

383393

@@ -889,6 +899,7 @@ def to_sql( # pylint: disable=too-many-locals
889899
primary_keys=primary_keys,
890900
varchar_lengths_default=varchar_lengths_default,
891901
varchar_lengths=varchar_lengths,
902+
lock=lock,
892903
)
893904
if index:
894905
df.reset_index(level=df.index.names, inplace=True)
@@ -905,8 +916,6 @@ def to_sql( # pylint: disable=too-many-locals
905916
_logger.debug("sql: %s", sql)
906917
cursor.executemany(sql, (parameters,))
907918
if table != created_table: # upsert
908-
if lock:
909-
_lock(cursor, [table], schema=schema)
910919
_upsert(
911920
cursor=cursor,
912921
schema=schema,
@@ -1385,10 +1394,8 @@ def copy_from_files( # pylint: disable=too-many-locals,too-many-arguments
13851394
use_threads=use_threads,
13861395
boto3_session=boto3_session,
13871396
s3_additional_kwargs=s3_additional_kwargs,
1397+
lock=lock,
13881398
)
1389-
if lock and table == created_table:
1390-
# Lock before copy if copying into target (not temp) table
1391-
_lock(cursor, [table], schema=schema)
13921399
_copy(
13931400
cursor=cursor,
13941401
path=path,
@@ -1404,8 +1411,6 @@ def copy_from_files( # pylint: disable=too-many-locals,too-many-arguments
14041411
manifest=manifest,
14051412
)
14061413
if table != created_table: # upsert
1407-
if lock:
1408-
_lock(cursor, [table], schema=schema)
14091414
_upsert(
14101415
cursor=cursor,
14111416
schema=schema,

0 commit comments

Comments
 (0)