Atomics.wait()正在阻塞,因此无法在主线程上调用它(如果尝试执行此操作,将抛出错误TypeError)。
从8.7版开始,V8引擎支持
Atomics.wait()称为Atomics.waitAsync()的非阻塞选项。可以在主线程上使用此新方法。
今天,我们将向您展示如何使用这些低级API创建一个互斥锁,该互斥锁可以同步(在工作线程中)和异步(在工作线程或主线程中)运行。
Atomics.wait()和Atomics.waitAsync()
方法
Atomics.wait()并Atomics.waitAsync()采用以下参数:
buffer:类型为Int32Array或的数组,BigInt64Array基于SharedArrayBuffer。index:数组中元素的实际索引。expectedValue:我们希望在内存中用buffer和表示的位置表示的值index。timeout:超时(以毫秒为单位)(可选,默认为Infinity)。
Atomics.wait()返回一个字符串。如果期望值未出现在指定的存储位置中,则它会Atomics.wait()立即退出,并返回string not-equal。否则,线程被阻塞。要释放该锁,必须发生以下事件之一。第一个是从方法的另一个线程进行的调用,Atomics.notify()其中指示该方法在内存中的位置Atomics.wait()。第二个是超时到期。在第一种情况下,它将Atomics.wait()返回一个字符串ok,在第二种情况下将返回一个字符串value timed-out。
该方法
Atomics.notify()采用以下参数:
typedArray:类型为Int32Array或的数组,BigInt64Array基于SharedArrayBuffer。index:数组中元素的实际索引。count:等待通知的代理数量(可选参数,默认设置为Infinity)。
该方法
Atomics.notify()在指定的地址通知指定数量的代理等待通知,typedArray并index以FIFO顺序绕过它们。如果已经打了几个电话,Atomics.wait()或者Atomics.waitAsync()正在看内存中的同一位置,那么它们都将排在同一队列中。
与方法不同
Atomics.wait(),方法Atomics.waitAsync()在调用它的地方立即返回一个值。它可以是下列值之一:
{ async: false, value: 'not-equal' }-如果指定的内存位置不包含期望值。{ async: false, value: 'timed-out' }-仅当超时设置为0时。{ async: true, value: promise }-在其他情况下。
经过一段时间后,可以通过字符串值成功地解决一个诺言
ok(如果调用了方法Atomics.notify(),则将有关所传递的内存位置信息传递到Atomics.waitAsync())。可以通过值来解决timed-out。这个承诺永远不会被拒绝。
以下示例演示了使用的基础知识
Atomics.waitAsync():
const sab = new SharedArrayBuffer(16);
const i32a = new Int32Array(sab);
const result = Atomics.waitAsync(i32a, 0, 0, 1000);
// | | ^ - ()
// | ^
// ^
if (result.value === 'not-equal') {
// SharedArrayBuffer .
} else {
result.value instanceof Promise; // true
result.value.then(
(value) => {
if (value == 'ok') { /* */ }
else { /* - */ }
});
}
// :
Atomics.notify(i32a, 0);
现在让我们谈谈如何创建可在同步和异步模式下使用的互斥量。应当注意,互斥量的同步版本的实现已在前面进行了讨论。例如-在这种材料。
在此示例中,我们
timeout在调用Atomics.wait()和时将不使用参数Atomics.waitAsync()。此参数可用于实现与超时相关的条件。代表互斥量的
类
AsyncLock使用缓冲区SharedArrayBuffer并实现以下方法:
lock():阻塞线程,直到我们有机会捕获互斥锁(仅适用于工作线程)。unlock():释放互斥锁(与之相反lock())。executeLocked(callback):尝试获取锁而不阻塞线程。可以在主线程上使用此方法。它计划在我们获取锁时执行回调。
让我们看一下如何实现这些方法。类声明包括常量和带有缓冲区的构造函数
SharedArrayBuffer。
class AsyncLock {
static INDEX = 0;
static UNLOCKED = 0;
static LOCKED = 1;
constructor(sab) {
this.sab = sab;
this.i32a = new Int32Array(sab);
}
lock() {
/* … */
}
unlock() {
/* … */
}
executeLocked(f) {
/* … */
}
}
此处的元素
i32a[0]包含值LOCKED或UNLOCKED。他,此外,代表了内存利益的地方Atomics.wait()和Atomics.waitAsync()。该类AsyncLock提供以下基本功能:
i32a[0] == LOCKED并且线程处于等待状态(被调用Atomics.wait()或之后Atomics.waitAsync()),正在观看i32a[0],最终将被通知。- 通知线程后,它将尝试获取锁。如果成功,则在释放锁时将调用
Atomics.notify()。
同步锁捕获和释放
考虑
lock()只能从工作线程调用的方法的代码。
lock() {
while (true) {
const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
/* >>> */ AsyncLock.UNLOCKED,
/* >>> */ AsyncLock.LOCKED);
if (oldValue == AsyncLock.UNLOCKED) {
return;
}
Atomics.wait(this.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED); // <<< ,
}
}
从线程调用方法时
lock(),它首先尝试获取锁,然后使用该锁将锁Atomics.compareExchange()的状态从更改UNLOCKED为LOCKED。该方法Atomics.compareExchange()尝试执行更改锁定状态的原子操作,它返回位于指定存储区中的原始值。如果原始值为UNLOCKED,则我们知道状态更改成功,并且线程已获取锁定。您无需执行其他任何操作。
如果
Atomics.compareExchange()无法更改锁的状态,则意味着另一个线程正在持有该锁。结果,调用该方法的线程lock()尝试使用该方法Atomics.wait()为了等到另一个线程释放锁。如果期望值仍存储在感兴趣的存储区中(在我们的示例中- AsyncLock.LOCKED),则调用将Atomics.wait()阻塞线程。Atomics.wait()仅当另一个线程调用时,才会从返回Atomics.notify()。
该方法
unlock()通过将锁设置为状态来释放锁,UNLOCKED然后调用Atomics.notify()它以通知正在等待释放锁的代理。假定锁定状态更改操作总是成功。这是因为执行此操作的线程持有锁。因此,此时无需再调用该方法unlock()。
unlock() {
const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
/* >>> */ AsyncLock.LOCKED,
/* >>> */ AsyncLock.UNLOCKED);
if (oldValue != AsyncLock.LOCKED) {
throw new Error('Tried to unlock while not holding the mutex');
}
Atomics.notify(this.i32a, AsyncLock.INDEX, 1);
}
在典型情况下,所有事情都是这样发生的:锁是自由的,线程T1捕获它,并使用更改其状态
Atomics.compareExchange()。线程T2尝试通过调用来获取锁Atomics.compareExchange(),但无法更改其状态。然后T2调用Atomics.wait(),此调用将阻塞线程。一段时间后,线程T1释放锁定并调用Atomics.notify()。这将导致Atomics.wait()对T2的调用返回,ok而线程T2退出锁。然后,T2尝试再次获取锁。这次他成功了。
这里有两种特殊情况。他们的分析旨在证明为什么
Atomics.wait(),并Atomics.waitAsync()检查数组元素指定索引处的特定值。这些是以下情况:
- T1 , T2 . T2 ,
Atomics.compareExchange(), . T1 , T2Atomics.wait(). T2Atomics.wait(),not-equal. T2 . - T1 , T2
Atomics.wait(). T1 , T2 (Atomics.wait())Atomics.compareExchange(). , T3, . .Atomics.compareExchange()T2 . T2Atomics.wait(), T3 .
最后一个特殊情况说明了我们的互斥体不能正常工作的事实。线程T2可能正在等待释放锁,但是T3在释放锁之后立即设法获取了它。更适合实际使用的锁实现可以使用存在的几种锁状态,以区分仅“获取”锁和“获取期间存在冲突”的情况。
异步锁捕获
executeLocked()与方法不同lock(),可以从主线程调用
非阻塞方法。它接收作为唯一参数的回调,并在成功获得锁定之后安排回调。
executeLocked(f) {
const self = this;
async function tryGetLock() {
while (true) {
const oldValue = Atomics.compareExchange(self.i32a, AsyncLock.INDEX,
/* >>> */ AsyncLock.UNLOCKED,
/* >>> */ AsyncLock.LOCKED);
if (oldValue == AsyncLock.UNLOCKED) {
f();
self.unlock();
return;
}
const result = Atomics.waitAsync(self.i32a, AsyncLock.INDEX,
AsyncLock.LOCKED);
// ^ ,
await result.value;
}
}
tryGetLock();
}
内部函数
tryGetLock()首先尝试使用获取锁Atomics.compareExchange()。如果调用此方法导致成功更改锁状态,则该函数可以调用回调,然后释放锁并退出。
如果调用
Atomics.compareExchange()不允许获取锁,那么在锁可能会释放的时刻,我们需要再次尝试进行操作。但是我们不能阻止线程并等待释放锁。相反,我们Atomics.waitAsync()正在计划使用该方法及其返回的承诺来获取锁的新尝试。
如果我们成功执行了该方法
Atomics.waitAsync(),那么当持有锁的线程调用时,此方法返回的promise将被解析。Atomics.notify()... 之后,想要获取锁的线程将像以前一样尝试再次执行该操作。
在这里,这些特殊的情况是可能的,是典型的同步版本(锁调用之间发布
Atomics.compareExchange()和Atomics.waitAsync();锁被另一个线程捕获,解决的承诺和呼叫的时刻之间这样做Atomics.compareExchange())。因此,在适用于实际项目的类似代码中,必须考虑到这一点。
结果
在这篇文章中,我们谈到了低级别的同步原语
Atomics.wait(),Atomics.waitAsync()和Atomics.notify()。我们分析了一个基于它们创建互斥锁的示例,该互斥锁可在主线程和工作线程中使用。
Atomics.wait(),Atomics.waitAsync()和Atomics.notify()在您的项目中会有用吗?