草庐IT

Kotlin 协程(一)

kolibreath 2023-03-28 原文

Come and Meet Kotlin Coroutine

Tags of Kotlin Coroutine

Kotlin协程可以被理解为一种轻量级的线程,它具有挂起恢复的特点,可以将我们从异步编程的回调陷阱中解放出来

下面我们一一来看给协程贴上的标签如何理解:

  1. 挂起和恢复

    1. 挂起函数(suspend function)

    协程最吸引人的特点就在协程的挂起和恢复特性上,通过这个特性我们能够像编写同步代码一样简化异步回调。这种特性在Kotlin语言层面表现为suspend关键字:

    // suspend function
    suspend fun function1() {
        delay(1000L)
        println("suspend function1")
    }
    
    // normal function
    fun function2() {
    //    delay(2000L) not satisfy structural concurrency
        println("suspend function2")
    }
    
    // type check:
    val funcVal1: suspend () -> Unit = ::function1
    val funcVal2: () -> Unit = ::function2
    

    相比普通的函数,suspend函数可以理解为一种新的函数类型。

    1. 协程构建器(Coroutine Builder)

    launch async runBlocking是三种常见的协程构建器,我们从函数签名上【感性】地认识一下他们的区别:

    // launch
    public fun CoroutineScope.launch(
        context: CoroutineContext = EmptyCoroutineContext,
        start: CoroutineStart = CoroutineStart.DEFAULT,
        block: suspend CoroutineScope.() -> Unit
    ): Job
    
    // aysnc
    public fun <T> CoroutineScope.async(
        context: CoroutineContext = EmptyCoroutineContext,
        start: CoroutineStart = CoroutineStart.DEFAULT,
        block: suspend CoroutineScope.() -> T
    ): Deferred<T> 
    
    // runBlocking
    public fun <T> runBlocking(
        context: CoroutineContext = EmptyCoroutineContext, 
        block: suspend CoroutineScope.() -> T
    ): T 
    

我们通常可以使用launch启动一个协程,他的返回值Job可以用于控制这个协程的生命周期, async可以看做是一个升级版的launch,他的block的返回值会被放在Deferred中。DeferredJob的子类,可以通过await方法获取返回值:

fun main() = runBlocking {

    val job = launch {
        println("execute of job")
        "execute of job" // launch中的block不考虑返回值,lambda的返回值会被忽略
    }
    val deferred = async {
        println("execute of deferred")
        "result of deferred"
    }

    println(deferred.await())

    Unit
}

/**
    execute of job
    execute of deferred
    result of deferred
**/

launchasync默认都是写了之后立刻启动(这一点非常重要,aysnc并不需要await触发执行),可以通过调整CoroutineStart参数变更启动方式:

fun main() = runBlocking {

    val lazyJob = launch(start = CoroutineStart.LAZY) {
        println("execute now!")
    }

    println("before lazy starts")
    // 通过delay先让父协程挂起,明显去别处launch没有立刻执行
    println("parent sleeps")
    delay(1000L)
    println("parent wakes up")
    lazyJob.start()

    Unit
}

/**
    before lazy starts
    parent sleeps
    parent wakes up
    execute now!
**/
  1. 理解挂起和恢复

下面我分别在两个suspend函数和两个由launch发起的协程中delay两秒,请问main函数执行完成分别需要几秒?

  • suspend函数
fun main() = runBlocking {
    getUserInfo()
    getFriendList()
}

suspend fun getUserInfo() {
    println("getUserInfo: start time: ${System.currentTimeMillis()}")
    delay(2000L)
    println("getUserInfo: end time: ${System.currentTimeMillis()}")
    logX("suspend function1")
}

suspend fun getFriendList() {
    println("getFriendList: start time: ${System.currentTimeMillis()}")
    delay(2000L)
    println("getFriendList end time: ${System.currentTimeMillis()}")
    logX("suspend function2")
}
  • Launch
fun main() = runBlocking {
   launch {
       println("launch1: start time: ${System.currentTimeMillis()}")
       delay(2000L)
       println("launch1: end time: ${System.currentTimeMillis()}")
       logX("launch1")
   }

    launch {
        println("launch2: start time: ${System.currentTimeMillis()}")
        delay(2000L)
        println("launch2: end time: ${System.currentTimeMillis()}")
        logX("launch2")
    }

    Unit
}

答案揭晓时刻:

suspend函数需要4秒,launch需要2秒。我们来看看挂起函数和launch的执行模型:

截屏2022-04-16 下午4.00.59.png

suspend函数和launch这类的协程构建器是有本质上的不同的,suspend函数在Kotlin编译器的作用下会变成一个自动机,而launch这类都不是suspend,他们其实是将任务【分发】到线程池(在JVM平台上)上实现的执行。

