在构建分布式系统时,确保数据一致性和操作原子性至关重要。分布式锁是实现这一目标的关键工具,它允许多个节点在共享资源上进行互斥访问。本文将深入探讨两种主流的分布式锁实现:基于 Redis 的分布式锁和基于 etcd 的分布式锁,分析它们的原理、核心问题、解决方案,并提供相应的 Golang 代码示例。
Redis 因其高性能、单线程执行命令的原子性以及丰富的数据结构,成为实现分布式锁的热门选择。
Redis 分布式锁的核心是利用 SETNX
(SET if Not eXists) 命令。该命令的特性是:如果指定的 key 不存在,则设置 key 对应的值,并返回 1;如果 key 已经存在,则不执行任何操作,并返回 0。
一个最简单的加锁操作如下:
SETNX lock_key 1
为了防止死锁(即获得锁的客户端崩溃而无法释放锁),我们需要为锁设置一个过期时间。在早期,这需要两步操作:SETNX
和 EXPIRE
。但这两步并非原子操作,如果 SETNX
成功后客户端崩溃,EXPIRE
未能执行,依然会导致死锁。
从 Redis 2.6.12 版本开始,SET
命令支持了扩展参数,使得加锁和设置过期时间可以成为一个原子操作:
SET lock_key unique_value NX PX 30000
unique_value
:一个唯一的客户端标识符。用于安全地释放锁,防止客户端 A 释放了客户端 B 的锁。NX
:表示只在 key 不存在时才设置。PX 30000
:表示锁的过期时间为 30000 毫秒(30 秒)。释放锁则需要一个 "查询-判断-删除" 的原子操作,通常使用 Lua 脚本实现,以确保只有锁的持有者才能删除锁。
DEL lock_key
命令,结果误删了客户端 B 持有的锁。下面是一个使用 go-redis
库实现的、包含“唯一标识”和“Lua脚本释放”的安全 Redis 分布式锁。
package main
import (
"context"
"fmt"
"time"
"github.com/go-redis/redis/v8"
"github.com/google/uuid"
)
// RedisLock 是一个基于 Redis 的分布式锁
type RedisLock struct {
client *redis.Client
key string
value string // 锁的唯一标识
}
// NewRedisLock 创建一个新的分布式锁实例
func NewRedisLock(client *redis.Client, key string) *RedisLock {
return &RedisLock{
client: client,
key: key,
value: uuid.NewString(), // 使用 UUID 作为唯一值
}
}
// TryLock 尝试获取锁
// ttl: 锁的过期时间
func (rl *RedisLock) TryLock(ctx context.Context, ttl time.Duration) (bool, error) {
// SET key value NX PX ttl
ok, err := rl.client.SetNX(ctx, rl.key, rl.value, ttl).Result()
if err != nil {
return false, fmt.Errorf("failed to acquire lock: %w", err)
}
return ok, nil
}
// Unlock 释放锁
func (rl *RedisLock) Unlock(ctx context.Context) error {
// 使用 Lua 脚本保证原子性:先GET,再比较,最后DEL
script := `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`
// 执行 Lua 脚本
res, err := rl.client.Eval(ctx, script, []string{rl.key}, rl.value).Result()
if err != nil {
return fmt.Errorf("failed to release lock: %w", err)
}
// 返回值为 1 表示删除成功,0 表示锁不存在或值不匹配
if n, ok := res.(int64); !ok || n == 0 {
// 注意:这里可能意味着锁已因超时而自动释放,或者被其他客户端持有。
// 在业务上可能需要记录日志或进行相应的处理。
return fmt.Errorf("failed to release lock: key not found or value mismatched")
}
return nil
}
//模拟扣减逻辑
func deductStock(lock *RedisLock) {
ctx := context.Background()
// 尝试获取锁,超时时间为10秒
locked, err := lock.TryLock(ctx, 10*time.Second)
if err != nil {
fmt.Printf("Error acquiring lock: %v\n", err)
return
}
if !locked {
fmt.Println("Could not acquire lock, another operation is in progress.")
return
}
// 获取锁成功后,确保最终会释放锁
defer func() {
if err := lock.Unlock(ctx); err != nil {
fmt.Printf("Error releasing lock: %v\n", err)
} else {
fmt.Println("Lock released successfully.")
}
}()
fmt.Println("Lock acquired, proceeding with stock deduction...")
// 模拟耗时操作,比如超过锁的过期时间
// time.Sleep(12 * time.Second)
time.Sleep(5 * time.Second) // 正常情况
fmt.Println("Stock deducted successfully.")
}
func main() {
// 初始化 Redis 客户端
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
_, err := rdb.Ping(context.Background()).Result()
if err != nil {
panic(err)
}
lockKey := "stock_lock"
// 模拟并发请求
for i := 0; i < 3; i++ {
go func(id int) {
fmt.Printf("Goroutine %d trying to deduct stock...\n", id)
// 每个 goroutine 使用自己的 lock 实例,确保 value 唯一
lock := NewRedisLock(rdb, lockKey)
deductStock(lock)
}(i)
}
// 等待 goroutines 执行
time.Sleep(20 * time.Second)
}
etcd 是一个高可用的分布式键值存储系统,常用于服务发现、配置共享和分布式协调。其底层基于 Raft 一致性算法,天然就为构建分布式系统组件提供了强大的支持。
etcd 实现分布式锁主要依赖其三大特性:
Lease (租约):
Key-Value Store 与 Revision:
Revision
号。/locks/my_lock/uuid_1
, /locks/my_lock/uuid_2
)。Revision
号最小,来确定谁获得了锁。Watch 机制:
Watch
(监视)比自己 Revision
号小的前一个 key。Revision
最小的 key,如果是,则获得锁。etcd 官方提供了 clientv3/concurrency
包,极大地简化了分布式锁的实现。
package etcdlock
import (
"context"
"fmt"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)
// Locker 封装了 etcd 会话与互斥锁
type Locker struct {
cli *clientv3.Client
key string
ttl int
session *concurrency.Session
mutex *concurrency.Mutex
locked bool
}
// NewLocker 使用给定客户端、锁 key 与 TTL(秒) 创建锁器
func NewLocker(cli *clientv3.Client, key string, ttl int) (*Locker, error) {
s, err := concurrency.NewSession(cli, concurrency.WithTTL(ttl))
if err != nil {
return nil, fmt.Errorf("new session: %w", err)
}
return &Locker{
cli: cli,
key: key,
ttl: ttl,
session: s,
mutex: concurrency.NewMutex(s, key),
}, nil
}
// Lock 阻塞直到获取锁
func (l *Locker) Lock(ctx context.Context) error {
if err := l.mutex.Lock(ctx); err != nil {
return err
}
l.locked = true
return nil
}
// LockWithTimeout 在超时时间内尝试加锁
func (l *Locker) LockWithTimeout(ctx context.Context, timeout time.Duration) error {
ctx2, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return l.Lock(ctx2)
}
// Unlock 释放锁
func (l *Locker) Unlock(ctx context.Context) error {
if !l.locked {
return nil
}
if err := l.mutex.Unlock(ctx); err != nil {
return err
}
l.locked = false
return nil
}
// Close 关闭会话(会导致未释放的锁随租约失效自动释放)
func (l *Locker) Close() error {
if l.session != nil {
return l.session.Close()
}
return nil
}
package main
import (
"context"
"fmt"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"your/module/etcdlock"
)
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil { panic(err) }
defer cli.Close()
locker, err := etcdlock.NewLocker(cli, "/locks/my-job", 10)
if err != nil { panic(err) }
defer locker.Close()
if err := locker.LockWithTimeout(context.Background(), 5*time.Second); err != nil {
fmt.Println("acquire failed:", err)
return
}
defer locker.Unlock(context.Background())
fmt.Println("locked, do critical work...")
time.Sleep(2 * time.Second)
fmt.Println("done")
}
concurrency
包内部处理了锁的归属权,Unlock 时会验证锁是否仍被当前 session 持有。特性 | Redis 分布式锁 | etcd 分布式锁 |
---|---|---|
性能 | 非常高,基于内存操作。 | 较高,但涉及磁盘 I/O 和 Raft 协议,性能低于 Redis。 |
可靠性 | 相对较低,主从模式有脑裂风险,需 Redlock 增强。 | 非常高,基于 Raft 协议保证了强一致性。 |
实现复杂度 | 较高,需要自己处理超时、续期、原子释放等问题。 | 非常低,官方库提供了开箱即用的封装。 |
特性 | 简单直接。 | 提供租约、Watch、Revision 等高级特性,更适合复杂协调场景。 |
适用场景 | 对性能要求极高,能容忍极低概率的锁失效场景。 | 对数据一致性和可靠性要求极高的场景,如分布式系统协调。 |
选择建议:
如果您喜欢我的文章,请点击下面按钮随意打赏,您的支持是我最大的动力。
最新评论