草庐IT

kotlin<第十一篇>:Channel-通道

NoBugException 2023-09-22 原文
(1)基本用法

Channel实际上是一个并发安全的队列,它可以用来连接协程,实现不同协程的通信。
生产者/消费者模式 (send - channel - receive)

Channel的基本用法如下:

runBlocking {

    val channel = Channel<Int>()
    // 生产者
    val producer = GlobalScope.launch {
        var i = 0
        while(true) {
            delay(1000)
            channel.send(++i)
            println("send $i")
        }
    }

    // 消费者
    val consumer = GlobalScope.launch {
        while(true) {
            val element = channel.receive()
            println("receive $element")
        }
    }

    joinAll(producer, consumer)

}
(2)Channel的容量

Channel实际上就是一个队列,队列中一定存在缓冲区,那么一旦这个缓冲区满了,
并且也一直没有人调用receive取走数据,send就需要挂起。
故意让接收端的节奏放慢,发现send总是会挂起,直到receive之后才会继续往下执行。

Channel的默认大小为0。

(3)迭代Channel

Channel本身确实像序列,所以我们在读取的时候可以直接获取一个Channel的iterator。

runBlocking {

    val channel = Channel<Int>(Channel.UNLIMITED)
    // 生产者
    val producer = GlobalScope.launch {
        for (x in 1..5) {
            println("send ${x * x}")
            channel.send(x * x)
        }
    }

    // 消费者
    val consumer = GlobalScope.launch {
        val iterator = channel.iterator()
        while(iterator.hasNext()) {
            val element = iterator.next()
            println("receive $element")
            delay(1000)
        }
    }

    joinAll(producer, consumer)

}

消费者代码也可以改成:

    // 消费者
    val consumer = GlobalScope.launch {
        for (element in channel) {
            println("receive $element")
            delay(1000)
        }
    }
(4)produce与actor

构造生产者与消费者的便捷方法
我们可以通过produce方法启动一个生产者协程,并返回一个ReceiveChannel,其他协程就可以用这个Channel来接收数据了。
反过来,我们可以用actor启动一个消费者协程。

produce演示:

runBlocking {

    val receiveChannel = GlobalScope.produce<Int> {
        repeat(100) {
            delay(1000)
            send(it)
        }
    }

    // 消费者
    val consumer = GlobalScope.launch {
        for (element in receiveChannel) {
            println("receive $element")
        }
    }
    consumer.join()
}

actor演示:

runBlocking {

    // 消费者
    val sendChannel = GlobalScope.actor<Int> {
        while (true) {
            val element = receive()
            println("receive $element")
        }
    }

    // 生产者
    val producer = GlobalScope.launch {
        repeat(100) {
            sendChannel.send(it)
            delay(1000)
        }
    }
    producer.join()
}
(5)Channel关闭

produce和actor返回的Channel都会随着对应的协程执行完毕而关闭,也正是这样,Channel才被称为 热数据流
对于一个Channel,如果我们调用了它的close方法,它会立即停止接收新元素,也就是说这时它的 isClosedForSend 会立即返回true,而由于Channel缓冲区的存在,这时可能还有一些元素没有处理完,因此要等所有的元素都读取之后 isClosedForReceive 才会返回true。

Channel的生命周期最好由主导方来维护,建议 由主导的一方实现关闭

runBlocking {

    val channel = Channel<Int>(3)
    // 生产者
    val producer = GlobalScope.launch {
        List(3) {
           channel.send(it)
           println("send $it")
        }
        channel.close()
        println("producer -> isClosedForSend:" + channel.isClosedForSend + " -- isClosedForReceive:" + channel.isClosedForReceive)
    }

    // 消费者
    val consumer = GlobalScope.launch {
        for (element in channel) {
            println("receive:$element")
            delay(1000)
        }
        println("consumer -> isClosedForSend:" + channel.isClosedForSend + " -- isClosedForReceive:" + channel.isClosedForReceive)
    }
    joinAll(producer, consumer)
}
(6)BroadcastChannel

