Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions pysyncobj/journal.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ def setRaftCommitIndex(self, raftCommitIndex):
def getRaftCommitIndex(self):
raise NotImplementedError

def setCurrentTerm(self, term, votedForNodeId):
raise NotImplementedError

def getCurrentTerm(self):
raise NotImplementedError

def getVotedForNodeId(self):
raise NotImplementedError

def onOneSecondTimer(self):
pass

Expand All @@ -45,6 +54,8 @@ def __init__(self):
self.__journal = []
self.__bytesSize = 0
self.__lastCommitIndex = 0
self.__currentTerm = 0
self.__votedForNodeId = None

def add(self, command, idx, term):
self.__journal.append((command, idx, term))
Expand Down Expand Up @@ -73,6 +84,16 @@ def setRaftCommitIndex(self, raftCommitIndex):
def getRaftCommitIndex(self):
return 1

def setCurrentTerm(self, term, votedForNodeId):
self.__currentTerm = term
self.__votedForNodeId = votedForNodeId

def getCurrentTerm(self):
return self.__currentTerm

def getVotedForNodeId(self):
return self.__votedForNodeId



class ResizableFile(object):
Expand Down Expand Up @@ -246,6 +267,20 @@ def setRaftCommitIndex(self, raftCommitIndex):
def getRaftCommitIndex(self):
return self.__meta.get('raftCommitIndex', 1)

def setCurrentTerm(self, term, votedForNodeId):
self.__meta['currentTerm'] = term
self.__meta['votedForNodeId'] = votedForNodeId
# Raft requires currentTerm and votedFor to be durable before any message
# reflecting them (vote request/grant) is sent, so store synchronously.
self.__metaStorer.storeMeta(self.__meta)
self.__metaSaved = True

def getCurrentTerm(self):
return self.__meta.get('currentTerm', 0)

def getVotedForNodeId(self):
return self.__meta.get('votedForNodeId', None)

