【译】The danger of TaskCompletionSource class

来自 Sergey Tepliakov的另一篇 https://devblogs.microsoft.com/premier-developer/the-danger-of-taskcompletionsourcet-class/#comments

当使用async/await时,如果您想手动控制任务的生存期,TaskCompletionSource<T>类是一个非常有用的工具。下面是TaskCompletionSource的一个示例  ,用于将基于事件的异步代码转换为基于任务的模式。

public static Task PerformOperation(this PictureBox pictureBox)
{
    var tcs = new TaskCompletionSource<object>();
            
    // Naive version that does not unsubscribe from the event
    pictureBox.LoadCompleted += (s, ea) =>
    {
        if (ea.Cancelled) tcs.SetCanceled();
        else if (ea.Error != null) tcs.SetException(ea.Error);
        else tcs.SetResult(null);
    };    pictureBox.LoadAsync();    return tcs.Task;
}

事实上, TaskCompletionSource<T> 代表了一个未来的结果并提供了通过调用 SetCanceled SetException 或 SetResult 方法手动设置任务的最终状态的能力。

这个类非常有用,不仅当你需要使旧代码看起来很现代和花哨时。 TaskCompletionSource<T> 用于手动控制一个任务的生命周期的各种情况,例如在不同的通信协议中。所以,让我们模仿其中之一。

假设我们要创建一个自定义数据库的adapter类。adapter类将具有用于处理请求的专用“工作线程”线程, ExecuteAsync 以及客户端可用于计划后台处理工作的方法。这与实际的 Redis 客户端执行的操作非常相似,其他一些数据库客户端也遵循相同的模式,因此这不是一个牵强的场景。

