Skip to content

feat(BA-1220): Add redis pubsub broadcaster #4253

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

HyeockJinKim
Copy link
Collaborator

resolves #4251 (BA-1220)

Checklist: (if applicable)

  • Milestone metadata specifying the target backport version
  • Mention to the original issue
  • Installer updates including:
    • Fixtures for db schema changes
    • New mandatory config options
  • Update of end-to-end CLI integration tests in ai.backend.test
  • API server-client counterparts (e.g., manager API -> client SDK)
  • Test case(s) to:
    • Demonstrate the difference of before/after
    • Demonstrate the flow of abstract/conceptual models with a concrete implementation
  • Documentation
    • Contents in the docs directory
    • docstrings in public interfaces and type annotations

@HyeockJinKim HyeockJinKim self-assigned this Apr 23, 2025
@github-actions github-actions bot added size:L 100~500 LoC comp:manager Related to Manager component comp:agent Related to Agent component comp:common Related to Common component comp:storage-proxy Related to Storage proxy component labels Apr 23, 2025
except asyncio.CancelledError:
raise
except Exception as e:
log.error("Error while reading broadcasted messages: %s", e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.error("Error while reading broadcasted messages: %s", e)
log.error("Error while reading broadcasted messages: {}", e)

@@ -126,7 +113,6 @@ async def close(self) -> None:
self._closed = True
self._auto_claim_loop_task.cancel()
self._read_messages_task.cancel()
self._read_broadcast_messages_task.cancel()

async def _auto_claim_loop(self, autoclaim_start_id: str, autoclaim_idle_timeout: int) -> None:
log.debug("Starting auto claim loop for stream %s", self._stream_key)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.debug("Starting auto claim loop for stream %s", self._stream_key)
log.debug("Starting auto claim loop for stream {}", self._stream_key)

@@ -108,7 +95,6 @@ async def close(self) -> None:
await self._conn.close()
self._auto_claim_loop_task.cancel()
self._read_messages_task.cancel()
self._read_broadcast_messages_task.cancel()

async def _auto_claim_loop(self, autoclaim_start_id: str, autoclaim_idle_timeout: int) -> None:
log.debug("Starting auto claim loop for stream %s", self._stream_key)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.debug("Starting auto claim loop for stream %s", self._stream_key)
log.debug("Starting auto claim loop for stream {}", self._stream_key)

Comment on lines +1483 to +1484
b = dump_json(raw_event)
await self._broadcaster.broadcast(b)
Copy link
Member

@jopemachine jopemachine Apr 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
b = dump_json(raw_event)
await self._broadcaster.broadcast(b)
await self._broadcaster.broadcast(dump_json(raw_event))

Or let's use proper name for b

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is intended that only one action is performed on a line.
When more than one action is performed, it's hard to see when an issue occurs.

@@ -539,9 +551,16 @@ async def event_dispatcher_ctx(root_ctx: RootContext) -> AsyncIterator[None]:
await root_ctx.event_dispatcher.close()


@dataclass
class EventProcessors:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it correct for the type name to be in plural form?

broadcaster: AbstractBroadcaster
subscriber: AbstractBroadcastSubscriber


def _make_message_queue(
Copy link
Member

@jopemachine jopemachine Apr 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function name also should be updated since it does not return message queue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
comp:agent Related to Agent component comp:common Related to Common component comp:manager Related to Manager component comp:storage-proxy Related to Storage proxy component size:L 100~500 LoC
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement broadcaster for broadcast events using pubsub
3 participants