使用Atomics.wait(),Atomics.notify()和Atomics.waitAsync()

静态方法Atomics.wait()Atomics.notify()是低级同步原语,可用于实现互斥体和其他类似机制。但是,由于该方法Atomics.wait()正在阻塞,因此无法在主线程上调用它(如果尝试执行此操作,将抛出错误TypeError)。



从8.7版开始,V8引擎支持Atomics.wait()称为Atomics.waitAsync()的非阻塞选项可以在主线程上使用此新方法。 今天,我们将向您展示如何使用这些低级API创建一个互斥锁,该互斥锁可以同步(在工作线程中)和异步(在工作线程或主线程中)运行。











Atomics.wait()和Atomics.waitAsync()



方法Atomics.wait()Atomics.waitAsync()采用以下参数:



  • buffer:类型为Int32Array的数组,BigInt64Array基于SharedArrayBuffer
  • index:数组中元素的实际索引。
  • expectedValue:我们希望在内存中用buffer表示的位置表示的值index
  • timeout:超时(以毫秒为单位)(可选,默认为Infinity)。


Atomics.wait()返回一个字符串。如果期望值未出现在指定的存储位置中,则它会Atomics.wait()立即退出,并返回string not-equal否则,线程被阻塞。要释放该锁,必须发生以下事件之一。第一个是从方法的另一个线程进行的调用,Atomics.notify()其中指示该方法在内存中的位置Atomics.wait()第二个是超时到期。在第一种情况下,它将Atomics.wait()返回一个字符串ok,在第二种情况下将返回一个字符串value timed-out



该方法Atomics.notify()采用以下参数:



  • typedArray:类型为Int32Array的数组,BigInt64Array基于SharedArrayBuffer
  • index:数组中元素的实际索引。
  • count:等待通知的代理数量(可选参数,默认设置为Infinity)。


该方法Atomics.notify()在指定的地址通知指定数量的代理等待通知,typedArrayindex以FIFO顺序绕过它们。如果已经打了几个电话,Atomics.wait()或者Atomics.waitAsync()正在看内存中的同一位置,那么它们都将排在同一队列中。



与方法不同Atomics.wait(),方法Atomics.waitAsync()在调用它的地方立即返回一个值。它可以是下列值之一:



  • { async: false, value: 'not-equal' } -如果指定的内存位置不包含期望值。
  • { async: false, value: 'timed-out' } -仅当超时设置为0时。
  • { async: true, value: promise } -在其他情况下。


经过一段时间后,可以通过字符串值成功地解决一个诺言ok(如果调用了方法Atomics.notify()则将有关所传递的内存位置信息传递到Atomics.waitAsync())。可以通过值来解决timed-out。这个承诺永远不会被拒绝。



以下示例演示了使用的基础知识Atomics.waitAsync()



const sab = new SharedArrayBuffer(16);
const i32a = new Int32Array(sab);
const result = Atomics.waitAsync(i32a, 0, 0, 1000);
//                                     |  |  ^ - ()
//                                     |  ^  
//                                     ^ 

if (result.value === 'not-equal') {
  //   SharedArrayBuffer   .
} else {
  result.value instanceof Promise; // true
  result.value.then(
    (value) => {
      if (value == 'ok') { /*   */ }
      else { /*  - */ }
    });
}

//      :
Atomics.notify(i32a, 0);


现在让我们谈谈如何创建可在同步和异步模式下使用的互斥量。应当注意,互斥量的同步版本的实现已在前面进行了讨论。例如-在这种材料。



在此示例中,我们timeout在调用Atomics.wait()将不使用参数Atomics.waitAsync()此参数可用于实现与超时相关的条件。代表互斥量的



AsyncLock使用缓冲区SharedArrayBuffer并实现以下方法:



  • lock():阻塞线程,直到我们有机会捕获互斥锁(仅适用于工作线程)。
  • unlock():释放互斥锁(与之相反lock())。
  • executeLocked(callback):尝试获取锁而不阻塞线程。可以在主线程上使用此方法。它计划在我们获取锁时执行回调。


让我们看一下如何实现这些方法。类声明包括常量和带有缓冲区的构造函数SharedArrayBuffer



class AsyncLock {
  static INDEX = 0;
  static UNLOCKED = 0;
  static LOCKED = 1;

  constructor(sab) {
    this.sab = sab;
    this.i32a = new Int32Array(sab);
  }

  lock() {
    /* … */
  }

  unlock() {
    /* … */
  }

  executeLocked(f) {
    /* … */
  }
}


此处的元素i32a[0]包含值LOCKEDUNLOCKED他,此外,代表了内存利益的地方Atomics.wait()Atomics.waitAsync()该类AsyncLock提供以下基本功能:



  1. i32a[0] == LOCKED并且线程处于等待状态(被调用Atomics.wait()或之后Atomics.waitAsync()),正在观看i32a[0],最终将被通知。
  2. 通知线程后,它将尝试获取锁。如果成功,则在释放锁时将调用Atomics.notify()


同步锁捕获和释放



考虑lock()只能从工作线程调用的方法的代码



lock() {
  while (true) {
    const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
                        /*   >>> */  AsyncLock.UNLOCKED,
                        /*   >>> */  AsyncLock.LOCKED);
    if (oldValue == AsyncLock.UNLOCKED) {
      return;
    }
    Atomics.wait(this.i32a, AsyncLock.INDEX,
                 AsyncLock.LOCKED); // <<< ,    
  }
}