suspend和协程构建器的结合之处就在await上:

public suspend fun await(): T

await是一个挂起函数,后续的流程会像上图以上被挂起,我们来看这个例子:

fun main() = runBlocking {
    val def = async {
        println("async starts")
        delay(2000L)
        println("async end")
        "hello world"
    }

    println("message from main")
    println(def.await())
    println("end of story")
}

/**
    message from main
    async starts
    async end
    hello world // end of story的输出被挂起到await执行完成再恢复
    end of story
**/

suspend函数到自动机的转换在最后一节会说明。Kotlin Coroutine狭义的协程指的是通过构建器启动的协程,后文不再说明。

  1. 轻量级的线程

    1. 如何理解【轻量级】

    在下面的代码中我们开启了很多个协程,但是等量的线程会OOM

    fun main() = runBlocking {
        repeat(1000_000_000) {
            launch { //常见的协程
                delay(1000000)
            }
        }
    
        delay(10000L)
    }
    
    1. Kotlin Coroutine VS Thread

协程本身是运行在线程池上的:

fun main() = runBlocking {

    logX("main ")
    val job = launch(Dispatchers.IO) {
        logX("launch 1")
    }
}

/**
================================
main 
Thread:main @coroutine#1
================================
================================
launch 1
Thread:DefaultDispatcher-worker-1 @coroutine#2
================================
**/ 

Dispatchers就可以指定运行的线程池。


d89e8744663d45635a5125829a9037a9.gif

Structured Concurrency

结构化并发的思想贯穿于Kotlin coroutine的始终,我通过一句话概述:控制协程执行的范围。这个范围使用CoroutineScope实现。因为上面的代码都运行在runBlocking中,传入参数的时候直接将block设置为CoroutineScope的扩展lambda,所以不需要再指定scope:

// runBlocking
public fun <T> runBlocking(
    context: CoroutineContext = EmptyCoroutineContext, 
    block: suspend CoroutineScope.() -> T
): T 

包括suspend函数也需要运行在scope中,否则就会在编译期报错。

Suspend Function : A CPS Transformation

Kotlin编译器会对挂起函数进行转换,如图所示:


784ce5776def5255e6d300cd5890a6yy.gif

这种转换在Kotlin中被称为CPS(continuation-passing-style)转换,Continuation可以理解为是存储了中间过程的Callback。下面我们具体看一个例子:

要注意什么?

  1. 编译后新增加的匿名内部类:TestContinuation

  2. 看【挂起】和【恢复】的逻辑:invokeSuspend

下面代码将编译前的挂起函数和编译后的挂起函数进行了一个比较,在编译后的testCoroutine中增加了一个新的匿名内部类,TestContinuation,其中记录了获取的结果的信息,同时注意看invokeSuspend方法,这个方法有点像递归,最后还会调用到自身,但是会走不同的状态机的分支逻辑:

// 编译前的代码
suspend fun testCoroutine() {
    log("start")
    val user = getUserInfo()
    log(user)
    val friendList = getFriendList(user)
    log(friendList)
    val feedList = getFeedList(friendList)
    log(feedList)
}
// 编译后的代码
fun testCoroutine(completion: Continuation<Any?>): Any? {
    // TestContinuation本质上是匿名内部类
    class TestContinuation(completion: Continuation<Any?>?) : ContinuationImpl(completion) {
        // 表示协程状态机当前的状态
        var label: Int = 0

        // 三个变量,对应原函数的三个变量
        lateinit var user: String
        lateinit var friendList: String
        lateinit var feedList: String

        // result 接收协程的运行结果
        var result = continuation.result

        // suspendReturn 接收挂起函数的返回值
        var suspendReturn: Any? = null

        // CoroutineSingletons 是个枚举类
        // COROUTINE_SUSPENDED 代表当前函数被挂起了
        val sFlag = CoroutineSingletons.COROUTINE_SUSPENDED

        // invokeSuspend 是协程的关键
        // 它最终会调用 testCoroutine(this) 开启协程状态机
        // 状态机相关代码就是后面的 when 语句
        // 协程的本质,可以说就是 CPS + 状态机
        override fun invokeSuspend(_result: Result<Any?>): Any? {
            result = _result
            label = label or Int.Companion.MIN_VALUE
            return testCoroutine(this)
        }
    }

    // ...
    val continuation = if (completion is TestContinuation) {
        completion
    } else {
        //                作为参数
        //                   ↓
        TestContinuation(completion)
    }
}

testCoroutine运行的逻辑如下:

协程状态机的核心逻辑反编译后的伪代码如下:

