System.Threading.Channels-高性能的生产者-消费者和异步,无需分配和堆栈跳水

再次问好。前一段时间,我写了另一篇鲜为人知的高性能爱好者工具System.IO.Pipelines所考虑的System.Threading.Channels(以下称为“通道”)的核心是建立在与Pipelines类似的原理上,解决了相同的问题-Producer-Consumer。但是,它具有更简单的api,可以将其优雅地合并到任何类型的企业代码中。同时,即使在异步情况下,它也使用没有分配且没有堆栈潜水的异步!(并不总是,但是经常)。







目录







介绍



生产者/消费者任务经常是在程序员的路上遇到的,而不是在头十年中遇到的。 Edsger Dijkstra亲自解决了这个问题-他提出了在生产者/消费者基础上组织工作时使用信号量来同步线程的想法。尽管其最简单形式的解决方案是众所周知的并且相当琐碎,但在现实世界中,可以以更为复杂的形式找到这种模式(制造商/消费者)。同样,现代编程标准强加了它们的指纹,代码被编写得更加简化并被破坏以便进一步重用。为了降低编写高质量代码的门槛并简化此过程,已做了所有工作。有问题的名称空间-System.Threading.Channels-是朝着这个目标迈出的又一步。



前一段时间我在看System.IO.Pipelines。它需要更周到的工作并对此问题有深入的了解,使用了“跨度”和“内存”,并且为了高效地工作,它不要求调用明显的方法(以避免不必要的内存分配)并不断以字节为单位进行思考。因此,Pipeline编程界面非常简单且不直观。



System.Threading.Channels为用户提供了更简单的api。值得一提的是,尽管api简单,但此工具还是经过高度优化的,很可能在其工作期间不会分配内存。可能是由于这样的事实:在后台使用ValueTask,甚至在真正异步的情况下,使用IValueTaskSource,可重复使用以进行进一步的操作。这就是实施渠道的全部兴趣所在。



通道是通用的,您可能会猜到,通用的类型是将产生和使用其实例的类型。有趣的是,Channel类的实现适合1行(github source ):



namespace System.Threading.Channels
{
    public abstract class Channel<T> : Channel<T, T> { }
}


因此,主要的渠道类别有两种类型的参数化-生产者渠道和消费者渠道分别设置。但是对于已实现的频道,则不使用此功能。

对于熟悉管道的人来说,入门的一般方法似乎很熟悉。即。我们创建1个中心类,从中分别分离生产者(ChannelWriter)和消费者(ChannelReader)。尽管有这些名称,但值得记住的是,这恰恰是生产者/使用者,而不是另一个同名经典多线程任务中的读取器/写入器。ChannelReader更改常规通道的状态(拉出一个值),该状态不再可用。这意味着他宁愿不读书,而是消费。但是稍后我们将熟悉实现。



开始工作。渠道



通道入门从抽象Channel <T>类和创建最合适实现的静态Channel类开始。此外,从该公共Channel,您可以获取一个ChannelWriter来写入该通道,并获得ChannelReader来从该通道使用。通道是ChannelWriter和ChannelReader的常规信息的存储库,因此,所有数据都存储在其中。而且,其记录或使用的逻辑已经分散在ChannelWriter和ChannelReader中,通常,频道可以分为两组-无限和受限。第一个实现比较简单,您可以无限制地写入(只要内存允许)。第二个记录受记录数的某个最大值限制。



这是异步的性质略有不同的地方。在无限制的通道中,写入操作将始终同步完成,没有什么可以停止对通道的写入。有限的渠道情况则有所不同。使用标准行为(可以覆盖),只要通道中有足够的空间容纳新实例,写操作就会同步完成。一旦通道已满,写操作将不会结束,直到有可用空间为止(在使用者消耗了已消耗的使用者之后)。因此,这里的操作实际上与流的变化和相关的变化(或没有变化,将在后面描述)是异步的。



在大多数情况下,阅读器的行为是相同的-如果通道中有内容,则阅读器会简单地阅读并同步结束。如果什么都没有,那么它会等待某人写下一些东西。



Channel静态类包含用于创建上述通道的4种方法:



Channel<T> CreateUnbounded<T>();
Channel<T> CreateUnbounded<T>(UnboundedChannelOptions options);
Channel<T> CreateBounded<T>(int capacity);
Channel<T> CreateBounded<T>(BoundedChannelOptions options);


