草庐IT

《Go 语言并发之道》读书笔记(七)

dk168 2023-03-28 原文

今天这篇笔记我们来学习Go 限流
限流是分布式系统中经常需要用到的技术,因为我们让请求没有限制,很容易就出现某个用户开很多线程把我们的服务拉跨,进而影响到别的用户。

限流

我们来看下Go语言层面可以怎么做到限流,先看一段不限流的代码,

type APIConnection struct{}

func Open() *APIConnection {
	return &APIConnection{}
}

func (a *APIConnection) ReadFile(ctx context.Context) error {
    //假装我们在这里有运行
	return nil
}

func (a *APIConnection) ResolveAddress(ctx context.Context) error {
    //假装我们在这里有运行
	return nil
}


func main() {
	defer log.Printf("Done")

	log.SetOutput(os.Stdout)
	log.SetFlags(log.Ltime | log.LUTC)

	apiConnection := Open()

	var wg sync.WaitGroup
	wg.Add(20)

	for i := 0; i < 10; i++ {
		go func() {
			defer wg.Done()
			err := apiConnection.ReadFile(context.Background())
			if err != nil {
				log.Printf("cannot ReadFile : %v", err)
			}
			log.Printf("ReadFile")
		}()
	}

	for i := 0; i < 10; i++ {
		go func() {
			defer wg.Done()
			err := apiConnection.ResolveAddress(context.Background())
			if err != nil {
				log.Printf("cannot ResolveAddress : %v", err)
			}
			log.Printf("ResolveAddress")
		}()
	}

	wg.Wait()

}

上面的代码我们定义了两个假想的方法ReadFile 和 ResolveAddress, 假设他们是去访问文件和读取网络,都是比较耗资源的操作。然后开启了20个goroutine去调用这两个方法
这段代码的运行结果如下

02:32:52 ResolveAddress
02:32:52 ResolveAddress
02:32:52 ResolveAddress
02:32:52 ResolveAddress
02:32:52 ResolveAddress
02:32:52 ResolveAddress
02:32:52 ResolveAddress
02:32:52 ResolveAddress
02:32:52 ResolveAddress
02:32:52 ReadFile
02:32:52 ReadFile
02:32:52 ReadFile
02:32:52 ReadFile
02:32:52 ReadFile
02:32:52 ReadFile
02:32:52 ReadFile
02:32:52 ReadFile
02:32:52 ReadFile
02:32:52 ResolveAddress
02:32:52 ReadFile
02:32:52 Done

我们可以看到一瞬间就都运行完了,如果我们访问了实际的资源,然后又开了很多的goroutine,那么很容易就耗尽资源。 为了防止这样的事情发生,我们引入限流,限定一段时间内,只能访问一定的资源。 我们今天要讲的是基于令牌桶算法的限速,令牌桶是什么算法呢? 很简单就是有一个基础的令牌数d, 然后有固定的速度r往令牌桶中放令牌, 用户拿到令牌才能进行下一步,拿不到就等待。
我们来看代码

type APIConnection struct {
	rateLimiter *rate.Limiter
}

func Open() *APIConnection {
	return &APIConnection{
		rateLimiter: rate.NewLimiter(rate.Limit(2), 5),
	}
}

func (a *APIConnection) ReadFile(ctx context.Context) error {
	if err := a.rateLimiter.Wait(ctx); err != nil {
		return err
	}
	return nil
}

func (a *APIConnection) ResolveAddress(ctx context.Context) error {
	if err := a.rateLimiter.Wait(ctx); err != nil {
		return err
	}
	return nil
}

main func我们没有修改,这里只是在APIConnection 中增加了一个
rateLimiter: rate.NewLimiter(rate.Limit(2), 5)
rate是golang.org/x/time/rate 下面的一个包, rate.NewLimiter是限速器,方法定义如下
func NewLimiter(r Limit, b int) *Limiter
r就是我们前面说的速率,每秒多少个令牌
b 就是令牌桶的高度,开始的时候有几个。

然后在ReadFile 和 ResolveAddress 方法中增加了a.rateLimiter.Wait(ctx), Wait就是等待有令牌出现。
运行的结果如下所示