public class DatabaseFacade : IDisposable
{private readonly BlockingCollection<(string item, TaskCompletionSource<string> result)> _queue =new BlockingCollection<(string item, TaskCompletionSource<string> result)>();private readonly Task _processItemsTask;public DatabaseFacade() => _processItemsTask = Task.Run(ProcessItems);public void Dispose() => _queue.CompleteAdding();public Task SaveAsync(string command){var tcs = new TaskCompletionSource<string>();_queue.Add((item: command, result: tcs));return tcs.Task;}private async Task ProcessItems(){foreach (var item in _queue.GetConsumingEnumerable()){Console.WriteLine($"DatabaseFacade: executing '{item.item}'...");// Waiting a bit to emulate some IO-bound operationawait Task.Delay(100);item.result.SetResult("OK");Console.WriteLine("DatabaseFacade: done.");}}
}

该代码并不能用于生产环境,但它是基于 BlockingCollection 的生产者-消费者模式的一个很好的示例。

假设我们有另一个组件,比如说一个logger。logger通常也使用生产者-消费者模式实现。出于性能原因,我们不希望在每次方法调用时刷新消息,而是可以使用阻塞集合和专用线程将数据保存到外部源。其中一个外部来源可能是数据库。

public class Logger : IDisposable
{private readonly DatabaseFacade _facade;private readonly BlockingCollection<string> _queue =new BlockingCollection<string>();private readonly Task _saveMessageTask;public Logger(DatabaseFacade facade) =>(_facade, _saveMessageTask) = (facade, Task.Run(SaveMessage));public void Dispose() => _queue.CompleteAdding();public void WriteLine(string message) => _queue.Add(message);private async Task SaveMessage(){foreach (var message in _queue.GetConsumingEnumerable()){// "Saving" message to the fileConsole.WriteLine($"Logger: {message}");// And to our database through the facadeawait _facade.SaveAsync(message);}}
}

这个logger的实现非常粗浅,无论如何,您不应该自己另写一个logger。我在这里的目标是展示两个生产者-消费者队列如何相互影响,logger是一个相当广泛、易于理解的概念。

译者注:即logger作为生产者, adapter作为消费者。

问题是:你能在这里看到代码有问题吗?一个非常严重的问题!

让我们尝试运行以下代码:

using (var facade = new DatabaseFacade())
using (var logger = new Logger(facade))
{logger.WriteLine("My message");await Task.Delay(100);await facade.SaveAsync("Another string");Console.WriteLine("The string is saved");
}

输出为:

Logger: My message
DatabaseFacade: executing 'My message'...

我们永远无法将"Another string"保存到数据库中。为什么?因为DatabaseFacade的线程被Logger的线程阻塞。

TaskCompletionSource 类型有一个非常奇特的行为:默认情况下,当调用方法时 SetResult后 ,后续的Task的“异步”延续都会以同步的方式进行。这就是我们的例子中发生的事情( SetCancelled 和 SetException 以及它们的 TrySetXXX 对应物也是如此):

这意味着两个“队列”隐式链接在一起,logger的队列会阻止adapter的队列。

译者注: 这里的错误比较抽象,即item.result.SetResult("OK");语句执行后,该线程跳去执行await _facade.SaveAsync(message);的下一句(也就是去foreach了),因为“当调用方法时 SetResult后 ,后续的Task的“异步”延续都会以同步的方式进行”。

不幸的是,这样的情况相对普遍,我在我的项目中遇到过几次。当任务的“延续”(注1) 以 TaskCompletionSource<T> 的方式实现时,可能会出现此问题,从而阻止调用 SetResult 的线程。

注1: 正如我们稍后将看到的,不同类型的Task延续表现不同。

这些问题的主要挑战是很难理解根本原因。一旦从生产机器抓取出问题的dump,您可能根本看不到任何明显的问题。你可能有一堆线程在等待内核对象,而在任何堆栈跟踪中没有任何相关用户的代码。

现在让我们看看为什么会发生这种情况,以及我们如何缓解这个问题。

每个Task都有一个名为 m_stateFlags 的字段,该字段表示任务的当前状态(如 RanToCompletion 、 Cancelled Failed 等)。但这并不是该字段的唯一作用:它还包含一组在任务创建期间通过指定 TaskCreationOptions 的标志。这些标志控制不同的方面,例如是否在专用线程 ( TaskCreationOptions.LongRunning ) 中运行任务,是否将工作项调度到全局队列而不是线程本地队列 ( TaskCreationOptions.PreferFairness ),或者是否强制任务继续始终异步运行 ( TaskCreationOptions.RunContinuationsAsynchronously)。

显然,我们对后一个方面感兴趣,我们将立即看到如何指定此标志。但要完全理解这个问题,我们还需要看另一个方面:我们需要了解任务的延续性。

static async Task WithAsync()
{var task = Task.Run(() => { Sleep(100); });await task;Console.WriteLine("After task await");
}static Task WithContinueWith()
{var task = Task.Run(() => { Sleep(100); });return task.ContinueWith(t => { Console.WriteLine("Inside ContinueWith"); },TaskContinuationOptions.OnlyOnRanToCompletion);
}

您可能认为这两种实现是等效的,因为编译器只是将 await 之后代码块“移动”到通过调度的实现的ContinueWith 中。但这只是一部分事实,实际的逻辑更复杂一些。

在我的另一篇文章“剖析 C# 中的异步方法”中更详细地描述了 C# 编译器对异步方法所做的实际转换,在这里我们将重点介绍一个特定方面:延续调度。

当一个正在awaitTask未完成时,生成的状态机将调用 TaskAwaiter.UnsafeOnCompleted 并传递一个回调,该回调在等待任务完成时调用,以向前移动状态机的状态。此方法调用 Task.SetContinuationForAwait 将给定操作添加为任务的延续:

// Now register the continuation, and if we couldn't register it because the task is already completing,
// process the continuation directly (in which case make sure we schedule the continuation
// rather than inlining it, the latter of which could result in a rare but possible stack overflow).
if (tc != null)
{if (!AddTaskContinuation(tc, addBeforeOthers: false))tc.Run(this, bCanInlineContinuationTask: false);
}
else
{Contract.Assert(!flowExecutionContext, "We already determined we're not required to flow context.");if (!AddTaskContinuation(continuationAction, addBeforeOthers: false))AwaitTaskContinuation.UnsafeScheduleAction(continuationAction, this);
}

局部变量 tc 在有同步上下文时不为 null,否则将调用该 else 块。首先, AddTaskContinuation 方法将被调用,该方法在当前任务未完成时返回 true (以防止栈溢出),同时一个指定的action委托成功添加为当前任务的延续。否则 UnsafeScheduleAction 方法将被调用创建 AwaitTaskContinuation 实例。

译者注:这段代码主要应该走if内的逻辑,也就是在当前任务完成时直接执行后续任务,否则就先添加作为延续。

在一般情况下(稍后会详细介绍), System.Action 实例被添加为任务延续,并且延续存储在 Task.m_continuationObject 中。

现在,让我们看看任务完成后会发生什么(代码片段来自 Task.FinishContinuations ):

internal void FinishContinuations()
{// Atomically store the fact that this task is completing.  From this point on, the adding of continuations will// result in the continuations being run/launched directly rather than being added to the continuation list.// 取出延续的任务,并将当前延续任务置为空object continuationObject = Interlocked.Exchange(ref m_continuationObject, s_taskCompletionSentinel);// If continuationObject == null, then we don't have any continuations to processif (continuationObject != null){// Skip synchronous execution of continuations if this task's thread was abortedbool bCanInlineContinuations = // 取非即为 任务未被中止且TaskCreationOptions未明确置为异步延续!(// 前两个为任务被中止((m_stateFlags & TASK_STATE_THREAD_WAS_ABORTED) != 0) ||(Thread.CurrentThread.ThreadState == ThreadState.AbortRequested) ||// 最后一个为TaskCreationOptions为异步延续((m_stateFlags & (int)TaskCreationOptions.RunContinuationsAsynchronously) != 0));// Handle the single-Action case// 把延续任务转为委托Action singleAction = continuationObject as Action;if (singleAction != null){AwaitTaskContinuation.RunOrScheduleAction(singleAction, bCanInlineContinuations, ref t_currentTask);return;}// The rest of the body}
}

FinishContinuations 函数检查任务创建标志,如果 RunContinuationsAsynchronously 未指定,则同步运行单个操作继续!这种行为对于案例中的 async / await 和task.ContinueWith 是不同的。使用同步上下文或非默认TaskScheduler时,除非任务已完成 (注2),异步方法的延续Task都会同步调用。这意味着当等待的任务未完成时,“异步”延续几乎一直同步运行!

注2:这是一种罕见的竞争条件,如果等待的任务在“await site”完成(如在await finishedTask  ) ,则异步方法将继续同步执行。仅当任务在调用生成的状态机的过程中 MoveNext 完成时,才有可能出现这种情况。(译者注:这里没太懂,大概就是一种特殊情况也会同步执行)

但是,安排 Task.ContinueWith 的延续的逻辑是不同的:在这种情况下,将创建一个 StandardTaskContinuation 实例并添加为任务的延续。除非 TaskContinuationOptions.ExecuteSynchronously 指定了标志,否则此延续将异步运行,而不考虑任务创建选项。

我们实际上可以检查我们一开始遇到的问题是否与 TaskCompletionSource 本身无关,并且实际上表现为在没有 TaskCreationOptions.RunContinuationsAsynchronously 以下情况下创建的任何任务:

static async Task WithAsync()
{Print("WithAsync");var task = Task.Run(() => { Sleep(100); Print("In Task.Run"); });await Task.Yield();await task;await Task.Yield();Print("After task await");
}static Task WithContinueWith()
{Print("WithContinueWith");var task = Task.Run(() => { Sleep(100); Print("In Task.Run"); });var result = task.ContinueWith(t => { Print("Inside ContinueWith"); });return result;
}await WithContinueWith();
await WithAsync();

输出:

WithAsync: 1
In Task.Run: 3
After task await: 3WithContinueWith: 3
In Task.Run: 4
Inside ContinueWith: 5

正如我们所看到的,在不同await语句之间的代码块和剩余的方法运行在 Task.Run同一线程中。但是task.ContinueWith 继续在不同的线程中异步运行。我们可以通过使用 Task.Factory.StartNew 和提供 TaskCreationOptions.RunContinuationsAsynchronously 以下方式来改变行为:

static async Task WithAsync()
{Print("WithAsync");var task = Task.Factory.StartNew(() => { Sleep(100); Print("In Task.Factory.StartNew"); },TaskCreationOptions.RunContinuationsAsynchronously);await task;Print("After task await");
}
WithAsync: 1
In Task.Factory.StartNew: 3
After task await: 4

如何解决这个问题?

正如我们已经讨论过的,这里涉及两个部分:1)任务创建时的参数(即TaskCreationOptions.RunContinuationsAsynchronously) 2)延续的类型(即刚刚的两种情况)。

您完全无法控制 async / await 执行时的行为。如果您无法控制任务的创建,但想要Task运行异步延续,则可以在await 之后显式调用 Task.Yield() 或者 完全切换到自定义的Task(这两种方式几乎不可行)。

译者注:即第一个例子的问题这么改也能解决,大家可以自行尝试。但这样改需要对Task有足够的认识。

private async Task SaveMessage()
{foreach (var message in _queue.GetConsumingEnumerable()){Console.WriteLine($"Logger: {message}");await _facade.SaveAsync(message);// 显式调用 Task.Yield()异步延续await Task.Yield();}
}

但是,如果可以的话,您应该在每次使用 TaskCompletionSource<T> 时都提供TaskCreationOptions

static async Task WithAsync(TaskCreationOptions options)
{Print($"WithAsync. Options: {options}");var tcs = new TaskCompletionSource<object>(options);var setTask = Task.Run(() => {Sleep(100);Print("Setting task's result");tcs.SetResult(null);Print("Set task's result");});//await Task.Yield();await tcs.Task;Print("After task await");await setTask;
}await WithAsync(TaskCreationOptions.None);
await WithAsync(TaskCreationOptions.RunContinuationsAsynchronously);

输出为:

WithAsync. Options: None: 1
Setting task's result: 3
After task await: 3
Set task's result: 3WithAsync. Options: RunContinuationsAsynchronously: 3
Setting task's result: 4
Set task's result: 4
After task await: 3

从 .NET 4.6.1 TaskCompletionSource 开始接受 TaskCreationFlags .如果指定了该标志 TaskCreationOptions.RunContinuationsAsynchronously ,则所有延续(包括“异步”延续)都将异步执行。这将消除当许多异步方法链接在一起并且该链中的一个任务基于 TaskCompletionSource时可能发生的隐式耦合 。

Conclusion 结论

TaskCompletionSource class 是在 .NET 4.0 之前 async 的时代引入的,用于手动控制任务的生存期。

默认情况下,除非指定了TaskCreationOptions.RunContinuationsAsynchronously 选项,否则 所有Task的延续都是同步执行的。

所有“异步”延续(await 语句前后的代码块)始终在await任务的线程中运行。

TaskCompletionSource 使用默认构造函数创建的实例可能会在设置任务结果的线程中运行所有“异步”延续,从而导致死锁和其他线程问题。

如果使用 .NET 4.6.1+,则应始终在创建 TaskCompletionSource 实例时提供 TaskCreationOptions.RunContinuationsAsynchronously 。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/743936.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Windows版PostgreSQL数据库下载及安装教程(关系型数据库管理系统)

前言 PostgreSQL是一个功能非常强大的、源代码开放的客户/服务器关系型数据库管理系统(RDBMS)。PostgreSQL最初设想于1986年,当时被叫做Berkley Postgres Project。该项目一直到1994年都处于演进和修改中,直到开发人员Andrew Yu和Jolly Chen在Postgres中添加了一个SQL(Str…

异构数据源数据同步 → 从源码分析 DataX 敏感信息的加解密

开心一刻 出门扔垃圾,看到一大爷摔地上了 过去问大爷:我账户余额 0.8,能扶你起来不 大爷往旁边挪了挪 跟我说到:孩子,快,你也躺下,这个来钱快! 我没理大爷,径直去扔了垃圾 然后飞速的躺在了大爷旁边,说道:感觉大爷带飞!书接上回 通过 异构数据源同步之数据同步 → …

开源工作流设计器(流程设计器)哪个好

本文重点介绍,基于activiti、flowable、camunda开源工作流引擎,如何选择一个开源免费的流程设计器,进行集成和扩展开发,快速交付项目使用。大家在开发OA办公自动化、ERP、CRM、BPM、低代码平台等项目的时候,经常用到流程引擎,目前主流的开源流程引擎有activiti、flowable…

如何设计应用系统的数据权限管理

在开发应用系统时,都离不开权限的设计,权限设计 = 功能权限 + 数据权限。而功能权限,在业界常常是基于RBAC(Role-Based Access Control)的一套方案。而数据权限,则根据不同的业务场景,则权限设计不尽相同,可以有不同的技术解决方案。按照应用系统权限类型划分,可进一步…

【进阶篇】一文搞清楚网页发起 HTTP 请求调用的完整过程

最近笔者在实际项目开发中会频繁涉及到服务之间的远程调用、域名的配置和请求的转发等与计算机网络相关的知识。这些其实在读本科和考研的时候都有学习过理论,但为了更透彻地掌握便于在工作中使用,我还是决定写一篇文章来分享实际开发中是怎么应用的。目录前言一、HTTP协议1.…

C++11标准库 原子变量 atomic 梳理

目录<atomic>原子操作的概念CAS实现原理CAS操作的伪代码:使用CAS完成变量的原子操作:CAS 操作的保证lock和锁的概念atomic模板类构造函数公共成员函数:atomic与互斥锁的效率比对 <atomic> C++11提供了一个原子类型std::atomic,通过这个原子类型管理的内部变量就可…

使用ML.NET训练一个属于自己的图像分类模型,对图像进行分类就这么简单!

前言 今天大姚给大家分享一个.NET开源、免费、跨平台(支持Windows、Linux、macOS多个操作系统)的机器学习框架:ML.NET。并且本文将会带你快速使用ML.NET训练一个属于自己的图像分类模型,对图像进行分类。ML.NET框架介绍 ML.NET 允许开发人员在其 .NET 应用程序中轻松构建、…

网页交互

单击 选择元素 (html)<!DOCTYPE html> <html lang="en"> <head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>Document</title&g…

读人工智能全传13人工智能导致的问题2

读人工智能全传13人工智能导致的问题21. 机器人sha手 1.1. 自主57的话题总是带有强烈的煽动性,许多人会本能地厌恶它,认为这样的系统是不道德的,永远不该被建立 1.2. 自主57的讨论大多源于战争中使用得越来越频繁的无人机 1.3. 无人机 1.3.1. 人驾驶的飞机,在菌用领域,它可…

万字:清结算体系,全局方案深度解析

本文分享了头部支付机构是如何做清结算的,在做和带领大家打通支付的底层处理原理,内核中的内核,分享给大家。支付机构帮助交易平台代收代付交易款,那么就需要先从消费者发卡行把钱拿过来,然后再结算给交易平台;对于交易平台也是一样的道理,要帮店家卖东西,需要帮忙通过…

【THM】tomghost练习

先努力成为脚本小子【THM】tomghost练习 与本文相关的TryHackMe实验房间链接:TryHackMe | Room details 简介:识别最近的漏洞,以尝试利用系统或读取你没有权限访问的文件。 **你能完成这个挑战吗? **机器可能需要长达5分钟的启动和配置。 管理员记录:这个房间的用户名包含不…

电动自行车 LED 大灯不亮故障分析和维修教程 All In One

电动自行车 LED 大灯不亮故障分析和维修教程 All In One 电动自行车内置的 LED 前大灯,在骑行途中突然不亮了 ❌电动自行车 LED 大灯不亮故障分析和维修教程 All In One自己动手,丰衣足食问题表象 电动自行车内置的 LED 前大灯,在骑行途中突然不亮了 ❌故障排查转向灯正常 行…