Distributed Lock with Redis (Redlock Algorithm)
Implements a Redis-backed distributed mutex that acquires, renews, and releases locks safely across processes.
main.go
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
package main
import (
"context"
"fmt"
"log"
"math/rand"
"sync"
"time"
"github.com/go-redis/redis/v8"
)
// RedisLock represents a distributed lock using Redlock algorithm
type RedisLock struct {
client *redis.Client
lockKey string
lockValue string // Unique value to prevent releasing other's locks
expireTime time.Duration
heartbeatTicker *time.Ticker
stopHeartbeat chan struct{}
mu sync.Mutex
isLocked bool
}
// NewRedisLock creates a new distributed lock instance
func NewRedisLock(client *redis.Client, lockKey string, expireTime time.Duration) *RedisLock {
// Generate unique value (UUID recommended in production; using random string here for simplicity)
rand.Seed(time.Now().UnixNano())
lockValue := fmt.Sprintf("lock_%d_%d", rand.Int63(), time.Now().UnixNano())
return &RedisLock{
client: client,
lockKey: lockKey,
lockValue: lockValue,
expireTime: expireTime,
stopHeartbeat: make(chan struct{}),
}
}
// Lock acquires the distributed lock with retry mechanism
func (rl *RedisLock) Lock(ctx context.Context, maxRetry int, retryInterval time.Duration) (bool, error) {
rl.mu.Lock()
defer rl.mu.Unlock()
if rl.isLocked {
return true, nil
}
// Retry until lock is acquired or max retries reached
for i := 0; i < maxRetry; i++ {
// Use SET NX (only set if key does not exist) + EX (expire)
success, err := rl.client.SetNX(ctx, rl.lockKey, rl.lockValue, rl.expireTime).Result()
if err != nil {
return false, fmt.Errorf("redis setnx failed: %v", err)
}
if success {
// Start heartbeat to renew lock
rl.startHeartbeat = time.NewTicker(rl.expireTime / 3)
go rl.heartbeatLoop(ctx)
rl.isLocked = true
log.Printf("Acquired lock: %s", rl.lockKey)
return true, nil
}
// Wait before retrying
time.Sleep(retryInterval)
}
return false, fmt.Errorf("failed to acquire lock after %d retries", maxRetry)
}
// heartbeatLoop renews the lock periodically to prevent expiration during long tasks
func (rl *RedisLock) heartbeatLoop(ctx context.Context) {
for {
select {
case <-rl.stopHeartbeat:
return
case <-rl.startHeartbeat.C:
rl.mu.Lock()
if !rl.isLocked {
rl.mu.Unlock()
return
}
// Only renew if lock value matches (own lock)
script := redis.NewScript("if redis.call('GET', KEYS[1]) == ARGV[1] then return redis.call('EXPIRE', KEYS[1], ARGV[2]) else return 0 end")
_, err := script.Run(ctx, rl.client, []string{rl.lockKey}, rl.lockValue, int(rl.expireTime.Seconds())).Result()
if err != nil {
log.Printf("failed to renew lock: %v", err)
}
rl.mu.Unlock()
}
}
}
// Unlock releases the lock safely (only if owned by current instance)
func (rl *RedisLock) Unlock(ctx context.Context) error {
rl.mu.Lock()
defer rl.mu.Unlock()
if !rl.isLocked {
return nil
}
// Stop heartbeat
close(rl.stopHeartbeat)
rl.startHeartbeat.Stop()
// Lua script ensures atomicity: check value before delete
script := redis.NewScript("if redis.call('GET', KEYS[1]) == ARGV[1] then return redis.call('DEL', KEYS[1]) else return 0 end")
_, err := script.Run(ctx, rl.client, []string{rl.lockKey}, rl.lockValue).Result()
if err != nil {
return fmt.Errorf("redis delete failed: %v", err)
}
rl.isLocked = false
log.Printf("Released lock: %s", rl.lockKey)
return nil
}
// Example Usage
func main() {
// Initialize Redis client
ctx := context.Background()
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // Set password if needed
DB: 0,
})
// Test connection
_, err := client.Ping(ctx).Result()
if err != nil {
log.Fatalf("failed to connect to redis: %v", err)
}
// Create lock: key "job:export:123", expire 30s
lock := NewRedisLock(client, "job:export:123", 30*time.Second)
defer lock.Unlock(ctx)
// Acquire lock: max 5 retries, 2s interval
locked, err := lock.Lock(ctx, 5, 2*time.Second)
if err != nil {
log.Fatalf("lock acquisition failed: %v", err)
}
if !locked {
log.Fatal("could not acquire lock")
}
// Simulate long-running task
log.Println("Executing critical task...")
time.Sleep(60 * time.Second) // Task longer than lock expire time (heartbeat renews it)
log.Println("Critical task completed")
}How It Works
Implements a Redis-backed distributed mutex that acquires, renews, and releases locks safely across processes.
Stores a unique value per holder, tries to set a lock key with NX and expiration, starts a heartbeat ticker to extend the TTL, and deletes the key on release; goroutines simulate concurrent workers contending for the lock.
Key Concepts
- 1Unique lock values prevent one client from releasing another client's lock.
- 2Automatic heartbeat renews the lock before it expires.
- 3Acquire uses SET NX with expiration to avoid deadlocks on crashes.
When to Use This Pattern
- Coordinating scheduled jobs across multiple instances.
- Protecting critical sections like migrations or billing runs.
- Serializing access to shared external resources.
Best Practices
- Use short expirations with heartbeats to recover from crashed workers.
- Ensure release checks the lock value to avoid unlocking peers.
- Run Redis with persistence or replication for higher Redlock safety.
Go Version1.16
Difficultyadvanced
Production ReadyYes
Lines of Code155