分布式锁实现
一、概述
分布式锁是分布式系统中的一个重要组件,用于协调分布式环境下的资源访问和并发控制。我们将从锁设计、死锁预防、性能优化和容错处理四个维度深入学习。
学习目标
维度 | 重点内容 | 掌握程度 |
---|---|---|
锁设计 | 基于Redis/etcd的锁实现原理 | 必须掌握 |
死锁预防 | 超时机制、重入机制 | 必须掌握 |
性能优化 | 锁粒度控制、读写分离 | 重点掌握 |
容错处理 | 节点故障、网络分区 | 重点掌握 |
二、实现流程图
三、基础锁实现
让我们首先实现一个基于Redis的分布式锁基础版本:
package distlockimport ("context""crypto/rand""encoding/base64""errors""time""github.com/go-redis/redis/v8"
)type DistributedLock struct {client *redis.Clientkey stringvalue stringexpiration time.Duration
}// NewDistributedLock 创建一个新的分布式锁实例
func NewDistributedLock(client *redis.Client, key string, expiration time.Duration) (*DistributedLock, error) {// 生成随机值作为锁的标识b := make([]byte, 16)_, err := rand.Read(b)if err != nil {return nil, err}value := base64.StdEncoding.EncodeToString(b)return &DistributedLock{client: client,key: key,value: value,expiration: expiration,}, nil
}// TryLock 尝试获取锁
func (dl *DistributedLock) TryLock(ctx context.Context) (bool, error) {return dl.client.SetNX(ctx, dl.key, dl.value, dl.expiration).Result()
}// Unlock 释放锁
func (dl *DistributedLock) Unlock(ctx context.Context) error {script := `if redis.call("get", KEYS[1]) == ARGV[1] thenreturn redis.call("del", KEYS[1])elsereturn 0end`result, err := dl.client.Eval(ctx, script, []string{dl.key}, dl.value).Result()if err != nil {return err}if result == 0 {return errors.New("lock not held")}return nil
}// RefreshLock 刷新锁的过期时间
func (dl *DistributedLock) RefreshLock(ctx context.Context) error {script := `if redis.call("get", KEYS[1]) == ARGV[1] thenreturn redis.call("pexpire", KEYS[1], ARGV[2])elsereturn 0end`result, err := dl.client.Eval(ctx,script,[]string{dl.key},dl.value,dl.expiration.Milliseconds(),).Result()if err != nil {return err}if result == 0 {return errors.New("lock not held")}return nil
}// IsLocked 检查锁是否被持有
func (dl *DistributedLock) IsLocked(ctx context.Context) (bool, error) {exists, err := dl.client.Exists(ctx, dl.key).Result()if err != nil {return false, err}return exists == 1, nil
}
四、增强版锁实现(带可重入特性)
下面是一个支持可重入的分布式锁实现:
package distlockimport ("context""encoding/json""errors""sync""time""github.com/go-redis/redis/v8"
)type LockInfo struct {Owner string `json:"owner"`Count int `json:"count"`Timestamp int64 `json:"timestamp"`
}type ReentrantLock struct {client *redis.Clientkey stringowner stringexpiration time.Durationmu sync.Mutex
}// NewReentrantLock 创建可重入锁
func NewReentrantLock(client *redis.Client, key string, owner string, expiration time.Duration) *ReentrantLock {return &ReentrantLock{client: client,key: key,owner: owner,expiration: expiration,}
}// Lock 获取可重入锁
func (rl *ReentrantLock) Lock(ctx context.Context) error {rl.mu.Lock()defer rl.mu.Unlock()script := `local lockInfo = redis.call('get', KEYS[1])if not lockInfo then-- 锁不存在,创建新锁redis.call('set', KEYS[1], ARGV[1], 'PX', ARGV[2])return 1endlocal info = cjson.decode(lockInfo)if info.owner == ARGV[3] then-- 重入锁info.count = info.count + 1info.timestamp = tonumber(ARGV[4])redis.call('set', KEYS[1], cjson.encode(info), 'PX', ARGV[2])return 1endreturn 0`lockInfo := LockInfo{Owner: rl.owner,Count: 1,Timestamp: time.Now().UnixNano(),}lockInfoJSON, err := json.Marshal(lockInfo)if err != nil {return err}result, err := rl.client.Eval(ctx,script,[]string{rl.key},string(lockInfoJSON),rl.expiration.Milliseconds(),rl.owner,time.Now().UnixNano(),).Result()if err != nil {return err}if result.(int64) == 0 {return errors.New("failed to acquire lock")}return nil
}// Unlock 释放可重入锁
func (rl *ReentrantLock) Unlock(ctx context.Context) error {rl.mu.Lock()defer rl.mu.Unlock()script := `local lockInfo = redis.call('get', KEYS[1])if not lockInfo thenreturn 0endlocal info = cjson.decode(lockInfo)if info.owner ~= ARGV[1] thenreturn -1endinfo.count = info.count - 1if info.count <= 0 thenredis.call('del', KEYS[1])return 1elseredis.call('set', KEYS[1], cjson.encode(info), 'PX', ARGV[2])return 1end`result, err := rl.client.Eval(ctx,script,[]string{rl.key},rl.owner,rl.expiration.Milliseconds(),).Result()if err != nil {return err}switch result.(int64) {case -1:return errors.New("lock held by another owner")case 0:return errors.New("lock not held")default:return nil}
}// RefreshLock 刷新锁的过期时间
func (rl *ReentrantLock) RefreshLock(ctx context.Context) error {script := `local lockInfo = redis.call('get', KEYS[1])if not lockInfo thenreturn 0endlocal info = cjson.decode(lockInfo)if info.owner ~= ARGV[1] thenreturn 0endinfo.timestamp = tonumber(ARGV[3])redis.call('set', KEYS[1], cjson.encode(info), 'PX', ARGV[2])return 1`result, err := rl.client.Eval(ctx,script,[]string{rl.key},rl.owner,rl.expiration.Milliseconds(),time.Now().UnixNano(),).Result()if err != nil {return err}if result.(int64) == 0 {return errors.New("lock not held")}return nil
}
五、死锁预防机制
1. 超时机制
- 所有锁操作都设置了过期时间
- 使用看门狗机制自动续期
- 防止客户端崩溃导致的死锁
2. 死锁检测
检测项 | 处理方式 | 实现难度 |
---|---|---|
循环等待 | 资源有序分配 | 中等 |
持有等待 | 一次性申请所有资源 | 简单 |
不可剥夺 | 超时自动释放 | 简单 |
互斥访问 | 读写分离 | 较难 |
六、性能优化策略
1. 锁粒度优化
- 降低锁粒度,提高并发度
- 使用多粒度锁机制
- 实现分段锁
2. 读写分离
package distlockimport ("context""fmt""time""github.com/go-redis/redis/v8"
)type RWLock struct {client *redis.Clientkey stringowner stringexpiration time.Duration
}func NewRWLock(client *redis.Client, key string, owner string, expiration time.Duration) *RWLock {return &RWLock{client: client,key: key,owner: owner,expiration: expiration,}
}// RLock 获取读锁
func (rwl *RWLock) RLock(ctx context.Context) error {script := `-- 检查是否存在写锁if redis.call('exists', KEYS[1] .. ':write') == 1 thenreturn 0end-- 增加读锁计数local count = redis.call('incr', KEYS[1] .. ':read')redis.call('pexpire', KEYS[1] .. ':read', ARGV[1])-- 记录读锁持有者redis.call('hset', KEYS[1] .. ':readers', ARGV[2], '1')redis.call('pexpire', KEYS[1] .. ':readers', ARGV[1])return 1`result, err := rwl.client.Eval(ctx,script,[]string{rwl.key},rwl.expiration.Milliseconds(),rwl.owner,).Result()if err != nil {return fmt.Errorf("failed to acquire read lock: %v", err)}if result.(int64) == 0 {return fmt.Errorf("write lock exists")}return nil
}// RUnlock 释放读锁
func (rwl *RWLock) RUnlock(ctx context.Context) error {script := `-- 检查读锁是否存在if redis.call('exists', KEYS[1] .. ':read') == 0 thenreturn 0end-- 检查当前客户端是否持有读锁if redis.call('hexists', KEYS[1] .. ':readers', ARGV[1]) == 0 thenreturn -1end-- 移除读锁持有者记录redis.call('hdel', KEYS[1] .. ':readers', ARGV[1])-- 减少读锁计数local count = redis.call('decr', KEYS[1] .. ':read')if count <= 0 thenredis.call('del', KEYS[1] .. ':read')redis.call('del', KEYS[1] .. ':readers')endreturn 1`result, err := rwl.client.Eval(ctx,script,[]string{rwl.key},rwl.owner,).Result()if err != nil {return fmt.Errorf("failed to release read lock: %v", err)}switch result.(int64) {case -1:return fmt.Errorf("read lock not held by this client")case 0:return fmt.Errorf("read lock not exists")default:return nil}
}// Lock 获取写锁
func (rwl *RWLock) Lock(ctx context.Context) error {script := `-- 检查是否存在读锁或写锁if redis.call('exists', KEYS[1] .. ':read') == 1 orredis.call('exists', KEYS[1] .. ':write') == 1 thenreturn 0end-- 设置写锁redis.call('set', KEYS[1] .. ':write', ARGV[1], 'PX', ARGV[2])return 1`result, err := rwl.client.Eval(ctx,script,[]string{rwl.key},rwl.owner,rwl.expiration.Milliseconds(),).Result()if err != nil {return fmt.Errorf("failed to acquire write lock: %v", err)}if result.(int64) == 0 {return fmt.Errorf("lock exists")}return nil
}// Unlock 释放写锁
func (rwl *RWLock) Unlock(ctx context.Context) error {script := `-- 检查写锁是否存在且属于当前客户端local value = redis.call('get', KEYS[1] .. ':write')if not value thenreturn 0endif value ~= ARGV[1] thenreturn -1end-- 删除写锁redis.call('del', KEYS[1] .. ':write')return 1`result, err := rwl.client.Eval(ctx,script,[]string{rwl.key},rwl.owner,).Result()if err != nil {return fmt.Errorf("failed to release write lock: %v", err)}switch result.(int64) {case -1:return fmt.Errorf("write lock not held by this client")case 0:return fmt.Errorf("write lock not exists")default:return nil}
}
七、容错处理
1. 容错机制设计
2. 故障处理实现
package distlockimport ("context""errors""sync""time""github.com/go-redis/redis/v8"
)type FaultTolerantLock struct {master *redis.Clientslaves []*redis.ClientlocalLock sync.Mutexkey stringowner stringexpiration time.Duration
}func NewFaultTolerantLock(master *redis.Client,slaves []*redis.Client,key string,owner string,expiration time.Duration,
) *FaultTolerantLock {return &FaultTolerantLock{master: master,slaves: slaves,key: key,owner: owner,expiration: expiration,}
}// Lock 获取容错锁
func (ftl *FaultTolerantLock) Lock(ctx context.Context) error {// 1. 尝试在主节点获取锁if err := ftl.tryLockOnMaster(ctx); err == nil {return nil}// 2. 主节点失败,尝试在从节点获取锁if err := ftl.tryLockOnSlaves(ctx); err == nil {return nil}// 3. 所有Redis节点都失败,降级使用本地锁ftl.localLock.Lock()// 4. 启动后台协程尝试恢复到Redis锁go ftl.tryRecoverToRedis(context.Background())return nil
}func (ftl *FaultTolerantLock) tryLockOnMaster(ctx context.Context) error {script := `if redis.call('exists', KEYS[1]) == 0 thenredis.call('set', KEYS[1], ARGV[1], 'PX', ARGV[2])return 1endreturn 0`result, err := ftl.master.Eval(ctx,script,[]string{ftl.key},ftl.owner,ftl.expiration.Milliseconds(),).Result()if err != nil {return err}if result.(int64) == 0 {return errors.New("lock exists")}return nil
}func (ftl *FaultTolerantLock) tryLockOnSlaves(ctx context.Context) error {// 需要在多数从节点上获取锁才算成功successCount := 0majorityCount := (len(ftl.slaves) / 2) + 1for _, slave := range ftl.slaves {if err := ftl.tryLockOnNode(ctx, slave); err == nil {successCount++if successCount >= majorityCount {return nil}}}return errors.New("failed to acquire lock on majority of slaves")
}func (ftl *FaultTolerantLock) tryLockOnNode(ctx context.Context, node *redis.Client) error {script := `if redis.call('exists', KEYS[1]) == 0 thenredis.call('set', KEYS[1], ARGV[1], 'PX', ARGV[2])return 1endreturn 0`result, err := node.Eval(ctx,script,[]string{ftl.key},ftl.owner,ftl.expiration.Milliseconds(),).Result()if err != nil {return err}if result.(int64) == 0 {return errors.New("lock exists")}return nil
}func (ftl *FaultTolerantLock) tryRecoverToRedis(ctx context.Context) {ticker := time.NewTicker(time.Second)defer ticker.Stop()for {select {case <-ctx.Done():returncase <-ticker.C:// 尝试恢复到Redis主节点if err := ftl.tryLockOnMaster(ctx); err == nil {ftl.localLock.Unlock()return}// 尝试恢复到Redis从节点if err := ftl.tryLockOnSlaves(ctx); err == nil {ftl.localLock.Unlock()return}}}
}// Unlock 释放锁
func (ftl *FaultTolerantLock) Unlock(ctx context.Context) error {// 尝试释放Redis锁if err := ftl.unlockRedis(ctx); err == nil {return nil}// Redis释放失败,释放本地锁ftl.localLock.Unlock()return nil
}func (ftl *FaultTolerantLock) unlockRedis(ctx context.Context) error {script := `if redis.call('get', KEYS[1]) == ARGV[1] thenreturn redis.call('del', KEYS[1])endreturn 0`// 先尝试在主节点释放result, err := ftl.master.Eval(ctx,script,[]string{ftl.key},ftl.owner,).Result()if err == nil && result.(int64) == 1 {return nil}// 主节点释放失败,尝试在从节点释放for _, slave := range ftl.slaves {result, err = slave.Eval(ctx,script,[]string{ftl.key},ftl.owner,).Result()if err == nil && result.(int64) == 1 {return nil}}return errors.New("failed to release lock on all nodes")
}
八、性能测试与监控
1. 性能指标
指标 | 说明 | 目标值 |
---|---|---|
获取锁延迟 | 从发起请求到获取锁的时间 | <50ms |
释放锁延迟 | 从发起释放到完成的时间 | <30ms |
锁冲突率 | 获取锁失败的比例 | <10% |
QPS | 每秒处理的锁请求数 | >1000 |
2. 监控指标
-
系统监控
- CPU使用率
- 内存使用
- 网络延迟
- 磁盘IO
-
业务监控
- 锁获取成功率
- 锁超时次数
- 死锁检测次数
- 降级次数
九、最佳实践总结
-
锁设计
- 使用唯一标识确保锁的归属
- 合理设置超时时间
- 实现可重入机制
- 使用Lua脚本保证原子性
-
死锁预防
- 实现超时自动释放
- 避免循环等待
- 实现锁的重入
- 定期检测死锁
-
性能优化
- 使用读写锁分离
- 控制锁粒度
- 批量处理
- 使用本地缓存
-
容错处理
- 实现主从切换
- 支持优雅降级
- 异步恢复机制
- 多副本数据同步
怎么样今天的内容还满意吗?再次感谢观众老爷的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!