草庐IT

C# 异步编程async/await

寡人正在Coding 2023-03-28 原文

概述

异步这个概念在不同语境下有不同的解释,比如在一个单核CPU里开启两个线程执行两个函数,通常认为这种调用是异步的,但对于CPU来说它是单核不可能同时运行两个函数,不过是由系统调度在不同的时间分片中执行。一般来说,如果两个工作能同时进行,就认为是异步的。在编程中,它通常代表函数的调用可以在不执行完的情况下返回,必要时在完成时回调。

有一个概念常常被混淆,多线程和异步。很多人认为异步就是多线程的,但是多线程只是实现异步的其中一种方式,除此之外还有系统中断,定时器,甚至可以自己写一个状态机实现异步(C# 的异步实现类似状态机)。

不同的编程语言有不同异步编程方法,在C#语言中,常常使用async/await等关键字,和Task等类来实现异步编程。

C#异步编程用法

class Program
{
    static void Main(string[] args)
    {
        var task =  IntTask();
		Console.WriteLine("等待中...");
        Console.WriteLine($"算完了? 让我康康! result = {task.Result}");
    }

    static async Task<int> IntTask()
    {
        Console.WriteLine("等3秒吧");
        await Task.Delay(3000);
        return 1;
    }
}

Main函数异步调用IntTask,打印"等三秒吧",随后返回到Main函数打印“等待中”,在task.Result取值时阻塞,三秒后IntTask返回(此时Task.Result被赋值)打印“result = 1”。看一下用法:

  • async: 异步函数使用async关键字修饰
  • await: 等待异步函数返回
  • Task:异步函数有返回值,且返回值为int类型

上述只是一个极简的用法,忽略了大量的细节,可以建立一个初步的印象。

async/await和Task简介

async

用async修饰一个方法,表明这个方法可以异步执行,其返回值必须是void/Task/Task<T>(T是返回值类型)其中一个,方法内的语句至少包含一个await关键字,否则会被同步的方式执行。

await

await只能修饰(返回值是)Task类型变量,此时会返回Task.Result或void而不是Task本身,在上述示例中,Main没有被async修饰,不能使用await,其返回值就是Task<int>, 而IntTask调用Task.Delay就是直接返回void。await也只能在被async修饰的函数的语句中使用。

Task

源于基于任务的异步模式(Task-based Asynchronous Pattern,TAP),被作为异步函数的返回值。异步函数的返回值有三种:

  • void:"fire and forget"(触发并忘记)不需要知道状态(是否完成),比如抛出异常、打印日志时可以使用
  • Task:需要知道是否完成(或失败)等状态,但是不需要返回值
  • Task<T>:在Task的基础上还想要返回值

其他

  • 异步函数不能使用ref/out修饰参数

实现原理剖析

如果使用反汇编等手段,可以看到上述示例代码的编译:

在返回1之前,好像有什么“奇怪的东西”被调用,编译器又背着开发者偷偷干了什么呢?

实现原理示例

在微软的开发博客里有一个叫谢尔盖·杰普利亚科夫(Sergey Tepliakov)的毛子曾提到这部分,来看一下他的示例:

源码

class StockPrices
{
    private Dictionary<string, decimal> _stockPrices;
    public async Task<decimal> GetStockPriceForAsync(string companyId)
    {
        await InitializeMapIfNeededAsync();
        _stockPrices.TryGetValue(companyId, out var result);
        return result;
    }
 
    private async Task InitializeMapIfNeededAsync()
    {
        if (_stockPrices != null)
            return;
 
        await Task.Delay(42);
        // Getting the stock prices from the external source and cache in memory.
        _stockPrices = new Dictionary<string, decimal> { { "MSFT", 42 } };
    }
}

这是他的源代码,这个类叫做StockPrices(股票价格),其核心业务是根据公司ID查询股票价格GetStockPriceForAsync,这是一个异步调用,首先它先异步调用InitializeMapIfNeededAsync对数据库进行初始化,初始化完成尝试从数据库中获取该公司的股票价格返回。
上述提到编译器偷偷自己生成了代码,如果手动实现大概是怎样的呢?来看谢尔盖给出的解:

手动实现

class GetStockPriceForAsync_StateMachine
{
    enum State { Start, Step1, }
    private readonly StockPrices @this;
    private readonly string _companyId;
    private readonly TaskCompletionSource<decimal> _tcs;
    private Task _initializeMapIfNeededTask;
    private State _state = State.Start;

    public GetStockPriceForAsync_StateMachine(StockPrices @this, string companyId)
    {
        this.@this = @this;
        _companyId = companyId;
    }

    public void Start()
    {
        try
        {
            if (_state == State.Start)
            {
                // The code from the start of the method to the first 'await'.

                if (string.IsNullOrEmpty(_companyId))
                    throw new ArgumentNullException();

                _initializeMapIfNeededTask = @this.InitializeMapIfNeeded();

                // Update state and schedule continuation
                _state = State.Step1;
                _initializeMapIfNeededTask.ContinueWith(_ => Start());
            }
            else if (_state == State.Step1)
            {
                // Need to check the error and the cancel case first
                if (_initializeMapIfNeededTask.Status == TaskStatus.Canceled)
                    _tcs.SetCanceled();
                else if (_initializeMapIfNeededTask.Status == TaskStatus.Faulted)
                    _tcs.SetException(_initializeMapIfNeededTask.Exception.InnerException);
                else
                {
                    // The code between first await and the rest of the method

                    @this._store.TryGetValue(_companyId, out var result);
                    _tcs.SetResult(result);
                }
            }
        }
        catch (Exception e)
        {
            _tcs.SetException(e);
        }
    }

    public Task<decimal> Task => _tcs.Task;
}

public Task<decimal> GetStockPriceForAsync(string companyId)
{
    var stateMachine = new GetStockPriceForAsync_StateMachine(this, companyId);
    stateMachine.Start();
    return stateMachine.Task;
}

从类名GetStockPriceForAsync_StateMachine可以看到,他为这个异步调用生成了一个状态机来实现异步,先来看下成员变量:

  • StockPrices: 原来那个“股票价格”类的引用
  • _companyId: 调用方法时的参数公司ID
  • _tcs:TaskCompletionSource 创建并完成该任务的来源。
  • _initializeMapIfNeededTask:调用初始化数据的异步任务
  • _state:状态枚举
  • Task:直接就是_tcs.Task,即该任务创建并完成的来源

现在看来这段代码的逻辑就比较清楚了,在调用异步查询股票的接口时,创建了一个状态机并调用状态机的Start函数,第一次进入start函数时状态机的状态是Start状态,它给_initializeMapIfNeededTask赋值,把状态机状态流转到Step1,并让_initializeMapIfNeededTask执行结束末尾再次调用Start函数(ContinueWith)。

_initializeMapIfNeededTask任务在等待了42毫秒后(Task.Delay(42)),末尾时再次调用了Start函数,此时状态为Step1。首先检查了Task状态,符合要求调用_tcs.SetResult(其实是给Task的Result赋值),此时异步任务完成。

TaskCompletionSource

看官方文档给的定义:

表示未绑定到委托的 Task<TResult> 的制造者方,并通过 Task 属性提供对使用者方的访问

简单的示例:

static void Main()
{
	TaskCompletionSource<int> tcs1 = new TaskCompletionSource<int>();
	Task<int> t1 = tcs1.Task;

	// Start a background task that will complete tcs1.Task
	Task.Factory.StartNew(() =>
	{
		Thread.Sleep(1000);
		tcs1.SetResult(15);
	});
}

看的出来这个类就是对Task的包装,方便创建分发给使用者的任务。其核心就是包装Task并方便外面设置其属性和状态

Task.ContinueWith

创建一个在目标 Task 完成时异步执行的延续任务

可以传入一个委托,在Task完成的末尾调用。这是一个典型的续体传递风格(continuation-pass style)。

续体传递风格

续体传递风格(continuation-pass style, CPS),来看维基百科的描述:

A function written in continuation-passing style takes an extra argument: an explicit "continuation"; i.e., a function of one argument. When the CPS function has computed its result value, it "returns" it by calling the continuation function with this value as the argument. That means that when invoking a CPS function, the calling function is required to supply a procedure to be invoked with the subroutine's "return" value. Expressing code in this form makes a number of things explicit which are implicit in direct style. These include: procedure returns, which become apparent as calls to a continuation; intermediate values, which are all given names; order of argument evaluation, which is made explicit; and tail calls, which simply call a procedure with the same continuation, unmodified, that was passed to the caller

大概的意思是,这种风格的函数比起普通的有一个额外的函数指针参数,调用结束(即将return)调用或者函数参数(替代直接return到调用者Caller)。还有一些其他细节,就不多说了,感兴趣自行翻译查看。

来看一个极简的例子:
int a = b + c + d;
这是一个链式运算,是有顺序的,在C++中,上述运算其实是:
int a = (b + c) + d; 先计算tmp = b + c(tmp是寄存器上一个临时的值,也称将亡值),然后计算 int a = tmp + c
使用续体传递来模拟这一过程:

class Program
{
    public class Result<T>
    {
        public T V;
    }

    static void Main(string[] args)
    {
        int a = 1;
        int b = 2;
        int c = 3;
        int d = 4;

        Result<int> ar = new() { V = a};
        Calc3(ar, b, c, d, Calc2);

        Console.WriteLine($"a = {ar.V}");
    }

    static void Calc3(Result<int> ar, int b, int c, int d, Action<Result<int>, int, int> continues)
    {
        int tmp = b + c;
        continues(ar, tmp, d);
    }

    static void Calc2(Result<int> ar, int tmp, int d)
    {
        ar.V = tmp + d;
    }
}

上述代码应该很清楚了,稍微看下应该能看明白。

C#编译器的实现

struct _GetStockPriceForAsync_d__1 : IAsyncStateMachine
{
    public StockPrices __this;
    public string companyId;
    public AsyncTaskMethodBuilder<decimal> __builder;
    public int __state;
    private TaskAwaiter __task1Awaiter;

    public void MoveNext()
    {
        decimal result;
        try
        {
            TaskAwaiter awaiter;
            if (__state != 0)
            {
                // State 1 of the generated state machine:
                if (string.IsNullOrEmpty(companyId))
                    throw new ArgumentNullException();

                awaiter = __this.InitializeLocalStoreIfNeededAsync().GetAwaiter();

                // Hot path optimization: if the task is completed,
                // the state machine automatically moves to the next step
                if (!awaiter.IsCompleted)
                {
                    __state = 0;
                    __task1Awaiter = awaiter;

                    // The following call will eventually cause boxing of the state machine.
                    __builder.AwaitUnsafeOnCompleted(ref awaiter, ref this);
                    return;
                }
            }
            else
            {
                awaiter = __task1Awaiter;
                __task1Awaiter = default(TaskAwaiter);
                __state = -1;
            }

            // GetResult returns void, but it'll throw if the awaited task failed.
            // This exception is catched later and changes the resulting task.
            awaiter.GetResult();
            __this._stocks.TryGetValue(companyId, out result);
        }
        catch (Exception exception)
        {
            // Final state: failure
            __state = -2;
            __builder.SetException(exception);
            return;
        }

        // Final state: success
        __state = -2;
        __builder.SetResult(result);
    }

    void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine)
    {
        __builder.SetStateMachine(stateMachine);
    }
}

