- 背压很酷
- 背压仅在实现反应性流规范的库中可用
- 这个规范太复杂了,您甚至不应该自己尝试实现它
在本文中,我将尝试表明:
- 背压很简单
- 要实现异步背压,就足以制作一个异步版本的信号量
- 如果存在异步信号量实现,那么将用几十行代码来实现org.reactivestreams.Publisher接口
背压是一种反馈,可调整数据生成器的速度以使其与用户的速度匹配。在没有这种连接的情况下,更快的生产者可能会使使用者的缓冲区溢出,或者,如果该缓冲区是无量纲的,则会耗尽所有RAM。
在多线程编程中,Dijkstroy解决了这个问题,他提出了一种新的同步机制-信号量。信号量可以视为许可计数器。假定生产者在执行资源密集型操作之前请求信号量的许可。如果信号量为空,则生产者线程被阻塞。
异步程序无法阻塞线程,因此它们无法访问空信号量以获取许可(但是它们可以执行所有其他信号量操作)。他们必须以另一种方式阻止执行。另一种方式是,他们只是离开正在运行的工作线程,但是在此之前,他们准备在信号量已满时立即恢复工作。
暂停和恢复异步方案最优雅的方式是组织作为一个数据流的演员与端口:
数据流模型-演员与端口,其端口之间的连接指示和初始令牌。摘自:数据流参与者的结构化描述及其应用
有输入和输出端口。输入端口从其他参与者的输出端口接收令牌(消息和信号)。如果输入端口包含令牌,而输出端口具有放置令牌的位置,则认为该端口处于活动状态。如果角色的所有端口都处于活动状态,则将其发送执行。因此,actor程序在恢复工作时可以安全地从输入端口读取令牌并写入周末。异步编程的所有智慧都在于这种简单的机制。将端口分配为参与者的单独子对象,可以极大地简化异步程序的编码,并可以通过组合不同类型的端口来增加其多样性。
经典的休伊特actor包含2个端口-一个可见,带有用于传入消息的缓冲区,另一个是隐藏的二进制文件,当actor被发送执行时,它将阻止该actor重启,直到首次启动结束为止。所需的异步信号量是这两个端口之间的交叉点。像消息缓冲区一样,它可以存储许多令牌,就像隐藏端口一样,这些令牌是黑色的,也就是说,就像在Petri网中一样,是不可区分的,并且令牌计数器足以存储它们。
在层次结构的第一层,我们有一个
AbstractActor带有三个嵌套类的类-basePort和衍生物AsyncSemaPort和InPort,以及用于在不存在阻塞端口的情况下启动执行器以执行的机制。简而言之,它看起来像这样:
public abstract class AbstractActor {
/** */
private int blocked = 0;
protected synchronized void restart() {
controlPort.unBlock();
}
private synchronized void incBlockCount() {
blocked++;
}
private synchronized void decBlockCount() {
blocked--;
if (blocked == 0) {
controlPort.block();
excecutor.execute(this::run);
}
}
protected abstract void turn() throws Throwable;
/** */
private void run() {
try {
turn();
restart();
} catch (Throwable throwable) {
whenError(throwable);
}
}
}
它包含一组最小的端口类:
Port-所有端口的基类
protected class Port {
private boolean isBlocked = true;
public Port() {
incBlockCount();
}
protected synchronized void block() {
if (isBlocked) {
return;
}
isBlocked = true;
incBlockCount();
}
protected synchronized void unBlock() {
if (!isBlocked) {
return;
}
isBlocked = false;
decBlockCount();
}
}
异步信号量:
public class AsyncSemaPort extends Port {
private long permissions = 0;
public synchronized void release(long n) {
permissions += n;
if (permissions > 0) {
unBlock();
}
}
public synchronized void aquire(long delta) {
permissions -= delta;
if (permissions <= 0) {
//
// ,
//
block();
}
}
}
InPort -一条传入消息的最小缓冲区:
public class InPort<T> extends Port implements OutMessagePort<T> {
private T item;
@Override
public void onNext(T item) {
this.item = item;
unBlock();
}
public synchronized T poll() {
T res = item;
item = null;
return res;
}
}
该课程的完整版本
AbstractActor可以在这里查看。
在层次结构的下一个级别,我们有三个具有特定端口的抽象参与者,但是具有未定义的处理例程:
- 一个类
AbstractProducer是一个具有异步信号量类型的端口的参与者(默认情况下,所有参与者都具有一个内部控制端口)。 - 该类
AbstractTransformer是常规的翰威特演员,它引用链中下一个演员的输入端口,并在该端口中发送转换后的令牌。 - 该类
AbstractConsumer也是普通的actor,但是它没有将转换后的令牌发送到任何地方,尽管它具有到生产者信号量的链接,并在吸收了输入令牌后打开了该信号量。这样可以使进程中的令牌数量保持恒定,并且不会发生缓冲区溢出。
在已经位于测试目录中的最后一级,定义了测试中使用的特定参与者:
- 该类
ProducerActor生成有限的整数流。 - 该类
TransformerActor从流中获取下一个数字,并将其沿链发送。 - 类
ConsumerActor-接受并打印生成的号码
现在,我们可以构建一个异步,并行工作处理程序链,如下所示:生产者-任意数量的转换器-消费者
因此,我们实现了反压,甚至以比无功流规范中更通用的形式实现了-反馈可以跨越任意数量的处理级联,而不是仅在规范中相邻。
要实现该规范,您需要定义一个输出端口,该端口对使用request()方法传递给它的权限数量敏感-这将是
Publisher,并InPort通过对该方法的调用来补充现有端口-这将是Subscriber。也就是说,我们假设接口Publisher和Subscriber描述端口的行为,而不是参与者。但是,从接口列表中也存在Processor,决不能是端口接口的事实来看,该规范的作者将其接口视为参与者接口。好吧,我们可以通过将接口功能的执行委派给相应的端口,使actor实现所有这些接口。
为简单起见,让我们
Publisher没有自己的缓冲区,而是将其直接写入缓冲区Subscriber。为此,您需要有人来Subscriber订阅和完成request(),即我们有2个条件,因此,我们需要2个端口-InPort<Subscriber>和AsyncSemaPort。它们都不适合作为实施的基础Publisher'a,因为它包含不必要的方法,所以我们将这些端口设置为内部变量:
public class ReactiveOutPort<T> implements Publisher<T>, Subscription, OutMessagePort<T> {
protected AbstractActor.InPort<Subscriber<? super T>> subscriber;
protected AbstractActor.AsyncSemaPort sema;
public ReactiveOutPort(AbstractActor actor) {
subscriber = actor.new InPort<>();
sema = actor.new AsyncSemaPort();
}
}
这次,我们
ReactiveOutPort没有将类定义为嵌套的,因此需要一个构造函数参数(对封闭的actor的引用)来实例化定义为嵌套类的端口。
该方法
subscribe(Subscriber subscriber)归结为保存订户并调用subscriber.onSubscribe():
public synchronized void subscribe(Subscriber<? super T> subscriber) {
if (subscriber == null) {
throw new NullPointerException();
}
if (this.subscriber.isFull()) {
subscriber.onError(new IllegalStateException());
return;
}
this.subscriber.onNext(subscriber);
subscriber.onSubscribe(this);
}
通常会导致一个调用
Publisher.request(),最终归结为通过调用提高信号量AsyncSemaPort.release():
public synchronized void request(long n) {
if (subscriber.isEmpty()) {
return; // this spec requirement
}
if (n <= 0) {
subscriber.current().onError(new IllegalArgumentException());
return;
}
sema.release(n);
}
现在,我们仍然不能忘记
AsyncSemaPort.aquire()在资源使用时使用调用来降低信号量:
public synchronized void onNext(T item) {
Subscriber<? super T> subscriber = this.subscriber.current();
if (subscriber == null) {
throw new IllegalStateException();
}
sema.aquire();
subscriber.onNext(item);
}
AsyncSemaphore 项目是专为本文设计的。故意将其制作得尽可能紧凑,以免造成阅读器疲劳。结果,它包含很多限制:
-
Publisher'Subscriber' -
Subscriber' 1
另外,
AsyncSemaPort它不是同步信号灯的完整类似物-只有一个客户端可以执行操作aquire()y AsyncSemaPort(意味着封闭的参与者)。但这不是缺点-AsyncSemaPort它可以很好地发挥作用。原则上,你可以采取不同的方式-采取java.util.concurrent.Semaphore与异步订阅接口(见补充它AsyncSemaphore.java从DF4J项目)。这样的信号量可以以任何顺序绑定参与者和执行线程。
通常,每种类型的同步(阻塞)交互都有自己的异步(非阻塞)对应。因此,在同一个DF4J项目中,有一个实现
BlockingQueue,并辅以异步接口。这开辟了将多线程程序逐步转换为异步程序的可能性,并用参与者部分替换了线程。