Skip to content

support moving ctdb enabled pods to new addresses #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
43 changes: 34 additions & 9 deletions sambacc/commands/ctdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@


def _ctdb_ok():
sambacc_ctdb = os.environ.get('SAMBACC_CTDB')
gate = 'ctdb-is-experimental'
sambacc_ctdb = os.environ.get("SAMBACC_CTDB")
gate = "ctdb-is-experimental"
if sambacc_ctdb == gate:
return
print('Using CTDB with samba-container (sambacc) is experimental.')
print('If you are developing or testing features for sambacc please')
print('set the environment variable SAMBACC_CTDB to the value:')
print(' ', gate)
print('before continuing and try again.')
print("Using CTDB with samba-container (sambacc) is experimental.")
print("If you are developing or testing features for sambacc please")
print("set the environment variable SAMBACC_CTDB to the value:")
print(" ", gate)
print("before continuing and try again.")
print()
raise Fail(gate)

Expand Down Expand Up @@ -97,8 +97,8 @@ class NodeParams:
_ctx: Context
node_number: typing.Optional[int] = None
hostname: typing.Optional[str] = None
persistent_path: str = ''
nodes_json: str = ''
persistent_path: str = ""
nodes_json: str = ""
_ip_addr: typing.Optional[str] = None

def __init__(self, ctx: Context):
Expand Down Expand Up @@ -141,6 +141,18 @@ def node_ip_addr(self) -> str:
raise ValueError("can not determine node ip")
return self._ip_addr

@property
def identity(self):
# this could be extended to use something like /etc/machine-id
# or whatever in the future.
if self.hostname:
return self.hostname
elif self.node_number:
return f"node-{self.node_number}"
else:
# the dashes make this an invalid dns name
return "-unknown-"


@commands.command(name="ctdb-migrate", arg_func=_ctdb_migrate_args)
def ctdb_migrate(ctx: Context) -> None:
Expand All @@ -163,7 +175,20 @@ def ctdb_set_node(ctx: Context) -> None:
_ctdb_ok()
np = NodeParams(ctx)
expected_pnn = np.node_number

try:
ctdb.refresh_node_in_statefile(
np.identity,
np.node_ip_addr,
int(expected_pnn or 0),
path=np.nodes_json,
)
return
except ctdb.NodeNotPresent:
pass

