深入理解Kotlin协程

我正在参加「掘金·启航计划」

Kotlin协程

协程由程序自己创建和调度,不需要操作系统调度,所以协程比线程更加轻量。相比于线程的切换,协程切换开销更小,速度更快

我们知道线程是CPU调度的基本单位,而协程不能独立存在,必须依赖于线程。在Kotlin中,Dispatcher(内部是线程池或者Android主线程)是调度器,可以调度协程运行在一个或多个线程之中。一个线程中可以有多个协程,同一个协程可以运行在一个线程的不同时刻;多个协程可以运行在一个或多个线程的不同时刻。进程、线程、协程三者的关系:

  • 进程:线程 = 1:N
  • 线程:协程 = 1:N

协程优势:

  • 更安全的代码kotlin中提供了许多语言功能,避免Java中最常见的null空指针等异常
  • 语法简洁、富有表现力:相比于Javakotlin可以使用更少的代码实现更多的功能。
  • 可互操作:与Java语言无缝互通。即可以在 kotlin 代码中调用Java代码,同时也可以在Java代码中调用kotlin代码。kotlin代码本质上也是通过kotlin编译器编译后生成VM能识别的字节码。
  • 结构化并发使用看似阻塞式的写法来实现异步功能。即可以同步的方式去写异步代码,相比于回调方式大幅简化了后台任务管理,例如网络请求、数据库访问等任务的管理。

Suspend 非阻塞式挂起函数

协程在常规函数的基础上添加了两项操作,用于处理长时间运行的任务。在 invoke(或call)return之外,协程添加了 suspendresume

  • suspend 用于暂停执行当前协程,并保存所有局部变量。
  • resume 用于让已挂起的协程从挂起处继续执行。

suspend 函数不能在普通函数中调用,否则会报Suspend function 'xxx' should be called only from a coroutine or another suspend function 的提示;如需调用 suspend 函数,只能从其他 suspend 函数进行调用,或通过使用协程构建器(例如 launch)来启动新的协程。

suspend 函数相比于普通函数内部多了一个 Continuation续体实例(Kotlin转成Java代码之后即可看到),suspend函数中可以调用普通函数,但普通函数却不能调用suspend函数

 suspend fun fetchDocs() {                             // Dispatchers.Main
    val result = get("https://developer.android.com") // Dispatchers.IO for `get`
    show(result)                                      // Dispatchers.Main
}

suspend fun get(url: String) = withContext(Dispatchers.IO) { /* ... */ }

suspend 修饰的函数为非阻塞式挂起函数,何为非阻塞式挂起呢?不同于Java 中的Thread.sleep() 会阻塞当前线程,suspend 修饰的函数当遇到启动子线程操作时,会在切线程时将协程挂起并记录当前挂起点,接着主线程暂停了当前协程,但可以去执行协程外逻辑而不会被阻塞;此时协程中启动的子线程也可以继续执行(相当于兵分两路,互不影响),当子线程中的逻辑执行完毕后,会自动从协程的挂起点恢复,这样就可以继续在主线程往下执行了。整体流程如下:执行suspend函数 -> 启动子线程 -> 函数挂起并记录挂起点 -> 协程暂停 -> 子线程执行完毕 -> 协程从挂起点恢复

如在上面的示例中,get() 在主线程中调用,内部在它启动网络请求之前挂起协程。虽然协程被挂起,但主线程并没有被阻塞,此时如果主线程中收到其他消息事件依然可以去处理(如点击事件等)。当网络请求完成时,get 会恢复已挂起的协程,继续执行show(result) 方法。这里注意一下,挂起函数并不一定会导致协程挂起,只有在发生异步调用时才会挂起

Kotlin 使用 堆栈帧 管理要运行哪个函数以及所有局部变量。挂起协程时,系统会复制并保存当前的堆栈帧以供稍后使用。恢复时,会将堆栈帧从其保存位置复制回来,然后函数再次开始运行。即使代码可能看起来像普通的顺序阻塞请求,协程也能确保网络请求避免阻塞主线程。

CPS变换 + Continuation续体 + 状态机

CPS变换 (Continuation-Passing-Style Transformation) 是一种编程风格, 用来将内部要执行的逻辑封装到一个闭包里面, 然后再返回给调用者。上一节的例子中,协程就是通过传递 Continuation 来控制异步调用流程的:将函数挂起之后执行的逻辑包装起成一个Continuation, 里面包含了挂起点信息,这样当协程恢复时就可以在挂起点继续执行

