diff --git a/pysyncobj/syncobj.py b/pysyncobj/syncobj.py index dddefb0..a68fb38 100644 --- a/pysyncobj/syncobj.py +++ b/pysyncobj/syncobj.py @@ -160,6 +160,9 @@ def __init__(self, selfNode, otherNodes, conf=None, consumers=None, nodeClass = self.__votesCount = 0 self.__raftLeader = None self.__raftElectionDeadline = monotonicTime() + self.__generateRaftTimeout() + # Set while a leadership transfer (Raft 'TimeoutNow') is pending, to stop + # the outgoing leader's heartbeats from postponing the forced election. + self.__transferInProgress = False self.__raftLog = createJournal(self.__conf.journalFile) if len(self.__raftLog) == 0: self.__raftLog.add(_bchr(_COMMAND_TYPE.NO_OP), 1, self.__raftCurrentTerm) @@ -207,6 +210,7 @@ def __init__(self, selfNode, otherNodes, conf=None, consumers=None, nodeClass = self.__transport.setOnUtilityMessageCallback('add', self._addNodeToCluster) self.__transport.setOnUtilityMessageCallback('remove', self._removeNodeFromCluster) self.__transport.setOnUtilityMessageCallback('set_version', self._setCodeVersion) + self.__transport.setOnUtilityMessageCallback('transfer', self._transferLeadership) self._methodToID = {} self._idToMethod = {} @@ -401,6 +405,50 @@ def _removeNodeFromCluster(self, args, callback): else: self.removeNodeFromCluster(node, callback) + def transferLeadership(self, node=None, callback=None): + """Gracefully transfer raft leadership to another node (Raft 'TimeoutNow'). + + Must be called on the current leader. The target node must be fully + caught up (matchIndex == leader's last log index), otherwise the + request is denied. If node is None, the most caught-up connected + follower is selected automatically. Async and best-effort: a + successful callback only means the request was sent; poll + getStatus()['leader'] to confirm the transfer completed. + + :param node: target node object or 'nodeHost:nodePort' (None = auto) + :type node: Node | str | None + :param callback: will be called on success or fail + :type callback: function(`FAIL_REASON <#pysyncobj.FAIL_REASON>`_, None) + """ + if self.__raftState != _RAFT_STATE.LEADER: + self.__callErrCallback(FAIL_REASON.NOT_LEADER, callback) + return + if node is not None and not isinstance(node, Node): + node = self.__nodeClass(node) + if node is None: + candidates = [(self.__raftMatchIndex.get(n, 0), n.id, n) + for n in self.__otherNodes if n in self.__connectedNodes] + if not candidates: + self.__callErrCallback(FAIL_REASON.REQUEST_DENIED, callback) + return + node = max(candidates)[2] + if node == self.__selfNode or node not in self.__otherNodes: + self.__callErrCallback(FAIL_REASON.REQUEST_DENIED, callback) + return + if self.__raftMatchIndex.get(node, 0) != self.__getCurrentLogIndex(): + # target is not fully synced - transferring would fail the election + self.__callErrCallback(FAIL_REASON.REQUEST_DENIED, callback) + return + self.__transport.send(node, { + 'type': 'timeout_now', + 'term': self.__raftCurrentTerm, + }) + if callback is not None: + callback(None, FAIL_REASON.SUCCESS) + + def _transferLeadership(self, args, callback): + self.transferLeadership(args[0] if args and args[0] else None, callback) + def __onSetCodeVersion(self, newVersion): methods = [m for m in dir(self) if callable(getattr(self, m)) and\ getattr(getattr(self, m), 'replicated', False) and \ @@ -579,6 +627,7 @@ def _onTick(self, timeToWait=0.0): if self.__raftState in (_RAFT_STATE.FOLLOWER, _RAFT_STATE.CANDIDATE) and self.__selfNode is not None: if self.__raftElectionDeadline < monotonicTime() and self.__connectedToAnyone(): self.__raftElectionDeadline = monotonicTime() + self.__generateRaftTimeout() + self.__transferInProgress = False self.__raftLeader = None self.__setState(_RAFT_STATE.CANDIDATE) self.__raftCurrentTerm += 1 @@ -853,6 +902,13 @@ def __doApplyCommand(self, command): def __onMessageReceived(self, node, message): + if message['type'] == 'timeout_now' and self.__selfNode is not None: + # Graceful leadership transfer (Raft 'TimeoutNow'): trigger an + # immediate election, but only on request of the current leader. + if message['term'] >= self.__raftCurrentTerm and node == self.__raftLeader: + self.__raftElectionDeadline = 0 + self.__transferInProgress = True + if message['type'] == 'request_vote' and self.__selfNode is not None: if message['term'] > self.__raftCurrentTerm: @@ -882,7 +938,13 @@ def __onMessageReceived(self, node, message): }) if message['type'] == 'append_entries' and message['term'] >= self.__raftCurrentTerm: - self.__raftElectionDeadline = monotonicTime() + self.__generateRaftTimeout() + if message['term'] > self.__raftCurrentTerm: + # A new leader at a higher term supersedes any pending transfer. + self.__transferInProgress = False + if not self.__transferInProgress: + # While a transfer is pending, ignore same-term heartbeats from + # the outgoing leader so they can't postpone the forced election. + self.__raftElectionDeadline = monotonicTime() + self.__generateRaftTimeout() if self.__raftLeader != node: self.__onLeaderChanged() self.__raftLeader = node diff --git a/pysyncobj/syncobj_admin.py b/pysyncobj/syncobj_admin.py index e695bb1..c547df1 100644 --- a/pysyncobj/syncobj_admin.py +++ b/pysyncobj/syncobj_admin.py @@ -23,6 +23,8 @@ def executeAdminCommand(args): parser.add_argument('-add', action='store', dest='add', help='send command \'add\'') parser.add_argument('-remove', action='store', dest='remove', help='send command \'remove\'') parser.add_argument('-set_version', action='store', dest='version', type=int, help='set cluster code version') + parser.add_argument('-transfer', action='store', dest='transfer', nargs='?', const='', default=None, + help='transfer leadership to the given node (or auto-select if omitted); must connect to the current leader') data = parser.parse_args(args) if not checkCorrectAddress(data.connection): @@ -40,6 +42,8 @@ def executeAdminCommand(args): message = ['remove', data.remove] elif data.version is not None: message = ['set_version', data.version] + elif data.transfer is not None: + message = ['transfer', data.transfer] else: return 'invalid command' diff --git a/test_syncobj.py b/test_syncobj.py index 5d1dbf2..8d4fa97 100755 --- a/test_syncobj.py +++ b/test_syncobj.py @@ -2311,3 +2311,83 @@ def test_filterParners(): o1 = TestObj(a[0], [a[1], a[0]]) assert len(o1._SyncObj__otherNodes) == 1 + + +def test_transferLeadership(): + random.seed(7) + + a = [getNextAddr(), getNextAddr(), getNextAddr()] + + o1 = TestObj(a[0], [a[1], a[2]], testBindAddr=True) + o2 = TestObj(a[1], [a[2], a[0]], testBindAddr=True) + o3 = TestObj(a[2], [a[0], a[1]], testBindAddr=True) + objs = [o1, o2, o3] + + doTicks(objs, 15.0, stopFunc=lambda: o1._isReady() and o2._isReady() and o3._isReady()) + + assert o1._isReady() and o2._isReady() and o3._isReady() + assert o1._getLeader() == o2._getLeader() == o3._getLeader() + + leaderAddr = o1._getLeader() + leader = next(o for o in objs if o._SyncObj__selfNode == leaderAddr) + target = next(o for o in objs if o._SyncObj__selfNode != leaderAddr) + targetAddr = target._SyncObj__selfNode + + result = {} + leader.transferLeadership(targetAddr, callback=lambda res, err: result.update(err=err)) + + # All nodes keep ticking, so the outgoing leader keeps sending heartbeats + # while the transfer plays out - the heartbeats must not postpone the + # forced election (otherwise the transfer would silently never complete). + doTicks(objs, 15.0, stopFunc=lambda: o1._getLeader() == targetAddr and + o2._getLeader() == targetAddr and + o3._getLeader() == targetAddr) + + assert result.get('err') == FAIL_REASON.SUCCESS + assert o1._getLeader() == targetAddr + assert o2._getLeader() == targetAddr + assert o3._getLeader() == targetAddr + + # The cluster is still functional after the transfer. + target.addValue(42) + doTicks(objs, 15.0, stopFunc=lambda: o1.getCounter() == 42 and + o2.getCounter() == 42 and o3.getCounter() == 42) + assert o1.getCounter() == 42 + assert o2.getCounter() == 42 + assert o3.getCounter() == 42 + + o1._destroy() + o2._destroy() + o3._destroy() + + +def test_transferLeadershipFromNonLeaderIsDenied(): + random.seed(7) + + a = [getNextAddr(), getNextAddr(), getNextAddr()] + + o1 = TestObj(a[0], [a[1], a[2]], testBindAddr=True) + o2 = TestObj(a[1], [a[2], a[0]], testBindAddr=True) + o3 = TestObj(a[2], [a[0], a[1]], testBindAddr=True) + objs = [o1, o2, o3] + + doTicks(objs, 15.0, stopFunc=lambda: o1._isReady() and o2._isReady() and o3._isReady()) + + assert o1._isReady() and o2._isReady() and o3._isReady() + + leaderAddr = o1._getLeader() + follower = next(o for o in objs if o._SyncObj__selfNode != leaderAddr) + other = next(o for o in objs + if o._SyncObj__selfNode != leaderAddr and o is not follower) + + result = {} + follower.transferLeadership(other._SyncObj__selfNode, + callback=lambda res, err: result.update(err=err)) + + assert result.get('err') == FAIL_REASON.NOT_LEADER + # Leadership is unchanged. + assert o1._getLeader() == leaderAddr + + o1._destroy() + o2._destroy() + o3._destroy()