一.前言
我们在编写代码的时候常常需要用到数据结构来存储代码,而在并发编程中,这一部分上的代码就会称为我们常常所说的临界区,如何正确的访问临界资源,成为了我们需要考虑的问题之一,而今天我们将通过实现一个协程安全的map来探索我们在并发编程设计时,如何去设计一个协程安全的数据结构
二.map的设计
1. 实现map的三种方法
在本文中我们主要会使用三种方法来实现这个map,分别是:
- 基于
channel
实现 - 基于
RWMutex
实现 - 基于
切片RWMutex
实现
在最后我们也会探究这几种方法实现的性能对比,准备好上车喽!
2. 基于Channel实现的map
首先我们来看一下结构体代码:
// ChannelMap 基于channel实现的线程安全map
type ChannelMap struct {m map[string]stringmutex chan struct{} // 基于通道实现的锁,保证了在同一时间只有一个goroutine可以访问map
}
func NewChannelMap() *ChannelMap {return &ChannelMap{m: make(map[string]string), mutex: make(chan struct{}, 1)}
}
如上我们实现了一个ChannelMap
的结构体,它里面我们利用Channel实现了一个类似于锁的机制,我们通过控制channel
的大小来控制map在同一时间可访问协程的数量,实现了类似于互斥锁的功能,下面是有关实现的完整代码:
// ChannelMap 基于channel实现的线程安全map
type ChannelMap struct {m map[string]stringmutex chan struct{} // 基于通道实现的锁,保证了在同一时间只有一个goroutine可以访问map
}func NewChannelMap() *ChannelMap {return &ChannelMap{m: make(map[string]string), mutex: make(chan struct{}, 1)}
}func (cm *ChannelMap) Lock() {cm.mutex <- struct{}{}
}func (cm *ChannelMap) Unlock() {<-cm.mutex
}func (cm *ChannelMap) Set(key, value string) {cm.Lock()defer cm.Unlock()cm.m[key] = value
}func (cm *ChannelMap) Get(key string) (string, bool) {cm.Lock()defer cm.Unlock()if cm.m == nil {return "", false}value, ok := cm.m[key]return value, ok
}func (cm *ChannelMap) Delete(key string) {cm.Lock()defer cm.Unlock()delete(cm.m, key)
}func (cm *ChannelMap) Len() int {cm.Lock()defer cm.Unlock()return len(cm.m)
}func (cm *ChannelMap) PrintMap() {cm.Lock()defer cm.Unlock()for k, v := range cm.m {fmt.Println(k, v)}
}
分析:
虽然上面的map
实现了一个安全的map,但是同一时间只有一个协程可以访问map
,这无疑是大大降低了并发性能,我们可以如何提高性能呢?
3.基于RWMutex实现的map
我们知道在读多写少的场景下我们可以控制让单一协程写,多协程读,在这里我们也可以用这样的机制来降低锁的颗粒度,提高并发效率,代码如下:
// RWMap 基于读写锁实现的线程安全map
type RWMap struct {m map[string]stringlock sync.RWMutex
}func NewRWMap() *RWMap {return &RWMap{m: make(map[string]string)}
}func (m *RWMap) Set(key, value string) {m.lock.Lock()defer m.lock.Unlock()m.m[key] = value
}func (m *RWMap) Get(key string) (string, bool) {m.lock.RLock()defer m.lock.RUnlock()val, ok := m.m[key]return val, ok
}func (m *RWMap) Delete(key string) {m.lock.Lock()defer m.lock.Unlock()delete(m.m, key)
}func (m *RWMap) Len() int {m.lock.RLock()defer m.lock.RUnlock()return len(m.m)
}func (m *RWMap) PrintMap() {m.lock.RLock()defer m.lock.RUnlock()for k, v := range m.m {fmt.Println(k, v)}
}
分析:通过使用读写锁,我们提高了并发性能,但是锁还是锁住了整体map,能不能再提高一下性能呢?我们来看最后一种。
4.基于切片RWMutex实现
在讲之前,我们先来看一下定义与构造函数:
// SplitLockMap 基于分片加锁的线程安全map
type SplitLockMap struct {slices []SplitSlice // 切片数组size int // 切片的数量hashFunc func(key string) uint // 哈希函数来确定key所在哪个切片中
}// SplitSlice map里面的单个切片
type SplitSlice struct {m map[string]stringmutex sync.RWMutex
}func NewSplitLockMap(size int) *SplitLockMap {slices := make([]SplitSlice, size)for i := 0; i < size; i++ {slices[i] = SplitSlice{m: make(map[string]string)}}return &SplitLockMap{slices: slices, size: size, hashFunc: fnvHash64}
}func fnvHash64(key string) uint {h := fnv.New64()h.Write([]byte(key))return uint(h.Sum64())
}
这里的SplitLockMap
主要由三部分组成:
- 切片列表
- 切片数量
- 加密的hash函数
我们通过将数据划分为多个分片(或称为桶)来减少锁争用,并提高并发性能。每个分片都有自己的读写锁(sync.RWMutex),允许在不同的分片上同时进行读取操作,或者在一个分片上进行写入操作而不影响其他分片上的读写操作。在每次操作前我们会用哈希函数去把这个随机拆分成对应的分配索引,让这个键值对的操作落在对应的分片上,进而实现各个切片之间的相互独立,大大提高工作性能。实现代码如下:
// SplitLockMap 基于分片加锁的线程安全map
type SplitLockMap struct {slices []SplitSlice // 切片数组size int // 切片的数量hashFunc func(key string) uint // 哈希函数来确定key所在哪个切片中
}// SplitSlice map里面的单个切片
type SplitSlice struct {m map[string]stringmutex sync.RWMutex
}func NewSplitLockMap(size int) *SplitLockMap {slices := make([]SplitSlice, size)for i := 0; i < size; i++ {slices[i] = SplitSlice{m: make(map[string]string)}}return &SplitLockMap{slices: slices, size: size, hashFunc: fnvHash64}
}func fnvHash64(key string) uint {h := fnv.New64()h.Write([]byte(key))return uint(h.Sum64())
}func (s *SplitLockMap) Set(key, value string) {index := s.hashFunc(key) % uint(s.size)s.slices[index].mutex.Lock()defer s.slices[index].mutex.Unlock()s.slices[index].m[key] = value
}func (s *SplitLockMap) Get(key string) (string, bool) {index := s.hashFunc(key) % uint(s.size)s.slices[index].mutex.RLock()defer s.slices[index].mutex.RUnlock()val, ok := s.slices[index].m[key]return val, ok
}func (s *SplitLockMap) Delete(key string) {index := s.hashFunc(key) % uint(s.size)s.slices[index].mutex.Lock()defer s.slices[index].mutex.Unlock()delete(s.slices[index].m, key)
}func (s *SplitLockMap) Len() int {var length intfor _, slice := range s.slices {slice.mutex.RLock()length += len(slice.m)slice.mutex.RUnlock()}return length
}func (s *SplitLockMap) PrintMap() {for _, slice := range s.slices {slice.mutex.RLock()for k, v := range slice.m {fmt.Println(k, v)}slice.mutex.RUnlock()}
}
结语
在上面的三种map中,性能应该是:
c h a n n e l < R W M u t e x < S l i c e R W M u t e x channel<RWMutex<SliceRWMutex channel<RWMutex<SliceRWMutex
主要体现在锁的颗粒度上,虽然上面使用的是go,但是设计思想是不变的,最后如果有收获的话,还请大家三连一下,谢谢!!!