Skip to content

skshohagmiah/clusterkit

Repository files navigation

ClusterKit

A lightweight, production-ready distributed cluster coordination library for Go

Go Version License Go Report Card Documentation PRs Welcome

ClusterKit handles cluster coordination so you can focus on building your application.

Features β€’ Quick Start β€’ Service Discovery β€’ Event Hooks β€’ Documentation β€’ Examples


πŸ€” Why ClusterKit Exists

The Problem

Building a distributed system is hard. You need to solve:

  • "Where does this data go?" - Partition assignment across nodes
  • "Who's in charge?" - Leader election and consensus
  • "Is everyone alive?" - Health checking and failure detection
  • "What happens when nodes join/leave?" - Rebalancing and data migration
  • "How do I know when to move data?" - Event notifications

Most developers end up either:

  1. ❌ Reinventing the wheel - Writing complex coordination logic from scratch
  2. ❌ Over-engineering - Using heavy frameworks that dictate your entire architecture
  3. ❌ Coupling tightly - Mixing coordination logic with business logic

The Solution

ClusterKit provides just the coordination layer - nothing more, nothing less.

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Your Application (Storage, Replication, Business Logic) β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚  ClusterKit (Coordination, Partitioning, Consensus)      β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

You get:

  • βœ… Production-ready coordination (Raft consensus, health checking)
  • βœ… Simple API (7 methods + hooks)
  • βœ… Complete flexibility (bring your own storage/replication)
  • βœ… Zero lock-in (just a library, not a framework)

🎯 What is ClusterKit?

ClusterKit is a coordination library that manages the distributed aspects of your cluster while giving you complete control over data storage and replication. It handles:

  • βœ… Partition Management - Consistent hashing to determine which partition owns a key
  • βœ… Node Discovery - Automatic cluster membership and health monitoring
  • βœ… Service Discovery - Register and discover application services across nodes
  • βœ… Leader Election - Raft-based consensus for cluster decisions
  • βœ… Rebalancing - Automatic partition redistribution when nodes join/leave
  • βœ… Event Hooks - Rich notifications for partition changes, node lifecycle events
  • βœ… Failure Detection - Automatic health checking and node removal
  • βœ… Rejoin Handling - Smart detection and data sync for returning nodes

You control:

  • πŸ”§ Data storage (PostgreSQL, Redis, files, memory, etc.)
  • πŸ”§ Replication protocol (HTTP, gRPC, TCP, etc.)
  • πŸ”§ Consistency model (strong, eventual, causal, etc.)
  • πŸ”§ Business logic

✨ Key Features

Core Capabilities

  • Simple API - 7 core methods + rich event hooks
  • Minimal Configuration - Only 2 required fields (NodeID, HTTPAddr)
  • Service Discovery - Register multiple services per node (HTTP, gRPC, WebSocket, etc.)
  • Production-Ready - WAL, snapshots, crash recovery, metrics
  • Health Checking - Automatic failure detection and node removal
  • Smart Rejoin - Detects returning nodes and triggers data sync

Event System

  • Rich Context - Events include timestamps, reasons, offline duration, partition ownership
  • 7 Lifecycle Hooks - OnPartitionChange, OnNodeJoin, OnNodeRejoin, OnNodeLeave, OnRebalanceStart, OnRebalanceComplete, OnClusterHealthChange
  • Async Execution - Hooks run in background goroutines (max 50 concurrent)
  • Panic Recovery - Hooks are isolated and won't crash your application

Distributed Coordination

  • Raft Consensus - Built on HashiCorp Raft for strong consistency
  • Consistent Hashing - MD5-based partition assignment
  • Configurable Replication - Set replication factor (default: 3)
  • HTTP API - RESTful endpoints for cluster information

πŸ—οΈ Architecture