02:48:16 ReadFile
02:48:16 ReadFile
02:48:16 ReadFile
02:48:16 ReadFile
02:48:16 ResolveAddress
02:48:17 ResolveAddress
02:48:17 ResolveAddress
02:48:18 ReadFile
02:48:18 ResolveAddress
02:48:19 ResolveAddress
02:48:19 ResolveAddress
02:48:20 ResolveAddress
02:48:20 ReadFile
02:48:21 ResolveAddress
02:48:21 ReadFile
02:48:22 ReadFile
02:48:22 ReadFile
02:48:23 ResolveAddress
02:48:23 ReadFile
02:48:24 ResolveAddress
02:48:24 Done

通过时间我们可以看到前面很快执行了5次,就是拿到了令牌桶中的5个令牌,后面每秒中执行两次,也就是我们的速率2个/秒。程序运行符合我们预期,达到了限速的效果。

组合限流

书中作者还举了两个例子,运用组合来限速,比如要求一秒中不能超过两个,同时一分钟不能超过10个。 属于Go语言的一点组合功能,示例代码如下


type RateLimiter interface {
	Wait(context.Context) error
	Limit() rate.Limit
}

type multiLimiter struct {
	limiters []RateLimiter
}

func MultiLimiter(limiters ...RateLimiter) *multiLimiter {
	byLimit := func(i, j int) bool {
		return limiters[i].Limit() < limiters[j].Limit()
	}
	sort.Slice(limiters, byLimit)
	return &multiLimiter{limiters: limiters}
}

func (l *multiLimiter) Wait(ctx context.Context) error {
	for _, l := range l.limiters {
		if err := l.Wait(ctx); err != nil {
			return err
		}
	}
	return nil
}

func (l *multiLimiter) Limit() rate.Limit {
	return l.limiters[0].Limit()
}

func Per(eventCount int, duration time.Duration) rate.Limit {
	return rate.Every(duration / time.Duration(eventCount))
}


type APIConnection struct {
	rateLimiter RateLimiter
}

func Open() *APIConnection {

	secondLimit := rate.NewLimiter(Per(2, time.Second), 1)
	minuteLimit := rate.NewLimiter(Per(10, time.Minute), 10)
	return &APIConnection{rateLimiter: MultiLimiter(secondLimit, minuteLimit)}
}

func (a *APIConnection) ReadFile(ctx context.Context) error {
	if err := a.rateLimiter.Wait(ctx); err != nil {
		return err
	}
	return nil
}

func (a *APIConnection) ResolveAddress(ctx context.Context) error {
	if err := a.rateLimiter.Wait(ctx); err != nil {
		return err
	}
	return nil
}

定义了multiLimiter来组合这些限速器,然后定义了Wait方法。比较简单,这里不详述
还可以不同的设备分不同的限速器, 这里也是贴出代码不详述

type APIConnection struct {
	networkLimit,
	diskLimit,
	apiLimit RateLimiter
}

func Open() *APIConnection {

	return &APIConnection{
		apiLimit: MultiLimiter(
			rate.NewLimiter(Per(2, time.Second), 1),
			rate.NewLimiter(Per(10, time.Minute), 10),
		),
		diskLimit: MultiLimiter(
			rate.NewLimiter(rate.Limit(1), 1),
		),
		networkLimit: MultiLimiter(
			rate.NewLimiter(Per(3, time.Second), 3),
		),
	}
}

func (a *APIConnection) ReadFile(ctx context.Context) error {
	if err := MultiLimiter(a.apiLimit, a.diskLimit).Wait(ctx); err != nil {
		return err
	}
	return nil
}

func (a *APIConnection) ResolveAddress(ctx context.Context) error {
	if err := MultiLimiter(a.apiLimit, a.networkLimit).Wait(ctx); err != nil {
		return err
	}
	return nil
}

不同用户限流

我们做web请求的时候,会遇到这样的需求,根据不同的用户给不同的限速,这里简单的给个sample, 其实就是用map把用户和限速器关联起来。


var userLimit = make(map[string]*rate.Limiter)

func doWork(user string) {
	if userLimit[user].Allow() {
		log.Printf("%s do work \n", user)
	} else {
		log.Printf("%s not work \n", user)
	}

}

func Per(eventCount int, duration time.Duration) rate.Limit {
	return rate.Every(duration / time.Duration(eventCount))
}

