@@ -166,10 +166,18 @@ async def start(self):
166166 prefix = self .settings .logging_name ,
167167 )
168168
169- # Notify listener the metagraph is ready
170- await self ._notify_if_needed (state == "ready" )
171-
172- continue
169+ # Notify listener the metagraph is ready with retry logic
170+ notification_success = await self ._notify_with_retry (state == "ready" )
171+
172+ # If notification failed after retries, force a resync
173+ if not notification_success and state != "ready" :
174+ btul .logging .warning (
175+ "🔄 Forcing resync due to persistent data consistency failure" ,
176+ prefix = self .settings .logging_name ,
177+ )
178+ # Don't continue - let it fall through to resync logic
179+ else :
180+ continue
173181
174182 if has_axons_changed :
175183 reason = "hotkey/IP changes detected"
@@ -195,8 +203,8 @@ async def start(self):
195203 # Store the sync block
196204 last_synced_block = block
197205
198- # Notify listener the metagraph is ready
199- await self ._notify_if_needed (state == "ready" )
206+ # Notify listener the metagraph is ready with retry logic
207+ await self ._notify_with_retry (state == "ready" )
200208
201209 # Store the new axons
202210 axons = new_axons
@@ -463,6 +471,7 @@ async def _resync(self, last_update: bool) -> dict[str, str]:
463471 f"📅 Last updated block recorded: #{ block } " ,
464472 prefix = self .settings .logging_name ,
465473 )
474+
466475 else :
467476 btul .logging .info (
468477 "✅ Metagraph is in sync with Redis — no changes detected." ,
@@ -473,7 +482,17 @@ async def _resync(self, last_update: bool) -> dict[str, str]:
473482
474483 async def _notify_if_needed (self , ready ):
475484 if ready :
476- return ready
485+ return True # Already ready, no need to notify
486+
487+ # Verify data consistency before marking as ready
488+ if not self .settings .dry_run :
489+ consistency_verified = await self ._verify_data_consistency ()
490+ if not consistency_verified :
491+ btul .logging .warning (
492+ "❌ Data consistency check failed - will not mark as ready" ,
493+ prefix = self .settings .logging_name ,
494+ )
495+ return False
477496
478497 btul .logging .debug (
479498 "🔔 Metagraph marked ready" , prefix = self .settings .logging_name
@@ -486,6 +505,84 @@ async def _notify_if_needed(self, ready):
486505
487506 return True
488507
508+ async def _notify_with_retry (self , ready ) -> bool :
509+ """
510+ Notify with retry logic using adaptive delays based on actual Redis response times.
511+ Returns True if successful, False if failed after all retries.
512+ """
513+ # If already ready, skip all checks
514+ if ready :
515+ return True
516+
517+ max_retries = 3
518+ base_delay = 0.01 # Start with 10ms base
519+
520+ for attempt in range (max_retries ):
521+ # Measure how long the consistency check takes
522+ start_time = asyncio .get_event_loop ().time ()
523+ success = await self ._notify_if_needed (ready )
524+ check_duration = asyncio .get_event_loop ().time () - start_time
525+
526+ if success :
527+ if attempt > 0 : # Only log if we actually retried
528+ btul .logging .info (
529+ f"✅ Data consistency verified on retry { attempt + 1 } (took { check_duration * 1000 :.1f} ms)" ,
530+ prefix = self .settings .logging_name ,
531+ )
532+ return True
533+
534+ if attempt < max_retries - 1 : # Don't delay on last attempt
535+ # Adaptive delay: wait 2-5x the time it took for the check
536+ # This accounts for Redis load, network latency, etc.
537+ adaptive_delay = max (base_delay , check_duration * (2 + attempt * 1.5 ))
538+
539+ btul .logging .debug (
540+ f"🔄 Retry { attempt + 1 } /{ max_retries } : check took { check_duration * 1000 :.1f} ms, "
541+ f"waiting { adaptive_delay * 1000 :.1f} ms..." ,
542+ prefix = self .settings .logging_name ,
543+ )
544+ await asyncio .sleep (adaptive_delay )
545+ else :
546+ btul .logging .error (
547+ f"❌ Failed to verify data consistency after { max_retries } attempts "
548+ f"(last check took { check_duration * 1000 :.1f} ms)" ,
549+ prefix = self .settings .logging_name ,
550+ )
551+
552+ return False
553+
554+ async def _verify_data_consistency (self ) -> bool :
555+ """
556+ Verify that the number of neurons in Redis matches the metagraph.
557+ Uses the existing get_neurons() method for simplicity and reliability.
558+ Returns True if consistent, False otherwise.
559+ """
560+ try :
561+ # Get expected count from metagraph (we already have this in memory)
562+ expected_count = len (self .metagraph .neurons )
563+
564+ # Get actual neurons from Redis using existing method
565+ neurons = await self .database .get_neurons ()
566+ actual_count = len (neurons )
567+
568+ # Only log when there's a mismatch to reduce noise
569+ if actual_count != expected_count :
570+ btul .logging .warning (
571+ f"⚠️ Data inconsistency: expected { expected_count } neurons, found { actual_count } in Redis" ,
572+ prefix = self .settings .logging_name ,
573+ )
574+ return False
575+
576+ return True
577+
578+ except Exception as e :
579+ btul .logging .error (
580+ f"❌ Error during consistency check: { e } " ,
581+ prefix = self .settings .logging_name ,
582+ )
583+ # On error, assume inconsistent to be safe
584+ return False
585+
489586 async def _has_new_neuron_registered (self , registration_count ) -> tuple [bool , int ]:
490587 new_count = await scbs .get_number_of_registration (
491588 subtensor = self .subtensor , netuid = self .settings .netuid
0 commit comments