如果需要,您可以指定更准确的选项来创建渠道,以帮助针对指定需求优化渠道。



UnboundedChannelOptions包含3个属性,默认情况下将其设置为false:



  1. AllowSynchronousContinuations — , , . -. , . , , , . , , , . , - - , ;
  2. SingleReader — , . , ;
  3. SingleWriter — , ;


BoundedChannelOptions包含相同的3个属性,顶部还有2个属性



  1. AllowSynchronousContinuations-相同;
  2. SingleReader相同;
  3. SingleWriter相同;
  4. 容量-可以放入通道的录音数量。此参数也是构造函数参数。
  5. FullMode-BoundedChannelFullMode枚举(具有4个选项)确定尝试写入已填充通道时的行为:

    • 等待-等待可用空间以完成异步操作
    • DropNewest-正在写入的项目会覆盖现有的最新项目,并同步结束
    • DropOldest-可记录项同步覆盖现有端中的最旧端
    • DropWrite-正在写入的项目未写入,它同步结束




根据传递的参数和调用的方法,将创建以下三种实现之一:SingleConsumerUnboundedChannelUnboundedChannelBoundedChannel但这并不是那么重要,因为我们将通过基类Channel <TWrite,TRead>使用该通道。



它具有2个属性:



  • ChannelReader <TRead>阅读器{get; 保护集;}
  • ChannelWriter <TWrite>编写器{get; 保护集;}


而且,还有2个隐式类型的运算符强制转换为ChannelReader <TRead>和ChannelWriter <TWrite>。



如何开始使用频道的示例:



Channel<int> channel = Channel.CreateUnbounded<int>();
//  
ChannelWriter<int> writer = channel.Writer;
ChannelReader<int> reader = channel.Reader; 
// 
ChannelWriter<int> writer = channel;
ChannelReader<int> reader = channel;


数据存储在队列中。对于3种类型,使用3个不同的队列-ConcurrentQueue <T>,Deque <T>和SingleProducerSingleConsumerQueue <T>。在这一点上,在我看来,我已经过时了,错过了一堆新的最简单的收藏。但是我赶快失望了-它们并不适合所有人。它们被标记为内部,因此无法使用。但是,如果您突然在产品上需要它们-您可以在此处(SingleProducerConsumerQueue)此处(Deque)找到它们后者的实现非常简单。我建议您阅读它,您可以非常快地学习它。



因此,让我们开始直接研究ChannelReader和ChannelWriter,以及有趣的实现细节。它们都归结为异步的,没有使用IValueTaskSource进行内存分配。



ChannelReader-消费者



当请求使用者对象时,将返回ChannelReader <T>抽象类的实现之一。同样,与管道不同,API非常简单,方法很少。您只需要了解方法列表即可了解如何在实践中使用它。



方法:



  1. 虚拟仅获取属性Task Completion {get; }

    Task类型对象,在关闭通道后完成;
  2. 虚拟get-only属性int Count {get; }

    这里应当强调的是,返回可读对象的当前数目;
  3. 虚拟仅获取属性bool CanCount {get; }

    指示Count属性是否可用;
  4. bool TryRead(out T item)

    . bool, , . out ( null, );
  5. ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default)

    ValueTask true, , . ValueTask false, ( );
  6. ValueTask<T> ReadAsync(CancellationToken cancellationToken = default)

    . , . .



    , TryRead WaitToReadAsync. ( cancelation tokens), — TryRead. , while(true) WaitToReadAsync. true, , TryRead. TryRead , , . — , WaitToReadAsync, , , .

    , , - .




ChannelWriter-生产者



一切都与消费者相似,所以让我们马上看一下方法:



  1. 虚方法bool TryComplete(Exception?Error = null)

    尝试将通道标记为已完成,即 表明没有更多的数据将被写入。作为可选参数,您可以引发导致通道终止的异常。如果成功完成则返回true,否则返回false(如果通道已经完成或不支持终止);否则返回false。
  2. 抽象方法bool TryWrite(T项)

    尝试将值写入通道。如果成功则返回true,否则返回false
  3. 抽象方法ValueTask <bool> WaitToWriteAsync(CancellationToken cancellingToken =默认值)

    返回带有真实值的ValueTask,该值将在通道上有足够的写入空间时完成。如果该通道中的条目将不再被允许,则值为false;否则为false。
  4. 虚拟方法ValueTask WriteAsync(T项目,CancellationToken cancelToken =默认值)

    异步写入通道。例如,如果通道已满,则该操作实际上将是异步的,并且只有在释放该记录的空间之后才能完成;
  5. 方法void Complete(Exception?Error = null)

    仅使用TryComplete尝试将通道标记为已完成,并且在失败的情况下会引发异常。