ClusterKit uses a layered architecture combining Raft consensus with consistent hashing:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    Your Application Layer                    β”‚
β”‚         (KV Store, Cache, Queue, Custom Logic)              β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                     β”‚ API Calls
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                   ClusterKit Public API                      β”‚
β”‚  GetPartition() β€’ IsPrimary() β€’ GetReplicas() β€’ Hooks       β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                     β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                  Coordination Layer                          β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚
β”‚  β”‚ Partition    β”‚ Health       β”‚ Hook                 β”‚    β”‚
β”‚  β”‚ Manager      β”‚ Checker      β”‚ Manager              β”‚    β”‚
β”‚  β”‚              β”‚              β”‚                      β”‚    β”‚
β”‚  β”‚ β€’ Consistent β”‚ β€’ Heartbeats β”‚ β€’ Event dispatch     β”‚    β”‚
β”‚  β”‚   Hashing    β”‚ β€’ Failure    β”‚ β€’ Async execution    β”‚    β”‚
β”‚  β”‚ β€’ Rebalance  β”‚   detection  β”‚ β€’ 7 lifecycle hooks  β”‚    β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚
β”‚         β”‚              β”‚                  β”‚                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚
β”‚  β”‚           Raft Consensus Layer                      β”‚    β”‚
β”‚  β”‚  β€’ Leader election                                  β”‚    β”‚
β”‚  β”‚  β€’ Log replication                                  β”‚    β”‚
β”‚  β”‚  β€’ State machine (cluster state)                    β”‚    β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                      β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                  Persistence Layer                          β”‚
β”‚  β€’ WAL (Write-Ahead Log)                                   β”‚
β”‚  β€’ Snapshots (cluster state)                               β”‚
β”‚  β€’ JSON state files                                        β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

How It Works

  1. Partition Assignment - MD5 hash of key β†’ partition ID (0-63)
  2. Node Selection - Consistent hashing assigns partitions to nodes
  3. Consensus - Raft ensures all nodes agree on cluster state
  4. Health Monitoring - Periodic checks detect failures
  5. Rebalancing - Automatic when topology changes
  6. Event Notification - Hooks fire for lifecycle events

See docs/architecture.md for detailed design


🎬 Quick Demo

See ClusterKit in action with a 3-node cluster:

# Clone the repository
git clone https://github.com/skshohagmiah/clusterkit
cd clusterkit/example/sync

# Start 3-node cluster
./run.sh

# Output shows:
# βœ… Node formation
# βœ… Leader election  
# βœ… Partition distribution
# βœ… Data replication
# βœ… Automatic rebalancing

Example Output:

πŸš€ Starting node-1 (bootstrap) on ports 8080/9080
   [RAFT] Becoming leader
   [CLUSTER] Leader elected: node-1
   
πŸ”— Starting node-2 (joining) on ports 8081/9081
   [JOIN] node-2 joining via node-1
   [RAFT] Adding voter: node-2
   [REBALANCE] Starting rebalance (trigger: node_join)
   [PARTITION] partition-0: node-1 β†’ node-2
   [PARTITION] partition-15: node-1 β†’ node-2
   [REBALANCE] Complete (moved 21 partitions in 2.3s)
   
πŸ”— Starting node-3 (joining) on ports 8082/9082
   [JOIN] node-3 joining via node-1
   [REBALANCE] Starting rebalance (trigger: node_join)
   [PARTITION] partition-5: node-1 β†’ node-3
   [REBALANCE] Complete (moved 14 partitions in 1.8s)

βœ… Cluster ready: 3 nodes, 64 partitions, RF=3

πŸ“¦ Installation

go get github.com/skshohagmiah/clusterkit

πŸš€ Quick Start

Bootstrap Node (First Node)

package main

import (
    "log"
    "time"
    "github.com/skshohagmiah/clusterkit"
)

