你好!
在本文中,我想告诉您如何制作自己的RWMutex,但是具有使超时或触发上下文跳过锁的功能。也就是说,实现TryLock(context.Context)和RTryLock(context.Context),但要使用自己的Mutex。
图片显示了如何将水倒入非常狭窄的脖子。
首先,应该澄清的是,对于99%的任务,根本不需要这种方法。当阻塞的资源可能长时间不释放时,将需要它们。我想指出的是,如果阻塞的资源长时间处于繁忙状态,则应首先尝试以最小化阻塞时间的方式优化逻辑。
但是,尽管如此,如果我们必须长期保留一个资源流,那么在我看来,TryLock将很难做到。
, , atomic, . , . , , . , , .
Mutex:
// RWTMutex - Read Write and Try Mutex
type RWTMutex struct {
state int32
mx sync.Mutex
ch chan struct{}
}
state — mutex, atomic.AddInt32, atomic.LoadInt32 atomic.CompareAndSwapInt32
ch — , .
mx — , , .
:
// TryLock - try locks mutex with context
func (m *RWTMutex) TryLock(ctx context.Context) bool {
if atomic.CompareAndSwapInt32(&m.state, 0, -1) {
return true
}
// Slow way
return m.lockST(ctx)
}
// RTryLock - try read locks mutex with context
func (m *RWTMutex) RTryLock(ctx context.Context) bool {
k := atomic.LoadInt32(&m.state)
if k >= 0 && atomic.CompareAndSwapInt32(&m.state, k, k+1) {
return true
}
// Slow way
return m.rlockST(ctx)
}
如您所见,如果互斥锁未锁定,则可以将其简单地阻止,但如果没有锁定,那么我们将继续进行更复杂的方案。
一开始,我们获得了通道,并进入了一个无限循环,如果最终被锁定,则成功退出,如果没有成功,则我们开始等待2个事件之一,或者通道被解除阻塞,或者ctx.Done()流将被解除阻塞:
func (m *RWTMutex) chGet() chan struct{} {
m.mx.Lock()
if m.ch == nil {
m.ch = make(chan struct{}, 1)
}
r := m.ch
m.mx.Unlock()
return r
}
func (m *RWTMutex) lockST(ctx context.Context) bool {
ch := m.chGet()
for {
if atomic.CompareAndSwapInt32(&m.state, 0, -1) {
return true
}
if ctx == nil {
return false
}
select {
case <-ch:
ch = m.chGet()
case <-ctx.Done():
return false
}
}
}
func (m *RWTMutex) rlockST(ctx context.Context) bool {
ch := m.chGet()
var k int32
for {
k = atomic.LoadInt32(&m.state)
if k >= 0 && atomic.CompareAndSwapInt32(&m.state, k, k+1) {
return true
}
if ctx == nil {
return false
}
select {
case <-ch:
ch = m.chGet()
case <-ctx.Done():
return false
}
}
}
让我们解锁互斥锁。
我们需要更改状态,并在必要时解除通道阻塞。
如我上面所写,如果通道关闭,那么case <-ch将进一步跳过执行流程。
func (m *RWTMutex) chClose() {
if m.ch == nil {
return
}
var o chan struct{}
m.mx.Lock()
if m.ch != nil {
o = m.ch
m.ch = nil
}
m.mx.Unlock()
if o != nil {
close(o)
}
}
// Unlock - unlocks mutex
func (m *RWTMutex) Unlock() {
if atomic.CompareAndSwapInt32(&m.state, -1, 0) {
m.chClose()
return
}
panic("RWTMutex: Unlock fail")
}
// RUnlock - unlocks mutex
func (m *RWTMutex) RUnlock() {
i := atomic.AddInt32(&m.state, -1)
if i > 0 {
return
} else if i == 0 {
m.chClose()
return
}
panic("RWTMutex: RUnlock fail")
}
互斥体本身已经准备好,您需要为其编写一些测试和标准方法,例如Lock()和RLock()
我的汽车基准测试显示了这些速度
BenchmarkRWTMutexTryLockUnlock-8 92154297 12.8 ns/op 0 B/op 0 allocs/op
BenchmarkRWTMutexTryRLockRUnlock-8 64337136 18.4 ns/op 0 B/op 0 allocs/op
RWMutex
BenchmarkRWMutexLockUnlock-8 44187962 25.8 ns/op 0 B/op 0 allocs/op
BenchmarkRWMutexRLockRUnlock-8 94655520 12.6 ns/op 0 B/op 0 allocs/op
Mutex
BenchmarkMutexLockUnlock-8 94345815 12.7 ns/op 0 B/op 0 allocs/op
也就是说,工作速度与普通的RWMutex和Mutex相当。