ctdb.add_node_to_statefile(
np.identity,
np.node_ip_addr,
int(expected_pnn or 0),
path=np.nodes_json,
Expand Down
164 changes: 136 additions & 28 deletions sambacc/ctdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>
#

import enum
import logging
import os
import subprocess
Expand All @@ -37,6 +38,31 @@
CTDB_NODES: str = "/etc/ctdb/nodes"


class NodeState(str, enum.Enum):
NEW = "new"
READY = "ready"
CHANGED = "changed"
REPLACED = "replaced"
GONE = "gone" # reserved


def next_state(state: NodeState) -> NodeState:
if state == NodeState.NEW:
return NodeState.READY
elif state == NodeState.CHANGED:
return NodeState.REPLACED
elif state == NodeState.REPLACED:
return NodeState.READY
return state


class NodeNotPresent(KeyError):
def __init__(self, identity, pnn=None):
super().__init__(identity)
self.identity = identity
self.pnn = pnn


def ensure_smb_conf(
iconfig: config.InstanceConfig, path: str = config.SMB_CONF
) -> None:
Expand Down Expand Up @@ -145,44 +171,94 @@ def ensure_ctdb_node_present(


def add_node_to_statefile(
node: str, pnn: int, path: str, in_nodes: bool = False
identity: str, node: str, pnn: int, path: str, in_nodes: bool = False
) -> None:
"""Add the given node's identity, (node) IP, and PNN to the JSON based
state file, located at `path`. If in_nodes is true, the state file will
reflect that the node is already added to the CTDB nodes file.
"""
with jfile.open(path, jfile.OPEN_RW) as fh:
jfile.flock(fh)
data = jfile.load(fh, {})
_update_statefile(data, identity, node, pnn, in_nodes=in_nodes)
jfile.dump(data, fh)


def refresh_node_in_statefile(
identity: str, node: str, pnn: int, path: str
) -> None:
"""Add the given node (IP) at the line for the given PNN to
the ctdb nodes file located at path.
"""Assuming the node is already in the statefile, update the state in
the case that the node (IP) has changed.
"""
with jfile.open(path, jfile.OPEN_RW) as fh:
jfile.flock(fh)
data = jfile.load(fh, {})
_update_statefile(data, node, pnn, in_nodes=in_nodes)
_refresh_statefile(data, identity, node, pnn)
jfile.dump(data, fh)


def _update_statefile(
data, node: str, pnn: int, in_nodes: bool = False
data, identity: str, node: str, pnn: int, in_nodes: bool = False
) -> None:
data.setdefault("nodes", [])
for entry in data["nodes"]:
if pnn == entry["pnn"]:
raise ValueError("duplicate pnn")
if identity == entry["identity"]:
raise ValueError("duplicate identity")
state = NodeState.NEW
if in_nodes:
state = NodeState.READY
data["nodes"].append(
{
"identity": identity,
"node": node,
"pnn": pnn,
"in_nodes": in_nodes,
"state": state,
}
)


def _refresh_statefile(
data, identity: str, node: str, pnn: int, in_nodes: bool = False
) -> None:
data.setdefault("nodes", [])
node_entry = None
for entry in data["nodes"]:
if pnn == entry["pnn"] and identity == entry["identity"]:
node_entry = entry
break
if pnn == entry["pnn"]:
raise ValueError(
f"matching pnn ({pnn}) identity={entry['identity']}"
)
if not node_entry:
raise NodeNotPresent(identity, pnn)
if node_entry["node"] == node:
# do nothing
return
node_entry["node"] = node
node_entry["state"] = NodeState.CHANGED


def _get_state(entry) -> NodeState:
return NodeState(entry["state"])


def _get_state_ok(entry) -> bool:
return _get_state(entry) == NodeState.READY


def pnn_in_nodes(pnn: int, nodes_json: str, real_path: str) -> bool:
"""Returns true if the specified pnn has an entry in the nodes json
file.
file and that the node is already added to the ctdb nodes file.
"""
with jfile.open(nodes_json, jfile.OPEN_RO) as fh:
jfile.flock(fh)
json_data = jfile.load(fh, {})
current_nodes = json_data.get("nodes", [])
for entry in current_nodes:
if pnn == entry["pnn"] and entry["in_nodes"]:
if pnn == entry["pnn"] and _get_state_ok(entry):
return True
return False

Expand Down Expand Up @@ -225,29 +301,41 @@ def _node_check(pnn: int, nodes_json: str, real_path: str) -> bool:
def _node_update_check(json_data, nodes_json: str, real_path: str):
desired = json_data.get("nodes", [])
ctdb_nodes = read_ctdb_nodes(real_path)
new_nodes = []
update_nodes = []
need_reload = []
_update_states = (NodeState.NEW, NodeState.CHANGED, NodeState.REPLACED)
for entry in desired:
pnn = entry["pnn"]
try:
matched = entry["node"] == ctdb_nodes[pnn]
except IndexError:
matched = False
if matched and entry["in_nodes"]:
matched = _node_line(ctdb_nodes, pnn) == entry["node"]
if matched and _get_state_ok(entry):
# everything's fine. skip this entry
continue
elif not matched:
# node not present in real nodes file
if len(ctdb_nodes) > entry["pnn"]:
msg = f'unexpected pnn {entry["pnn"]} for nodes {ctdb_nodes}'
if entry["state"] in _update_states:
update_nodes.append(entry)
need_reload.append(entry)
elif entry["state"] == NodeState.READY:
msg = f"ready node (pnn {pnn}) missing from {ctdb_nodes}"
raise ValueError(msg)
new_nodes.append(entry["node"])
need_reload.append(entry)
else:
# node present but in_nodes marker indicates
# node present but state indicates
# update is not finalized
need_reload.append(entry)
return ctdb_nodes, new_nodes, need_reload
return ctdb_nodes, update_nodes, need_reload


def _node_line(ctdb_nodes, pnn) -> str:
try:
return ctdb_nodes[pnn]
except IndexError:
return ""


def _entry_to_node(ctdb_nodes, entry) -> str:
pnn = entry["pnn"]
if entry["state"] == NodeState.CHANGED:
return "#{}".format(ctdb_nodes[pnn].strip("#"))
return entry["node"]


def _node_update(nodes_json: str, real_path: str) -> bool:
Expand All @@ -257,10 +345,10 @@ def _node_update(nodes_json: str, real_path: str) -> bool:
with jfile.open(nodes_json, jfile.OPEN_RO) as fh:
jfile.flock(fh)
json_data = jfile.load(fh, {})
_, test_new_nodes, test_need_reload = _node_update_check(
_, test_chg_nodes, test_need_reload = _node_update_check(
json_data, nodes_json, real_path
)
if not test_new_nodes and not test_need_reload:
if not test_chg_nodes and not test_need_reload:
_logger.info("examined nodes state - no changes")
return False
# we probably need to make a change. but we recheck our state again
Expand All @@ -269,22 +357,42 @@ def _node_update(nodes_json: str, real_path: str) -> bool:
with jfile.open(nodes_json, jfile.OPEN_RW) as fh:
jfile.flock(fh)
json_data = jfile.load(fh, {})
ctdb_nodes, new_nodes, need_reload = _node_update_check(
ctdb_nodes, chg_nodes, need_reload = _node_update_check(
json_data, nodes_json, real_path
)
if not new_nodes and not need_reload:
if not chg_nodes and not need_reload:
_logger.info("reexamined nodes state - no changes")
return False
_logger.info("writing updates to ctdb nodes file")
all_nodes = ctdb_nodes + new_nodes
new_ctdb_nodes = list(ctdb_nodes)
for entry in chg_nodes:
pnn = entry["pnn"]
expected_line = _entry_to_node(ctdb_nodes, entry)
if _node_line(new_ctdb_nodes, pnn) == expected_line:
continue
if entry["state"] == NodeState.NEW:
if pnn != len(new_ctdb_nodes):
raise ValueError(
f"unexpected pnn in new entry {entry}:"
" nodes: {new_ctdb_nodes}"
)
new_ctdb_nodes.append(expected_line)
else:
new_ctdb_nodes[pnn] = expected_line
with open(real_path, "w") as nffh:
write_nodes_file(nffh, all_nodes)
write_nodes_file(nffh, new_ctdb_nodes)
nffh.flush()
os.fsync(nffh)
_logger.info("running: ctdb reloadnodes")
subprocess.check_call(list(samba_cmds.ctdb["reloadnodes"]))
for entry in need_reload:
entry["in_nodes"] = True
entry["state"] = next_state(entry["state"])
_logger.debug(
"setting node identity=[{}] pnn={} to {}",
entry["identity"],
entry["pnn"],
entry["state"],
)
jfile.dump(json_data, fh)
fh.flush()
os.fsync(fh)
Expand Down
Loading