Atomics.wait()
正在阻塞,因此无法在主线程上调用它(如果尝试执行此操作,将抛出错误TypeError
)。
从8.7版开始,V8引擎支持
Atomics.wait()
称为Atomics.waitAsync()的非阻塞选项。可以在主线程上使用此新方法。
今天,我们将向您展示如何使用这些低级API创建一个互斥锁,该互斥锁可以同步(在工作线程中)和异步(在工作线程或主线程中)运行。
Atomics.wait()和Atomics.waitAsync()
方法
Atomics.wait()
并Atomics.waitAsync()
采用以下参数:
buffer
:类型为Int32Array
或的数组,BigInt64Array
基于SharedArrayBuffer
。index
:数组中元素的实际索引。expectedValue
:我们希望在内存中用buffer
和表示的位置表示的值index
。timeout
:超时(以毫秒为单位)(可选,默认为Infinity
)。
Atomics.wait()
返回一个字符串。如果期望值未出现在指定的存储位置中,则它会Atomics.wait()
立即退出,并返回string not-equal
。否则,线程被阻塞。要释放该锁,必须发生以下事件之一。第一个是从方法的另一个线程进行的调用,Atomics.notify()
其中指示该方法在内存中的位置Atomics.wait()
。第二个是超时到期。在第一种情况下,它将Atomics.wait()
返回一个字符串ok
,在第二种情况下将返回一个字符串value timed-out
。
该方法
Atomics.notify()
采用以下参数:
typedArray
:类型为Int32Array
或的数组,BigInt64Array
基于SharedArrayBuffer
。index
:数组中元素的实际索引。count
:等待通知的代理数量(可选参数,默认设置为Infinity
)。
该方法
Atomics.notify()
在指定的地址通知指定数量的代理等待通知,typedArray
并index
以FIFO顺序绕过它们。如果已经打了几个电话,Atomics.wait()
或者Atomics.waitAsync()
正在看内存中的同一位置,那么它们都将排在同一队列中。
与方法不同
Atomics.wait()
,方法Atomics.waitAsync()
在调用它的地方立即返回一个值。它可以是下列值之一:
{ async: false, value: 'not-equal' }
-如果指定的内存位置不包含期望值。{ async: false, value: 'timed-out' }
-仅当超时设置为0时。{ async: true, value: promise }
-在其他情况下。
经过一段时间后,可以通过字符串值成功地解决一个诺言
ok
(如果调用了方法Atomics.notify()
,则将有关所传递的内存位置信息传递到Atomics.waitAsync()
)。可以通过值来解决timed-out
。这个承诺永远不会被拒绝。
以下示例演示了使用的基础知识
Atomics.waitAsync()
:
const sab = new SharedArrayBuffer(16);
const i32a = new Int32Array(sab);
const result = Atomics.waitAsync(i32a, 0, 0, 1000);
// | | ^ - ()
// | ^
// ^
if (result.value === 'not-equal') {
// SharedArrayBuffer .
} else {
result.value instanceof Promise; // true
result.value.then(
(value) => {
if (value == 'ok') { /* */ }
else { /* - */ }
});
}
// :
Atomics.notify(i32a, 0);
现在让我们谈谈如何创建可在同步和异步模式下使用的互斥量。应当注意,互斥量的同步版本的实现已在前面进行了讨论。例如-在这种材料。
在此示例中,我们
timeout
在调用Atomics.wait()
和时将不使用参数Atomics.waitAsync()
。此参数可用于实现与超时相关的条件。代表互斥量的
类
AsyncLock
使用缓冲区SharedArrayBuffer
并实现以下方法:
lock()
:阻塞线程,直到我们有机会捕获互斥锁(仅适用于工作线程)。unlock()
:释放互斥锁(与之相反lock()
)。executeLocked(callback)
:尝试获取锁而不阻塞线程。可以在主线程上使用此方法。它计划在我们获取锁时执行回调。
让我们看一下如何实现这些方法。类声明包括常量和带有缓冲区的构造函数
SharedArrayBuffer
。
class AsyncLock {
static INDEX = 0;
static UNLOCKED = 0;
static LOCKED = 1;
constructor(sab) {
this.sab = sab;
this.i32a = new Int32Array(sab);
}
lock() {
/* … */
}
unlock() {
/* … */
}
executeLocked(f) {
/* … */
}
}
此处的元素
i32a[0]
包含值LOCKED
或UNLOCKED
。他,此外,代表了内存利益的地方Atomics.wait()
和Atomics.waitAsync()
。该类AsyncLock
提供以下基本功能:
i32a[0] == LOCKED
并且线程处于等待状态(被调用Atomics.wait()
或之后Atomics.waitAsync()
),正在观看i32a[0]
,最终将被通知。- 通知线程后,它将尝试获取锁。如果成功,则在释放锁时将调用
Atomics.notify()
。
同步锁捕获和释放
考虑
lock()
只能从工作线程调用的方法的代码。
lock() {
while (true) {
const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
/* >>> */ AsyncLock.UNLOCKED,
/* >>> */ AsyncLock.LOCKED);
if (oldValue == AsyncLock.UNLOCKED) {
return;
}
Atomics.wait(this.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED); // <<< ,
}
}
从线程调用方法时
lock()
,它首先尝试获取锁,然后使用该锁将锁Atomics.compareExchange()
的状态从更改UNLOCKED
为LOCKED
。该方法Atomics.compareExchange()
尝试执行更改锁定状态的原子操作,它返回位于指定存储区中的原始值。如果原始值为UNLOCKED
,则我们知道状态更改成功,并且线程已获取锁定。您无需执行其他任何操作。
如果
Atomics.compareExchange()
无法更改锁的状态,则意味着另一个线程正在持有该锁。结果,调用该方法的线程lock()
尝试使用该方法Atomics.wait()
为了等到另一个线程释放锁。如果期望值仍存储在感兴趣的存储区中(在我们的示例中- AsyncLock.LOCKED
),则调用将Atomics.wait()
阻塞线程。Atomics.wait()
仅当另一个线程调用时,才会从返回Atomics.notify()
。
该方法
unlock()
通过将锁设置为状态来释放锁,UNLOCKED
然后调用Atomics.notify()
它以通知正在等待释放锁的代理。假定锁定状态更改操作总是成功。这是因为执行此操作的线程持有锁。因此,此时无需再调用该方法unlock()
。
unlock() {
const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
/* >>> */ AsyncLock.LOCKED,
/* >>> */ AsyncLock.UNLOCKED);
if (oldValue != AsyncLock.LOCKED) {
throw new Error('Tried to unlock while not holding the mutex');
}
Atomics.notify(this.i32a, AsyncLock.INDEX, 1);
}
在典型情况下,所有事情都是这样发生的:锁是自由的,线程T1捕获它,并使用更改其状态
Atomics.compareExchange()
。线程T2尝试通过调用来获取锁Atomics.compareExchange()
,但无法更改其状态。然后T2调用Atomics.wait()
,此调用将阻塞线程。一段时间后,线程T1释放锁定并调用Atomics.notify()
。这将导致Atomics.wait()
对T2的调用返回,ok
而线程T2退出锁。然后,T2尝试再次获取锁。这次他成功了。
这里有两种特殊情况。他们的分析旨在证明为什么
Atomics.wait()
,并Atomics.waitAsync()
检查数组元素指定索引处的特定值。这些是以下情况:
- T1 , T2 . T2 ,
Atomics.compareExchange()
, . T1 , T2Atomics.wait()
. T2Atomics.wait()
,not-equal
. T2 . - T1 , T2
Atomics.wait()
. T1 , T2 (Atomics.wait()
)Atomics.compareExchange()
. , T3, . .Atomics.compareExchange()
T2 . T2Atomics.wait()
, T3 .
最后一个特殊情况说明了我们的互斥体不能正常工作的事实。线程T2可能正在等待释放锁,但是T3在释放锁之后立即设法获取了它。更适合实际使用的锁实现可以使用存在的几种锁状态,以区分仅“获取”锁和“获取期间存在冲突”的情况。
异步锁捕获
executeLocked()
与方法不同lock()
,可以从主线程调用
非阻塞方法。它接收作为唯一参数的回调,并在成功获得锁定之后安排回调。
executeLocked(f) {
const self = this;
async function tryGetLock() {
while (true) {
const oldValue = Atomics.compareExchange(self.i32a, AsyncLock.INDEX,
/* >>> */ AsyncLock.UNLOCKED,
/* >>> */ AsyncLock.LOCKED);
if (oldValue == AsyncLock.UNLOCKED) {
f();
self.unlock();
return;
}
const result = Atomics.waitAsync(self.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED);
// ^ ,
await result.value;
}
}
tryGetLock();
}
内部函数
tryGetLock()
首先尝试使用获取锁Atomics.compareExchange()
。如果调用此方法导致成功更改锁状态,则该函数可以调用回调,然后释放锁并退出。
如果调用
Atomics.compareExchange()
不允许获取锁,那么在锁可能会释放的时刻,我们需要再次尝试进行操作。但是我们不能阻止线程并等待释放锁。相反,我们Atomics.waitAsync()
正在计划使用该方法及其返回的承诺来获取锁的新尝试。
如果我们成功执行了该方法
Atomics.waitAsync()
,那么当持有锁的线程调用时,此方法返回的promise将被解析。Atomics.notify()
... 之后,想要获取锁的线程将像以前一样尝试再次执行该操作。
在这里,这些特殊的情况是可能的,是典型的同步版本(锁调用之间发布
Atomics.compareExchange()
和Atomics.waitAsync()
;锁被另一个线程捕获,解决的承诺和呼叫的时刻之间这样做Atomics.compareExchange()
)。因此,在适用于实际项目的类似代码中,必须考虑到这一点。
结果
在这篇文章中,我们谈到了低级别的同步原语
Atomics.wait()
,Atomics.waitAsync()
和Atomics.notify()
。我们分析了一个基于它们创建互斥锁的示例,该互斥锁可在主线程和工作线程中使用。
Atomics.wait(),Atomics.waitAsync()和Atomics.notify()在您的项目中会有用吗?