[AsyncStateMachine(typeof(_GetStockPriceForAsync_d__1))]
public Task<decimal> GetStockPriceFor(string companyId)
{
    _GetStockPriceForAsync_d__1 _GetStockPriceFor_d__;
    _GetStockPriceFor_d__.__this = this;
    _GetStockPriceFor_d__.companyId = companyId;
    _GetStockPriceFor_d__.__builder = AsyncTaskMethodBuilder<decimal>.Create();
    _GetStockPriceFor_d__.__state = -1;
    var __t__builder = _GetStockPriceFor_d__.__builder;
    __t__builder.Start<_GetStockPriceForAsync_d__1>(ref _GetStockPriceFor_d__);
    return _GetStockPriceFor_d__.__builder.Task;
}

比较一下C#编译器生成的状态机:

  • __this:StockPrices“股票价格”类的引用
  • companyId:公司ID参数
  • __builder:AsyncTaskMethodBuilder类型的表示返回任务的异步方法生成器
  • __state:状态索引
  • __task1Awaiter:TaskAwaiter类型,提供等待异步任务完成的对象

上述成员有一些和之前手撸的状态机不太一样,等下面会介绍,先来这一套的逻辑:
首先创建了一个_GetStockPriceForAsync_d__1状态机_GetStockPriceFor_d__并初始化赋值,随后调用了这个状态机的__builder的Start函数并把该状态机作为引用参数传入。__builder.Start函数会调到该状态机的MoveNext函数(下面会介绍),这和手撸代码状态机Start函数调用类似。