Continuation意为续体,只存在于挂起函数中。Kotlin 中当一个普通函数加上suspend 关键字之后,就成为了挂起函数,如:

private suspend fun suspendFuc() {}

我们对此函数进行反编译,查看对应的Java代码为:

private final Object suspendFuc(Continuation $completion) {
      return Unit.INSTANCE;
}

可以看到Java代码中系统帮我们多生成了一个Continuation 类型的参数。Continuation 是一个接口类型,表示在返回T类型值的挂起点之后的延续,其中类型T代表的是原来函数的返回值类型。

@SinceKotlin("1.3")
public interface Continuation<in T> {
    /**
     * The context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext

    /**
     * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
     * return value of the last suspension point.
     */
    public fun resumeWith(result: Result<T>)
}

resumeWith 恢复执行相应协程,并传递一个结果作为最后一个挂起点的返回值。

  • 状态机

上述函数中加入一个delay 函数,delay 函数本身也是一个挂起函数:

    private suspend fun suspendFuc(): String {
        delay(1000)
        return "value"
    }

经过Tools -> Kotlin -> Show Kotlin Bytecode 反编译查看:

   private final Object suspendFuc(Continuation var1) {
      Object $continuation;
      label20: {
         if (var1 instanceof <undefinedtype>) {
            $continuation = (<undefinedtype>)var1;
            if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
               ((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
               break label20;
            }
         }

          //将要执行的Continuation逻辑传入ContinuationImpl中
         $continuation = new ContinuationImpl(var1) {
            // $FF: synthetic field
            Object result;
            int label;

             //invokeSuspend()会在恢复协程挂起点时调用
            @Nullable
            public final Object invokeSuspend(@NotNull Object $result) {
               this.result = $result;
               this.label |= Integer.MIN_VALUE;
               return CoroutineBaseFragment.this.suspendFuc(this);
            }
         };
      }

      Object $result = ((<undefinedtype>)$continuation).result;
      //返回此状态意味着函数要被挂起
      Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();

      // 状态机逻辑,通过label进行分块执行
      switch(((<undefinedtype>)$continuation).label) {
      case 0:
         ResultKt.throwOnFailure($result);
         ((<undefinedtype>)$continuation).label = 1;
         if (DelayKt.delay(1000L, (Continuation)$continuation) == var4) {
            return var4;
         }
         break;
      case 1:
         ResultKt.throwOnFailure($result);
         break;
      default:
         throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
      }
      return "value";
   }

suspend挂起函数经过Kotlin编译器编译之后会进行CPS变换,并且函数里的逻辑会进行分块执行:分块逻辑数量 = 挂起函数数量 + 1,上述函数逻辑中,通过switch(label){} 状态机将逻辑分块执行,首先label = 0,此时会先将label置为1,接着调用了delay挂起函数,返回Intrinsics.COROUTINE_SUSPENDED意味着函数要被挂起;

当函数从协程挂起点恢复执行时,会调用$continuation#invokeSuspend(Object $result) 方法恢复执行,$result是上一次函数执行的结果,以参数形式带入之后,就可以从挂起点继续执行了。

此时label变为1ResultKt.throwOnFailure($result)中通过if (value is Result.Failure) throw value.exception判断上面的分块结果是否有异常,如果有直接抛异常;否则直接break,执行到最后的return "value" 逻辑,这样整个方法的流程就运行完毕了。

总结:suspend挂起函数的执行流程就是通过CPS变换 + Continuation + 状态机来运转的

CoroutineContext

我们知道 Android 中的 Context(子类有Application、Activity、Service)可以获取应用资源,可以启动ActivityService等。CoroutineContext意为协程上下文,其作用可以类比 Context,通过它可以控制协程在哪个线程中执行、捕获协程异常、设置协程名称等。

public interface CoroutineContext {
    //从该上下文中返回具有给定[key]的元素 或 null 
    public operator fun <E : Element> get(key: Key<E>): E?

    //从initial开始累加上下文中的条目,并从左到右对当前累加器值和上下文中的每个元素应用operation
    public fun <R> fold(initial: R, operation: (R, Element) -> R): R

    //返回一个上下文,其中包含来自此上下文的元素和来自其他上下文context的元素。具有相同键的元素会被覆盖。
    public operator fun plus(context: CoroutineContext): CoroutineContext =
        if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
            context.fold(this) { acc, element ->
                val removed = acc.minusKey(element.key)
                if (removed === EmptyCoroutineContext) element else {
                    // make sure interceptor is always last in the context (and thus is fast to get when present)
                    val interceptor = removed[ContinuationInterceptor]
                    if (interceptor == null) CombinedContext(removed, element) else {
                        val left = removed.minusKey(ContinuationInterceptor)
                        if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                            CombinedContext(CombinedContext(left, element), interceptor)
                    }
                }
            }

    //删除带有指定[key]的元素。
    public fun minusKey(key: Key<*>): CoroutineContext

    /**
     * Key for the elements of [CoroutineContext]. [E] is a type of element with this key.
     */
    public interface Key<E : Element>

    //CoroutineContext的一个元素。协程上下文的一个元素本身就是一个单例上下文。
    public interface Element : CoroutineContext {
        /**
         * A key of this coroutine context element.
         */
        public val key: Key<*>

        public override operator fun <E : Element> get(key: Key<E>): E? =
            @Suppress("UNCHECKED_CAST")
            if (this.key == key) this as E else null

        public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
            operation(initial, this)

        public override fun minusKey(key: Key<*>): CoroutineContext =
            if (this.key == key) EmptyCoroutineContext else this
    }
}

协程总是会运行在以 CoroutineContext 类型的上下文中,其中plus操作符被重写,多个CoroutineContext相加时,会生成一个CombinedContext( 内部可以理解成一个Element都不一样的链表);同时 CombinedContext拥有 map索引能力,集合中的每个元素都有一个唯一的 Key

继承关系

Element 定义在CoroutineContext内部,是它的内部接口。CoroutineContext 使用以下元素集定义协程的行为:

  • Job:控制协程的生命周期。
  • CoroutineDispatcher:将工作分派到适当的线程。
  • CoroutineName:协程的名称,可用于调试。
  • CoroutineExceptionHandler:处理未捕获的异常。

下面就来详细看看几种元素的使用。

CoroutineContext几种具体实现

1、Job & SupervisorJob

Job是协程的句柄。使用launchasync创建的协程都会返回一个Job实例,该实例是对应协程的唯一标识并可以管理协程的生命周期。如:

val job1 = GlobalScope.launch { }
val job2 = MainScope().launch { }

override fun onDestroy() {
    super.onDestroy()
    job1.cancel()
    job2.cancel()
}

注:上面的协程启动方式并不推荐在项目中直接使用,因为生命周期比较长,如果没有主动关闭可能就会产生内存泄漏。推荐在ViewModel中使用viewModelScopeLifecycleOwner中使用lifecycleScope,可以在各自的生命周期中自动关闭协程。

  val job = GlobalScope.launch(context = Job() + Dispatchers.Main,
      start = CoroutineStart.LAZY) {
      // 逻辑部分
   }

  job常用的API:
  -  job.start()  // 对应 start = CoroutineStart.LAZY
  -  job.cancelAndJoin()  //cancel() + join()
  -  job.cancel() // 取消
  -  job.isActive // 协程是否存活
  -  job.isCancelled // 协程是否被取消
  -  job.isCompleted //协程是否已经执行完毕

Job 有以下状态:

State isActive isCompleted isCancelled
New (optional initial state) false false false
Active (default initial state) true false false
Completing (transient state) true false false
Cancelling (transient state) false false true
Cancelled (final state) false true true
Completed (final state) false true false

状态流转:

                                          wait children
    +-----+ start  +--------+ complete   +-------------+  finish  +-----------+
    | New | -----> | Active | ---------> | Completing  | -------> | Completed |
    +-----+        +--------+            +-------------+          +-----------+
                     |  cancel / fail       |
                     |     +----------------+
                     |     |
                     V     V
                 +------------+                           finish  +-----------+
                 | Cancelling | --------------------------------> | Cancelled |
                 +------------+                                   +-----------+

协程通过状态机方式实现自动回调。

SupervisorJob的使用

val exceptionHandler =
                CoroutineExceptionHandler { context, throwable -> log("throwable:$throwable") }

GlobalScope.launch(exceptionHandler) {
   //子Job1
    launch(SupervisorJob()) {
        delay(200)
        throw NullPointerException()
     }
   //子Job2
    launch {
        delay(300)
        log("child2 execute successfully")
    }
    //子Job3
    launch {
        delay(400)
        log("child3 execute successfully")
    }
    log("parent execute successfully")
}

注意子Job1传入的是 SupervisorJob,当发生异常时,兄弟协程(job2/job3)不会被取消;如果是默认配置,那么兄弟协程也会被取消,上述代码执行结果:

2022-08-30 23:31:30.142  E/TTT: parent execute successfully
2022-08-30 23:31:30.346  E/TTT: throwable:java.lang.NullPointerException
2022-08-30 23:31:30.446  E/TTT: child2 execute successfully
2022-08-30 23:31:30.545  E/TTT: child3 execute successfully

如果将job1中的更换为默认时,执行结果为:

2022-08-30 23:31:30.142  E/TTT: parent execute successfully
2022-08-30 23:31:30.346  E/TTT: throwable:java.lang.NullPointerException

可以看到job1中使用默认设置时,当job1发生了异常,兄弟协程也会被取消。注意这里job2/job3可以被取消是因为delay()中有对取消状态的判断,通知job2/job3取消可以类比Thread.interrupt(),interrupt只是通知线程要中断并设置一个中断状态,最终要不要中断还是线程自己说了算。所以如果改成以下代码,即使job1使用默认配置,job3也不会被取消:

//子Job3
 launch(Dispatchers.IO) {
     Thread.sleep(400)
     log("child3 execute successfully")
 }

job3中没有了对协程状态的判断,所以即使job3被通知要取消协程了,依然会继续执行直到结束。

SuperviorJob源码浅析

public fun Job(parent: Job? = null): CompletableJob = JobImpl(parent)

public fun SupervisorJob(parent: Job? = null) : CompletableJob = SupervisorJobImpl(parent)

private class SupervisorJobImpl(parent: Job?) : JobImpl(parent) {
    override fun childCancelled(cause: Throwable): Boolean = false
}

当子协程 job1/job2/job3 启动时,会和协程本身的 Job 形成父子关系。而当子协程抛异常时,父JobchildCancelled() 方法会被执行,且默认返回的是 true,表示取消 父Job 及其所有 子Job;如果 子Job中使用的是 SuperviorJobchildCancelled() 返回的是 false,表示 父Job及其未发生异常的 子Job都不会受影响。

2、CoroutineDispatcher
public actual object Dispatchers {
   @JvmStatic
    public actual val Default: CoroutineDispatcher = createDefaultDispatcher()

    @JvmStatic
    public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher

    @JvmStatic
    public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined

    @JvmStatic
    public val IO: CoroutineDispatcher = DefaultScheduler.IO
}

将工作分派到适当的线程。

  • Dispatchers.Main:运行在UI线程中。Dispatchers.Main.immediate: 如果在UI线程加载,不会做特殊处理;如果是在子线程,会通过Handler转发到主线程
  • Dispatchers.IO: 执行IO密集型任务,最多提交max(64, CPU核心数)个任务执行。
  • Dispatchers.DEFAULT :执行CPU密集型任务, CoroutineScheduler最多有corePoolSize个线程被创建,corePoolSize的取值为max(2, CPU核心数),即它会尽量的等于CPU核心数
  • Dispatchers.Unconfined:不给协程指定运行的线程,由启动协程的线程决定;但当被挂起后, 会由恢复协程的线程继续执行。内部通过ThreadLocal保存执行协程时对应的线程,用于恢复协程时在取出对应线程并在其继续执行协程。

withContext

withContext() 可以在不引入回调的情况下控制任何代码行的线程池,因此可以将其应用于非常小的函数,例如从数据库中读取数据或执行网络请求。一种不错的做法是使用 withContext() 来确保每个函数都是主线程安全的,这意味着,您可以从主线程调用每个函数。这样,调用方就从不需要考虑应该使用哪个线程来执行函数了。

3、CoroutineName

协程的名称,可用于调试。

4、CoroutineExceptionHandler
public interface CoroutineExceptionHandler : CoroutineContext.Element {

    public companion object Key : CoroutineContext.Key<CoroutineExceptionHandler>

    //当有未处理的异常发生时,该方法就会执行
    public fun handleException(context: CoroutineContext, exception: Throwable)
}

CoroutineExceptionHandler当有未捕获的异常时就会触发执行。如果不在顶级协程中设置,那么当有异常发生时会导致程序的crash,可以通过自定义CoroutineExceptionHandler来拦截异常,如下:

val exceptionHandler =
                CoroutineExceptionHandler { context, throwable -> log("throwable:$throwable") }
lifecycleScope.launch(exceptionHandler) { // do something }              

自定义CoroutineExceptionHandler调用的是如下方法:

public inline fun CoroutineExceptionHandler(crossinline handler: (CoroutineContext, Throwable) -> Unit): CoroutineExceptionHandler =
    object : AbstractCoroutineContextElement(CoroutineExceptionHandler), CoroutineExceptionHandler {
        override fun handleException(context: CoroutineContext, exception: Throwable) =
            handler.invoke(context, exception)
    }

所以当发生异常时,会在handleException()中执行我们传入的handler(CoroutineContext, Throwable)方法。

CoroutineExceptionHandler使用示例

 val exceptionHandler =
                CoroutineExceptionHandler { context, throwable -> log("parent throwable:$throwable") }
 lifecycleScope.launch(exceptionHandler) {
     log("parent execute start")
     val childExHandler =
                    CoroutineExceptionHandler { context, throwable -> log("child throwable:$throwable") }
     //子协程中如果使用SupervisorJob()、Job(),则异常不会往上传播;否则异常会在顶层协程中处理
     val childJob = launch(childExHandler) {
         delay(1000)
         log("child execute")
         throw  IllegalArgumentException("error occur")
     }
     childJob.join()
     log("parent execute end")
 }

执行结果:

2022-09-04 00:38:51.792 15415-15415/ E/TTT: parent execute start
2022-09-04 00:38:52.796 15415-15415/ E/TTT: child execute
2022-09-04 00:38:52.805 15415-15415/ E/TTT: parent throwable:java.lang.IllegalArgumentException: error occur

可见虽然在子协程中也自定义了CoroutineExceptionHandler,但是最终子协程中的异常还是在顶层的父协程种处理的,如果就想在子协程中处理异常呢?可以在子协程中加上SupervisorJob()、Job(),则异常会在子协程中处理而不会往上传播:

 //其他内容不变
 val childJob = launch(childExHandler + SupervisorJob()) {
     //其他内容不变
 }

执行结果:

2022-09-04 00:42:48.465 15755-15755/ E/TTT: parent execute start
2022-09-04 00:42:49.468 15755-15755/ E/TTT: child execute
2022-09-04 00:42:49.473 15755-15755/ E/TTT: child throwable:java.lang.IllegalArgumentException: error occur
2022-09-04 00:42:49.474 15755-15755/ E/TTT: parent execute end

可见异常最终是在子协程中处理的,且虽然子协程中发生了异常,父协程依然能执行完毕。

:如果启动协程的是async,则CoroutineExceptionHandler中的handler方法并不会马上执行,必须调用deffered.await()时才会执行。

CoroutineScope 协程作用域

CoroutineScope 会跟踪它使用 launchasync 创建的所有协程。CoroutineScope 本身并不运行协程,但是通过CoroutineScope可以保证对协程进行统一管理,避免发生内存泄漏等,

常见的CoroutineScope

  • GlobalScope: 全局协程作用域,如果不主动通过job.cancel()关闭,其生命周期与Application一致。
  • MainScope: MainScope的内部:public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main),可以看到MainScope其实是通过SupervisorJob() + Dispatchers.Main自定义的协程作用域,其内部运行在UI线程且内部异常不会往上传播。
  • runBlocking: 顶层函数,和 GlobalScope 不一样,它会阻塞当前线程直到其内部所有子协程执行结束;
  • ktx扩展库中的自定义GlobalScope:典型应用就是在LifecycleOwner(androidX中的FragmentActivity、Fragment都实现了该接口)使用的lifecycleScope,以及ViewModel中使用的ViewModelScope,其内部可以在合适的时机自动关闭协程,从而避免内存泄露的发生。

参考

【1】官网:利用 Kotlin 协程提升应用性能 </br>
【2】官网:Android 上的 Kotlin 协程</br>
【3】Kotlin挂起函数原理</br>
【4】揭秘Kotlin协程中的CoroutineContext

作者:_小马快跑_ 原文地址:https://juejin.cn/post/7145800423641710599

%s 个评论

要回复文章请先登录注册