func main() {
    // Create first node - only 2 fields required!
    ck, err := clusterkit.New(clusterkit.Options{
        NodeID:   "node-1",
        HTTPAddr: ":8080",
        // Optional: Register application services
        Services: map[string]string{
            "kv":   ":9080",     // Your KV store API
            "api":  ":3000",     // Your REST API
            "grpc": ":50051",    // Your gRPC service
        },
        // Optional: Enable health checking
        HealthCheck: clusterkit.HealthCheckConfig{
            Enabled:          true,
            Interval:         5 * time.Second,
            Timeout:          2 * time.Second,
            FailureThreshold: 3,
        },
    })
    if err != nil {
        log.Fatal(err)
    }
    
    if err := ck.Start(); err != nil {
        log.Fatal(err)
    }
    defer ck.Stop()
    
    log.Println("βœ… Bootstrap node started on :8080")
    
    select {} // Keep running
}

Additional Nodes (Join Cluster)

ck, err := clusterkit.New(clusterkit.Options{
    NodeID:   "node-2",
    HTTPAddr: ":8081",
    JoinAddr: "localhost:8080", // Bootstrap node address
    HealthCheck: clusterkit.HealthCheckConfig{
        Enabled:          true,
        Interval:         5 * time.Second,
        Timeout:          2 * time.Second,
        FailureThreshold: 3,
    },
})

πŸ“š API Reference

Core Methods

// 1. Get partition for a key
partition, err := ck.GetPartition(key string) (*Partition, error)

// 2. Get primary node for partition
primary := ck.GetPrimary(partition *Partition) *Node

// 3. Get replica nodes for partition
replicas := ck.GetReplicas(partition *Partition) []Node

// 4. Get all nodes (primary + replicas)
nodes := ck.GetNodes(partition *Partition) []Node

// 5. Check if I'm the primary
isPrimary := ck.IsPrimary(partition *Partition) bool

// 6. Check if I'm a replica
isReplica := ck.IsReplica(partition *Partition) bool

// 7. Get my node ID
myID := ck.GetMyNodeID() string

Cluster Operations

// Get cluster information
cluster := ck.GetCluster() *Cluster

// Trigger manual rebalancing
err := ck.RebalancePartitions() error

// Get metrics
metrics := ck.GetMetrics() *Metrics

// Health check
health := ck.HealthCheck() *HealthStatus

πŸ” Service Discovery

ClusterKit includes built-in service discovery to help smart clients find your application services across the cluster.

Server Registration

// Register multiple services per node
ck, err := clusterkit.New(clusterkit.Options{
    NodeID:   "node-1",
    HTTPAddr: ":8080",  // ClusterKit coordination API
    Services: map[string]string{
        "kv":        ":9080",     // Key-Value store
        "api":       ":3000",     // REST API
        "grpc":      ":50051",    // gRPC service
        "websocket": ":8081",     // WebSocket server
        "metrics":   ":9090",     // Prometheus metrics
    },
})

Smart Client Discovery

// Get cluster topology with services
resp, err := http.Get("http://localhost:8080/cluster")
var cluster ClusterResponse
json.NewDecoder(resp.Body).Decode(&cluster)

// Route requests to appropriate services
for _, node := range cluster.Cluster.Nodes {
    kvAddr := node.Services["kv"]      // ":9080"
    apiAddr := node.Services["api"]    // ":3000"
    grpcAddr := node.Services["grpc"]  // ":50051"
    
    // Route different request types to different services
    routeKVRequest(node.ID, "localhost"+kvAddr)
    routeAPIRequest(node.ID, "localhost"+apiAddr)
    routeGRPCRequest(node.ID, "localhost"+grpcAddr)
}

API Response Format

The /cluster endpoint returns service information for each node:

{
  "cluster": {
    "nodes": [
      {
        "id": "node-1",
        "ip": ":8080",
        "name": "Server-1",
        "status": "active",
        "services": {
          "kv": ":9080",
          "api": ":3000",
          "grpc": ":50051"
        }
      }
    ]
  }
}