MoveNext与Start函数的处理过程也类似:第一次进来__state == -1,__builder.AwaitUnsafeOnCompleted切换上下文执行InitializeLocalStoreIfNeededAsync异步任务,并指定在完成后切换到当前上下文调用该状态机的MoveNext函数,类似手撸代码的Task.ContinueWith。第二次进入时,执行到__builder.SetResult(result),异步任务基本完成。

上述描述也是忽略了一些细节,下面是调用的时序图,会更清楚些,有些不太清楚的点后面会详细介绍。

TaskAwaiter

来看下官方定义:

public readonly struct TaskAwaiter : System.Runtime.CompilerServices.ICriticalNotifyCompletion 提供等待异步任务完成的对象

结构:

可以看到,这个所谓“等待异步任务完成的对象”,主要是保证实现ICriticalNotifyCompletion的接口OnCompleted等。

AsyncTaskMethodBuilder<TAwaiter,TStateMachine>(TAwaiter, TStateMachine)

官方定义:

个人认为可以视为异步任务的“门面”,它负责启动状态机,传递一些中间状态,并在最终SetResult时表示它和其子例程的异步任务结束。其中有一个方法AwaitUnsafeOnCompleted,值得研究一下。

AsyncTaskMethodBuilder.AwaitUnsafeOnCompleted

