This is some example code for how to use Hashicorp's Raft implementation with gRPC.
This example uses Hashicorp's Raft Transport to communicate between nodes using TCP.
This example use two independent ports support application's service and raft's transport.
GRPC Server port: 40051, 40052, 40053, 40054 ... RAFT Server port: 50051, 50052, 50053, 50054 ...
$ mkdir /tmp/test
$ mkdir /tmp/test/node{A,B,C}
$ ./raft-example --raft_bootstrap --raft_id=nodeA --grpc_addr=localhost:40051 --raft_addr=localhost:50051 --raft_data_dir /tmp/test
$ ./raft-example --raft_id=nodeB --grpc_addr=localhost:40052 --raft_addr=localhost:50052 --raft_data_dir /tmp/test
$ ./raft-example --raft_id=nodeC --grpc_addr=localhost:40053 --raft_addr=localhost:50053 --raft_data_dir /tmp/testYou start up three nodes, and bootstrap one of them. Then you tell the bootstrapped node where to find peers. Those peers sync up to the state of the bootstrapped node and become members of the cluster. Once your cluster is running, you never need to pass --raft_bootstrap again.
raft-manager is used to communicate with the cluster and add the other nodes.
add nodes into your cluster, first call applied_index, then add_voter.
$ go install github.com/xkeyideal/raft-manager/cmd/manager@latest
$ ./manager localhost:40051 applied_index
$ ./manager localhost:40051 add_voter nodeB localhost:50052 132
$ ./manager localhost:40051 applied_index
$ ./manager localhost:40051 add_voter nodeC localhost:50053 133exec cmd.go test the raft write and read.
$ go run cmd/cmd.goRaft uses logs to synchronize changes. Every change submitted to a Raft cluster is a log entry, which gets stored and replicated to the followers in the cluster. In this example, we use raft-pebbledb to store these logs. Once in a while Raft decides the logs have grown too large, and makes a snapshot. Your code is asked to write out its state. That state captures all previous logs. Now Raft can delete all the old logs and just use the snapshot. These snapshots are stored using the FileSnapshotStore, which means they'll just be files in your disk.
You can see all this happening in NewRaft() in engine/raft.go.
See fsm/fsm.go. You'll need to implement a raft.FSM, and you probably want a gRPC RPC interface.
The Apply method of Hashicorp Raft must be called by leader.
// Apply is used to apply a command to the FSM in a highly consistent
// manner. This returns a future that can be used to wait on the application.
// An optional timeout can be provided to limit the amount of time we wait
// for the command to be started. This must be run on the leader or it
// will fail.
func (r *Raft) Apply(cmd []byte, timeout time.Duration) ApplyFuture {
return r.ApplyLog(Log{Data: cmd}, timeout)
}So we need forward Apply request to the leader. Like the Consul KV https://github.com/hashicorp/consul/blob/main/agent/consul/kvs_endpoint.go#L100 by RPC.
We must know the leader RPC address not raft address when want to forward request to leader.
We can use gossip protocol propagate per node RPC, raft address infos and so on. Best practices of Gossip by Memberlist
The Raft Example use constant map, you can modify it in https://github.com/xkeyideal/raft-example/blob/master/app.go
var (
// Default static lookup for backward compatibility
defaultStaticLookup = map[string]string{
"127.0.0.1:50051": "127.0.0.1:40051",
"127.0.0.1:50052": "127.0.0.1:40052",
"127.0.0.1:50053": "127.0.0.1:40053",
}
)In production environments, hardcoded address mappings are impractical. This project supports dynamic service discovery using HashiCorp Memberlist (gossip protocol).
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Address Resolution β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββ βββββββββββββββββββββββββββββββββββ β
β β AddressResolver βββββββ resolver/resolver.go β β
β β (interface) β β - GetGRPCAddr(nodeID) string β β
β ββββββββββ¬βββββββββ β - SetGRPCAddr(nodeID, addr) β β
β β β - RemoveAddr(nodeID) β β
β ββββββββ΄βββββββ βββββββββββββββββββββββββββββββββββ β
β β β β
β βΌ βΌ β
β βββββββββββββ βββββββββββββ β
β β Static β β Gossip β β
β β Resolver β β Resolver β β
β β(hardcoded)β β(memberlistβ β
β βββββββββββββ βββββββ¬ββββββ β
β β β
β βΌ β
β βββββββββββββββ β
β β Gossip β gossip/gossip.go β
β β (memberlist)β - Node join/leave events β
β β β - Propagate NodeMeta β
β βββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
- NodeMeta Propagation: Each node broadcasts its metadata (NodeID, RaftAddr, GRPCAddr) via gossip
- Automatic Discovery: When a node joins/leaves, all other nodes are notified
- Leader Forwarding: When forwarding requests to the leader, the gRPC address is resolved dynamically
# Node A (bootstrap node)
./raft-example --raft_bootstrap --raft_id=nodeA \
--grpc_addr=127.0.0.1:40051 --raft_addr=127.0.0.1:50051 \
--gossip_addr=0.0.0.0:7946 \
--raft_data_dir /data/raft
# Node B (joins via gossip seeds)
./raft-example --raft_id=nodeB \
--grpc_addr=127.0.0.1:40052 --raft_addr=127.0.0.1:50052 \
--gossip_addr=0.0.0.0:7947 --gossip_seeds=127.0.0.1:7946 \
--raft_data_dir /data/raft
# Node C
./raft-example --raft_id=nodeC \
--grpc_addr=127.0.0.1:40053 --raft_addr=127.0.0.1:50053 \
--gossip_addr=0.0.0.0:7948 --gossip_seeds=127.0.0.1:7946,127.0.0.1:7947 \
--raft_data_dir /data/raft| Flag | Description | Example |
|---|---|---|
--gossip_addr |
Gossip bind address. Empty to disable gossip. | 0.0.0.0:7946 |
--gossip_seeds |
Comma-separated list of seed nodes to join | 127.0.0.1:7946,127.0.0.1:7947 |
If --gossip_addr is not provided, the system falls back to the static defaultStaticLookup map (development mode).
// NodeMeta is propagated to all cluster members via gossip
type NodeMeta struct {
NodeID string `json:"node_id"` // Raft server ID
RaftAddr string `json:"raft_addr"` // Raft transport address
GRPCAddr string `json:"grpc_addr"` // gRPC service address
}
// Key functions
func New(cfg Config) (*Gossip, error) // Create gossip instance
func (g *Gossip) GetGRPCAddr(nodeID string) (string, bool) // Resolve address
func (g *Gossip) OnJoin(fn func(NodeMeta)) // Register join callback
func (g *Gossip) OnLeave(fn func(NodeMeta)) // Register leave callback
func (g *Gossip) Leave(timeout time.Duration) // Gracefully leave cluster// AddressResolver abstracts address resolution for leader forwarding
type AddressResolver interface {
GetGRPCAddr(nodeID string) (grpcAddr string, ok bool)
SetGRPCAddr(nodeID, grpcAddr string)
RemoveAddr(nodeID string)
}
// Two implementations:
// - StaticResolver: uses hardcoded map (development)
// - GossipResolver: uses gossip for dynamic discovery (production)This project implements efficient linearizable reads without writing to the Raft log.
Instead of the naive approach (applying read commands through Raft log), we use:
- VerifyLeader(): Contact a quorum to confirm we're still the leader
- Barrier Check: Ensure we've applied all committed entries (via
readyForConsistentReadsflag) - Local Read: Safe to read locally after the above checks pass
Client Request (Linearizable=true)
β
βΌ
βββββββββββββββββββββββββββββββββββ
β service/service.go: Get() β
β - If not leader: forward β
β - If leader: call Query() β
βββββββββββββββββββ¬ββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββ
β engine/kv.go: Query() β
β - ConsistentRead() check β
β - ReadLocal() from Pebble β
βββββββββββββββββββ¬ββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββ
β engine/linearizable.go: β
β ConsistentRead() β
β - VerifyLeader() β quorum β
β - isReadyForConsistentReads() β
βββββββββββββββββββββββββββββββββββ
| Aspect | Old (Apply-based) | New (ReadIndex-style) |
|---|---|---|
| Write to log | Yes | No |
| Disk I/O | High | Low |
| Latency | Higher | Lower |
| Consistency | Linearizable | Linearizable |
The engine supports graceful shutdown to prevent request loss during restarts.
- gRPC GracefulStop: Waits for in-flight requests to complete (with 10s timeout)
- Gossip Leave: Notifies cluster members before shutting down
- Ordered Cleanup: gRPC β Gossip β Raft β FSM
// engine/engine.go: Close()
func (e *Engine) Close() {
e.shutdownOnce.Do(func() {
close(e.shutdownCh)
// 1. Gracefully stop gRPC (wait for requests)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
done := make(chan struct{})
go func() {
e.grpcServer.GracefulStop()
close(done)
}()
select {
case <-done:
log.Println("[INFO] gRPC server stopped gracefully")
case <-ctx.Done():
e.grpcServer.Stop() // Force stop after timeout
}
// 2. Leave gossip cluster
if e.gossip != nil {
e.gossip.Leave(5 * time.Second)
e.gossip.Shutdown()
}
// 3. Close Raft and FSM
e.raft.close()
e.fsm.Close()
})
}The FSM implements the abandon mechanism from Consul, which is essential for implementing blocking/watch-style queries that need to be notified when snapshot restore happens.
When a snapshot restore occurs:
- The entire FSM state is replaced
- Indices may have gone backwards
- Data may have changed significantly
- Blocking queries waiting on old data become stale
The abandon mechanism solves this by providing a channel that watchers can monitor.
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Abandon Mechanism Flow β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Watcher A ββββββ β
β β βββββββββββββββββββ β
β Watcher B ββββββΌβββββΊβ AbandonCh() ββββββ Snapshot Restore β
β β ββββββββββ¬βββββββββ β
β Watcher C ββββββ β β
β β close(oldCh) β
β βΌ β
β All watchers wake up immediately! β
β β Re-query with index=0 β
β β Refresh cached data β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
The abandon mechanism is exposed via a gRPC streaming API:
// proto/service.proto
service Example {
// Watch implements blocking query with abandon mechanism
rpc Watch(WatchRequest) returns (stream WatchResponse) {}
}
message WatchRequest {
string key = 1; // Key to watch
uint64 min_index = 2; // Wait for index >= min_index
int64 timeout_seconds = 3; // Max wait time (0 = default 5min)
}
message WatchResponse {
string value = 1; // Current value
uint64 current_index = 2; // Current index
bool abandoned = 3; // True if state was restored
}# Start watching a key (cmd/cmd.go)
go run cmd/cmd.go -watch -key=my-key// cmd/cmd.go demonstrates complete Watch usage:
// 1. Start watching
stream, err := client.Watch(ctx, &pb.WatchRequest{
Key: "my-key",
MinIndex: lastIndex + 1,
TimeoutSeconds: 30,
})
// 2. Receive updates
for {
resp, err := stream.Recv()
if err != nil {
return err
}
if resp.Abandoned {
// State was restored! Reset and reconnect
log.Println("State abandoned, resetting watch...")
lastIndex = 0
break // Reconnect with minIndex=0
}
// Normal update
log.Printf("Value changed at index %d: %s",
resp.CurrentIndex, resp.Value)
lastIndex = resp.CurrentIndex
}The server-side implementation uses BlockingQuery from service/blocking_query.go:
// service/service.go
func (r *GRPCService) Watch(req *pb.WatchRequest, stream pb.Example_WatchServer) error {
// Uses BlockingQuery which monitors FSM's AbandonCh()
result, err := BlockingQuery(ctx, r.fsm, minIndex, timeout, queryFn)
// When abandoned, result.Abandoned = true
// Client should reset minIndex and reconnect
return stream.Send(&pb.WatchResponse{
Value: string(result.Value),
CurrentIndex: result.CurrentIndex,
Abandoned: result.Abandoned,
})
}| Feature | Description |
|---|---|
| Streaming | Server pushes updates as they happen |
| Abandon Detection | Immediate notification on snapshot restore |
| Timeout Support | Returns current value if no changes within timeout |
| Multi-node | Works on any node (leader or follower) |
- SnapshotInterval & SnapshotThreshold,
SnapshotIntervalcontrols how often we check if we should perform a snapshot.SnapshotThresholdcontrols how many outstanding logs there must be before we perform a snapshot.// runSnapshots is a long running goroutine used to manage taking // new snapshots of the FSM. It runs in parallel to the FSM and // main goroutines, so that snapshots do not block normal operation. func (r *Raft) runSnapshots() { for { select { case <-randomTimeout(r.config().SnapshotInterval): // Check if we should snapshot if !r.shouldSnapshot() { continue } // Trigger a snapshot if _, err := r.takeSnapshot(); err != nil { r.logger.Error("failed to take snapshot", "error", err) } case future := <-r.userSnapshotCh: // User-triggered, run immediately id, err := r.takeSnapshot() if err != nil { r.logger.Error("failed to take snapshot", "error", err) } else { future.opener = func() (*SnapshotMeta, io.ReadCloser, error) { return r.snapshots.Open(id) } } future.respond(err) case <-r.shutdownCh: return } } }
- HeartbeatTimeout, the time in follower state without contact from a leader before we attempt an election.
- ElectionTimeout, the time in candidate state without contact from a leader before we attempt an election.
- TrailingLogs, controls how many logs we leave after a snapshot. This is used so that we can quickly replay logs on a follower instead of being forced to send an entire snapshot.
So the most critical parameters of raft for followers sync logs are SnapshotThreshold and TrailingLogs, and the value of SnapshotThreshold should be less than the value of TrailingLogs.
- NoSnapshotRestoreOnStart, the default to
false, will restoreSnapshot whenNewRaft.
rqlite is an easy-to-use, lightweight, distributed relational database, which uses SQLite as its storage engine.
If you are using this repository inside VS Code with GitHub Copilot skills enabled, the following skill can help you use this project as a real-world Raft reference (leader forwarding, linearizable reads, snapshots, membership changes, etc.):
raft-example is under the BSD 2-Clause License. See the LICENSE file for details.