Benefits

  • 🎯 No hardcoded ports - Services are explicitly registered and discoverable
  • πŸ”§ Multi-service nodes - Support HTTP, gRPC, WebSocket, etc. on same node
  • πŸ“‘ Dynamic discovery - Clients automatically find services as nodes join/leave
  • βš–οΈ Load balancing - Route different request types to different services
  • πŸš€ Zero configuration - Services field is optional and backward compatible

🎣 Event Hooks System

ClusterKit provides a comprehensive event system with rich context for all cluster lifecycle events.

1. OnPartitionChange - Partition Assignment Changes

Triggered when: Partitions are reassigned due to rebalancing

ck.OnPartitionChange(func(event *clusterkit.PartitionChangeEvent) {
    // Only act if I'm the destination node
    if event.CopyToNode.ID != myNodeID {
        return
    }
    
    log.Printf("πŸ“¦ Partition %s moving (reason: %s)", 
        event.PartitionID, event.ChangeReason)
    log.Printf("   From: %d nodes", len(event.CopyFromNodes))
    log.Printf("   Primary changed: %s β†’ %s", event.OldPrimary, event.NewPrimary)
    
    // Fetch and merge data from all source nodes
    for _, source := range event.CopyFromNodes {
        data := fetchPartitionData(source, event.PartitionID)
        mergeData(data)
    }
})

Event Structure:

type PartitionChangeEvent struct {
    PartitionID   string    // e.g., "partition-5"
    CopyFromNodes []*Node   // Nodes that have the data
    CopyToNode    *Node     // Node that needs the data
    ChangeReason  string    // "node_join", "node_leave", "rebalance"
    OldPrimary    string    // Previous primary node ID
    NewPrimary    string    // New primary node ID
    Timestamp     time.Time // When the change occurred
}

Use Cases:

  • Migrate data when partitions move
  • Update local indexes
  • Trigger background sync jobs

2. OnNodeJoin - New Node Joins Cluster

Triggered when: A brand new node joins the cluster

ck.OnNodeJoin(func(event *clusterkit.NodeJoinEvent) {
    log.Printf("πŸŽ‰ Node %s joined (cluster size: %d)", 
        event.Node.ID, event.ClusterSize)
    
    if event.IsBootstrap {
        log.Println("   This is the bootstrap node - initializing cluster")
        initializeSchema()
    }
    
    // Update monitoring dashboards
    updateNodeCount(event.ClusterSize)
})

Event Structure:

type NodeJoinEvent struct {
    Node        *Node     // The joining node
    ClusterSize int       // Total nodes after join
    IsBootstrap bool      // Is this the first node?
    Timestamp   time.Time // When the node joined
}

Use Cases:

  • Initialize cluster-wide resources on bootstrap
  • Update monitoring/alerting systems
  • Trigger capacity planning checks

3. OnNodeRejoin - Node Returns After Being Offline

Triggered when: A node that was previously in the cluster rejoins

ck.OnNodeRejoin(func(event *clusterkit.NodeRejoinEvent) {
    if event.Node.ID == myNodeID {
        log.Printf("πŸ”„ I'm rejoining after %v offline", event.OfflineDuration)
        log.Printf("   Last seen: %v", event.LastSeenAt)
        log.Printf("   Had %d partitions before leaving", 
            len(event.PartitionsBeforeLeave))
        
        // Clear stale data
        clearAllLocalData()
        
        // Wait for OnPartitionChange to sync fresh data
        log.Println("   Ready for partition reassignment")
    } else {
        log.Printf("πŸ“‘ Node %s rejoined after %v", 
            event.Node.ID, event.OfflineDuration)
    }
})

Event Structure:

type NodeRejoinEvent struct {
    Node                  *Node         // The rejoining node
    OfflineDuration       time.Duration // How long it was offline
    LastSeenAt            time.Time     // When it was last seen
    PartitionsBeforeLeave []string      // Partitions it had before
    Timestamp             time.Time     // When it rejoined
}