when (continuation.label) {
    0 -> {
        // 检测异常
        throwOnFailure(result)

        log("start")
        // 将 label 置为 1,准备进入下一次状态
        continuation.label = 1

        // 执行 getUserInfo
        suspendReturn = getUserInfo(continuation)

        // 判断是否挂起
        if (suspendReturn == sFlag) {
            return suspendReturn
        } else {
            result = suspendReturn
            //go to next state
        }
    }

    1 -> {
        throwOnFailure(result)

        // 获取 user 值
        user = result as String
        log(user)
    
        // 准备进入下一个状态
        continuation.label = 2

        // 执行 getFriendList
        suspendReturn = getFriendList(user, continuation)

        // 判断是否挂起
        if (suspendReturn == sFlag) {
            return suspendReturn
        } else {
            result = suspendReturn
            //go to next state
        }
    }

    2 -> {
        throwOnFailure(result)

        user = continuation.mUser as String

        // 获取 friendList 的值
        friendList = result as String
        log(friendList)

        // 准备进入下一个状态
        continuation.label = 3

        // 执行 getFeedList
        suspendReturn = getFeedList(user, friendList, continuation)

        // 判断是否挂起
        if (suspendReturn == sFlag) {
            return suspendReturn
        } else {
            result = suspendReturn
            //go to next state
        }
    }

    3 -> {
        throwOnFailure(result)

        user = continuation.mUser as String
        friendList = continuation.mFriendList as String
        feedList = continuation.result as String
        log(feedList)
        loop = false
    }
}

我们来捋一下其中的顺序,最开始先构建一个TestContinuation的实例,注意,Continuation的这个实例是三个挂起函数的公共参数。

  1. getUserInfo

开始时label = 0, 此时进入逻辑,先进行异常的检查,设置下一次的入口label=1,执行getUserInfo:

when (continuation.label) {
    0 -> {
        // ...
        continuation.label = 1
        // 执行 getUserInfo
        suspendReturn = getUserInfo(continuation)
        // 判断是否挂起
        if (suspendReturn == sFlag) {
            return suspendReturn
        } else {
            result = suspendReturn
            //go to next state
        }
    }
    // ...
}

在Kotlin编译器CPS转换之后的getUserInfo方法中,因为传入了continuation参数,需要再执行一次Continuation#invokeSuspend,这个方法同时也将结果记录在了result

 override fun invokeSuspend(_result: Result<Any?>): Any? {
     result = _result
     label = label or Int.Companion.MIN_VALUE
     return testCoroutine(this)
 }

相当于【递归】地执行一次这样的逻辑(个人认为这个逻辑和传递事件的分发有点相似)。此时getUserInfo执行完成返回的结果是CoroutineSingletons.COROUTINE_SUSPEND,所以继续执行下个when的case。

后面的结果其他的挂起函数的执行过程都差不多。具体过程如图所示:

截屏2022-04-15 上午9.20.55.png

通过这个状态机的分析能够让我们更加深刻的理解挂起函数中【挂起】和【恢复】的本质:其实就是基于状态机的回调函数,但是这种回调函数的执行逻辑是Kotlin编译器自动生成的,大大减少了我们的脑力消耗。

需要注意的是,以上的挂起函数都是【真正的】挂起函数,suspend function中都带有挂起的操作,但是Kotlin编译器在进行CPS转换的时候只认supsend关键字,对于伪suspend函数,走else分支,节省开销:

 if (suspendReturn == sFlag) {
      return suspendReturn
 } else {
      result = suspendReturn
      //go to next state
 }

