Skip to content

feat(BA-684): Create Raftify Client #3743

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: main
Choose a base branch
from

Conversation

racheliee
Copy link
Contributor

@racheliee racheliee commented Feb 18, 2025

resolves #3634 (BA-684)

about

This PR introduces the raftify client that supports key-value store.

setup

The raft-cluster-config.toml should contain the information of the raft clusters as such:

[[peers.other]]
host = "127.0.0.1"
port = 60061
node-id = 1
role = "leader"

[[peers.myself]]
host = "127.0.0.1"
port = 60062
node-id = 2
role = "voter"

[[peers.other]]
host = "127.0.0.1"
port = 60063
node-id = 3
role = "voter"

testing

The key-value store mechanism can be tested as such:

curl -XGET http://localhost:6025{node-id}/put/1/test
curl -XGET http://localhost:6025{node-id}/get/1
curl -XGET http://localhost:6025{node-id}/leader

Checklist: (if applicable)

  • [ x ] Mention to the original issue
  • Installer updates including:
    • Fixtures for db schema changes
    • New mandatory config options
  • API server-client counterparts (e.g., manager API -> client SDK)
  • Test case(s) to:
    • Demonstrate the difference of before/after
    • Demonstrate the flow of abstract/conceptual models with a concrete implementation
  • Documentation
    • Contents in the docs directory
    • docstrings in public interfaces and type annotations

@racheliee racheliee changed the title feat(BA-686): Implement Raftify KVS feat(BA-684): Create Raftify Client Feb 18, 2025
@@ -83,3 +86,10 @@ certfile = ""

[debug]
enabled = false

[raft-kvs]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
[raft-kvs]
[raft]

Comment on lines 185 to 203
async def __aenter__(self) -> Union[EtcdCommunicator, RaftKVSCommunicator]:
if isinstance(self.etcd.etcd, EtcdClient):
self._etcd_client = self.etcd.etcd.with_lock(
EtcdLockOption(
lock_name=self.lock_name.encode("utf-8"),
timeout=self._timeout,
ttl=int(self._lifetime) if self._lifetime is not None else None,
),
)
elif isinstance(self.etcd.etcd, RaftKVSClient):
self._etcd_client = await self.etcd.etcd.with_lock(
RaftKVSLockOptions(
lock_name=self.lock_name.encode("utf-8"),
timeout=self._timeout,
ttl=int(self._lifetime) if self._lifetime is not None else None,
),
)
assert self._etcd_client is not None
etcd_communicator = await self._etcd_client.__aenter__()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

class EtcdLock(AbstractDistributedLock):
    _etcd_client: Optional[Union[EtcdClient, RaftKVSClient]]
    _etcd: AbstractKVStore
    _etcd_locked: ETCDLocked

    _debug: bool

    lock_name: str
    etcd: AbstractKVStore
    timeout: float

    default_timeout: float = 9600  # not allow infinite timeout for safety

    def __init__(
        self,
        lock_name: str,
        etcd: AbstractKVStore,
        *,
        timeout: Optional[float] = None,
        lifetime: Optional[float] = None,
        debug: bool = False,
    ) -> None:
        super().__init__(lifetime=lifetime)
        self.lock_name = lock_name
        self.etcd = etcd
        self._timeout = timeout if timeout is not None else self.default_timeout
        self._debug = debug
        self._etcd_client = None

    async def __aenter__(self) -> EtcdCommunicator:
        if self._etcd_locked is not None:
            raise RuntimeError("Already locked")

        locked = self.etcd.with_lock(...)
        self._etcd_locked = locked
        etcd_communicator = await locked.__aenter__()
        if self._debug:
            log.debug("etcd lock acquired")

        return etcd_communicator

    async def __aexit__(self, *exc_info) -> Optional[bool]:
        if self._etcd_locked is None:
            raise RuntimeError("Not locked")
        await self._etcd_client.__aexit__(*exc_info)

        if self._debug:
            log.debug("etcd lock released")

        self._etcd_locked = None
        return None

Comment on lines 113 to 114
assert local_config["etcd"]["user"] is not None
assert local_config["etcd"]["password"] is not None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use raise


async def with_lock(
self, lock_options: RaftKVSLockOptions, connect_options: Optional["ConnectOptions"] = None
) -> "RaftKVSClient":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Self in typing library.

Comment on lines 84 to 108
def pickle_deserialize(data: bytes) -> str | None:
if data == b"":
return None

if pickle.PROTO in data:
r = pickle.loads(data[data.index(pickle.PROTO) :])
return r

# Not pickle data
return None


def register_raft_custom_deserializer() -> None:
"""
Initialize the custom deserializers.
"""

set_confchange_context_deserializer(pickle_deserialize)
set_confchangev2_context_deserializer(pickle_deserialize)
set_entry_context_deserializer(pickle_deserialize)
set_entry_data_deserializer(pickle_deserialize)
set_message_context_deserializer(pickle_deserialize)
set_snapshot_data_deserializer(pickle_deserialize)
set_log_entry_deserializer(pickle_deserialize)
set_fsm_deserializer(pickle_deserialize)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use pickle data?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@jopemachine jopemachine Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can inject a deserializer used internally in raft-rs to deserialize byte slices used for logging purposes.
Besides pickle, other deserializers can also be injected.

If no deserializer is injected, the raw byte slice will be displayed as-is.

Comment on lines 345 to 411
register_raft_custom_deserializer()