func main() {

	log.SetOutput(os.Stdout)
	log.SetFlags(log.Ltime | log.LUTC)

	userLimit["user1"] = rate.NewLimiter(Per(2, time.Second), 1)
	userLimit["user2"] = rate.NewLimiter(Per(2, time.Minute), 5)

	var wg sync.WaitGroup
	wg.Add(20)

	for i := 0; i < 10; i++ {
		go func() {
			defer wg.Done()
			doWork("user1")

		}()

		time.Sleep(500 * time.Millisecond)

	}

	for i := 0; i < 10; i++ {
		go func() {
			defer wg.Done()
			doWork("user2")
		}()
		time.Sleep(500 * time.Millisecond)
	}

	wg.Wait()

}

上面的例子中用户1被限制1秒访问2次,用户2被限制1分钟访问2次

03:19:14 user1 do work 
03:19:15 user1 do work 
03:19:15 user1 do work 
03:19:16 user1 do work 
03:19:16 user1 do work 
03:19:17 user1 do work 
03:19:17 user1 do work 
03:19:18 user1 do work 
03:19:18 user1 do work 
03:19:19 user1 do work 
03:19:20 user2 do work 
03:19:20 user2 do work 
03:19:21 user2 do work 
03:19:21 user2 do work 
03:19:22 user2 do work 
03:19:22 user2 not work 
03:19:23 user2 not work 
03:19:23 user2 not work 
03:19:24 user2 not work 
03:19:24 user2 not work 

这里用户1基本能得到执行,用户2执行了5次后,由于没有拿到令牌,就不能work了。这样达到了不同用户,不同的限速器。

总结

对于限速,可以在服务器层面进行限速,我们这里是在后台程序端进行限速, 也有不少现成的解决方案 https://www.jianshu.com/p/c13843d2e1ec
对于分布式的系统这样的限速自然是不够的,可以结合redis的功能来进行限速,网上有看到些方法: https://blog.csdn.net/jim_007/article/details/110084822
没有实际操作,后面我们再实际操作下再来记录。