def onOneSecondTimer(self):
if not self.__metaSaved:
self.__metaStorer.storeMeta(self.__meta)
Expand Down
33 changes: 26 additions & 7 deletions pysyncobj/syncobj.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ def __init__(self, selfNode, otherNodes, conf=None, consumers=None, nodeClass =
self.__raftLog = createJournal(self.__conf.journalFile)
if len(self.__raftLog) == 0:
self.__raftLog.add(_bchr(_COMMAND_TYPE.NO_OP), 1, self.__raftCurrentTerm)
# Restore raft persistent state (see raft paper §5 - currentTerm and votedFor
# must survive restarts, otherwise already-committed entries may be overwritten
# by a stale member after a simultaneous restart of the cluster). The max() with
# the last log term covers journals written by versions that did not store the
# term: currentTerm is always >= the term of the last log entry.
self.__raftCurrentTerm = max(self.__raftLog.getCurrentTerm(), self.__getCurrentLogTerm())
self.__votedForNodeId = self.__raftLog.getVotedForNodeId()
self.__raftCommitIndex = self.__raftLog.getRaftCommitIndex()
self.__raftLastApplied = 1
self.__raftNextIndex = {}
Expand Down Expand Up @@ -581,8 +588,7 @@ def _onTick(self, timeToWait=0.0):
self.__raftElectionDeadline = monotonicTime() + self.__generateRaftTimeout()
self.__raftLeader = None
self.__setState(_RAFT_STATE.CANDIDATE)
self.__raftCurrentTerm += 1
self.__votedForNodeId = self.__selfNode.id
self.__setCurrentTerm(self.__raftCurrentTerm + 1, self.__selfNode.id)
self.__votesCount = 1
for node in self.__otherNodes:
self.__transport.send(node, {
Expand Down Expand Up @@ -856,8 +862,7 @@ def __onMessageReceived(self, node, message):
if message['type'] == 'request_vote' and self.__selfNode is not None:

if message['term'] > self.__raftCurrentTerm:
self.__raftCurrentTerm = message['term']
self.__votedForNodeId = None
self.__setCurrentTerm(message['term'], None)
self.__setState(_RAFT_STATE.FOLLOWER)
self.__raftLeader = None

Expand All @@ -873,7 +878,7 @@ def __onMessageReceived(self, node, message):
if self.__votedForNodeId is not None:
return

self.__votedForNodeId = node.id
self.__setCurrentTerm(self.__raftCurrentTerm, node.id)

self.__raftElectionDeadline = monotonicTime() + self.__generateRaftTimeout()
self.__transport.send(node, {
Expand All @@ -887,8 +892,7 @@ def __onMessageReceived(self, node, message):
self.__onLeaderChanged()
self.__raftLeader = node
if message['term'] > self.__raftCurrentTerm:
self.__raftCurrentTerm = message['term']
self.__votedForNodeId = None
self.__setCurrentTerm(message['term'], None)
self.__setState(_RAFT_STATE.FOLLOWER)
newEntries = message.get('entries', [])
serialized = message.get('serialized', None)
Expand Down Expand Up @@ -930,6 +934,11 @@ def __onMessageReceived(self, node, message):
if clusterChangeRequest is not None:
self.__doChangeCluster(clusterChangeRequest, reverse=True)

if prevLogIdx + 1 <= self.__raftLastApplied:
logger.critical(
'truncating already-applied log entries from index %d '
'(last applied: %d) - state machine may have diverged from '
'other cluster members', prevLogIdx + 1, self.__raftLastApplied)
self.__deleteEntriesFrom(prevLogIdx + 1)
for entry in newEntries:
self.__raftLog.add(*entry)
Expand All @@ -953,6 +962,11 @@ def __onMessageReceived(self, node, message):
self.__loadDumpFile(clearJournal=True)
self.__sendNextNodeIdx(node, success=True)

# If the log was truncated below the commit index, don't persist a commit
# index covering entries that are no longer the committed ones.
if self.__raftCommitIndex > self.__getCurrentLogIndex():
self.__raftCommitIndex = self.__getCurrentLogIndex()

if leaderCommitIndex > self.__raftCommitIndex:
self.__raftCommitIndex = min(leaderCommitIndex, self.__getCurrentLogIndex())

Expand Down Expand Up @@ -1155,6 +1169,11 @@ def __onBecomeLeader(self):

self.__sendAppendEntries()

def __setCurrentTerm(self, term, votedForNodeId):
self.__raftCurrentTerm = term
self.__votedForNodeId = votedForNodeId
self.__raftLog.setCurrentTerm(term, votedForNodeId)

def __setState(self, newState):
oldState = self.__raftState
self.__raftState = newState
Expand Down
105 changes: 105 additions & 0 deletions test_syncobj.py
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,84 @@ def test_randomTest1():
removeFiles([e + '.meta' for e in journalFiles])


def test_simultaneousRestartDataDivergence():
# If raft persistent state (currentTerm, votedFor) is lost on restart, then after a
# simultaneous restart of all members the terms restart from 1 while journals still
# contain entries with higher terms. A member that starts slightly later can then
# out-vote the new leader with its stale (but high-term) log and truncate entries
# that were already committed and applied by the other members, permanently
# diverging their state machines while the raft log stays in sync.
random.seed(42)

a = [getNextAddr(), getNextAddr(), getNextAddr()]
journalFiles = [getNextJournalFile(), getNextJournalFile(), getNextJournalFile()]
removeFiles(journalFiles)
removeFiles([e + '.meta' for e in journalFiles])

def createObj(i):
return TestObj(a[i], [a[j] for j in range(3) if j != i],
TEST_TYPE.JOURNAL_1, journalFile=journalFiles[i])

# Phase 1: build some history with lastLogTerm >= 2, so that the terms of a fresh
# post-restart election are lower than the terms already stored in the journals.
objs = [createObj(0), createObj(1), createObj(2)]
doTicks(objs, 10.0, stopFunc=lambda: all(o._isReady() for o in objs))
assert all(o._isReady() for o in objs)

# force a re-election to bump the term above 1
oldTerm = max(o._getTerm() for o in objs)
follower = [o for o in objs if not o._isLeader()][0]
follower._SyncObj__raftElectionDeadline = 0
doTicks(objs, 10.0, stopFunc=lambda: max(o._getTerm() for o in objs) > oldTerm and
any(o._isLeader() for o in objs))
assert max(o._getTerm() for o in objs) >= 2

objs[0].addKeyValue('k1', 'pre')
doTicks(objs, 10.0, stopFunc=lambda: all(o.getValue('k1') == 'pre' for o in objs))
assert all(o.getValue('k1') == 'pre' for o in objs)

# let the journal meta be stored (it is saved on a one-second timer), then stop all
doTicks(objs, 2.5)
for o in objs:
o._destroy()
time.sleep(0.1)

# Phase 2: members 0 and 2 restart quickly, member 1 is slow to start.
o0 = createObj(0)
o2 = createObj(2)
doTicks([o0, o2], 10.0, stopFunc=lambda: o0._isLeader() or o2._isLeader())
assert o0._isLeader() or o2._isLeader()

# a write is committed and applied by the two started members
(o0 if o0._isLeader() else o2).addKeyValue('w1', 'post')
doTicks([o0, o2], 10.0, stopFunc=lambda: o0.getValue('w1') == 'post' and
o2.getValue('w1') == 'post')
assert o0.getValue('w1') == 'post'
assert o2.getValue('w1') == 'post'

# Phase 3: member 1 finally starts and tries to become leader. Member 0 (the
# current leader) is frozen (not ticked) so member 1 only needs member 2's vote,
# which makes the race deterministic. With a stale log whose last term is higher
# than the new entries' term it must NOT win; member 2 (with the up-to-date log)
# must win instead and replicate the data to member 1.
o1 = createObj(1)
doTicks([o1, o2], 15.0, stopFunc=lambda: o1._isLeader() or o1.getValue('w1') == 'post')

# Everybody ticks again; the cluster must converge with no data loss.
objs = [o0, o1, o2]
doTicks(objs, 20.0, stopFunc=lambda: all(o.getValue('w1') == 'post' for o in objs))

for o in objs:
assert o.getValue('k1') == 'pre'
assert o.getValue('w1') == 'post'

for o in objs:
o._destroy()
time.sleep(0.1)
removeFiles(journalFiles)
removeFiles([e + '.meta' for e in journalFiles])


# Ensure that raftLog after serialization is the same as in serialized data
def test_logCompactionRegressionTest1():
random.seed(42)
Expand Down Expand Up @@ -1171,6 +1249,33 @@ def test_journalTest2():
removeFiles([e + '.meta' for e in journalFiles])


def test_journalKeepsCurrentTermAndVote():
journalFiles = [getNextJournalFile()]
removeFiles(journalFiles)
removeFiles([e + '.meta' for e in journalFiles])

journal = createJournal(journalFiles[0])
# defaults for a fresh (or legacy, pre-term-persistence) journal
assert journal.getCurrentTerm() == 0
assert journal.getVotedForNodeId() is None
journal.setCurrentTerm(5, 'localhost:1234')
journal._destroy()

journal = createJournal(journalFiles[0])
assert journal.getCurrentTerm() == 5
assert journal.getVotedForNodeId() == 'localhost:1234'
journal.setCurrentTerm(6, None)
journal._destroy()

journal = createJournal(journalFiles[0])
assert journal.getCurrentTerm() == 6
assert journal.getVotedForNodeId() is None
journal._destroy()

removeFiles(journalFiles)
removeFiles([e + '.meta' for e in journalFiles])


def test_applyJournalAfterRestart():
dumpFiles = [getNextDumpFile(), getNextDumpFile()]
journalFiles = [getNextJournalFile(), getNextJournalFile()]
Expand Down