有关Kotlin 协程(一)的更多相关文章

  1. [面试直通版]操作系统核心之进程、线程与协程(下) - 2

    点击->操作系统复习的文章集目录操作系统线程线程是什么进程与线程的关系用户态/内核态操作系统资源管理内核态用户态内核态/用户态切换程序运行类型分析计算密集型IO密集型结合进程,线程来理解程序运行类型分析协程基础上下文切换协程协程为什么叫协作式线程?协程的优缺点操作系统线程典型问题:简述进程和线程的区别以下内容带您一步步了解线程是什么比进程更小的独立运行的基本单位-线程(Threads)线程的提出主要是为了提高系统内程序并发执行的程度,从而进一步提升系统的吞吐量,充分发挥多核CPU的优越性而设计的引入进程是为了操作系统更加方便地管理程序,使得多个程序能并发管理和执行而线程则是为了减少程序在并发执

  2. Kotlin:通过并消耗2个参数函数? - 2

    我正在尝试在Kotlin学习功能编程,并且难以使此代码起作用:importjava.util.*funcaseName(br:String,c:Int):String{if(c==0){returnbr.toLowerCase()}else{returnbr.toUpperCase()}}funmapIt(ns:ArrayList,f:(String,Int)->String):List{valcoll:List=ns.map{it->f(it,_)}returncoll}funmain(args:Array){valnames=arrayListOf("Joe","Bill","Murrar

  3. performance - 协程性能 - 2

    我已经开始学习围棋了,它既有趣又简单。但是使用goroutines我没有看到性能上的好处。如果我尝试在2个函数中两次连续添加100万个数字:packagemainimport("fmt""time")varsumAintvarsumBintfuncfSumA(){fori:=0;i需要5毫秒。MacBook-Pro-de-Pedro:hellopedro$./bin/helloElapsedtime5.724406msSumatotal999999000000MacBook-Pro-de-Pedro:hellopedro$./bin/helloElapsedtime5.358165ms

  4. go - 如何等待 panic 的协程? - 2

    等待goroutine的常见方法是使用*sync.WaitGroup:funcmain(){wg:=&sync.WaitGroup{}wg.Add(1)gofunc(){deferwg.Done()//Longrunningtask}()wg.Wait()}这里没有问题。但是,这个怎么样:funcmain(){wg:=&sync.WaitGroup{}wg.Add(1)gofunc(){deferwg.Done()//Longrunningtaskpanic("Somethingunexpectedhappened.")}()wg.Wait()}在这种情况下,当wg.Done()被调用

  5. go - 如何正确限制协程的数量 - 2

    我正在进入URL的“stdin”行,例如:$echo-e'https://golang.org\nhttps://godoc.org\nhttps://golang.org'|去运行1.go。任务是从每个网页中获取单词“Go”的编号。但是我不允许启动超过5个goroutines并且只能使用标准库这是我的代码:packagemainimport("fmt""net/http""bufio""os""regexp""io/ioutil""time")funcworker(idint,jobs在我将超过5个URL(其中一个不正确)传递给标准输入之前,我一直认为它有效。输出是:goroutin

  6. Golang 中的协程大小不是线性增加的 - 2

    对于下面的代码:constLOOPNUMint=200000funcmain(){z:=make(chanint16)fori:=0;i我用LOOPNUM=200k和400k运行代码,内存使用情况如下:有谁知道我将goroutines加倍后内存突然增加的原因(以及减少内存使用的任何解决方案)?谢谢! 最佳答案 您不是在等待goroutines完成,因此它会在更改以执行您告诉它的所有操作之前退出。将其更改为:constLOOPNUMint=200000varwgsync.WaitGroupfuncmain(){wg=sync.Wait

  7. 带有 http.HandleFunc 的协程 - 2

    我想知道代码1是否管理内部goutines并且可以在请求增加(几十个)时使用一个CPU的所有内核,或者如果每个处理程序我必须放置关键字go来指示函数处理程序将由一个gorotine管理,如代码2中所示,因此可以使用服务器的所有核心。代码1packagemainimport("fmt""net/http")funcHandlerOne(whttp.ResponseWriter,req*http.Request){fmt.Println("messageone")}funcHandlerTwo(whttp.ResponseWriter,req*http.Request){fmt.Print

  8. go - CoInitializeEx(COINIT_MULTITHREADED) 和使用 WMI 的协程 - 2

    我们有一个用Go编写的监控代理,它使用许多goroutines从WMI收集系统指标。我们最近发现当go二进制文件在Server2016或Windows10(也可能在其他使用WMF5.1的操作系统上)上运行时,程序会泄漏内存。在为reproducetheissue创建最小测试用例之后似乎仅当您对ole.CoInitializeEx方法进行大量调用时才会发生泄漏(WMF5.1中可能发生了一些变化,但我们无法在同一系统上使用pythoncomtypes包重现该问题)。我们在应用程序中将COINIT_MULTITHREADED用于多线程单元(MTA),我的问题是:因为我们从各种goroutin

  9. Golang 同步协程 - 2

    我正在开发用于并发配置网络设备的SSH客户端应用程序,但在实现并发性时遇到了问题。我的程序接受一部分主机和一部分配置命令以发送到每个主机。我正在使用sync.WaitGroup来等待所有主机完成配置。这适用于小批量主机,但很快我的配置goroutines中的功能开始随机失败。如果我在失败的主机上重新运行程序,一些会成功,一些会再次失败。我必须重复这个过程,直到只剩下有实际错误的主机。它总是在身份验证失败时说authenticationfailed:authmethodstried[nonepassword]...或者sysDescr的值没有添加到某些设备字段。就好像当有许多主机和gor

  10. 协程死锁? - 2

    我是golang的新手,我对这个死锁感到困惑(runhere)packagemainimport("fmt""runtime""time")funcmain(){c:=make(chanstring)work:=make(chanint,1)clvl:=runtime.NumCPU()count:=0fori:=0;i 最佳答案 您永远不会关闭c,因此您的forrange循环将永远等待。像这样关闭它:varwgsync.WaitGroupfori:=0;i编辑:修复了panic,感谢Crast

随机推荐