Use Cases:

  • Clear stale local data before rebalancing
  • Decide sync strategy based on offline duration
  • Log rejoin events for debugging
  • Alert if offline duration was too long

Important: This hook fires BEFORE rebalancing. Use it to prepare (clear data), then let OnPartitionChange handle the actual data sync with correct partition assignments.


4. OnNodeLeave - Node Leaves or Fails

Triggered when: A node is removed from the cluster (failure or graceful shutdown)

ck.OnNodeLeave(func(event *clusterkit.NodeLeaveEvent) {
    log.Printf("❌ Node %s left (reason: %s)", 
        event.Node.ID, event.Reason)
    log.Printf("   Owned %d partitions (primary)", len(event.PartitionsOwned))
    log.Printf("   Replicated %d partitions", len(event.PartitionsReplica))
    
    // Clean up connections
    closeConnectionTo(event.Node.IP)
    
    // Alert if critical
    if event.Reason == "health_check_failure" && len(event.PartitionsOwned) > 10 {
        alertOps("High partition loss - node failed!")
    }
})

Event Structure:

type NodeLeaveEvent struct {
    Node              *Node     // Full node info
    Reason            string    // "health_check_failure", "graceful_shutdown", "removed_by_admin"
    PartitionsOwned   []string  // Partitions this node was primary for
    PartitionsReplica []string  // Partitions this node was replica for
    Timestamp         time.Time // When it left
}

Use Cases:

  • Clean up network connections
  • Alert operations team
  • Update capacity planning
  • Log failure events

5. OnRebalanceStart - Rebalancing Begins

Triggered when: Partition rebalancing operation starts

ck.OnRebalanceStart(func(event *clusterkit.RebalanceEvent) {
    log.Printf("βš–οΈ  Rebalance starting (trigger: %s)", event.Trigger)
    log.Printf("   Triggered by: %s", event.TriggerNodeID)
    log.Printf("   Partitions to move: %d", event.PartitionsToMove)
    log.Printf("   Nodes affected: %v", event.NodesAffected)
    
    // Pause background jobs during rebalance
    pauseBackgroundJobs()
    
    // Increase operation timeouts
    increaseTimeouts()
})

Event Structure:

type RebalanceEvent struct {
    Trigger          string    // "node_join", "node_leave", "manual"
    TriggerNodeID    string    // Which node caused it
    PartitionsToMove int       // How many partitions will move
    NodesAffected    []string  // Which nodes are affected
    Timestamp        time.Time // When rebalance started
}

6. OnRebalanceComplete - Rebalancing Finishes

Triggered when: Partition rebalancing operation completes

ck.OnRebalanceComplete(func(event *clusterkit.RebalanceEvent, duration time.Duration) {
    log.Printf("βœ… Rebalance completed in %v", duration)
    log.Printf("   Moved %d partitions", event.PartitionsToMove)
    
    // Resume background jobs
    resumeBackgroundJobs()
    
    // Reset timeouts
    resetTimeouts()
    
    // Update metrics
    recordRebalanceDuration(duration)
})

7. OnClusterHealthChange - Cluster Health Status Changes

Triggered when: Overall cluster health status changes

ck.OnClusterHealthChange(func(event *clusterkit.ClusterHealthEvent) {
    log.Printf("πŸ₯ Cluster health: %s", event.Status)
    log.Printf("   Healthy: %d/%d nodes", event.HealthyNodes, event.TotalNodes)
    
    if event.Status == "critical" {
        log.Printf("   Unhealthy nodes: %v", event.UnhealthyNodeIDs)
        alertOps("Cluster in critical state!")
        enableReadOnlyMode()
    } else if event.Status == "healthy" {
        log.Println("   All systems operational")
        disableReadOnlyMode()
    }
})

Event Structure:

type ClusterHealthEvent struct {
    HealthyNodes     int       // Number of healthy nodes
    UnhealthyNodes   int       // Number of unhealthy nodes
    TotalNodes       int       // Total nodes in cluster
    Status           string    // "healthy", "degraded", "critical"
    UnhealthyNodeIDs []string  // IDs of unhealthy nodes
    Timestamp        time.Time // When health changed
}