正常情况下,一个 发送者 对应着一个 接收者
使用 BroadcastChannel 可以存在多个接收者。

runBlocking {

    val broadcastChannel = BroadcastChannel<Int>(Channel.BUFFERED)
    // 生产者
    val producer = GlobalScope.launch {
        List(3) {
            delay(1000)
            broadcastChannel.send(it)
        }
        broadcastChannel.close()
    }

    List(3) { index->
        GlobalScope.launch {
            val receiveChannel = broadcastChannel.openSubscription()
            for (element in receiveChannel) {
                delay(1000)
                println("[#${index}] receive:$element")
            }
        }
    }.joinAll()

    producer.join()
}

BroadcastChannel<Int>(Channel.BUFFERED) 可以改成 Channel<Int>().broadcast(Channel.BUFFERED)

(7)await多路复用

什么是多路复用?
数据通信系统或计算机网络系统中,传输媒体的带宽或容量往往会大于传输单一信号的需求,为了有效地利用通信线路,希望 一个信道同时传输多路信号,这就是所谓多路复用技术。

复用多个await?
两个API分别从网络和本地缓存 获取数据,期望哪个先返回就先用哪个做展示。

request->server->response--
                         ----Select -> Response
request->server->response--
data class User(val name: String)
data class Response<T>(val value: T, val isLocal: Boolean)
suspend fun CoroutineScope.getUserForLocal(name: String) = async(Dispatchers.IO) {
    delay(1000)
    User(name)
}

suspend fun CoroutineScope.getUserFromRemote(name: String) = async(Dispatchers.IO) {
    delay(100)
    User(name)
}

fun main() {
    runBlocking {

        GlobalScope.launch {
            val localRequest = getUserForLocal("xxx")
            val remoteRequest = getUserFromRemote("yyy")
            // select 选择执行
            val userResponse = select<Response<User>> {
                localRequest.onAwait { Response(it, true) }
                remoteRequest.onAwait { Response(it, false) }
            }
            println("name:" + userResponse.value.name + "-- isLocal:" + userResponse.isLocal)
        }.join()

    }
}

select:谁先返回,就选择谁。

(8)复用多个Channel
fun main() {
    runBlocking {

        val channels = listOf(Channel<Int>(), Channel<Int>())
        GlobalScope.launch {
            delay(100)
            channels[0].send(200)
        }
        GlobalScope.launch {
            delay(50)
            channels[0].send(100)
        }
        val result = select<Int?> {
            channels.forEach { channel ->
                channel.onReceive { it }
            }
        }
        println(result)

    }
}
(9)SelectClause

我们怎么知道哪些事件可以被select呢? 其实所有能够被select的事件都是selectClauseN 类型,包括:

  • selectClause0:对应事件没有返回值,例如join没有返回值,那么onJoin就是SelectClauseN类型。使用时,onJoin的参数是一个无参函数。
  • selectClause1:对应事件有返回值,例如:onAwait和onReceive都是此类情况。
  • selectClause2:对应事件有返回值,此外还需要一个额外的参数,例如Channel.onSend有两个参数,第一个是具体的数据,第二个参数是发送成功的回调参数。

-> 如果我们想要确认挂起函数是否支持select,只需要查看其是否存在对应的SelectClauseN类型可回调即可。

selectClause0举例:

fun main() {
    runBlocking {
        val job1 = GlobalScope.launch {
            delay(100)
            println("job 1")
        }
        val job2 = GlobalScope.launch {
            delay(10)
            println("job 2")
        }
        select<Unit> {
            job1.onJoin { println("job1 onJoin") }
            job2.onJoin { println("job2 onJoin") }
        }
    }
}

selectClause1举例:

data class User(val name: String)
data class Response<T>(val value: T, val isLocal: Boolean)
suspend fun CoroutineScope.getUserForLocal(name: String) = async(Dispatchers.IO) {
    delay(1000)
    User(name)
}

suspend fun CoroutineScope.getUserFromRemote(name: String) = async(Dispatchers.IO) {
    delay(100)
    User(name)
}