上面的一个小例子(轻松地开始自己的实验):



Channel<int> unboundedChannel = Channel.CreateUnbounded<int>();

//      ,        
ChannelWriter<int> writer = unboundedChannel;
ChannelReader<int> reader = unboundedChannel;

//     
int objectToWriteInChannel = 555;
await writer.WriteAsync(objectToWriteInChannel);
//  ,     ,   ,  
writer.Complete();

//         
int valueFromChannel = await reader.ReadAsync();


现在,让我们继续进行最有趣的部分。



免分配异步



在编写和研究代码的过程中,我意识到实现所有这些操作几乎没有什么有趣的。通常,您可以用这种方式来描述它-使用竞争性集合以及ValueTask的大量使用来避免不必要的锁定,ValueTask是一种节省内存的结构。但是,我要提醒您,快速浏览一下PC上的所有文件并用ValueTask替换所有Task并不值得。仅在大多数情况下操作同步结束的情况下才有意义。毕竟,正如我们记得的那样,通过异步,很有可能更改线程,这意味着堆栈将不再与以前相同。无论如何,真正的性能专家都知道-不要在出现问题之前进行优化。



一件好事是,我不会将自己注册为专业人士,因此是时候弄清楚编写没有内存分配的异步代码的秘密是什么了,乍一看听起来真是太不真实了。但这确实发生了。



IValueTaskSource接口



我们从起点开始-ValueTask结构,它在.net core 2.0中添加并在2.1中得到了补充。在此结构内部,对象_obj的狡猾字段被隐藏。根据不言自明的名称,很容易猜出该字段中可以隐藏三件事之一-空,任务/任务<T>或IValueTaskSource。实际上,它遵循ValueTask的创建方式。



正如制造商所保证的那样,仅应明显使用此结构-带有await关键字。也就是说,您不应多次将await应用于同一ValueTask,使用组合器,添加多个延续等。另外,您不应该多次从ValueTask获得结果。这是由于我们试图理解这一事实-在不分配内存的情况下重用了所有这些东西。



我已经提到过IValueTaskSource接口是他帮助节省了内存。这是通过对许多任务重复使用IValueTaskSource自身多次来完成的。但是正是由于这种重用,所以无法使用ValueTask。



因此,IValueTaskSource。该接口具有3种方法,通过实现这些方法,您可以成功地节省内存和时间,以分配这些珍贵的字节。



  1. GetResult-在运行时为异步方法形成的状态机需要结果时调用一次。ValueTask有一个GetResult方法,该方法调用同名的接口方法,正如我们记得的那样,可以将其存储在_obj字段中。
  2. GetStatus-由状态机调用以确定操作的状态。也通过ValueTask。
  3. OnCompleted-再次,状态机调用它以向当时尚未完成的任务添加延续。


但是,尽管界面很简单,但是实现仍需要一些技巧。在这里,我们能记得我们开始- 通道此实现使用AsyncOperation这是IValueTaskSource的实现。此类隐藏在内部访问修饰符的后面。但这不会干扰对基本机制的理解。这就引出了一个问题,为什么不将IValueTaskSource的实现交给大众?第一个原因(出于乐趣的考虑)是手握锤子,钉子无处不在,当手握IValueTaskSource实现时,到处都有对内存的盲目工作。第二个原因(更合理)是,尽管接口简单而通用,但是当使用某些应用程序细微差别时,真正的实现是最佳的。正是由于这个原因,您可以在功能强大的.net的各个部分中找到实现,例如在通道罩下的AsyncOperation,新套接字API内的AsyncIOOperation等。

但是,公平地说,仍然有一个常见的实现方式-ManualResetValueTaskSourceSourceCore但这已经离本文的主题太远了。



比较交易



非常流行类的一种流行方法,它避免了经典同步原语的开销。我想大多数人都熟悉它,但是仍然值得用三个词来描述,因为这种构造在AsyncOperation中经常使用。