πŸ”„ Understanding Cluster Lifecycle

Scenario 1: Node Join

1. New node starts and sends join request
   ↓
2. OnNodeJoin fires
   - Event includes: node info, cluster size, bootstrap flag
   ↓
3. OnRebalanceStart fires
   - Event includes: trigger reason, partitions to move
   ↓
4. Partitions are reassigned
   ↓
5. OnPartitionChange fires (multiple times, once per partition)
   - Event includes: partition ID, source nodes, destination node, reason
   ↓
6. OnRebalanceComplete fires
   - Event includes: duration, partitions moved

Your Application:

  • In OnNodeJoin: Log event, update monitoring
  • In OnPartitionChange: Migrate data for assigned partitions
  • In OnRebalanceComplete: Resume normal operations

Scenario 2: Node Failure & Removal

1. Health checker detects node failure (3 consecutive failures)
   ↓
2. OnNodeLeave fires
   - Event includes: node info, reason="health_check_failure", partitions owned
   ↓
3. OnRebalanceStart fires
   ↓
4. Partitions are reassigned to remaining nodes
   ↓
5. OnPartitionChange fires (for each reassigned partition)
   ↓
6. OnRebalanceComplete fires

Your Application:

  • In OnNodeLeave: Clean up connections, alert ops team
  • In OnPartitionChange: Take ownership of reassigned partitions
  • Data already exists on replicas, so migration is fast!

Scenario 3: Node Rejoin (After Failure)

1. Failed node restarts and rejoins
   ↓
2. OnNodeRejoin fires
   - Event includes: node info, offline duration, partitions before leave
   ↓
3. Your app clears stale local data
   ↓
4. OnRebalanceStart fires
   ↓
5. Partitions are reassigned (may be different than before!)
   ↓
6. OnPartitionChange fires (for each assigned partition)
   - Your app fetches fresh data from replicas
   ↓
7. OnRebalanceComplete fires

Your Application:

  • In OnNodeRejoin: Clear ALL stale data (important!)
  • In OnPartitionChange: Fetch fresh data for NEW partition assignments
  • Don't assume you'll get the same partitions you had before!

Why clear data?

  • Node was offline - data is stale
  • Partition assignments may have changed
  • Other nodes have the latest data
  • Clean slate ensures consistency

πŸ’‘ Complete Example: Building a Distributed KV Store

package main

import (
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "sync"
    "time"
    
    "github.com/skshohagmiah/clusterkit"
)

type KVStore struct {
    ck          *clusterkit.ClusterKit
    data        map[string]string
    mu          sync.RWMutex
    nodeID      string
    isRejoining bool
    rejoinMu    sync.Mutex
}

func NewKVStore(ck *clusterkit.ClusterKit, nodeID string) *KVStore {
    kv := &KVStore{
        ck:     ck,
        data:   make(map[string]string),
        nodeID: nodeID,
    }
    
    // Register all hooks
    ck.OnPartitionChange(func(event *clusterkit.PartitionChangeEvent) {
        kv.handlePartitionChange(event)
    })
    
    ck.OnNodeRejoin(func(event *clusterkit.NodeRejoinEvent) {
        if event.Node.ID == kv.nodeID {
            kv.handleRejoin(event)
        }
    })
    
    ck.OnNodeLeave(func(event *clusterkit.NodeLeaveEvent) {
        log.Printf("[KV] Node %s left (reason: %s, partitions: %d)", 
            event.Node.ID, event.Reason, 
            len(event.PartitionsOwned)+len(event.PartitionsReplica))
    })
    
    return kv
}

