目录
- 介绍 任务生产者/消费者
- 开始工作。渠道
- 渠道消费。频道阅读器
- 记录到频道。ChannelWriter
- 免分配异步
- IValueTaskSource接口
- 关于CompareExchange的一些知识
- 堆栈潜水问题
- AsyncOperation-实施细节
介绍
生产者/消费者任务经常是在程序员的路上遇到的,而不是在头十年中遇到的。 Edsger Dijkstra亲自解决了这个问题-他提出了在生产者/消费者基础上组织工作时使用信号量来同步线程的想法。尽管其最简单形式的解决方案是众所周知的并且相当琐碎,但在现实世界中,可以以更为复杂的形式找到这种模式(制造商/消费者)。同样,现代编程标准强加了它们的指纹,代码被编写得更加简化并被破坏以便进一步重用。为了降低编写高质量代码的门槛并简化此过程,已做了所有工作。有问题的名称空间-System.Threading.Channels-是朝着这个目标迈出的又一步。
前一段时间我在看System.IO.Pipelines。它需要更周到的工作并对此问题有深入的了解,使用了“跨度”和“内存”,并且为了高效地工作,它不要求调用明显的方法(以避免不必要的内存分配)并不断以字节为单位进行思考。因此,Pipeline编程界面非常简单且不直观。
System.Threading.Channels为用户提供了更简单的api。值得一提的是,尽管api简单,但此工具还是经过高度优化的,很可能在其工作期间不会分配内存。可能是由于这样的事实:在后台使用ValueTask,甚至在真正异步的情况下,也使用IValueTaskSource,可重复使用以进行进一步的操作。这就是实施渠道的全部兴趣所在。
通道是通用的,您可能会猜到,通用的类型是将产生和使用其实例的类型。有趣的是,Channel类的实现适合1行(github source ):
namespace System.Threading.Channels
{
public abstract class Channel<T> : Channel<T, T> { }
}
因此,主要的渠道类别有两种类型的参数化-生产者渠道和消费者渠道分别设置。但是对于已实现的频道,则不使用此功能。
对于熟悉管道的人来说,入门的一般方法似乎很熟悉。即。我们创建1个中心类,从中分别分离生产者(ChannelWriter)和消费者(ChannelReader)。尽管有这些名称,但值得记住的是,这恰恰是生产者/使用者,而不是另一个同名经典多线程任务中的读取器/写入器。ChannelReader更改常规通道的状态(拉出一个值),该状态不再可用。这意味着他宁愿不读书,而是消费。但是稍后我们将熟悉实现。
开始工作。渠道
通道入门从抽象Channel <T>类和创建最合适实现的静态Channel类开始。此外,从该公共Channel,您可以获取一个ChannelWriter来写入该通道,并获得ChannelReader来从该通道使用。通道是ChannelWriter和ChannelReader的常规信息的存储库,因此,所有数据都存储在其中。而且,其记录或使用的逻辑已经分散在ChannelWriter和ChannelReader中,通常,频道可以分为两组-无限和受限。第一个实现比较简单,您可以无限制地写入(只要内存允许)。第二个记录受记录数的某个最大值限制。
这是异步的性质略有不同的地方。在无限制的通道中,写入操作将始终同步完成,没有什么可以停止对通道的写入。有限的渠道情况则有所不同。使用标准行为(可以覆盖),只要通道中有足够的空间容纳新实例,写操作就会同步完成。一旦通道已满,写操作将不会结束,直到有可用空间为止(在使用者消耗了已消耗的使用者之后)。因此,这里的操作实际上与流的变化和相关的变化(或没有变化,将在后面描述)是异步的。
在大多数情况下,阅读器的行为是相同的-如果通道中有内容,则阅读器会简单地阅读并同步结束。如果什么都没有,那么它会等待某人写下一些东西。
Channel静态类包含用于创建上述通道的4种方法:
Channel<T> CreateUnbounded<T>();
Channel<T> CreateUnbounded<T>(UnboundedChannelOptions options);
Channel<T> CreateBounded<T>(int capacity);
Channel<T> CreateBounded<T>(BoundedChannelOptions options);
如果需要,您可以指定更准确的选项来创建渠道,以帮助针对指定需求优化渠道。
UnboundedChannelOptions包含3个属性,默认情况下将其设置为false:
- AllowSynchronousContinuations — , , . -. , . , , , . , , , . , - - , ;
- SingleReader — , . , ;
- SingleWriter — , ;
BoundedChannelOptions包含相同的3个属性,顶部还有2个属性
- AllowSynchronousContinuations-相同;
- SingleReader相同;
- SingleWriter相同;
- 容量-可以放入通道的录音数量。此参数也是构造函数参数。
- FullMode-BoundedChannelFullMode枚举(具有4个选项)确定尝试写入已填充通道时的行为:
- 等待-等待可用空间以完成异步操作
- DropNewest-正在写入的项目会覆盖现有的最新项目,并同步结束
- DropOldest-可记录项同步覆盖现有端中的最旧端
- DropWrite-正在写入的项目未写入,它同步结束
根据传递的参数和调用的方法,将创建以下三种实现之一:SingleConsumerUnboundedChannel,UnboundedChannel,BoundedChannel。但这并不是那么重要,因为我们将通过基类Channel <TWrite,TRead>使用该通道。
它具有2个属性:
- ChannelReader <TRead>阅读器{get; 保护集;}
- ChannelWriter <TWrite>编写器{get; 保护集;}
而且,还有2个隐式类型的运算符强制转换为ChannelReader <TRead>和ChannelWriter <TWrite>。
如何开始使用频道的示例:
Channel<int> channel = Channel.CreateUnbounded<int>();
//
ChannelWriter<int> writer = channel.Writer;
ChannelReader<int> reader = channel.Reader;
//
ChannelWriter<int> writer = channel;
ChannelReader<int> reader = channel;
数据存储在队列中。对于3种类型,使用3个不同的队列-ConcurrentQueue <T>,Deque <T>和SingleProducerSingleConsumerQueue <T>。在这一点上,在我看来,我已经过时了,错过了一堆新的最简单的收藏。但是我赶快失望了-它们并不适合所有人。它们被标记为内部,因此无法使用。但是,如果您突然在产品上需要它们-您可以在此处(SingleProducerConsumerQueue)和此处(Deque)找到它们。后者的实现非常简单。我建议您阅读它,您可以非常快地学习它。
因此,让我们开始直接研究ChannelReader和ChannelWriter,以及有趣的实现细节。它们都归结为异步的,没有使用IValueTaskSource进行内存分配。
ChannelReader-消费者
当请求使用者对象时,将返回ChannelReader <T>抽象类的实现之一。同样,与管道不同,API非常简单,方法很少。您只需要了解方法列表即可了解如何在实践中使用它。
方法:
- 虚拟仅获取属性Task Completion {get; }
Task类型的对象,在关闭通道后完成; - 虚拟get-only属性int Count {get; }
这里应当强调的是,返回可读对象的当前数目; - 虚拟仅获取属性bool CanCount {get; }
指示Count属性是否可用; - bool TryRead(out T item)
. bool, , . out ( null, ); - ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default)
ValueTask true, , . ValueTask false, ( ); - ValueTask<T> ReadAsync(CancellationToken cancellationToken = default)
. , . .
, TryRead WaitToReadAsync. ( cancelation tokens), — TryRead. , while(true) WaitToReadAsync. true, , TryRead. TryRead , , . — , WaitToReadAsync, , , .
, , - .
ChannelWriter-生产者
一切都与消费者相似,所以让我们马上看一下方法:
- 虚方法bool TryComplete(Exception?Error = null)
尝试将通道标记为已完成,即 表明没有更多的数据将被写入。作为可选参数,您可以引发导致通道终止的异常。如果成功完成则返回true,否则返回false(如果通道已经完成或不支持终止);否则返回false。 - 抽象方法bool TryWrite(T项)
尝试将值写入通道。如果成功则返回true,否则返回false - 抽象方法ValueTask <bool> WaitToWriteAsync(CancellationToken cancellingToken =默认值)
返回带有真实值的ValueTask,该值将在通道上有足够的写入空间时完成。如果该通道中的条目将不再被允许,则值为false;否则为false。 - 虚拟方法ValueTask WriteAsync(T项目,CancellationToken cancelToken =默认值)
异步写入通道。例如,如果通道已满,则该操作实际上将是异步的,并且只有在释放该记录的空间之后才能完成; - 方法void Complete(Exception?Error = null)
仅使用TryComplete尝试将通道标记为已完成,并且在失败的情况下会引发异常。
上面的一个小例子(轻松地开始自己的实验):
Channel<int> unboundedChannel = Channel.CreateUnbounded<int>();
// ,
ChannelWriter<int> writer = unboundedChannel;
ChannelReader<int> reader = unboundedChannel;
//
int objectToWriteInChannel = 555;
await writer.WriteAsync(objectToWriteInChannel);
// , , ,
writer.Complete();
//
int valueFromChannel = await reader.ReadAsync();
现在,让我们继续进行最有趣的部分。
免分配异步
在编写和研究代码的过程中,我意识到实现所有这些操作几乎没有什么有趣的。通常,您可以用这种方式来描述它-使用竞争性集合以及ValueTask的大量使用来避免不必要的锁定,ValueTask是一种节省内存的结构。但是,我要提醒您,快速浏览一下PC上的所有文件并用ValueTask替换所有Task并不值得。仅在大多数情况下操作同步结束的情况下才有意义。毕竟,正如我们记得的那样,通过异步,很有可能更改线程,这意味着堆栈将不再与以前相同。无论如何,真正的性能专家都知道-不要在出现问题之前进行优化。
一件好事是,我不会将自己注册为专业人士,因此是时候弄清楚编写没有内存分配的异步代码的秘密是什么了,乍一看听起来真是太不真实了。但这确实发生了。
IValueTaskSource接口
我们从起点开始-ValueTask结构,它在.net core 2.0中添加并在2.1中得到了补充。在此结构内部,对象_obj的狡猾字段被隐藏。根据不言自明的名称,很容易猜出该字段中可以隐藏三件事之一-空,任务/任务<T>或IValueTaskSource。实际上,它遵循ValueTask的创建方式。
正如制造商所保证的那样,仅应明显使用此结构-带有await关键字。也就是说,您不应多次将await应用于同一ValueTask,使用组合器,添加多个延续等。另外,您不应该多次从ValueTask获得结果。这是由于我们试图理解这一事实-在不分配内存的情况下重用了所有这些东西。
我已经提到过IValueTaskSource接口。是他帮助节省了内存。这是通过对许多任务重复使用IValueTaskSource自身多次来完成的。但是正是由于这种重用,所以无法使用ValueTask。
因此,IValueTaskSource。该接口具有3种方法,通过实现这些方法,您可以成功地节省内存和时间,以分配这些珍贵的字节。
- GetResult-在运行时为异步方法形成的状态机需要结果时调用一次。ValueTask有一个GetResult方法,该方法调用同名的接口方法,正如我们记得的那样,可以将其存储在_obj字段中。
- GetStatus-由状态机调用以确定操作的状态。也通过ValueTask。
- OnCompleted-再次,状态机调用它以向当时尚未完成的任务添加延续。
但是,尽管界面很简单,但是实现仍需要一些技巧。在这里,我们能记得我们开始- 通道。此实现使用AsyncOperation类这是IValueTaskSource的实现。此类隐藏在内部访问修饰符的后面。但这不会干扰对基本机制的理解。这就引出了一个问题,为什么不将IValueTaskSource的实现交给大众?第一个原因(出于乐趣的考虑)是手握锤子,钉子无处不在,当手握IValueTaskSource实现时,到处都有对内存的盲目工作。第二个原因(更合理)是,尽管接口简单而通用,但是当使用某些应用程序细微差别时,真正的实现是最佳的。正是由于这个原因,您可以在功能强大的.net的各个部分中找到实现,例如在通道罩下的AsyncOperation,新套接字API内的AsyncIOOperation等。
但是,公平地说,仍然有一个常见的实现方式-ManualResetValueTaskSourceSourceCore。但这已经离本文的主题太远了。
比较交易
非常流行类的一种流行方法,它避免了经典同步原语的开销。我想大多数人都熟悉它,但是仍然值得用三个词来描述,因为这种构造在AsyncOperation中经常使用。
在文献中,此功能称为比较和交换(CAS)。在.net中,可在Interlocked类中使用。
签名如下:
public static T CompareExchange<T>(ref T location1, T value, T comparand) where T : class;
还有int,long,float,double,IntPtr,object的重载。
该方法本身是原子的,也就是说,它的执行没有中断。比较2个值,如果相等,则将新值分配给变量。当您需要检查变量的值并根据其更改变量时,它们可以解决问题。
假设您要增加一个小于10的变量,则
有2个线程。
流1 | 流2 |
---|---|
检查条件的变量值(即小于10),该条件有效 | -- |
在检查和更改值之间 | 为变量分配不满足条件的值(例如15) |
更改值(尽管不应更改),因为不再满足条件 | -- |
使用此方法时,在获取变量的实际值时,可以完全更改所需的值,也可以不更改。
location1是我们要更改其值的变量。将其与比较项进行比较,如果相等,则将值写入location1中。如果操作成功,则该方法将返回location1变量的过去值。如果不是,那么将返回location1的当前值。
说得更深入一点,有一个cmpxchg汇编语言指令可以执行这些操作。是她在后台使用。
叠潜
在查看所有这些代码时,我多次遇到了对“ Stack Dive”的引用。这是一件非常酷和有趣的事情,实际上是非常不可取的。最重要的是,通过同步执行连续,我们可以耗尽堆栈资源。
假设我们有10,000种风格的任务
//code1
await ...
//code2
假设第一个任务完成了执行,从而释放了第二个任务的继续执行,我们立即开始在该线程中同步执行该任务,即,将一部分堆栈与该继续执行的框架一起使用。反过来,这种延续解锁了第三项任务的延续,我们也开始立即执行它。等等。如果继续中没有等待等待或以某种方式刷新堆栈的东西,那么我们将简单地消耗堆栈空间。什么可能导致StackOverflow和应用程序崩溃。在我的代码审查中,我将提到AsyncOperation如何与之抗争。
AsyncOperation作为IValueTaskSource的实现
源代码。
在AsyncOperation内部,有一个类型为Action <object>的_continuation字段。该字段用于(无论是否相信)延续。但是,就像在太现代的代码中经常遇到的情况一样,字段还承担其他责任(例如垃圾收集器和方法表参考中的最后一位)。 _continuation字段来自同一系列。除了延续本身和null之外,此字段中还可以存储2个特殊值。s_availableSentinel和s_completedSentinel。这些字段分别指示该操作可用和已完成。它仅可用于完全异步操作的重用。
另外AsyncOperation实现IThreadPoolWorkItem一个方法-void Execute()=> SetCompletionAndInvokeContinuation()。 SetCompletionAndInvokeContinuation方法仅用于进行延续。并且可以直接在AsyncOperation代码中或通过提到的Execute调用此方法。毕竟,实现IThreadPoolWorkItem的类型可以像这种ThreadPool.UnsafeQueueUserWorkItem一样以某种方式扔到线程池中(this,preferLocal:false)。
Execute方法将由线程池执行。
延续本身的执行非常简单。
延续被复制到局部变量,并在其位置写入s_completedSentinel-一个人造的木偶物体(或哨兵,我不知道该如何跟我说话),表示任务已完成。然后简单地执行真实延续的本地副本。如果存在ExecutionContext,则将这些操作发布到上下文中。这里没有秘密。该代码可以由类直接调用-只需通过调用封装这些操作的方法即可,也可以通过线程池中的IThreadPoolWorkItem接口调用。现在您可以猜测该函数如何与同步执行连续性一起工作。
IValueTaskSource接口的第一种方法是GetResult(github)。
很简单,他:
- _currentId.
_currentId — , . . ; - _continuation - s_availableSentinel. , , AsyncOperation . , (pooled = true);
- _result.
_result TrySetResult .
TrySetResult方法(github)。
该方法很简单。 -它将接收到的参数存储在_result中并发出信号完成信号,即调用SignalCompleteion方法,这很有趣。
SignalCompletion(github)方法。
此方法使用了我们一开始所讨论的所有内容。
从一开始,如果_continuation == null,我们将编写s_completedSentinel木偶。
此外,该方法可以分为四个块。为了简化理解方案,我必须马上说,第四个块只是连续执行的同步执行。也就是说,通过该方法的连续执行很简单,如我在IThreadPoolWorkItem的段落中所述。
- _schedulingContext == null, .. ( if).
_runContinuationsAsynchronously == true, , — ( if).
IThreadPoolWorkItem . AsyncOperation . .
, if ( , ), , 2 3 , — .. 4 ; - _schedulingContext is SynchronizationContext, ( if).
_runContinuationsAsynchronously = true. . , , . , . 2 , :
sc.Post(s => ((AsyncOperation<TResult>)s).SetCompletionAndInvokeContinuation(), this);
. , , ( , ), 4 — ; - , 2 . .
, _schedulingContext TaskScheduler, . , 2, .. _runContinuationsAsynchronously = true TaskScheduler . , Task.Factory.StartNew . . - — . , .
IValueTaskSource接口的第二种方法是GetStatus(github),
就像一个彼得斯堡甜甜圈。
如果_continuation!= _CompletedSentinel,然后返回ValueTaskSourceStatus.Pending
如果错误== null,则返回ValueTaskSourceStatus.Succeeded
如果_error.SourceException是OperationCanceledException,然后返回ValueTaskSourceStatus.Canceled
好吧,既然来了很多在这里,返回ValueTaskSourceStatus.Faulted
第三次也是最后,但是IValueTaskSource接口最复杂的方法是OnCompleted(github),
该方法添加了一个在完成时执行的延续。
必要时捕获ExecutionContext和SynchronizationContext。
接下来,上述Interlocked.CompareExchange用于将继续存储在字段中,并将其与null进行比较。提醒一下,CompareExchange返回变量的实际值。
如果保存了延续,则将返回更新前变量中的值,即null。这意味着在写入延续时操作尚未完成。而自己完成此操作的人将了解所有内容(如上所示)。对于我们来说,执行任何其他操作是没有意义的。这就是方法的工作结束的地方。
如果保存该值不起作用,即从CompareExchange返回了null以外的值。在这种情况下,有人设法比我们更快地投入价值。也就是说,发生了2种情况之一-任务完成的速度比我们在此处达到的速度快,或者尝试记录多于1个的继续,所以不应该这样做。
因此,我们检查返回的值是否等于s_completedSentinel-在完成的情况下,它将恰好写入该值。
- 如果这不是s_completedSentinel,那么我们没有按计划使用-他们试图添加多个延续。就是说,一个已经被记录下来,而我们正在写出来。这是一种特殊情况;
- s_completedSentinel, , , . , _runContinuationsAsynchronously = false.
, , OnCompleted, awaiter'. . , AsyncOperation — System.Threading.Channels. , . , . , , ( ) . , awaiter' , , . awaiter'.
为了避免这种情况,无论如何都必须异步运行延续。它根据与SignalCompleteion方法中前三个块相同的方案执行-只需在池中,在上下文中或通过工厂和调度程序
这是同步继续的示例:
class Program
{
static async Task Main(string[] args)
{
Channel<int> unboundedChannel = Channel.CreateUnbounded<int>(new UnboundedChannelOptions
{
AllowSynchronousContinuations = true
});
ChannelWriter<int> writer = unboundedChannel;
ChannelReader<int> reader = unboundedChannel;
Console.WriteLine($"Main, before await. Thread id: {Thread.CurrentThread.ManagedThreadId}");
var writerTask = Task.Run(async () =>
{
Thread.Sleep(500);
int objectToWriteInChannel = 555;
Console.WriteLine($"Created thread for writing with delay, before await write. Thread id: {Thread.CurrentThread.ManagedThreadId}");
await writer.WriteAsync(objectToWriteInChannel);
Console.WriteLine($"Created thread for writing with delay, after await write. Thread id: {Thread.CurrentThread.ManagedThreadId}");
});
//Blocked here because there are no items in channel
int valueFromChannel = await reader.ReadAsync();
Console.WriteLine($"Main, after await (will be processed by created thread for writing). Thread id: {Thread.CurrentThread.ManagedThreadId}");
await writerTask;
Console.Read();
}
}
输出:
Main,等待之前。线程ID:1已
创建线程,用于在等待写入之前进行延迟写入。线程ID:4
主线程,等待之后(将由创建的线程处理以进行写入)。线程ID:4已
创建线程,用于在等待写入后延迟写入。线程ID:4