在文献中,此功能称为比较和交换(CAS)。在.net中,可在Interlocked类中使用



签名如下:



public static T CompareExchange<T>(ref T location1, T value, T comparand) where T : class;


还有int,long,float,double,IntPtr,object的重载。



该方法本身是原子的,也就是说,它的执行没有中断。比较2个值,如果相等,则将新值分配给变量。当您需要检查变量的值并根据其更改变量时,它们可以解决问题。



假设您要增加一个小于10的变量,则



有2个线程。



流1 流2
检查条件的变量值(即小于10),该条件有效 --
在检查和更改值之间 为变量分配不满足条件的值(例如15)
更改值(尽管不应更改),因为不再满足条件 --




使用此方法时,在获取变量的实际值时,可以完全更改所需的值,也可以不更改。



location1是我们要更改其值的变量。将其与比较项进行比较,如果相等,则将值写入location1中。如果操作成功,则该方法将返回location1变量的过去值。如果不是,那么将返回location1的当前值。

说得更深入一点,有一个cmpxchg汇编语言指令可以执行这些操作。是她在后台使用。



叠潜



在查看所有这些代码时,我多次遇到了对“ Stack Dive”的引用。这是一件非常酷和有趣的事情,实际上是非常不可取的。最重要的是,通过同步执行连续,我们可以耗尽堆栈资源。



假设我们有10,000种风格的任务



//code1
await ...
//code2


假设第一个任务完成了执行,从而释放了第二个任务的继续执行,我们立即开始在该线程中同步执行该任务,即,将一部分堆栈与该继续执行的框架一起使用。反过来,这种延续解锁了第三项任务的延续,我们也开始立即执行它。等等。如果继续中没有等待等待或以某种方式刷新堆栈的东西,那么我们将简单地消耗堆栈空间。什么可能导致StackOverflow和应用程序崩溃。在我的代码审查中,我将提到AsyncOperation如何与之抗争。



AsyncOperation作为IValueTaskSource的实现



源代码



在AsyncOperation内部,有一个类型为Action <object>的_continuation字段。该字段用于(无论是否相信)延续。但是,就像在太现代的代码中经常遇到的情况一样,字段还承担其他责任(例如垃圾收集器和方法表参考中的最后一位)。 _continuation字段来自同一系列。除了延续本身和null之外,此字段中还可以存储2个特殊值。s_availableSentinels_completedSentinel。这些字段分别指示该操作可用和已完成。它仅可用于完全异步操作的重用。



另外AsyncOperation实现IThreadPoolWorkItem一个方法-void Execute()=> SetCompletionAndInvokeContinuation()。 SetCompletionAndInvokeContinuation方法仅用于进行延续。并且可以直接在AsyncOperation代码中或通过提到的Execute调用此方法。毕竟,实现IThreadPoolWorkItem的类型可以像这种ThreadPool.UnsafeQueueUserWorkItem一样以某种方式扔到线程池中(this,preferLocal:false)。



Execute方法将由线程池执行。



延续本身的执行非常简单。



延续被复制到局部变量,并在其位置写入s_completedSentinel-一个人造的木偶物体(或哨兵,我不知道该如何跟我说话),表示任务已完成。然后简单地执行真实延续的本地副本。如果存在ExecutionContext,则将这些操作发布到上下文中。这里没有秘密。该代码可以由类直接调用-只需通过调用封装这些操作的方法即可,也可以通过线程池中的IThreadPoolWorkItem接口调用。现在您可以猜测该函数如何与同步执行连续性一起工作。



IValueTaskSource接口的第一种方法是GetResultgithub)。



很简单,他:



  1. _currentId.

    _currentId — , . . ;
  2. _continuation - s_availableSentinel. , , AsyncOperation . , (pooled = true);
  3. _result.

    _result TrySetResult .


TrySetResult方法github)。



该方法很简单。 -它将接收到的参数存储在_result中并发出信号完成信号,即调用SignalCompleteion方法,这很有趣。



SignalCompletiongithub方法



此方法使用了我们一开始所讨论的所有内容。



从一开始,如果_continuation == null,我们将编写s_completedSentinel木偶。



