Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 63 additions & 1 deletion pysyncobj/syncobj.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ def __init__(self, selfNode, otherNodes, conf=None, consumers=None, nodeClass =
self.__votesCount = 0
self.__raftLeader = None
self.__raftElectionDeadline = monotonicTime() + self.__generateRaftTimeout()
# Set while a leadership transfer (Raft 'TimeoutNow') is pending, to stop
# the outgoing leader's heartbeats from postponing the forced election.
self.__transferInProgress = False
self.__raftLog = createJournal(self.__conf.journalFile)
if len(self.__raftLog) == 0:
self.__raftLog.add(_bchr(_COMMAND_TYPE.NO_OP), 1, self.__raftCurrentTerm)
Expand Down Expand Up @@ -207,6 +210,7 @@ def __init__(self, selfNode, otherNodes, conf=None, consumers=None, nodeClass =
self.__transport.setOnUtilityMessageCallback('add', self._addNodeToCluster)
self.__transport.setOnUtilityMessageCallback('remove', self._removeNodeFromCluster)
self.__transport.setOnUtilityMessageCallback('set_version', self._setCodeVersion)
self.__transport.setOnUtilityMessageCallback('transfer', self._transferLeadership)

self._methodToID = {}
self._idToMethod = {}
Expand Down Expand Up @@ -401,6 +405,50 @@ def _removeNodeFromCluster(self, args, callback):
else:
self.removeNodeFromCluster(node, callback)

def transferLeadership(self, node=None, callback=None):
"""Gracefully transfer raft leadership to another node (Raft 'TimeoutNow').

Must be called on the current leader. The target node must be fully
caught up (matchIndex == leader's last log index), otherwise the
request is denied. If node is None, the most caught-up connected
follower is selected automatically. Async and best-effort: a
successful callback only means the request was sent; poll
getStatus()['leader'] to confirm the transfer completed.

:param node: target node object or 'nodeHost:nodePort' (None = auto)
:type node: Node | str | None
:param callback: will be called on success or fail
:type callback: function(`FAIL_REASON <#pysyncobj.FAIL_REASON>`_, None)
"""
if self.__raftState != _RAFT_STATE.LEADER:
self.__callErrCallback(FAIL_REASON.NOT_LEADER, callback)
return
if node is not None and not isinstance(node, Node):
node = self.__nodeClass(node)
if node is None:
candidates = [(self.__raftMatchIndex.get(n, 0), n.id, n)
for n in self.__otherNodes if n in self.__connectedNodes]
if not candidates:
self.__callErrCallback(FAIL_REASON.REQUEST_DENIED, callback)
return
node = max(candidates)[2]
if node == self.__selfNode or node not in self.__otherNodes:
self.__callErrCallback(FAIL_REASON.REQUEST_DENIED, callback)
return
if self.__raftMatchIndex.get(node, 0) != self.__getCurrentLogIndex():
# target is not fully synced - transferring would fail the election
self.__callErrCallback(FAIL_REASON.REQUEST_DENIED, callback)
return
self.__transport.send(node, {
'type': 'timeout_now',
'term': self.__raftCurrentTerm,
})
if callback is not None:
callback(None, FAIL_REASON.SUCCESS)

def _transferLeadership(self, args, callback):
self.transferLeadership(args[0] if args and args[0] else None, callback)

def __onSetCodeVersion(self, newVersion):
methods = [m for m in dir(self) if callable(getattr(self, m)) and\
getattr(getattr(self, m), 'replicated', False) and \
Expand Down Expand Up @@ -579,6 +627,7 @@ def _onTick(self, timeToWait=0.0):
if self.__raftState in (_RAFT_STATE.FOLLOWER, _RAFT_STATE.CANDIDATE) and self.__selfNode is not None:
if self.__raftElectionDeadline < monotonicTime() and self.__connectedToAnyone():
self.__raftElectionDeadline = monotonicTime() + self.__generateRaftTimeout()
self.__transferInProgress = False
self.__raftLeader = None
self.__setState(_RAFT_STATE.CANDIDATE)
self.__raftCurrentTerm += 1
Expand Down Expand Up @@ -853,6 +902,13 @@ def __doApplyCommand(self, command):

def __onMessageReceived(self, node, message):

if message['type'] == 'timeout_now' and self.__selfNode is not None:
# Graceful leadership transfer (Raft 'TimeoutNow'): trigger an
# immediate election, but only on request of the current leader.
if message['term'] >= self.__raftCurrentTerm and node == self.__raftLeader:
self.__raftElectionDeadline = 0
self.__transferInProgress = True

if message['type'] == 'request_vote' and self.__selfNode is not None:

if message['term'] > self.__raftCurrentTerm:
Expand Down Expand Up @@ -882,7 +938,13 @@ def __onMessageReceived(self, node, message):
})

if message['type'] == 'append_entries' and message['term'] >= self.__raftCurrentTerm:
self.__raftElectionDeadline = monotonicTime() + self.__generateRaftTimeout()
if message['term'] > self.__raftCurrentTerm:
# A new leader at a higher term supersedes any pending transfer.
self.__transferInProgress = False
if not self.__transferInProgress:
# While a transfer is pending, ignore same-term heartbeats from
# the outgoing leader so they can't postpone the forced election.
self.__raftElectionDeadline = monotonicTime() + self.__generateRaftTimeout()
if self.__raftLeader != node:
self.__onLeaderChanged()
self.__raftLeader = node
Expand Down
4 changes: 4 additions & 0 deletions pysyncobj/syncobj_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ def executeAdminCommand(args):
parser.add_argument('-add', action='store', dest='add', help='send command \'add\'')
parser.add_argument('-remove', action='store', dest='remove', help='send command \'remove\'')
parser.add_argument('-set_version', action='store', dest='version', type=int, help='set cluster code version')
parser.add_argument('-transfer', action='store', dest='transfer', nargs='?', const='', default=None,
help='transfer leadership to the given node (or auto-select if omitted); must connect to the current leader')

data = parser.parse_args(args)
if not checkCorrectAddress(data.connection):
Expand All @@ -40,6 +42,8 @@ def executeAdminCommand(args):
message = ['remove', data.remove]
elif data.version is not None:
message = ['set_version', data.version]
elif data.transfer is not None:
message = ['transfer', data.transfer]
else:
return 'invalid command'

Expand Down
80 changes: 80 additions & 0 deletions test_syncobj.py
Original file line number Diff line number Diff line change
Expand Up @@ -2311,3 +2311,83 @@ def test_filterParners():

o1 = TestObj(a[0], [a[1], a[0]])
assert len(o1._SyncObj__otherNodes) == 1


def test_transferLeadership():
random.seed(7)

a = [getNextAddr(), getNextAddr(), getNextAddr()]

o1 = TestObj(a[0], [a[1], a[2]], testBindAddr=True)
o2 = TestObj(a[1], [a[2], a[0]], testBindAddr=True)
o3 = TestObj(a[2], [a[0], a[1]], testBindAddr=True)
objs = [o1, o2, o3]

doTicks(objs, 15.0, stopFunc=lambda: o1._isReady() and o2._isReady() and o3._isReady())

assert o1._isReady() and o2._isReady() and o3._isReady()
assert o1._getLeader() == o2._getLeader() == o3._getLeader()

leaderAddr = o1._getLeader()
leader = next(o for o in objs if o._SyncObj__selfNode == leaderAddr)
target = next(o for o in objs if o._SyncObj__selfNode != leaderAddr)
targetAddr = target._SyncObj__selfNode

result = {}
leader.transferLeadership(targetAddr, callback=lambda res, err: result.update(err=err))

# All nodes keep ticking, so the outgoing leader keeps sending heartbeats
# while the transfer plays out - the heartbeats must not postpone the
# forced election (otherwise the transfer would silently never complete).
doTicks(objs, 15.0, stopFunc=lambda: o1._getLeader() == targetAddr and
o2._getLeader() == targetAddr and
o3._getLeader() == targetAddr)

assert result.get('err') == FAIL_REASON.SUCCESS
assert o1._getLeader() == targetAddr
assert o2._getLeader() == targetAddr
assert o3._getLeader() == targetAddr

# The cluster is still functional after the transfer.
target.addValue(42)
doTicks(objs, 15.0, stopFunc=lambda: o1.getCounter() == 42 and
o2.getCounter() == 42 and o3.getCounter() == 42)
assert o1.getCounter() == 42
assert o2.getCounter() == 42
assert o3.getCounter() == 42

o1._destroy()
o2._destroy()
o3._destroy()


def test_transferLeadershipFromNonLeaderIsDenied():
random.seed(7)

a = [getNextAddr(), getNextAddr(), getNextAddr()]

o1 = TestObj(a[0], [a[1], a[2]], testBindAddr=True)
o2 = TestObj(a[1], [a[2], a[0]], testBindAddr=True)
o3 = TestObj(a[2], [a[0], a[1]], testBindAddr=True)
objs = [o1, o2, o3]

doTicks(objs, 15.0, stopFunc=lambda: o1._isReady() and o2._isReady() and o3._isReady())

assert o1._isReady() and o2._isReady() and o3._isReady()

leaderAddr = o1._getLeader()
follower = next(o for o in objs if o._SyncObj__selfNode != leaderAddr)
other = next(o for o in objs
if o._SyncObj__selfNode != leaderAddr and o is not follower)

result = {}
follower.transferLeadership(other._SyncObj__selfNode,
callback=lambda res, err: result.update(err=err))

assert result.get('err') == FAIL_REASON.NOT_LEADER
# Leadership is unchanged.
assert o1._getLeader() == leaderAddr

o1._destroy()
o2._destroy()
o3._destroy()