有关《Go 语言并发之道》读书笔记(七)的更多相关文章

  1. ruby - 如何将脚本文件的末尾读取为数据文件(Perl 或任何其他语言) - 2

    我正在寻找执行以下操作的正确语法(在Perl、Shell或Ruby中):#variabletoaccessthedatalinesappendedasafileEND_OF_SCRIPT_MARKERrawdatastartshereanditcontinues. 最佳答案 Perl用__DATA__做这个:#!/usr/bin/perlusestrict;usewarnings;while(){print;}__DATA__Texttoprintgoeshere 关于ruby-如何将脚

  2. ruby - 寻找通过阅读代码确定编程语言的ruby gem? - 2

    几个月前,我读了一篇关于ruby​​gem的博客文章,它可以通过阅读代码本身来确定编程语言。对于我的生活,我不记得博客或gem的名称。谷歌搜索“ruby编程语言猜测”及其变体也无济于事。有人碰巧知道相关gem的名称吗? 最佳答案 是这个吗:http://github.com/chrislo/sourceclassifier/tree/master 关于ruby-寻找通过阅读代码确定编程语言的rubygem?,我们在StackOverflow上找到一个类似的问题:

  3. Unity 热更新技术 | (三) Lua语言基本介绍及下载安装 - 2

    ?博客主页:https://xiaoy.blog.csdn.net?本文由呆呆敲代码的小Y原创,首发于CSDN??学习专栏推荐:Unity系统学习专栏?游戏制作专栏推荐:游戏制作?Unity实战100例专栏推荐:Unity实战100例教程?欢迎点赞?收藏⭐留言?如有错误敬请指正!?未来很长,值得我们全力奔赴更美好的生活✨------------------❤️分割线❤️-------------------------

  4. 7个大一C语言必学的程序 / C语言经典代码大全 - 2

    嗨~大家好,这里是可莉!今天给大家带来的是7个C语言的经典基础代码~那一起往下看下去把【程序一】打印100到200之间的素数#includeintmain(){ inti; for(i=100;i 【程序二】输出乘法口诀表#includeintmain(){inti;for(i=1;i 【程序三】判断1000年---2000年之间的闰年#includeintmain(){intyear;for(year=1000;year 【程序四】给定两个整形变量的值,将两个值的内容进行交换。这里提供两种方法来进行交换,第一种为创建临时变量来进行交换,第二种是不创建临时变量而直接进行交换。1.创建临时变量来

  5. LC滤波器设计学习笔记(一)滤波电路入门 - 2

    目录前言滤波电路科普主要分类实际情况单位的概念常用评价参数函数型滤波器简单分析滤波电路构成低通滤波器RC低通滤波器RL低通滤波器高通滤波器RC高通滤波器RL高通滤波器部分摘自《LC滤波器设计与制作》,侵权删。前言最近需要学习放大电路和滤波电路,但是由于只在之前做音乐频谱分析仪的时候简单了解过一点点运放,所以也是相当从零开始学习了。滤波电路科普主要分类滤波器:主要是从不同频率的成分中提取出特定频率的信号。有源滤波器:由RC元件与运算放大器组成的滤波器。可滤除某一次或多次谐波,最普通易于采用的无源滤波器结构是将电感与电容串联,可对主要次谐波(3、5、7)构成低阻抗旁路。无源滤波器:无源滤波器,又称

  6. ruby-on-rails - 获取并发布相同匹配项的请求 - 2

    在我的路线文件中我有:match'graphs/(:id(/:action))'=>'graphs#(:action)'如果是GET请求(工作)或POST请求(不工作),我想匹配它我知道我可以使用以下方法在资源中声明POST请求:post'/'=>:show,:on=>:member但是我怎样才能为比赛做到这一点呢?谢谢。 最佳答案 如果你同时想要POST和GETmatch'graphs/(:id(/:action))'=>'graphs#(:action)',:via=>[:get,:post]编辑默认值可以设置如下match'g

  7. ruby - 如何保持我不常用的编程语言技能 - 2

    关闭。这个问题是off-topic.它目前不接受答案。想改进这个问题吗?Updatethequestion所以它是on-topic用于堆栈溢出。关闭11年前。Improvethisquestion我不经常使用ruby​​-通常它加起来相当于每两个月或更长时间编写一次脚本。我的大部分编程都是使用C++进行的,这与ruby​​有很大不同。由于我与ruby​​之间的差距如此之大,我总是忘记语言的基本方面(比如解析文本文件和其他简单的东西)。我想每天练习一些基本的东西,我想知道是否有一些我可以订阅的网站,并且会向我发送当天的Ruby问题或类似的东西。有人知道这样的站点/Internet服务吗?

  8. ruby-on-rails - 如果特定语言环境中缺少翻译,如何配置 i18n 以使用 en 语言环境? - 2

    如果特定语言环境中缺少翻译,如何配置i18n以使用en语言环境翻译?当前已插入翻译缺失消息。我正在使用RoR3.1。 最佳答案 找到相似的question这里是答案:#application.rb#railswillfallbacktoconfig.i18n.default_localetranslationconfig.i18n.fallbacks=true#railswillfallbacktoen,nomatterwhatissetasconfig.i18n.default_localeconfig.i18n.fallback

  9. ruby-on-rails - 如何通过 URL 更改语言环境? - 2

    在我的双语Rails4应用程序中,我有一个像这样的LocalesController:classLocalesController用户可以通过此表单更改其语言环境:deflocale_switcherform_tagurl_for(:controller=>'locales',:action=>'change_locale'),:method=>'get',:id=>'locale_switcher'doselect_tag'set_locale',options_for_select(LANGUAGES,I18n.locale.to_s)end这有效。但是,目前用户无法通过URL更改

  10. ruby - 一种语言如何被自身解释(如 Rubinius)? - 2

    我使用Ruby编程已经有一段时间了,现在只使用Ruby的标准MRI实现,但我一直对我经常听到的其他实现感到好奇。前几天我在读有关Rubinius的文章,这是一个用Ruby编写的Ruby解释器。我试着在不同的地方查找它,但我很难弄清楚这样的东西到底是如何工作的。我在编译器或语言编写方面从来没有太多经验,但我真的很想弄明白。一门语言究竟如何才能被自己解释?编译中是否有一个我不明白这有意义的基本步骤?有人可以像我是个白痴一样向我解释这个吗(因为无论如何这都不会太离谱) 最佳答案 它比你想象的要简单。Rubinius并非100%用Ruby编

随机推荐