From 11e5e8895bce8aa12b66d1b96cbba60bc7296a31 Mon Sep 17 00:00:00 2001 From: Alex Lutay <1928266+taurus-forever@users.noreply.github.com> Date: Tue, 30 Jun 2026 23:40:12 +0200 Subject: [PATCH 1/2] Add ability to transfer Raft leader to another node Stopping a Raft leader node causing Raft leader reelection. The election process itself takes a bit of time and with a tight schedule (slow hardware and/or bad luck) may trigger a Primary step-down (due to lack of DCS updates by Patroni). Also Patroni-based operators have to handle temporary Raft leader outage gracefully which make scale-down logic complex. The main idea of this PR is to add ability for operators to transfer the current node Raft leadership to another member of the Raft cluster before shutting down the current server. Example: ``` > syncobj-admin -conn 10.69.235.38:2222 -pass mypass -status commit_idx: 518 enabled_code_version: 0 has_quorum: True last_applied: 518 leader: 10.69.235.38:2222 <<<<<<<<<<<<<<<< 10.69.235.38 is a Leader leader_commit_idx: 518 log_len: 50 match_idx_count: 2 match_idx_server_10.69.235.244:2222: 518 match_idx_server_10.69.235.36:2222: 518 next_node_idx_count: 2 next_node_idx_server_10.69.235.244:2222: 519 next_node_idx_server_10.69.235.36:2222: 519 partner_node_status_server_10.69.235.244:2222: 2 partner_node_status_server_10.69.235.36:2222: 2 partner_nodes_count: 2 raft_term: 9 readonly_nodes_count: 0 revision: deprecated self: 10.69.235.38:2222 self_code_version: 0 state: 2 uptime: 421 version: 0.3.15 > syncobj-admin -conn 10.69.235.38:2222 -pass mypass -transfer 10.69.235.36:2222 SUCCESS TRANSFER 10.69.235.36:2222 > syncobj-admin -conn 10.69.235.38:2222 -pass mypass -status commit_idx: 527 enabled_code_version: 0 has_quorum: True last_applied: 527 leader: 10.69.235.36:2222 <<<<<<<<<< 10.69.235.36 is a new Leader leader_commit_idx: 527 log_len: 59 match_idx_count: 2 match_idx_server_10.69.235.244:2222: 526 match_idx_server_10.69.235.36:2222: 526 next_node_idx_count: 2 next_node_idx_server_10.69.235.244:2222: 527 next_node_idx_server_10.69.235.36:2222: 527 partner_node_status_server_10.69.235.244:2222: 2 partner_node_status_server_10.69.235.36:2222: 2 partner_nodes_count: 2 raft_term: 10 readonly_nodes_count: 0 revision: deprecated self: 10.69.235.38:2222 self_code_version: 0 state: 0 uptime: 438 version: 0.3.15 > ssh 10.69.235.38 -- shutdown -P now # will cause no Raft troubles ``` Assisted-by: Claude:claude-4.8-opus --- pysyncobj/syncobj.py | 51 ++++++++++++++++++++++++++++++++++++++ pysyncobj/syncobj_admin.py | 4 +++ 2 files changed, 55 insertions(+) diff --git a/pysyncobj/syncobj.py b/pysyncobj/syncobj.py index dddefb0..2cf131e 100644 --- a/pysyncobj/syncobj.py +++ b/pysyncobj/syncobj.py @@ -207,6 +207,7 @@ def __init__(self, selfNode, otherNodes, conf=None, consumers=None, nodeClass = self.__transport.setOnUtilityMessageCallback('add', self._addNodeToCluster) self.__transport.setOnUtilityMessageCallback('remove', self._removeNodeFromCluster) self.__transport.setOnUtilityMessageCallback('set_version', self._setCodeVersion) + self.__transport.setOnUtilityMessageCallback('transfer', self._transferLeadership) self._methodToID = {} self._idToMethod = {} @@ -401,6 +402,50 @@ def _removeNodeFromCluster(self, args, callback): else: self.removeNodeFromCluster(node, callback) + def transferLeadership(self, node=None, callback=None): + """Gracefully transfer raft leadership to another node (Raft 'TimeoutNow'). + + Must be called on the current leader. The target node must be fully + caught up (matchIndex == leader's last log index), otherwise the + request is denied. If node is None, the most caught-up connected + follower is selected automatically. Async and best-effort: a + successful callback only means the request was sent; poll + getStatus()['leader'] to confirm the transfer completed. + + :param node: target node object or 'nodeHost:nodePort' (None = auto) + :type node: Node | str | None + :param callback: will be called on success or fail + :type callback: function(`FAIL_REASON <#pysyncobj.FAIL_REASON>`_, None) + """ + if self.__raftState != _RAFT_STATE.LEADER: + self.__callErrCallback(FAIL_REASON.NOT_LEADER, callback) + return + if node is not None and not isinstance(node, Node): + node = self.__nodeClass(node) + if node is None: + candidates = [(self.__raftMatchIndex.get(n, 0), n.id, n) + for n in self.__otherNodes if n in self.__connectedNodes] + if not candidates: + self.__callErrCallback(FAIL_REASON.REQUEST_DENIED, callback) + return + node = max(candidates)[2] + if node == self.__selfNode or node not in self.__otherNodes: + self.__callErrCallback(FAIL_REASON.REQUEST_DENIED, callback) + return + if self.__raftMatchIndex.get(node, 0) != self.__getCurrentLogIndex(): + # target is not fully synced - transferring would fail the election + self.__callErrCallback(FAIL_REASON.REQUEST_DENIED, callback) + return + self.__transport.send(node, { + 'type': 'timeout_now', + 'term': self.__raftCurrentTerm, + }) + if callback is not None: + callback(None, FAIL_REASON.SUCCESS) + + def _transferLeadership(self, args, callback): + self.transferLeadership(args[0] if args and args[0] else None, callback) + def __onSetCodeVersion(self, newVersion): methods = [m for m in dir(self) if callable(getattr(self, m)) and\ getattr(getattr(self, m), 'replicated', False) and \ @@ -853,6 +898,12 @@ def __doApplyCommand(self, command): def __onMessageReceived(self, node, message): + if message['type'] == 'timeout_now' and self.__selfNode is not None: + # Graceful leadership transfer (Raft 'TimeoutNow'): trigger an + # immediate election, but only on request of the current leader. + if message['term'] >= self.__raftCurrentTerm and node == self.__raftLeader: + self.__raftElectionDeadline = 0 + if message['type'] == 'request_vote' and self.__selfNode is not None: if message['term'] > self.__raftCurrentTerm: diff --git a/pysyncobj/syncobj_admin.py b/pysyncobj/syncobj_admin.py index e695bb1..c547df1 100644 --- a/pysyncobj/syncobj_admin.py +++ b/pysyncobj/syncobj_admin.py @@ -23,6 +23,8 @@ def executeAdminCommand(args): parser.add_argument('-add', action='store', dest='add', help='send command \'add\'') parser.add_argument('-remove', action='store', dest='remove', help='send command \'remove\'') parser.add_argument('-set_version', action='store', dest='version', type=int, help='set cluster code version') + parser.add_argument('-transfer', action='store', dest='transfer', nargs='?', const='', default=None, + help='transfer leadership to the given node (or auto-select if omitted); must connect to the current leader') data = parser.parse_args(args) if not checkCorrectAddress(data.connection): @@ -40,6 +42,8 @@ def executeAdminCommand(args): message = ['remove', data.remove] elif data.version is not None: message = ['set_version', data.version] + elif data.transfer is not None: + message = ['transfer', data.transfer] else: return 'invalid command' From 4ac3ed314440108611a4952b2c5f8fa0da1b483e Mon Sep 17 00:00:00 2001 From: Alex Lutay <1928266+taurus-forever@users.noreply.github.com> Date: Tue, 30 Jun 2026 22:15:38 +0000 Subject: [PATCH 2/2] Guard leadership transfer against outgoing leader's heartbeats transferLeadership() triggers an election on the target follower by zeroing its __raftElectionDeadline when a 'timeout_now' message arrives. The election only fires on the follower's next tick, so any same-term append_entries heartbeat from the still-active outgoing leader that was processed in between reset the deadline and silently cancelled the transfer - the admin still got SUCCESS while leadership never moved. Add a one-shot __transferInProgress flag: set it when 'timeout_now' is accepted, skip the append_entries deadline reset while it is set so the outgoing leader's heartbeats can no longer postpone the forced election, and clear it the moment the election fires. A higher-term append_entries (a different leader already won) clears the flag and resumes normal resets, so the node still yields correctly to a genuinely new leader. Add tests covering a successful transfer under active heartbeats and the NOT_LEADER denial when transferLeadership() is called on a follower. Assisted-by: Claude:claude-4.8-opus --- pysyncobj/syncobj.py | 13 ++++++- test_syncobj.py | 80 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+), 1 deletion(-) diff --git a/pysyncobj/syncobj.py b/pysyncobj/syncobj.py index 2cf131e..a68fb38 100644 --- a/pysyncobj/syncobj.py +++ b/pysyncobj/syncobj.py @@ -160,6 +160,9 @@ def __init__(self, selfNode, otherNodes, conf=None, consumers=None, nodeClass = self.__votesCount = 0 self.__raftLeader = None self.__raftElectionDeadline = monotonicTime() + self.__generateRaftTimeout() + # Set while a leadership transfer (Raft 'TimeoutNow') is pending, to stop + # the outgoing leader's heartbeats from postponing the forced election. + self.__transferInProgress = False self.__raftLog = createJournal(self.__conf.journalFile) if len(self.__raftLog) == 0: self.__raftLog.add(_bchr(_COMMAND_TYPE.NO_OP), 1, self.__raftCurrentTerm) @@ -624,6 +627,7 @@ def _onTick(self, timeToWait=0.0): if self.__raftState in (_RAFT_STATE.FOLLOWER, _RAFT_STATE.CANDIDATE) and self.__selfNode is not None: if self.__raftElectionDeadline < monotonicTime() and self.__connectedToAnyone(): self.__raftElectionDeadline = monotonicTime() + self.__generateRaftTimeout() + self.__transferInProgress = False self.__raftLeader = None self.__setState(_RAFT_STATE.CANDIDATE) self.__raftCurrentTerm += 1 @@ -903,6 +907,7 @@ def __onMessageReceived(self, node, message): # immediate election, but only on request of the current leader. if message['term'] >= self.__raftCurrentTerm and node == self.__raftLeader: self.__raftElectionDeadline = 0 + self.__transferInProgress = True if message['type'] == 'request_vote' and self.__selfNode is not None: @@ -933,7 +938,13 @@ def __onMessageReceived(self, node, message): }) if message['type'] == 'append_entries' and message['term'] >= self.__raftCurrentTerm: - self.__raftElectionDeadline = monotonicTime() + self.__generateRaftTimeout() + if message['term'] > self.__raftCurrentTerm: + # A new leader at a higher term supersedes any pending transfer. + self.__transferInProgress = False + if not self.__transferInProgress: + # While a transfer is pending, ignore same-term heartbeats from + # the outgoing leader so they can't postpone the forced election. + self.__raftElectionDeadline = monotonicTime() + self.__generateRaftTimeout() if self.__raftLeader != node: self.__onLeaderChanged() self.__raftLeader = node diff --git a/test_syncobj.py b/test_syncobj.py index 5d1dbf2..8d4fa97 100755 --- a/test_syncobj.py +++ b/test_syncobj.py @@ -2311,3 +2311,83 @@ def test_filterParners(): o1 = TestObj(a[0], [a[1], a[0]]) assert len(o1._SyncObj__otherNodes) == 1 + + +def test_transferLeadership(): + random.seed(7) + + a = [getNextAddr(), getNextAddr(), getNextAddr()] + + o1 = TestObj(a[0], [a[1], a[2]], testBindAddr=True) + o2 = TestObj(a[1], [a[2], a[0]], testBindAddr=True) + o3 = TestObj(a[2], [a[0], a[1]], testBindAddr=True) + objs = [o1, o2, o3] + + doTicks(objs, 15.0, stopFunc=lambda: o1._isReady() and o2._isReady() and o3._isReady()) + + assert o1._isReady() and o2._isReady() and o3._isReady() + assert o1._getLeader() == o2._getLeader() == o3._getLeader() + + leaderAddr = o1._getLeader() + leader = next(o for o in objs if o._SyncObj__selfNode == leaderAddr) + target = next(o for o in objs if o._SyncObj__selfNode != leaderAddr) + targetAddr = target._SyncObj__selfNode + + result = {} + leader.transferLeadership(targetAddr, callback=lambda res, err: result.update(err=err)) + + # All nodes keep ticking, so the outgoing leader keeps sending heartbeats + # while the transfer plays out - the heartbeats must not postpone the + # forced election (otherwise the transfer would silently never complete). + doTicks(objs, 15.0, stopFunc=lambda: o1._getLeader() == targetAddr and + o2._getLeader() == targetAddr and + o3._getLeader() == targetAddr) + + assert result.get('err') == FAIL_REASON.SUCCESS + assert o1._getLeader() == targetAddr + assert o2._getLeader() == targetAddr + assert o3._getLeader() == targetAddr + + # The cluster is still functional after the transfer. + target.addValue(42) + doTicks(objs, 15.0, stopFunc=lambda: o1.getCounter() == 42 and + o2.getCounter() == 42 and o3.getCounter() == 42) + assert o1.getCounter() == 42 + assert o2.getCounter() == 42 + assert o3.getCounter() == 42 + + o1._destroy() + o2._destroy() + o3._destroy() + + +def test_transferLeadershipFromNonLeaderIsDenied(): + random.seed(7) + + a = [getNextAddr(), getNextAddr(), getNextAddr()] + + o1 = TestObj(a[0], [a[1], a[2]], testBindAddr=True) + o2 = TestObj(a[1], [a[2], a[0]], testBindAddr=True) + o3 = TestObj(a[2], [a[0], a[1]], testBindAddr=True) + objs = [o1, o2, o3] + + doTicks(objs, 15.0, stopFunc=lambda: o1._isReady() and o2._isReady() and o3._isReady()) + + assert o1._isReady() and o2._isReady() and o3._isReady() + + leaderAddr = o1._getLeader() + follower = next(o for o in objs if o._SyncObj__selfNode != leaderAddr) + other = next(o for o in objs + if o._SyncObj__selfNode != leaderAddr and o is not follower) + + result = {} + follower.transferLeadership(other._SyncObj__selfNode, + callback=lambda res, err: result.update(err=err)) + + assert result.get('err') == FAIL_REASON.NOT_LEADER + # Leadership is unchanged. + assert o1._getLeader() == leaderAddr + + o1._destroy() + o2._destroy() + o3._destroy()