fun main() {
    runBlocking {

        GlobalScope.launch {
            val localRequest = getUserForLocal("xxx")
            val remoteRequest = getUserFromRemote("yyy")
            // select 选择执行
            val userResponse = select<Response<User>> {
                localRequest.onAwait { Response(it, true) }
                remoteRequest.onAwait { Response(it, false) }
            }
            println("name:" + userResponse.value.name + "-- isLocal:" + userResponse.isLocal)
        }.join()

    }
}
fun main() {
    runBlocking {

        val channels = listOf(Channel<Int>(), Channel<Int>())
        GlobalScope.launch {
            delay(100)
            channels[0].send(200)
        }
        GlobalScope.launch {
            delay(50)
            channels[0].send(100)
        }
        val result = select<Int?> {
            channels.forEach { channel ->
                channel.onReceive { it }
            }
        }
        println(result)

    }
}

selectClause2举例:

fun main() {
    runBlocking {
        val channels = listOf(Channel<Int>(), Channel<Int>())
        println(channels)
        launch(Dispatchers.IO) {
            select<Unit> {
                launch {
                    delay(10)
                    channels[1].onSend(200) { sentChannel->
                        println("sent on $sentChannel")
                    }
                }
                launch {
                    delay(100)
                    channels[0].onSend(100) { sentChannel->
                        println("sent on $sentChannel")
                    }
                }
            }
        }
        GlobalScope.launch {
            println(channels[0].receive())
        }
        GlobalScope.launch {
            println(channels[1].receive())
        }
    }
}

[完...]