此外,该方法可以分为四个块。为了简化理解方案,我必须马上说,第四个块只是连续执行同步执行。也就是说,通过该方法的连续执行很简单,如我在IThreadPoolWorkItem的段落中所述。



  1. _schedulingContext == null, .. ( if).

    _runContinuationsAsynchronously == true, , — ( if).

    IThreadPoolWorkItem . AsyncOperation . .

    , if ( , ), , 2 3 , — .. 4 ;
  2. _schedulingContext is SynchronizationContext, ( if).

    _runContinuationsAsynchronously = true. . , , . , . 2 , :

    sc.Post(s => ((AsyncOperation<TResult>)s).SetCompletionAndInvokeContinuation(), this);
    


    . , , ( , ), 4 — ;
  3. , 2 . .

    , _schedulingContext TaskScheduler, . , 2, .. _runContinuationsAsynchronously = true TaskScheduler . , Task.Factory.StartNew . .
  4. — . , .


IValueTaskSource接口的第二种方法是GetStatusgithub),

就像一个彼得斯堡甜甜圈。



如果_continuation!= _CompletedSentinel,然后返回ValueTaskSourceStatus.Pending

如果错误== null,则返回ValueTaskSourceStatus.Succeeded

如果_error.SourceException是OperationCanceledException,然后返回ValueTaskSourceStatus.Canceled

好吧,既然来了很多在这里,返回ValueTaskSourceStatus.Faulted



第三次也是最后,但是IValueTaskSource接口最复杂的方法是OnCompletedgithub),



该方法添加了一个在完成时执行的延续。



必要时捕获ExecutionContext和SynchronizationContext。



接下来,上述Interlocked.CompareExchange用于将继续存储在字段中,并将其与null进行比较。提醒一下,CompareExchange返回变量的实际值。



如果保存了延续,则将返回更新前变量中的值,即null。这意味着在写入延续时操作尚未完成。而自己完成此操作的人将了解所有内容(如上所示)。对于我们来说,执行任何其他操作是没有意义的。这就是方法的工作结束的地方。



如果保存该值不起作用,即从CompareExchange返回了null以外的值。在这种情况下,有人设法比我们更快地投入价值。也就是说,发生了2种情况之一-任务完成的速度比我们在此处达到的速度快,或者尝试记录多于1个的继续,所以不应该这样做。



因此,我们检查返回的值是否等于s_completedSentinel-在完成的情况下,它将恰好写入该值。



  • 如果这不是s_completedSentinel,那么我们没有按计划使用-他们试图添加多个延续。就是说,一个已经被记录下来,而我们正在写出来。这是一种特殊情况;
  • s_completedSentinel, , , . , _runContinuationsAsynchronously = false.

    , , OnCompleted, awaiter'. . , AsyncOperation — System.Threading.Channels. , . , . , , ( ) . , awaiter' , , . awaiter'.

    为了避免这种情况,无论如何都必须异步运行延续。它根据与SignalCompleteion方法中前三个块相同的方案执行-只需在池中,在上下文中或通过工厂和调度程序


这是同步继续的示例:



class Program
    {
        static async Task Main(string[] args)
        {
            Channel<int> unboundedChannel = Channel.CreateUnbounded<int>(new UnboundedChannelOptions
            {
                AllowSynchronousContinuations = true
            });

            ChannelWriter<int> writer = unboundedChannel;
            ChannelReader<int> reader = unboundedChannel;

            Console.WriteLine($"Main, before await. Thread id: {Thread.CurrentThread.ManagedThreadId}");

            var writerTask = Task.Run(async () =>
            {
                Thread.Sleep(500);
                int objectToWriteInChannel = 555;
                Console.WriteLine($"Created thread for writing with delay, before await write. Thread id: {Thread.CurrentThread.ManagedThreadId}");
                await writer.WriteAsync(objectToWriteInChannel);
                Console.WriteLine($"Created thread for writing with delay, after await write. Thread id: {Thread.CurrentThread.ManagedThreadId}");
            });

            //Blocked here because there are no items in channel
            int valueFromChannel = await reader.ReadAsync();
            Console.WriteLine($"Main, after await (will be processed by created thread for writing). Thread id: {Thread.CurrentThread.ManagedThreadId}");

            await writerTask;

            Console.Read();
        }
    }


输出:



Main,等待之前。线程ID:1已

创建线程,用于在等待写入之前进行延迟写入。线程ID:4

线程,等待之后(将由创建的线程处理以进行写入)。线程ID:4已

创建线程,用于在等待写入后延迟写入。线程ID:4



All Articles