这个方法在上述中一笔带过,被描述为类似Task.ContinueWith,确实如此,但执行过程相当复杂,在这里也只是简单介绍下过程。

AwaitUnsafeOnCompleted首先会调用GetCompletionAction,GetCompletionAction创建了一个保存了上下文 context = ExecuteContext.FastCapture()的MoveNextRunner,并返回了指向的MoveNextRunner.Run函数的委托。

接着调用参数awaiter的UnsafeOnCompleted(completionAction)函数,这里completionAction就是上述的那个委托,内部调用了成员Task.SetContinuationForAwait函数来初始化续体,SetContinuationForAwait又调用AddTaskContinuation把延续方法添加到Task中,当上述示例源码中的InitializeMapIfNeededAsync函数执行完调用Runner.Run:

[SecuritySafeCritical]
internal void Run()
{
    if (this.m_context != null)
    {
        try
        {
            // 我们并未给 s_invokeMoveNext 赋值,所以 callback == null
            ContextCallback callback = s_invokeMoveNext;
            if (callback == null)
            {
                // 将回调设置为下方的 InvokeMoveNext 方法
                s_invokeMoveNext = callback = new
                ContextCallback(AsyncMethodBuilderCore.MoveNextRunner.InvokeMoveNext);
            }
            ExecutionContext.Run(this.m_context, callback, this.m_stateMachine, true);
            return;
        }
        finally
        {
            this.m_context.Dispose();
        }
    }
    this.m_stateMachine.MoveNext();
}

[SecurityCritical]
private static void InvokeMoveNext(object stateMachine)
{
    ((IAsyncStateMachine) stateMachine).MoveNext();
}

((IAsyncStateMachine) stateMachine).MoveNext() 重新调用了状态机的MoveNext()

参考链接

