草庐IT

Tars-Cpp 协程实现分析

Ye Feng 2023-08-30 原文

一、前言

Tars 是 Linux 基金会的开源项目

(​​https://github.com/TarsCloud​​),它是基于名字服务使用 Tars 协议的高性能 RPC 开发框架,配套一体化的运营管理平台,并通过伸缩调度,实现运维半托管服务。Tars 集可扩展协议编解码、高性能 RPC 通信框架、名字路由与发现、发布监控、日志统计、配置管理等于一体,通过它可以快速用微服务的方式构建自己的稳定可靠的分布式应用,并实现完整有效的服务治理。

Tars 目前支持 C++,Java,PHP,Nodejs,Go 语言,其中 TarsCpp 3.x 全面启用对协程的支持,服务框架全面融合协程。本文基于TarsCpp-v3.0.0版本,讨论了协程在TarsCpp服务框架的实现。

二、协程的介绍

2.1 什么是协程

协程的概念最早出现在Melvin Conway在1963年的论文("Design of a separable transition-diagram compiler"),协程认为是“可以暂停和恢复执行”的函数。

协程可以看成一种特殊的函数,相比于函数,协程最大的特点就是支持挂起(yield)和恢复(resume)的能力。如上图所示:函数不能主动中断执行流;而协程支持主动挂起,中断执行流,并在一定时机恢复执行。

协程的作用:

  1. 降低并发编码的复杂度,尤其是异步编程(callback hell)。
  2. 协程在用户态中实现调度,避免了陷入内核,上下文切换开销小。

2.2 进程、线程和协程

我们可以简单的认为协程是用户态的线程。协程和线程主要异同:

  1. 相同点:都可以实现上下文切换(保存和恢复执行流)
  2. 不同点:线程的上下文切换在内核实现,切换的时机由内核调度器控制。协程的上下文切换在用户态实现,切换的时机由调用方自身控制。

进程、线程和协程的比较:

2.3 协程的分类

按控制传递(Control-transfer)机制分为:对称(Symmetric)协程和非对称(Asymmetric)协程。

  • 对称协程:协程之间相互独立,调度权(CPU)可以在任意协程之间转移。协程只有一种控制传递操作(yield)。对称协程一般需要调度器支持,通过调度算法选择下一个目标协程。
  • 非对称协程:协程之间存在调用关系,协程让出的调度权只能返回给调用者。协程有两种控制操作:恢复(resume)和挂起(yield)。

下图演示了对称协程的调度权转移流程,协程只有一个操作yield,表示让出CPU,返回给调度器。

对称协程示意图

下图演示了非对称协程的调度权转移流程。协程可以有两个操作,即resume和yield。resume表示转移CPU给被调用者,yield表示被调用者返回CPU给调用者。

非对称协程示意图

根据协程是否有独立的栈空间,协程分为有栈协程(stackful)和无栈协程(stackless)两种。

  • 有栈协程:每个协程有独立的栈空间,保存独立的上下文(执行栈、寄存器等),协程的唤醒和挂起就是拷贝和切换上下文。优点:协程调度可以嵌套,在内存中的任意位置、任意时刻进行。局限:协程数目增大,内存开销增大。
  • 无栈协程:单个线程内所有协程都共享同一个栈空间(共享栈),协程的切换就是简单的函数调用和返回,无栈协程通常是基于状态机或闭包来实现。优点:减小内存开销。局限:协程调度产生的局部变量都在共享栈上, 一旦新的协程运行后共享栈中的数据就会被覆盖, 先前协程的局部变量也就不再有效, 进而无法实现参数传递、嵌套调用等高级协程交互。

Golang 中的 goroutine、Lua 中的协程都是有栈协程;ES6的 await/async、Python 的 Generator、C++20 中的 cooroutine 都是无栈协程。

三、Tars 协程实现

实现协程的核心有两点:

  • 实现用户态的上下文切换。
  • 实现协程的调度。

Tars 协程的由下面几个类实现:

  • TC_CoroutineInfo 协程信息类:实现协程的上下文切换。每个协程对应一个 TC_CoroutineInfo 对象,上下文切换基于boost.context实现。
  •  TC_CoroutineScheduler 协程调度器类:实现了协程的管理和调度。
  • TC_Coroutine 协程类:继承于线程类(TC_Thread),方便业务快速使用协程。

Tars 协程有几个特点:

  • 有栈协程。每个协程都分配了独立的栈空间。
  • 对称协程。协程之间相互独立,由调度器负责调度。
  • 基于 epoll 实现协程调度,和网络IO无缝结合。

3.1 用户态上下文切换的实现方式

协程可以看成一种特殊的函数,和普通函数不同,协程函数有挂起(yield)和恢复(resume)的能力,即可以中断自己的执行流,并且在合适的时候恢复执行流,这也称为上下文切换的能力。

协程执行的过程,依赖两个关键要素:协程栈和寄存器,协程的上下文环境其实就是寄存器和栈的状态。实现上下文切换的核心就是实现保存并恢复当前执行环境的寄存器状态的能力。

实现用户态上下文切换一般有以下方式:

3.2  基于boost.context实现上下文切换

Tars 协程是基于 boost.context 实现,boost.context 提供了两个接口(make_fcontext, jump_fcontext)实现协程的上下文切换。

代码1:

/**
* @biref 执行环境上下文
*/
typedef void* fcontext_t;
/**
* @biref 事件参数包装
*/
struct transfer_t {
fcontext_t fctx; // 来源的执行上下文。来源的上下文指的是从什么位置跳转过来的
void* data; // 接口传入的自定义的指针
};
/**
* @biref 初始化执行环境上下文
* @param sp 栈空间地址
* @param size 栈空间的大小
* @param fn 入口函数
* @return 返回初始化完成后的执行环境上下文
*/
extern "C" fcontext_t make_fcontext(void * stack, std::size_t stack_size, void (* fn)( transfer_t));
/**
* @biref 跳转到目标上下文
* @param to 目标上下文
* @param vp 目标上下文的附加参数,会设置为transfer_t里的data成员
* @return 跳转来源
*/
extern "C" transfer_t jump_fcontext(fcontext_t const to, void * vp);

(1)make_fcontext 创建协程

  • 接受三个参数,stack 是为协程分配的栈底,stack_size 是栈的大小,fn 是协程的入口函数
  • 返回初始化完成后的执行环境上下文

(2)jump_fcontext 切换协程

  • 接受两个参数,目标上下文地址和参数指针
  • 返回一个上下文,指向当前上下文从哪个上下文跳转过来

make_fcontext 和 jump_fcontext 通过汇编代码实现,具体的汇编代码可以参考:

fcontext的结构

boost context 是通过 fcontext_t结构体来保存协程状态。相对于其它汇编实现的协程库,boost的context和stack是一起的,栈底指针就是context,切换context就是切换stack。

3.3  Tars协程信息类

TC_CoroutineInfo  协程信息类,包装了 boost.context 提供的接口,表示一个 TARS 协程。

其中,TC_CoroutineInfo::registerFunc 定义了协程的创建。

代码2:

void TC_CoroutineInfo::registerFunc(const std::function<void ()>& callback)
{
_callback = callback;
_init_func.coroFunc = TC_CoroutineInfo::corotineProc;
_init_func.args = this;
fcontext_t ctx = make_fcontext(_stack_ctx.sp, _stack_ctx.size,
TC_CoroutineInfo::corotineEntry); // 创建协程
transfer_t tf = jump_fcontext(ctx, this); // context 切换
//实际的ctx
this->setCtx(tf.fctx);
}
void TC_CoroutineInfo::corotineEntry(transfer_t tf)
{
TC_CoroutineInfo * coro = static_cast< TC_CoroutineInfo * >(tf.data); // this
auto func = coro->_init_func.coroFunc;
void* args = coro->_init_func.args;
transfer_t t = jump_fcontext(tf.fctx, NULL);
//拿到自己的协程堆栈, 当前协程结束以后, 好跳转到main
coro->_scheduler->setMainCtx(t.fctx);
//再跳转到具体函数
func(args, t);
}

TC_CoroutineInfo::switchCoro 定义了协程切换。

代码3

void TC_CoroutineScheduler::switchCoro(TC_CoroutineInfo *to)
{
//跳转到to协程
_currentCoro = to;


transfer_t t = jump_fcontext(to->getCtx(), NULL);


//并保存协程堆栈
to->setCtx(t.fctx);
}

四、 Tars 协程调度器

基于 boost.context 的 TC_CoroutineInfo 类实现了协程的上下文切换,协程的管理和调度,则是由 TC_CoroutineScheduler 协程调度器类来负责,分管理和调度两个方面来说明 TC_CoroutineScheduler 调度类。

  • 协程管理:目的是需要合理的数据结构来组织协程(TC_CoroutineInfo),方便调度的实现。
  • 协程调度:目的是控制协程的启动、休眠和唤醒,实现了 yield, sleep 等功能,本质就是实现协程的状态机,完成协程的状态切换。Tars 协程分为 5 个状态:FREE, ACTIVE, AVAIL, INACTIVE, TIMEOUT

代码4: 

/**
* 协程的状态信息
*/
enum CORO_STATUS
{
CORO_FREE = 0,
CORO_ACTIVE = 1,
CORO_AVAIL = 2,
CORO_INACTIVE = 3,
CORO_TIMEOUT = 4
};

4.1 Tars 协程的管理

TC_CoroutineScheduler 主要通过以下方法管理协程:

  1. TC_CoroutineScheduler::create() 
    创建 TC_CoroutineScheduler 对象
  2. TC_CoroutineScheduler::init() 初始化,分配协程栈内存
  3. TC_CoroutineScheduler::run() 启动调度
  4. TC_CoroutineScheduler::terminate() 停止调度
  5. TC_CoroutineScheduler::destroy() 资源销毁,释放协程栈内存

我们可以通过 TC_CoroutineScheduler::init() 看到数据结构的初始化过程。

代码5:

void TC_CoroutineScheduler::init()
{
... ....
createCoroutineInfo(_poolSize); // _all_coro = new TC_CoroutineInfo*[_poolSize+1];
TC_CoroutineInfo::CoroutineHeadInit(&_active);
TC_CoroutineInfo::CoroutineHeadInit(&_avail);
TC_CoroutineInfo::CoroutineHeadInit(&_inactive);
TC_CoroutineInfo::CoroutineHeadInit(&_timeout);
TC_CoroutineInfo::CoroutineHeadInit(&_free);
int iSucc = 0;
for(size_t i = 0; i < _currentSize; ++i)
{
//iId=0不使用, 给mainCoro使用!!!!
uint32_t iId = generateId();
stack_context s_ctx = stack_traits::allocate(_stackSize); // 分配协程栈内存
TC_CoroutineInfo *coro = new TC_CoroutineInfo(this, iId, s_ctx);
_all_coro[iId] = coro;
TC_CoroutineInfo::CoroutineAddTail(coro, &_free);
++iSucc;
}
_currentSize = iSucc;
_mainCoro.setUid(0);
_mainCoro.setStatus(TC_CoroutineInfo::CORO_FREE);
_currentCoro = &_mainCoro;
}

通过下面的 TC_CoroutineScheduler 调度类数据结构图,可以更清楚的看到协程的组织方式:

Tars调度类数据结构

  • 使用协程之前,需要在协程数组(_all_coro),创建指定数量的协程对象,并为每个协程分配协程栈内存。
  • 通过链表的方式管理协程,每个状态都有一个链表。协程状态切换,对应协程在不同状态链表的转移。

4.2 Tars 协程的调度

Tars 调度是基于epoll实现,在 epoll 循环里检查是否有需要执行的协程, 有则执行之, 没有则等待在epoll对象上, 直到有唤醒或者超时。使用 epoll 实现的好处是可以和网络IO无缝粘合, 当有数据发送/接收时, 唤醒epoll对象, 从而完成协程的切换。

Tars 协程调度的核心逻辑是

 TC_CoroutineScheduler::run()

代码6:

void TC_CoroutineScheduler::run()
{
... ...
while(!_epoller->isTerminate())
{
if(_activeCoroQueue.empty() && TC_CoroutineInfo::CoroutineHeadEmpty(&_avail) && TC_CoroutineInfo::CoroutineHeadEmpty(&_active))
{
_epoller->done(1000); // epoll_wait(..., 1000ms) 先处理epoll的网络事件
}

//唤醒需要激活的协程
wakeup();
//唤醒sleep的协程
wakeupbytimeout();
//唤醒yield的协程
wakeupbyself();
int iLoop = 100;
//执行active协程, 每次执行100个, 避免占满cpu
while(iLoop > 0 && !TC_CoroutineInfo::CoroutineHeadEmpty(&_active))
{
TC_CoroutineInfo *coro = _active._next;
switchCoro(coro);
--iLoop;
}
//执行available协程, 每次执行1个
if(!TC_CoroutineInfo::CoroutineHeadEmpty(&_avail))
{
TC_CoroutineInfo *coro = _avail._next;
switchCoro(coro);
}
}
... ...
}

下图可以更清楚得看到协程调度和状态转移的过程。

Tars协程调度状态转移图

TC_CoroutineScheduler 提供了下面四种方法实现协程的调度: 

(1) TC_CoroutineScheduler::go(): 启动协程。

(2)TC_CoroutineScheduler::yield(): 当前协程放弃继续执行。并提供了两种方式,支持不同的唤醒策略。

  • yield(true): 会自动唤醒(等到下次协程调度, 都会再激活当前线程)
  • yield(false): 不再自动唤醒, 除非自己调度该协程(比如put到调度器中) 

(3)TC_CoroutineScheduler::sleep(): 当前协程休眠iSleepTime时间(单位:毫秒),然后会被唤醒继续执行。

(4)TC_CoroutineScheduler::put(): 放入需要唤醒的协程, 将协程放入到调度器中, 马上会被调度器调度。

五、总结

本文介绍了协程的概念,并讨论了 Tars Cpp 协程的实现原理和源码分析。

TarsCpp 3.x全面启用对协程的支持,本文的源码分析是基于TarsCpp-v3.0.0版本

​https://github.com/TarsCloud/TarsCpp/tree/release/3.0​

有关Tars-Cpp 协程实现分析的更多相关文章

  1. ruby - 如何根据特征实现 FactoryGirl 的条件行为 - 2

    我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden

  2. 华为OD机试用Python实现 -【明明的随机数】 2023Q1A - 2

    华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o

  3. 基于C#实现简易绘图工具【100010177】 - 2

    C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.

  4. MIMO-OFDM无线通信技术及MATLAB实现(1)无线信道:传播和衰落 - 2

     MIMO技术的优缺点优点通过下面三个增益来总体概括:阵列增益。阵列增益是指由于接收机通过对接收信号的相干合并而活得的平均SNR的提高。在发射机不知道信道信息的情况下,MIMO系统可以获得的阵列增益与接收天线数成正比复用增益。在采用空间复用方案的MIMO系统中,可以获得复用增益,即信道容量成倍增加。信道容量的增加与min(Nt,Nr)成正比分集增益。在采用空间分集方案的MIMO系统中,可以获得分集增益,即可靠性性能的改善。分集增益用独立衰落支路数来描述,即分集指数。在使用了空时编码的MIMO系统中,由于接收天线或发射天线之间的间距较远,可认为它们各自的大尺度衰落是相互独立的,因此分布式MIMO

  5. 【Java入门】使用Java实现文件夹的遍历 - 2

    遍历文件夹我们通常是使用递归进行操作,这种方式比较简单,也比较容易理解。本文为大家介绍另一种不使用递归的方式,由于没有使用递归,只用到了循环和集合,所以效率更高一些!一、使用递归遍历文件夹整体思路1、使用File封装初始目录,2、打印这个目录3、获取这个目录下所有的子文件和子目录的数组。4、遍历这个数组,取出每个File对象4-1、如果File是否是一个文件,打印4-2、否则就是一个目录,递归调用代码实现publicclassSearchFile{publicstaticvoidmain(String[]args){//初始目录Filedir=newFile("d:/Dev");Datebeg

  6. ruby - Arrays Sets 和 SortedSets 在 Ruby 中是如何实现的 - 2

    通常,数组被实现为内存块,集合被实现为HashMap,有序集合被实现为跳跃列表。在Ruby中也是如此吗?我正在尝试从性能和内存占用方面评估Ruby中不同容器的使用情况 最佳答案 数组是Ruby核心库的一部分。每个Ruby实现都有自己的数组实现。Ruby语言规范只规定了Ruby数组的行为,并没有规定任何特定的实现策略。它甚至没有指定任何会强制或至少建议特定实现策略的性能约束。然而,大多数Rubyist对数组的性能特征有一些期望,这会迫使不符合它们的实现变得默默无闻,因为实际上没有人会使用它:插入、前置或追加以及删除元素的最坏情况步骤复

  7. ruby - "public/protected/private"方法是如何实现的,我该如何模拟它? - 2

    在ruby中,你可以这样做:classThingpublicdeff1puts"f1"endprivatedeff2puts"f2"endpublicdeff3puts"f3"endprivatedeff4puts"f4"endend现在f1和f3是公共(public)的,f2和f4是私有(private)的。内部发生了什么,允许您调用一个类方法,然后更改方法定义?我怎样才能实现相同的功能(表面上是创建我自己的java之类的注释)例如...classThingfundeff1puts"hey"endnotfundeff2puts"hey"endendfun和notfun将更改以下函数定

  8. ruby - 实现k最近邻需要哪些数据? - 2

    我目前有一个reddit克隆类型的网站。我正在尝试根据我的用户之前喜欢的帖子推荐帖子。看起来K最近邻或k均值是执行此操作的最佳方法。我似乎无法理解如何实际实现它。我看过一些数学公式(例如k表示维基百科页面),但它们对我来说并没有真正意义。有人可以推荐一些伪代码,或者可以查看的地方,以便我更好地了解如何执行此操作吗? 最佳答案 K最近邻(又名KNN)是一种分类算法。基本上,您采用包含N个项目的训练组并对它们进行分类。如何对它们进行分类完全取决于您的数据,以及您认为该数据的重要分类特征是什么。在您的示例中,这可能是帖子类别、谁发布了该项

  9. ruby-on-rails - 使用 Ruby 正确处理 Stripe 错误和异常以实现一次性收费 - 2

    我查看了Stripedocumentationonerrors,但我仍然无法正确处理/重定向这些错误。基本上无论发生什么,我都希望他们返回到edit操作(通过edit_profile_path)并向他们显示一条消息(无论成功与否)。我在edit操作上有一个表单,它可以POST到update操作。使用有效的信用卡可以正常工作(费用在Stripe仪表板中)。我正在使用Stripe.js。classExtrasController5000,#amountincents:currency=>"usd",:card=>token,:description=>current_user.email)

  10. ruby - Ruby 1.8 的 Shellwords.shellescape 实现 - 2

    虽然1.8.7的构建我似乎有一个向后移植的Shellwords::shellescape版本,但我知道该方法是1.9的一个特性,在1.8的早期版本中绝对不支持.有谁知道我在哪里可以找到(以Gem形式或仅作为片段)针对Ruby转义的Bourne-shell命令的强大独立实现? 最佳答案 您也可以从shellwords.rb中复制您想要的内容。在Ruby的颠覆存储库的主干中(即GPLv2'd):defshellescape(str)#Anemptyargumentwillbeskipped,soreturnemptyquotes.ret

随机推荐