从线程调用方法时lock(),它首先尝试获取锁,然后使用该锁将锁Atomics.compareExchange()的状态从更改UNLOCKEDLOCKED。该方法Atomics.compareExchange()尝试执行更改锁定状态的原子操作,它返回位于指定存储区中的原始值。如果原始值为UNLOCKED,则我们知道状态更改成功,并且线程已获取锁定。您无需执行其他任何操作。



如果Atomics.compareExchange()无法更改锁的状态,则意味着另一个线程正在持有该锁。结果,调用该方法的线程lock()尝试使用该方法Atomics.wait()为了等到另一个线程释放锁。如果期望值仍存储在感兴趣的存储区中(在我们的示例中- AsyncLock.LOCKED),则调用将Atomics.wait()阻塞线程。Atomics.wait()仅当另一个线程调用时才会从返回Atomics.notify()



该方法unlock()通过将锁设置为状态来释放锁,UNLOCKED然后调用Atomics.notify()它以通知正在等待释放锁的代理。假定锁定状态更改操作总是成功。这是因为执行此操作的线程持有锁。因此,此时无需再调用该方法unlock()



unlock() {
  const oldValue = Atomics.compareExchange(this.i32a, AsyncLock.INDEX,
                      /*   >>> */  AsyncLock.LOCKED,
                      /*   >>> */  AsyncLock.UNLOCKED);
  if (oldValue != AsyncLock.LOCKED) {
    throw new Error('Tried to unlock while not holding the mutex');
  }
  Atomics.notify(this.i32a, AsyncLock.INDEX, 1);
}


在典型情况下,所有事情都是这样发生的:锁是自由的,线程T1捕获它,并使用更改其状态Atomics.compareExchange()。线程T2尝试通过调用来获取锁Atomics.compareExchange(),但无法更改其状态。然后T2调用Atomics.wait(),此调用将阻塞线程。一段时间后,线程T1释放锁定并调用Atomics.notify()。这将导致Atomics.wait()对T2的调用返回,ok而线程T2退出锁。然后,T2尝试再次获取锁。这次他成功了。



这里有两种特殊情况。他们的分析旨在证明为什么Atomics.wait(),并Atomics.waitAsync()检查数组元素指定索引处的特定值。这些是以下情况:



  • T1 , T2 . T2 , Atomics.compareExchange(), . T1 , T2 Atomics.wait(). T2 Atomics.wait(), not-equal. T2 .
  • T1 , T2 Atomics.wait() . T1 , T2 ( Atomics.wait()) Atomics.compareExchange() . , T3, . . Atomics.compareExchange() T2 . T2 Atomics.wait() , T3 .


最后一个特殊情况说明了我们的互斥体不能正常工作的事实。线程T2可能正在等待释放锁,但是T3在释放锁之后立即设法获取了它。更适合实际使用的锁实现可以使用存在的几种锁状态,以区分仅“获取”锁和“获取期间存在冲突”的情况。



异步锁捕获



executeLocked()与方法不同lock(),可以从主线程调用 非阻塞方法它接收作为唯一参数的回调,并在成功获得锁定之后安排回调。



executeLocked(f) {
  const self = this;

  async function tryGetLock() {
    while (true) {
      const oldValue = Atomics.compareExchange(self.i32a, AsyncLock.INDEX,
                          /*   >>> */  AsyncLock.UNLOCKED,
                          /*   >>> */  AsyncLock.LOCKED);
      if (oldValue == AsyncLock.UNLOCKED) {
        f();
        self.unlock();
        return;
      }
      const result = Atomics.waitAsync(self.i32a, AsyncLock.INDEX,
                                       AsyncLock.LOCKED);
                                   //  ^ ,    
      await result.value;
    }
  }

  tryGetLock();
}


内部函数tryGetLock()首先尝试使用获取锁Atomics.compareExchange()。如果调用此方法导致成功更改锁状态,则该函数可以调用回调,然后释放锁并退出。



如果调用Atomics.compareExchange()不允许获取锁,那么在锁可能会释放的时刻,我们需要再次尝试进行操作。但是我们不能阻止线程并等待释放锁。相反,我们Atomics.waitAsync()正在计划使用该方法及其返回的承诺来获取新尝试



如果我们成功执行了该方法Atomics.waitAsync(),那么当持有锁的线程调用时,此方法返回的promise将被解析。Atomics.notify()... 之后,想要获取锁的线程将像以前一样尝试再次执行该操作。



在这里,这些特殊的情况是可能的,是典型的同步版本(锁调用之间发布Atomics.compareExchange()Atomics.waitAsync();锁被另一个线程捕获,解决的承诺和呼叫的时刻之间这样做Atomics.compareExchange())。因此,在适用于实际项目的类似代码中,必须考虑到这一点。



结果



在这篇文章中,我们谈到了低级别的同步原语Atomics.wait()Atomics.waitAsync()Atomics.notify()我们分析了一个基于它们创建互斥锁的示例,该互斥锁可在主线程和工作线程中使用。



Atomics.wait(),Atomics.waitAsync()和Atomics.notify()在您的项目中会有用吗?



All Articles