Skip to content

Commit ae9086d

Browse files
author
Brannon Imamura
committed
Make locking behavior more predictable
Tables should be locked before any edits happen. In the case of overwrite + delete/truncate method, the lock would happen too late, leaving room to query an empty table. Since all locks happen after a _create_table(), we can move lock into that function for simplicity.
1 parent f55af39 commit ae9086d

File tree

1 file changed

+11
-7
lines changed

1 file changed

+11
-7
lines changed

awswrangler/redshift.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,7 @@ def _create_table( # pylint: disable=too-many-locals,too-many-arguments
277277
use_threads: Union[bool, int] = True,
278278
boto3_session: Optional[boto3.Session] = None,
279279
s3_additional_kwargs: Optional[Dict[str, str]] = None,
280+
lock: bool = False,
280281
) -> Tuple[str, Optional[str]]:
281282
if mode == "overwrite":
282283
if overwrite_method == "truncate":
@@ -291,14 +292,21 @@ def _create_table( # pylint: disable=too-many-locals,too-many-arguments
291292
_logger.debug(str(e))
292293
con.rollback()
293294
_begin_transaction(cursor=cursor)
295+
if lock:
296+
_lock(cursor, [table], schema=schema)
294297
elif overwrite_method == "delete":
295298
if _does_table_exist(cursor=cursor, schema=schema, table=table):
299+
if lock:
300+
_lock(cursor, [table], schema=schema)
296301
# Atomic, but slow.
297302
_delete_all(cursor=cursor, schema=schema, table=table)
298303
else:
299304
# Fast, atomic, but either fails if there are any dependent views or, in cascade mode, deletes them.
300305
_drop_table(cursor=cursor, schema=schema, table=table, cascade=bool(overwrite_method == "cascade"))
306+
# No point in locking here, the oid will change.
301307
elif _does_table_exist(cursor=cursor, schema=schema, table=table) is True:
308+
if lock:
309+
_lock(cursor, [table], schema=schema)
302310
if mode == "upsert":
303311
guid: str = uuid.uuid4().hex
304312
temp_table: str = f"temp_redshift_{guid}"
@@ -363,6 +371,8 @@ def _create_table( # pylint: disable=too-many-locals,too-many-arguments
363371
)
364372
_logger.debug("Create table query:\n%s", sql)
365373
cursor.execute(sql)
374+
if lock:
375+
_lock(cursor, [table], schema=schema)
366376
return table, schema
367377

368378

@@ -869,6 +879,7 @@ def to_sql( # pylint: disable=too-many-locals
869879
primary_keys=primary_keys,
870880
varchar_lengths_default=varchar_lengths_default,
871881
varchar_lengths=varchar_lengths,
882+
lock=lock,
872883
)
873884
if index:
874885
df.reset_index(level=df.index.names, inplace=True)
@@ -885,8 +896,6 @@ def to_sql( # pylint: disable=too-many-locals
885896
_logger.debug("sql: %s", sql)
886897
cursor.executemany(sql, (parameters,))
887898
if table != created_table: # upsert
888-
if lock:
889-
_lock(cursor, [table], schema=schema)
890899
_upsert(cursor=cursor, schema=schema, table=table, temp_table=created_table, primary_keys=primary_keys)
891900
if commit_transaction:
892901
con.commit()
@@ -1354,9 +1363,6 @@ def copy_from_files( # pylint: disable=too-many-locals,too-many-arguments
13541363
boto3_session=boto3_session,
13551364
s3_additional_kwargs=s3_additional_kwargs,
13561365
)
1357-
if lock and table == created_table:
1358-
# Lock before copy if copying into target (not temp) table
1359-
_lock(cursor, [table], schema=schema)
13601366
_copy(
13611367
cursor=cursor,
13621368
path=path,
@@ -1372,8 +1378,6 @@ def copy_from_files( # pylint: disable=too-many-locals,too-many-arguments
13721378
manifest=manifest,
13731379
)
13741380
if table != created_table: # upsert
1375-
if lock:
1376-
_lock(cursor, [table], schema=schema)
13771381
_upsert(cursor=cursor, schema=schema, table=table, temp_table=created_table, primary_keys=primary_keys)
13781382
if commit_transaction:
13791383
con.commit()

0 commit comments

Comments
 (0)