有关kotlin<第十一篇>:Channel-通道的更多相关文章

  1. ruby-on-rails - 如何在 Rails Controller 中调用 channel 方法? - 2

    我有一个订阅用户的ActionCable方法。如果开始新的session,我也想为用户订阅新channel。我想不出在Controller中调用channel方法的正确语法。更新:问题是消息在发送时附加到聊天框,但是当发送第一条消息时,websocket连接尚未建立,因此在用户看来好像消息没有发送(因为它没有被附加)。channel/msgs_channel.rbclassMsgsChannel在我的convosController中,create方法,我尝试了几种方法:convos_controller.rbdefcreate@convo=Convo.create!({sender_

  2. ruby-on-rails - 在 rails 插件之外无法访问 ActionCable channel - 2

    我正在尝试创建一个公开ActionCablechannel的gem,但我无法让它工作。这是我的宝贝#lib/my_channel.rbclassMyChannel然后我将gem添加到我的主要应用程序Gemfile,运行bundleinstall,启动控制台并运行MyChannel。没有屈服和错误,这意味着channel已正确包含。然后我将其添加到我的主应用程序//application.jsvarsocket="ws://localhost:3000/cable";varcable=ActionCable.createConsumer(socket);cable.subscriptio

  3. 工程(十一)——NUC11+D435i+VINS-FUSION+ESDF建图(github代码) - 2

    博主的合并代码git@github.com:huashu996/VINS-FUSION-ESDFmap.git一、D435i深度相机配置1.1SDK+ROS参考我之前的博客,步骤和所遇见的问题已经写的很详细了https://blog.csdn.net/HUASHUDEYANJING/article/details/129323834?spm=1001.2014.3001.55011.2相机标定参数1、相机内参通过rostopic的camera/info获取header:标准消息头seq:序列ID,连续递增的ID号stamp:两个时间戳frame_id:与此数据相关联的帧IDheight:图像尺

  4. ElasticSearch(十一)【集群搭建】 - 2

    十一、ES集群的相关概念上一篇文章《ElasticSearch-聚合查询》集群(cluster)一个集群就是由一个或多个节点组织在一起,它们共同持有你整个的数据,并一起提供索引和搜整合应用索功能。一个集群由一个唯一的名字标识,这个名字默认就是elasticsearch。这个名字是重要的,因为一个节点只能通过指定某个集群的名字,来加入这个集群节点(node)一个节点是集群中的一个服务器,作为集群的一部分,它存储你的数据,参与集群的索引节点和搜索功能。和集群类似,一个节点也是由一个名字来标识的,默认情况下,这个名字是一个随机的漫威漫画角色的名字,这个名字会在启动的时候赋予节点索引(Index)一组

  5. Kotlin:通过并消耗2个参数函数? - 2

    我正在尝试在Kotlin学习功能编程,并且难以使此代码起作用:importjava.util.*funcaseName(br:String,c:Int):String{if(c==0){returnbr.toLowerCase()}else{returnbr.toUpperCase()}}funmapIt(ns:ArrayList,f:(String,Int)->String):List{valcoll:List=ns.map{it->f(it,_)}returncoll}funmain(args:Array){valnames=arrayListOf("Joe","Bill","Murrar

  6. 计算机网络(十一)——导论与路由器工作原理 - 2

    文章目录1.网络层的概述2.路由器的工作原理2.1路由器的组成2.2转发2.2.1查找2.2.2交换2.2.3输出端口处理核心内容路由器的工作原理1.网络层的概述(1)功能结构网络层用于提供主机与主机之间的逻辑通信,源主机网络层接收来自运输层的报文段,将其封装为一个数据报并向相邻的路由器发送数据报。接收方的网络层接收来自相邻路由器的数据报,提取出报文段交付给运输层。为此需要提供转发和路由选择两个功能。网络层能够被划分为两个相互作用的部分,数据部分和路由部分:数据部分的主要作用:转发,即分组从输入链路接口转发到合适的输出链路接口。控制部分的主要作用:路由选择,即决定从源主机到目的主机,分组所采用

  7. javascript - 如何将两个输入 channel 连接到 ScriptProcessorNode? (网络音频 API,JavaScript) - 2

    我正在尝试实现一个具有两个输入channel和一个输出channel的ScriptProcessorNode。varsource=newArray(2);source[0]=context.createBufferSource();source[0].buffer=buffer[0];source[1]=context.createBufferSource();source[1].buffer=buffer[1];vartest=context.createScriptProcessor(4096,2,1);source[0].connect(test,0,0);source[1].c

  8. javascript - Node 子进程, channel 在 process.send 上关闭 - 2

    在我的工作文件中,我监听数据回调。someLib是Node串口。process.on('message',function(msg){someLib.on('data',function(data){console.log('somedata');process.send(data);});});这打印somedataError:channelclosed但是process.on('message',function(msg){process.send('foobar');});工作正常。这很奇怪,但有时第一个代码示例有效,所以channel关闭错误随机出现。来自http://node

  9. STM32_基础入门(十一)第二篇_通用定时器使用详解 - 2

    持续关注阿杰在线更新保姆式笔记~~坚持日更目录一、通用定时器基本介绍二、基本定时功能1、定时器时钟来源分析2、常用库函数3、代码区三、定时器输出PWM3.1基本介绍3.2 PWM工作过程​3.3 常用库函数 ​PWM输出配置步骤: 3.4 代码区四、输入捕获功能1.基本介绍2.工作过程3.常用库函数 输入捕获的一般配置步骤 代码区一、通用定时器基本介绍通用定时器包括TIM2、TIM3、TIM4和TIM5STM32通用定时器是一个通过可编程预分频器驱动的16位自动装载计数器构成。每个定时器都是完全独立的,没有互相共享任何资源。它们可以一起同步操作。定时器可以进行定时器基本定时,输出4路PWM,输

  10. javascript - 如何使用 JS 和 Chrome 控制台向 channel 发送 Discord 消息? - 2

    如何在不使用DiscordAPI的情况下使用JS和Chrome控制台将Discord消息发送到Discordchannel?好像是不可能的…… 最佳答案 打开discord控制台:ctrl+shift+i(不起作用?请参阅下面的编辑)然后进入网络选项卡。现在我们需要嗅探一条消息,所以在discord中输入一条消息并发送。然后在控制台网络选项卡中右键单击名为“messages”的请求,然后选择“Copyasfetch”。然后转到“控制台”选项卡。粘贴请求。编辑此请求以删除“noonce”字段。还有,用您的消息编辑“内容”字段。当您按下

随机推荐