raft_configs = root_ctx.local_config.get("raft-kvs")
assert raft_configs is not None, "Raft configuration missing in the manager.toml"

raft_cluster_configs = root_ctx.raft_cluster_config
assert raft_cluster_configs is not None

other_peers = [{**peer, "myself": False} for peer in raft_cluster_configs["peers"]["other"]]
my_peers = [{**peer, "myself": True} for peer in raft_cluster_configs["peers"]["myself"]]
all_peers = sorted([*other_peers, *my_peers], key=lambda x: x["node-id"])

initial_peers = Peers({
int(peer_config["node-id"]): Peer(
addr=f"{peer_config['host']}:{peer_config['port']}",
role=InitialRole.from_str(peer_config["role"]),
)
for peer_config in all_peers
})

raft_core_config = RaftConfig(
heartbeat_tick=raft_configs["heartbeat-tick"],
election_tick=raft_configs["election-tick"],
min_election_tick=raft_configs["min-election-tick"],
max_election_tick=raft_configs["max-election-tick"],
max_committed_size_per_ready=raft_configs["max-committed-size-per-ready"],
max_size_per_msg=raft_configs["max-size-per-msg"],
max_inflight_msgs=raft_configs["max-inflight-msgs"],
check_quorum=raft_configs["check-quorum"],
batch_append=raft_configs["batch-append"],
max_uncommitted_size=raft_configs["max-uncommitted-size"],
skip_bcast_commit=raft_configs["skip-bcast-commit"],
pre_vote=raft_configs["pre-vote"],
priority=raft_configs["priority"],
)

raft_cfg = Config(
log_dir=raft_configs["log-dir"],
save_compacted_logs=True,
compacted_log_dir=raft_configs["log-dir"],
restore_wal_from=raft_cluster_configs["restore-wal-from"],
restore_wal_snapshot_from=raft_cluster_configs["restore-wal-snapshot-from"],
initial_peers=initial_peers,
raft_config=raft_core_config,
)

node_id_offset = next((idx for idx, item in enumerate(all_peers) if item["myself"]), None)
assert node_id_offset is not None, '"peers.myself" not found in initial_peers!'
node_id = node_id_offset + aiotools.process_index.get() + 1

raft_addr = initial_peers.get(node_id).get_addr()

store = RaftHashStore()

raft_logger = RaftLogger(
logging.getLogger(f"{__spec__.name}.raft.node-{node_id}"), # type: ignore
)

root_ctx.kvstore_ctx.raft = Raft.bootstrap(
node_id,
raft_addr,
store, # type: ignore
raft_cfg,
raft_logger, # type: ignore
)
raft_cluster = root_ctx.kvstore_ctx.raft
raft_cluster.run() # type: ignore
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, raft setup should be separated to another setup context function.
example:

@actxmgr
async def raft_ctx(root_ctx: RootContext) -> AsyncIterator[None]:
    ...

@actxmgr
async def etcd_shared_config_ctx(root_ctx: RootContext) -> AsyncIterator[None]:
    ...

@actxmgr
async def raft_shared_config_ctx(root_ctx: RootContext) -> AsyncIterator[None]:
    ...

@racheliee racheliee marked this pull request as ready for review February 21, 2025 08:56
"""
Abstract interface for Key-Value Store (KVS) operations
Defines the basic operations that a KVS should support
"""

etcd: T
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, it seems better to change etcd to private field of EtcdClient and provide interface methods that are needed for use.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to use Generic then.

Comment on lines +761 to +765
self.scope_prefix_map = t.Dict({
t.Key(ConfigScopes.GLOBAL): t.String(allow_blank=True),
t.Key(ConfigScopes.SGROUP, optional=True): t.String,
t.Key(ConfigScopes.NODE, optional=True): t.String,
}).check(scope_prefix_map)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that there's no problem even with only private fields, not public fields.

@@ -15,6 +15,7 @@ python_sources(
visibility_private_component(
allowed_dependents=[
"//src/ai/backend/testutils/**",
"//src/ai/backend/common/**",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

???

@@ -29,6 +33,38 @@ class BaseContext:
pass


class KVStoreKind(CIStrEnum):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we just receive lowercase values for configuration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could but i felt that using the enum would make it safer

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I meant was that StrEnum seems to be sufficient, rather than CIStrEnum.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohh i see gotcha


class ConnectOptions:
def __init__(self) -> None: ...
def with_user(self, user: str, password: str) -> "ConnectOptions":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

User Self in typing package

self,
raft_node: RaftNode,
endpoints: list[str],
connect_options: Optional["ConnectOptions"] = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use "ConnectOptions"?

Comment on lines +45 to +47
self.raft_node = raft_node
self.endpoints = endpoints
self.connect_options = connect_options
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer it if internal fields were not accessed from outside as much as possible, but why are this fields used as public?

Comment on lines +1192 to +1198
@click.option(
"--raft-cluster-config-path",
"--raft-cluster-config",
type=Path,
default=None,
help="The raft cluster config file path. (default: ./raft-cluster-config.toml and /etc/backend.ai/raft-cluster-config.toml)",
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we only inject it from a configuration file without inputting it in CLI?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is injected from the configuration file via the default path if no configuration file is specified. this is just an option to specify the raft configuration file path if you'd like to use a file from a different place

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn’t it be better to receive the raft config path from the manager’s config file instead of a CLI option?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Create Raftify Client
3 participants