Skip to content

Commit 1419cf3

Browse files
authored
fix: Pin redis-py to 4.5.5 and enhance logging for Redis reconnection and failover (#1620)
1 parent 833ed54 commit 1419cf3

26 files changed

+310
-218
lines changed

changes/1620.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Improve logging when retrying redis connections during failover and use explicit names for all redis connection pools

python.lock

Lines changed: 137 additions & 138 deletions
Large diffs are not rendered by default.

requirements.txt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ dataclasses-json~=0.5.7
2929
etcetra==0.1.17
3030
faker~=13.12.0
3131
graphene~=2.1.9
32-
hiredis~=2.2.3
3332
humanize>=3.1.0
3433
ifaddr~=0.2
3534
inquirer~=2.9.2
@@ -55,7 +54,8 @@ pyzmq~=24.0.1
5554
PyJWT~=2.0
5655
PyYAML~=6.0
5756
packaging>=21.3
58-
redis[hiredis]~=4.6.0
57+
hiredis>=2.2.3
58+
redis[hiredis]==4.5.5
5959
rich~=12.2
6060
SQLAlchemy[postgresql_asyncpg]~=1.4.40
6161
setproctitle~=1.3.2
@@ -70,7 +70,7 @@ trafaret~=2.1
7070
treelib==1.6.1
7171
typeguard~=2.10
7272
typing_extensions~=4.3
73-
uvloop>=0.17; sys_platform != "Windows"
73+
uvloop~=0.17.0; sys_platform != "Windows" # 0.18 breaks the API and adds Python 3.12 support
7474
yarl~=1.8.2 # FIXME: revert to >=1.7 after aio-libs/yarl#862 is resolved
7575
zipstream-new~=1.1.8
7676

src/ai/backend/agent/agent.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@
6767

6868
from ai.backend.common import msgpack, redis_helper
6969
from ai.backend.common.config import model_definition_iv
70-
from ai.backend.common.defs import REDIS_STREAM_DB
70+
from ai.backend.common.defs import REDIS_STAT_DB, REDIS_STREAM_DB
7171
from ai.backend.common.docker import MAX_KERNELSPEC, MIN_KERNELSPEC, ImageRef
7272
from ai.backend.common.events import (
7373
AbstractEvent,
@@ -631,8 +631,16 @@ async def __ainit__(self) -> None:
631631
node_id=self.local_config["agent"]["id"],
632632
consumer_group=EVENT_DISPATCHER_CONSUMER_GROUP,
633633
)
634-
self.redis_stream_pool = redis_helper.get_redis_object(self.local_config["redis"], db=4)
635-
self.redis_stat_pool = redis_helper.get_redis_object(self.local_config["redis"], db=0)
634+
self.redis_stream_pool = redis_helper.get_redis_object(
635+
self.local_config["redis"],
636+
name="stream",
637+
db=REDIS_STREAM_DB,
638+
)
639+
self.redis_stat_pool = redis_helper.get_redis_object(
640+
self.local_config["redis"],
641+
name="stat",
642+
db=REDIS_STAT_DB,
643+
)
636644

637645
alloc_map_mod.log_alloc_map = self.local_config["debug"]["log-alloc-map"]
638646
computers = await self.load_resources()

src/ai/backend/common/events.py

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,8 @@
99
import socket
1010
import uuid
1111
from collections import defaultdict
12-
from types import TracebackType
1312
from typing import (
1413
Any,
15-
Awaitable,
1614
Callable,
1715
ClassVar,
1816
Coroutine,
@@ -32,8 +30,8 @@
3230
from aiotools.context import aclosing
3331
from aiotools.server import process_index
3432
from aiotools.taskgroup import PersistentTaskGroup
33+
from aiotools.taskgroup.types import AsyncExceptionHandler
3534
from redis.asyncio import ConnectionPool
36-
from typing_extensions import TypeAlias
3735

3836
from . import msgpack, redis_helper
3937
from .logging import BraceStyleAdapter
@@ -60,10 +58,6 @@
6058

6159
log = BraceStyleAdapter(logging.getLogger(__spec__.name)) # type: ignore[name-defined]
6260

63-
PTGExceptionHandler: TypeAlias = Callable[
64-
[Type[Exception], Exception, TracebackType], Awaitable[None]
65-
]
66-
6761

6862
class AbstractEvent(metaclass=abc.ABCMeta):
6963
# derivatives should define the fields.
@@ -850,16 +844,18 @@ def __init__(
850844
log_events: bool = False,
851845
*,
852846
consumer_group: str,
853-
service_name: str = None,
847+
service_name: str | None = None,
854848
stream_key: str = "events",
855-
node_id: str = None,
856-
consumer_exception_handler: PTGExceptionHandler = None,
857-
subscriber_exception_handler: PTGExceptionHandler = None,
849+
node_id: str | None = None,
850+
consumer_exception_handler: AsyncExceptionHandler | None = None,
851+
subscriber_exception_handler: AsyncExceptionHandler | None = None,
858852
) -> None:
859853
_redis_config = redis_config.copy()
860854
if service_name:
861855
_redis_config["service_name"] = service_name
862-
self.redis_client = redis_helper.get_redis_object(_redis_config, db=db)
856+
self.redis_client = redis_helper.get_redis_object(
857+
_redis_config, name="event_dispatcher.stream", db=db
858+
)
863859
self._log_events = log_events
864860
self._closed = False
865861
self.consumers = defaultdict(set)
@@ -905,7 +901,7 @@ def consume(
905901
callback: EventCallback[TContext, TEvent],
906902
coalescing_opts: CoalescingOptions = None,
907903
*,
908-
name: str = None,
904+
name: str | None = None,
909905
) -> EventHandler[TContext, TEvent]:
910906
if name is None:
911907
name = f"evh-{secrets.token_urlsafe(16)}"
@@ -928,9 +924,9 @@ def subscribe(
928924
event_cls: Type[TEvent],
929925
context: TContext,
930926
callback: EventCallback[TContext, TEvent],
931-
coalescing_opts: CoalescingOptions = None,
927+
coalescing_opts: CoalescingOptions | None = None,
932928
*,
933-
name: str = None,
929+
name: str | None = None,
934930
) -> EventHandler[TContext, TEvent]:
935931
if name is None:
936932
name = f"evh-{secrets.token_urlsafe(16)}"
@@ -1054,15 +1050,19 @@ def __init__(
10541050
redis_config: EtcdRedisConfig,
10551051
db: int = 0,
10561052
*,
1057-
service_name: str = None,
1053+
service_name: str | None = None,
10581054
stream_key: str = "events",
10591055
log_events: bool = False,
10601056
) -> None:
10611057
_redis_config = redis_config.copy()
10621058
if service_name:
10631059
_redis_config["service_name"] = service_name
10641060
self._closed = False
1065-
self.redis_client = redis_helper.get_redis_object(_redis_config, db=db)
1061+
self.redis_client = redis_helper.get_redis_object(
1062+
_redis_config,
1063+
name="event_producer.stream",
1064+
db=db,
1065+
)
10661066
self._log_events = log_events
10671067
self._stream_key = stream_key
10681068

@@ -1092,7 +1092,7 @@ async def produce_event(
10921092
)
10931093

10941094

1095-
def _generate_consumer_id(node_id: str = None) -> str:
1095+
def _generate_consumer_id(node_id: str | None = None) -> str:
10961096
h = hashlib.sha1()
10971097
h.update(str(node_id or socket.getfqdn()).encode("utf8"))
10981098
hostname_hash = h.hexdigest()

src/ai/backend/common/redis_helper.py

Lines changed: 85 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import inspect
55
import logging
66
import socket
7+
import time
78
from typing import (
89
Any,
910
AsyncIterator,
@@ -21,7 +22,7 @@
2122

2223
import redis.exceptions
2324
import yarl
24-
from redis.asyncio import Redis
25+
from redis.asyncio import ConnectionPool, Redis
2526
from redis.asyncio.client import Pipeline, PubSub
2627
from redis.asyncio.sentinel import MasterNotFoundError, Sentinel, SlaveNotFoundError
2728
from redis.backoff import ExponentialBackoff
@@ -63,6 +64,10 @@
6364
redis.exceptions.TimeoutError,
6465
],
6566
}
67+
_default_conn_pool_opts: Mapping[str, Any] = {
68+
"max_connections": 16,
69+
# "timeout": 20.0, # for redis-py 5.0+
70+
}
6671

6772
_scripts: Dict[str, str] = {}
6873

@@ -73,11 +78,6 @@ class ConnectionNotAvailable(Exception):
7378
pass
7479

7580

76-
def _parse_stream_msg_id(msg_id: bytes) -> Tuple[int, int]:
77-
timestamp, _, sequence = msg_id.partition(b"-")
78-
return int(timestamp), int(sequence)
79-
80-
8181
async def subscribe(channel: PubSub, *, reconnect_poll_interval: float = 0.3) -> AsyncIterator[Any]:
8282
"""
8383
An async-generator wrapper for pub-sub channel subscription.
@@ -130,7 +130,7 @@ async def blpop(
130130
redis_obj: RedisConnectionInfo,
131131
key: str,
132132
*,
133-
service_name: str = None,
133+
service_name: Optional[str] = None,
134134
) -> AsyncIterator[Any]:
135135
"""
136136
An async-generator wrapper for blpop (blocking left pop).
@@ -175,8 +175,9 @@ async def execute(
175175
redis_obj: RedisConnectionInfo,
176176
func: Callable[[Redis], Awaitable[Any]],
177177
*,
178-
service_name: str = None,
178+
service_name: Optional[str] = None,
179179
encoding: Optional[str] = None,
180+
command_timeout: Optional[float] = None,
180181
) -> Any:
181182
"""
182183
Executes a function that issues Redis commands or returns a pipeline/transaction of commands,
@@ -187,9 +188,25 @@ async def execute(
187188
"""
188189
redis_client = redis_obj.client
189190
service_name = service_name or redis_obj.service_name
190-
reconnect_poll_interval = float(
191-
cast(str, redis_obj.redis_helper_config.get("reconnect_poll_timeout"))
192-
)
191+
reconnect_poll_interval = redis_obj.redis_helper_config.get("reconnect_poll_timeout", 0.0)
192+
193+
first_trial = time.perf_counter()
194+
retry_log_count = 0
195+
last_log_time = first_trial
196+
197+
def show_retry_warning(e: Exception, warn_on_first_attempt: bool = True) -> None:
198+
nonlocal retry_log_count, last_log_time
199+
now = time.perf_counter()
200+
if (warn_on_first_attempt and retry_log_count == 0) or now - last_log_time >= 10.0:
201+
log.warning(
202+
"Retrying due to interruption of Redis connection "
203+
"({}, conn-pool: {}, retrying-for: {:.3f}s)",
204+
repr(e),
205+
redis_obj.name,
206+
now - first_trial,
207+
)
208+
retry_log_count += 1
209+
last_log_time = now
193210

194211
while True:
195212
try:
@@ -228,21 +245,33 @@ async def execute(
228245
MasterNotFoundError,
229246
SlaveNotFoundError,
230247
redis.exceptions.ReadOnlyError,
248+
redis.exceptions.ConnectionError,
231249
ConnectionResetError,
232-
):
250+
) as e:
251+
warn_on_first_attempt = True
252+
if (
253+
isinstance(e, redis.exceptions.ConnectionError)
254+
and "Too many connections" in e.args[0]
255+
): # connection pool is full
256+
warn_on_first_attempt = False
257+
show_retry_warning(e, warn_on_first_attempt)
233258
await asyncio.sleep(reconnect_poll_interval)
234259
continue
235-
except redis.exceptions.ConnectionError as e:
236-
log.error(f"execute(): Connecting to redis failed: {e}")
237-
await asyncio.sleep(reconnect_poll_interval)
260+
except (
261+
redis.exceptions.TimeoutError,
262+
asyncio.TimeoutError,
263+
) as e:
264+
if command_timeout is not None:
265+
now = time.perf_counter()
266+
if now - first_trial >= command_timeout + 1.0:
267+
show_retry_warning(e)
238268
continue
239269
except redis.exceptions.ResponseError as e:
240270
if "NOREPLICAS" in e.args[0]:
271+
show_retry_warning(e)
241272
await asyncio.sleep(reconnect_poll_interval)
242273
continue
243274
raise
244-
except (redis.exceptions.TimeoutError, asyncio.TimeoutError):
245-
continue
246275
except asyncio.CancelledError:
247276
raise
248277
finally:
@@ -317,6 +346,7 @@ async def read_stream(
317346
{stream_key: last_id},
318347
block=block_timeout,
319348
),
349+
command_timeout=block_timeout / 1000,
320350
)
321351
if not reply:
322352
continue
@@ -367,6 +397,7 @@ async def read_stream_by_group(
367397
str(autoclaim_idle_timeout),
368398
autoclaim_start_id,
369399
),
400+
command_timeout=autoclaim_idle_timeout / 1000,
370401
)
371402
for msg_id, msg_data in reply[1]:
372403
messages.append((msg_id, msg_data))
@@ -381,6 +412,7 @@ async def read_stream_by_group(
381412
{stream_key: b">"}, # fetch messages not seen by other consumers
382413
block=block_timeout,
383414
),
415+
command_timeout=block_timeout / 1000,
384416
)
385417
if len(reply) == 0:
386418
continue
@@ -422,13 +454,32 @@ async def read_stream_by_group(
422454

423455
def get_redis_object(
424456
redis_config: EtcdRedisConfig,
457+
*,
458+
name: str,
425459
db: int = 0,
426460
**kwargs,
427461
) -> RedisConnectionInfo:
428462
redis_helper_config: RedisHelperConfig = cast(
429463
RedisHelperConfig, redis_config.get("redis_helper_config")
430464
)
431-
465+
conn_opts = {
466+
**_default_conn_opts,
467+
**kwargs,
468+
# "lib_name": None, # disable implicit "CLIENT SETINFO" (for redis-py 5.0+)
469+
# "lib_version": None, # disable implicit "CLIENT SETINFO" (for redis-py 5.0+)
470+
}
471+
conn_pool_opts = {
472+
**_default_conn_pool_opts,
473+
}
474+
if socket_timeout := redis_helper_config.get("socket_timeout"):
475+
conn_opts["socket_timeout"] = float(socket_timeout)
476+
if socket_connect_timeout := redis_helper_config.get("socket_connect_timeout"):
477+
conn_opts["socket_connect_timeout"] = float(socket_connect_timeout)
478+
if max_connections := redis_helper_config.get("max_connections"):
479+
conn_pool_opts["max_connections"] = int(max_connections)
480+
# for redis-py 5.0+
481+
# if connection_ready_timeout := redis_helper_config.get("connection_ready_timeout"):
482+
# conn_pool_opts["timeout"] = float(connection_ready_timeout)
432483
if _sentinel_addresses := redis_config.get("sentinel"):
433484
sentinel_addresses: Any = None
434485
if isinstance(_sentinel_addresses, str):
@@ -453,19 +504,14 @@ def get_redis_object(
453504
**kwargs,
454505
},
455506
)
456-
457-
conn_opts = {
458-
**_default_conn_opts,
459-
**kwargs,
460-
"socket_timeout": float(cast(str, redis_helper_config.get("socket_timeout"))),
461-
"socket_connect_timeout": float(
462-
cast(str, redis_helper_config.get("socket_connect_timeout"))
463-
),
464-
}
465-
466507
return RedisConnectionInfo(
467-
client=sentinel.master_for(service_name=service_name, password=password, **conn_opts),
508+
client=sentinel.master_for(
509+
service_name=service_name,
510+
password=password,
511+
**conn_opts,
512+
),
468513
sentinel=sentinel,
514+
name=name,
469515
service_name=service_name,
470516
redis_helper_config=redis_helper_config,
471517
)
@@ -475,10 +521,18 @@ def get_redis_object(
475521
url = yarl.URL("redis://host").with_host(str(redis_url[0])).with_port(
476522
redis_url[1]
477523
).with_password(redis_config.get("password")) / str(db)
478-
479524
return RedisConnectionInfo(
480-
client=Redis.from_url(str(url), **kwargs),
525+
# In redis-py 5.0.1+, we should migrate to `Redis.from_pool()` API
526+
client=Redis(
527+
connection_pool=ConnectionPool.from_url(
528+
str(url),
529+
**conn_pool_opts,
530+
),
531+
**conn_opts,
532+
auto_close_connection_pool=True,
533+
),
481534
sentinel=None,
535+
name=name,
482536
service_name=None,
483537
redis_helper_config=redis_helper_config,
484538
)

0 commit comments

Comments
 (0)