func (kv *KVStore) handleRejoin(event *clusterkit.NodeRejoinEvent) {
    kv.rejoinMu.Lock()
    defer kv.rejoinMu.Unlock()
    
    if kv.isRejoining {
        return // Already rejoining
    }
    
    kv.isRejoining = true
    
    log.Printf("[KV] πŸ”„ Rejoining after %v offline", event.OfflineDuration)
    log.Printf("[KV] πŸ—‘οΈ  Clearing stale data")
    
    // Clear ALL stale data
    kv.mu.Lock()
    kv.data = make(map[string]string)
    kv.mu.Unlock()
    
    log.Printf("[KV] βœ… Ready for partition reassignment")
}

func (kv *KVStore) handlePartitionChange(event *clusterkit.PartitionChangeEvent) {
    if event.CopyToNode.ID != kv.nodeID {
        return
    }
    
    // Check if rejoining
    kv.rejoinMu.Lock()
    isRejoining := kv.isRejoining
    kv.rejoinMu.Unlock()
    
    if len(event.CopyFromNodes) == 0 {
        log.Printf("[KV] New partition %s assigned", event.PartitionID)
        return
    }
    
    log.Printf("[KV] πŸ”„ Migrating partition %s (reason: %s)", 
        event.PartitionID, event.ChangeReason)
    
    // Fetch data from source nodes
    for _, source := range event.CopyFromNodes {
        data := kv.fetchPartitionData(source, event.PartitionID)
        
        kv.mu.Lock()
        for key, value := range data {
            kv.data[key] = value
        }
        kv.mu.Unlock()
        
        log.Printf("[KV] βœ… Migrated %d keys from %s", len(data), source.ID)
        break // Successfully migrated
    }
    
    // Clear rejoin flag after first partition
    if isRejoining {
        kv.rejoinMu.Lock()
        kv.isRejoining = false
        kv.rejoinMu.Unlock()
        log.Printf("[KV] βœ… Rejoin complete")
    }
}

func (kv *KVStore) fetchPartitionData(node *clusterkit.Node, partitionID string) map[string]string {
    url := fmt.Sprintf("http://%s/migrate?partition=%s", node.IP, partitionID)
    resp, err := http.Get(url)
    if err != nil {
        return nil
    }
    defer resp.Body.Close()
    
    var result map[string]string
    json.NewDecoder(resp.Body).Decode(&result)
    return result
}

func (kv *KVStore) Set(key, value string) error {
    partition, err := kv.ck.GetPartition(key)
    if err != nil {
        return err
    }
    
    if kv.ck.IsPrimary(partition) || kv.ck.IsReplica(partition) {
        kv.mu.Lock()
        kv.data[key] = value
        kv.mu.Unlock()
        return nil
    }
    
    // Forward to primary
    primary := kv.ck.GetPrimary(partition)
    return kv.forwardToPrimary(primary, key, value)
}

func (kv *KVStore) Get(key string) (string, error) {
    partition, err := kv.ck.GetPartition(key)
    if err != nil {
        return "", err
    }
    
    if kv.ck.IsPrimary(partition) || kv.ck.IsReplica(partition) {
        kv.mu.RLock()
        defer kv.mu.RUnlock()
        
        value, exists := kv.data[key]
        if !exists {
            return "", fmt.Errorf("key not found")
        }
        return value, nil
    }
    
    // Forward to primary
    primary := kv.ck.GetPrimary(partition)
    return kv.readFromPrimary(primary, key)
}

func main() {
    ck, _ := clusterkit.New(clusterkit.Options{
        NodeID:   "node-1",
        HTTPAddr: ":8080",
        HealthCheck: clusterkit.HealthCheckConfig{
            Enabled:          true,
            Interval:         5 * time.Second,
            Timeout:          2 * time.Second,
            FailureThreshold: 3,
        },
    })
    ck.Start()
    defer ck.Stop()
    
    kv := NewKVStore(ck, "node-1")
    
    // Use the KV store
    kv.Set("user:123", "John Doe")
    value, _ := kv.Get("user:123")
    fmt.Println("Value:", value)
    
    select {}
}