有关C# 异步编程async/await的更多相关文章

  1. ruby-on-rails - 如何在 ruby​​ 中使用两个参数异步运行 exe? - 2

    exe应该在我打开页面时运行。异步进程需要运行。有什么方法可以在ruby​​中使用两个参数异步运行exe吗?我已经尝试过ruby​​命令-system()、exec()但它正在等待过程完成。我需要用参数启动exe,无需等待进程完成是否有任何ruby​​gems会支持我的问题? 最佳答案 您可以使用Process.spawn和Process.wait2:pid=Process.spawn'your.exe','--option'#Later...pid,status=Process.wait2pid您的程序将作为解释器的子进程执行。除

  2. ruby - 寻找通过阅读代码确定编程语言的ruby gem? - 2

    几个月前,我读了一篇关于ruby​​gem的博客文章,它可以通过阅读代码本身来确定编程语言。对于我的生活,我不记得博客或gem的名称。谷歌搜索“ruby编程语言猜测”及其变体也无济于事。有人碰巧知道相关gem的名称吗? 最佳答案 是这个吗:http://github.com/chrislo/sourceclassifier/tree/master 关于ruby-寻找通过阅读代码确定编程语言的rubygem?,我们在StackOverflow上找到一个类似的问题:

  3. c# - 如何在 ruby​​ 中调用 C# dll? - 2

    如何在ruby​​中调用C#dll? 最佳答案 我能想到几种可能性:为您的DLL编写(或找人编写)一个COM包装器,如果它还没有,则使用Ruby的WIN32OLE库来调用它;看看RubyCLR,其中一位作者是JohnLam,他继续在Microsoft从事IronRuby方面的工作。(估计不会再维护了,可能不支持.Net2.0以上的版本);正如其他地方已经提到的,看看使用IronRuby,如果这是您的技术选择。有一个主题是here.请注意,最后一篇文章实际上来自JohnLam(看起来像是2009年3月),他似乎很自在地断言RubyCL

  4. C# 到 Ruby sha1 base64 编码 - 2

    我正在尝试在Ruby中复制Convert.ToBase64String()行为。这是我的C#代码:varsha1=newSHA1CryptoServiceProvider();varpasswordBytes=Encoding.UTF8.GetBytes("password");varpasswordHash=sha1.ComputeHash(passwordBytes);returnConvert.ToBase64String(passwordHash);//returns"W6ph5Mm5Pz8GgiULbPgzG37mj9g="当我在Ruby中尝试同样的事情时,我得到了相同sha

  5. 基于C#实现简易绘图工具【100010177】 - 2

    C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.

  6. 网络编程套接字 - 2

    网络编程套接字网络编程基础知识理解源`IP`地址和目的`IP`地址理解源MAC地址和目的MAC地址认识端口号理解端口号和进程ID理解源端口号和目的端口号认识`TCP`协议认识`UDP`协议网络字节序socket编程接口`sockaddr``UDP`网络程序服务器端代码逻辑:需要用到的接口服务器端代码`udp`客户端代码逻辑`udp`客户端代码`TCP`网络程序服务器代码逻辑多个版本服务器单进程版本多进程版本多线程版本线程池版本服务器端代码客户端代码逻辑客户端代码TCP协议通讯流程TCP协议的客户端/服务器程序流程三次握手(建立连接)数据传输四次挥手(断开连接)TCP和UDP对比网络编程基础知识

  7. ruby-on-rails - 在 Ruby on Rails 中发送响应之前如何等待多个异步操作完成? - 2

    在我做的一些网络开发中,我有多个操作开始,比如对外部API的GET请求,我希望它们同时开始,因为一个不依赖另一个的结果。我希望事情能够在后台运行。我找到了concurrent-rubylibrary这似乎运作良好。通过将其混合到您创建的类中,该类的方法具有在后台线程上运行的异步版本。这导致我编写如下代码,其中FirstAsyncWorker和SecondAsyncWorker是我编写的类,我在其中混合了Concurrent::Async模块,并编写了一个名为“work”的方法来发送HTTP请求:defindexop1_result=FirstAsyncWorker.new.async.

  8. ruby - 我正在学习编程并选择了 Ruby。我应该升级到 Ruby 1.9 吗? - 2

    我完全不是程序员,正在学习使用Ruby和Rails框架进行编程。我目前正在使用Ruby1.8.7和Rails3.0.3,但我想知道我是否应该升级到Ruby1.9,因为我真的没有任何升级的“遗留”成本。缺点是什么?我是否会遇到与普通gem的兼容性问题,或者甚至其他我不太了解甚至无法预料的问题? 最佳答案 你应该升级。不要坚持从1.8.7开始。如果您发现不支持1.9.2的gem,请避免使用它们(因为它们很可能不被维护)。如果您对gem是否兼容1.9.2有任何疑问,您可以在以下位置查看:http://www.railsplugins.or

  9. ruby - 如何以编程方式删除实例上的 "singleton information"以使其编码(marshal)? - 2

    我创建了一个由于“在运行时执行的单例元类定义”而无法编码的对象(这段代码的描述是否正确?)。这是通过以下代码执行的:#defineclassXthatmyusesingletonclassmetaprogrammingfeatures#throughcallofmethod:break_marshalling!classXdefbreak_marshalling!meta_class=class我该怎么做才能使对象编码正确?是否可以从对象instance_of_x的classX中“移除”单例组件?我真的需要一个建议,因为我们的一些对象需要通过Marshal.dump序列化机制进行缓存。

  10. Ruby 元编程问题 - 2

    我正在查看Ruby日志记录库Logging.logger方法并从sourceatgithub提出问题与这段代码有关:logger=::Logging::Logger.new(name)logger.add_appendersappenderlogger.additive=falseclass我知道类 最佳答案 这实际上删除了方法(当它实际被执行时)。这是确保close不会被调用两次的保障措施。看起来好像有嵌套的“class 关于Ruby元编程问题,我们在StackOverflow上找到一

随机推荐