πŸ—οΈ Configuration Options

Minimal (2 required fields)

ck, _ := clusterkit.New(clusterkit.Options{
    NodeID:   "node-1",  // Required
    HTTPAddr: ":8080",   // Required
})

Production Configuration

ck, _ := clusterkit.New(clusterkit.Options{
    // Required
    NodeID:   "node-1",
    HTTPAddr: ":8080",
    
    // Cluster Formation
    JoinAddr:  "node-1:8080", // For non-bootstrap nodes
    Bootstrap: false,          // Auto-detected
    
    // Partitioning
    PartitionCount:    64,  // More partitions = better distribution
    ReplicationFactor: 3,   // Survive 2 node failures
    
    // Storage
    DataDir: "/var/lib/clusterkit",
    
    // Health Checking
    HealthCheck: clusterkit.HealthCheckConfig{
        Enabled:          true,
        Interval:         5 * time.Second,  // Check every 5s
        Timeout:          2 * time.Second,  // Request timeout
        FailureThreshold: 3,                // Remove after 3 failures
    },
})

πŸ“Š HTTP API

ClusterKit exposes RESTful endpoints:

# Get cluster state (includes service discovery)
curl http://localhost:8080/cluster

# Get metrics
curl http://localhost:8080/metrics

# Get detailed health
curl http://localhost:8080/health/detailed

# Check if ready
curl http://localhost:8080/ready

The /cluster endpoint returns comprehensive cluster information including:

  • Node membership and status
  • Service discovery - All registered services per node
  • Partition assignments and replica locations
  • Cluster configuration and hash settings

πŸ§ͺ Running Examples

ClusterKit includes 3 complete examples:

# SYNC - Strong consistency (quorum-based)
cd example/sync && ./run.sh

# ASYNC - Maximum throughput (eventual consistency)
cd example/async && ./run.sh

# Server-Side - Simple HTTP clients
cd example/server-side && ./run.sh

Each example demonstrates:

  • βœ… Cluster formation (10 nodes)
  • βœ… Data distribution (1000 keys)
  • βœ… Automatic rebalancing
  • βœ… Health checking and failure recovery
  • βœ… Node rejoin handling

🐳 Docker Deployment

docker-compose.yml

version: '3.8'

services:
  node-1:
    image: your-registry/clusterkit:latest
    environment:
      - NODE_ID=node-1
      - HTTP_PORT=8080
      - DATA_DIR=/data
    ports:
      - "8080:8080"
    volumes:
      - node1-data:/data

  node-2:
    image: your-registry/clusterkit:latest
    environment:
      - NODE_ID=node-2
      - HTTP_PORT=8080
      - JOIN_ADDR=node-1:8080
      - DATA_DIR=/data
    ports:
      - "8081:8080"
    volumes:
      - node2-data:/data
    depends_on:
      - node-1

volumes:
  node1-data:
  node2-data:

πŸ“š Documentation

Comprehensive guides in the docs/ directory:

Core Concepts

  • Architecture - Detailed system design, Raft + consistent hashing
  • Partitioning - How data is distributed across nodes
  • Replication - Replication strategies and consistency models
  • Rebalancing - How partitions move when topology changes

Advanced Topics

Examples


🀝 Contributing

Contributions welcome! Please read CONTRIBUTING.md first.


πŸ“„ License

MIT License - see LICENSE for details.


🌟 Why ClusterKit?

Simple - 7 methods + hooks, minimal config
Flexible - Bring your own storage and replication
Production-Ready - Raft consensus, health checking, metrics
Well-Documented - Comprehensive guides and examples
Battle-Tested - Used in production distributed systems

Start building your distributed system today! πŸš€

About

Open source distributed system clustering go library that manages partitions, replications, consistant hasing, cluster metadata etc

Topics

Resources

License

Contributing

Security policy

Stars

Watchers

Forks

Packages

 
 
 

Contributors