草庐IT

【Linux网络编程】高并发服务器框架 线程池介绍+线程池封装

猿力猪 2023-04-21 原文

目录

前言

一、线程池介绍

💻线程池基本概念

💻线程池组成部分

💻线程池工作原理 

二、线程池代码封装

🌈main.cpp

🌈ThreadPool.h

🌈ThreadPool.cpp

🌈ChildTask.h 

🌈ChildTask.cpp

🌈BaseTask.h

🌈BaseTask.cpp

三、测试效果

四、总结

📌创建线程池的好处


前言

本文主要学习Linux内核编程,结合Visual Studio 2019进行跨平台编程,内容包括线程池介绍以及线程池封装

一、线程池介绍

💻线程池基本概念

  • 线程池是预先创建线程的一种技术 (服务器真正意义上实现高并发就必须用线程池)
  • 🌰举个例子:生活中的水池,是装东西的容器,用来装水的,线程池当然就是拿来装线程的
  • 线程池在任务还没有到来之前,创建一定数量的线程,放入空闲队列中,这些线程都是处于阻塞状态,不消耗CPU,但占用较小的内存空间
  • 当新任务到来时,缓冲池选择一个空闲线程,把任务传入此线程中运行,如果缓冲池已经没有空闲线程,则新建若干个线程,当系统比较空闲时,大部分线程都一直处于暂停状态,线程池自动销毁一部分线程,回收系统资源

💻线程池组成部分

  • 线程池类

        维护工作者线程队列(包括空闲与忙碌队列)

        维护一个任务队列

        维护一个线程池调度器指针

  • 线程池调度器(本身也是一个线程)

        负责线程调度

        负责任务分配

  • 工作者线程类(线程池中的线程类的封装)
  • 任务队列
  • 任务接口(实际的业务逻辑都继承自该接口)

💻线程池工作原理 

根据服务器的需要,来设置线程的数量,可能是10条、20条、30条,根据服务器的承载,10条不代表只能做10个任务,总有任务做的快,有的做的慢,可能可以完成20个任务 

🌰举个例子:如上动图所示,当我们服务器收到一个注册业务,是一个服务器要执行的任务,它会进入到任务队列,队列先进先出,顺次执行,任务会唤醒空闲列表当中的一个空闲的线程,接到任务之后,空闲线程会从空闲列表中消失,进入到忙碌列表,去完成对应的任务,完成任务后,从忙碌列表中出去,到空闲列表继续等待新任务

如果,所有的线程都在忙,都在做任务,这时候登录进来,先进入任务队列,会创建一个新的线程来接这个任务,当所有线程都完成任务,回到空闲列表后,新创建的线程销毁,留下原先设置的对应数量线程(类似,保留老员工,把实习生裁员)

  • 队列:先进先出
  • 空闲列表(链表):不定长(有的时候可能需要创建新线程来接任务)
  • 忙碌列表(链表):不定长(有的时候可能需要创建新线程来接任务)

话不多说,咱们上号,封装一下线程池相关函数,来进行测试 

二、线程池代码封装

🌈main.cpp

  • 主函数,设置10条线程,来执行30个任务 
#include <iostream>
#include <stdio.h>
#include "ThreadPool.h"
#include "ChildTask.h"
using namespace std;

int main()
{
	ThreadPool* pool = new ThreadPool(10);//10条线程

	for (int i = 0; i < 30; i++)//设置30个任务
	{
		char buf[40] = { 0 };//初始化
		sprintf(buf, "%s%d", "任务", i);

		BaseTask* task = new ChildTask(buf);
		pool->pushTask(task);
	}

	while (1) {}

	return 0;
}

🌈ThreadPool.h

  • 对线程池进行设计,核心包括最大、最小线程数忙碌列表空闲列表任务队列互斥量条件变量,以及线程执行函数
#pragma once
#include <queue>//队列
#include <list>//链表头文件
#include <pthread.h>//线程头文件
#include <algorithm>//find查找
#include <iostream>
#include "BaseTask.h"

using namespace std;

#define MIN_NUM 10//最小值 默认参数

class ThreadPool
{
public:
	ThreadPool(const int num = MIN_NUM);
	~ThreadPool();

	//判断任务队列是否为空
	bool QueueIsEmpty();
	//线程互斥量加锁解锁
	void Lock();
	void Unlock();
	//线程条件变量等待和唤醒
	void Wait();
	void WakeUp();

	//添加任务到任务队列
	void pushTask(BaseTask* task);
	//从任务队列移除任务
	BaseTask* popTask(BaseTask* task);

	//从忙碌回到空闲 工作结束
	void MoveToIdle(pthread_t id);

	//从空闲到忙碌 工作开始
	void MoveToBusy(pthread_t id);

	//线程执行函数
	static void* RunTime(void* vo);

private:
	int threadMinNum;//最大线程数量
	int threadMaxNum;//最小线程数量
	queue<BaseTask*>taskQueue;//任务队列
	list<pthread_t>busyList;//线程忙碌列表
	list<pthread_t>idleList;//线程空闲列表
	pthread_mutex_t mutex;//互斥量:做锁
	pthread_cond_t cond;//条件变量:让线程等待或者唤醒
};

🌈ThreadPool.cpp

  • 对函数进行参数的设置,核心在于线程执行函数上的设置,在工作前和工作完设置打印,方便我们进行观察
#include "ThreadPool.h"

ThreadPool::ThreadPool(const int num)
{
	this->threadMinNum = num;

	//条件变量、互斥量初始化
	pthread_mutex_init(&this->mutex, NULL);
	pthread_cond_init(&this->cond, NULL);

	pthread_t id;
	//线程num条创建
	for (int i = 0; i < this->threadMinNum; i++)
	{
		//线程创建
		pthread_create(&id, NULL, RunTime, this);
		this->idleList.push_back(id);//线程存入空闲列表
	}
}

ThreadPool::~ThreadPool()
{
}
//任务队列是否为空
bool ThreadPool::QueueIsEmpty()
{
	return this->taskQueue.empty();
}
//线程加锁
void ThreadPool::Lock()
{
	pthread_mutex_lock(&this->mutex);
}
//线程解锁
void ThreadPool::Unlock()
{
	pthread_mutex_unlock(&this->mutex);
}
//线程等待
void ThreadPool::Wait()
{
	pthread_cond_wait(&this->cond, &this->mutex);
}
//线程唤醒
void ThreadPool::WakeUp()
{
	pthread_cond_signal(&this->cond);
}
//添加任务到任务队列
void ThreadPool::pushTask(BaseTask* task)
{
	Lock();
	taskQueue.push(task);
	Unlock();
	WakeUp();
}
//从任务队列移除任务
BaseTask* ThreadPool::popTask(BaseTask* task)
{
	task = this->taskQueue.front();//从队列头取
	this->taskQueue.pop();//删除队列头

	return task;
}
//从忙碌回到空闲 工作结束
void ThreadPool::MoveToIdle(pthread_t id)
{
	list<pthread_t>::iterator iter;
	iter = find(busyList.begin(), busyList.end(), id);

	if (iter != busyList.end())
	{
		//从忙碌移除
		this->busyList.erase(iter);
		//添加到空闲
		this->idleList.push_back(*iter);//this->idleList.push_back(id)
	}
}
//从空闲到忙碌 工作开始
void ThreadPool::MoveToBusy(pthread_t id)
{
	list<pthread_t>::iterator iter;
	iter = find(idleList.begin(), idleList.end(), id);

	if (iter != idleList.end())
	{
		//从空闲移除
		this->idleList.erase(iter);
		//添加到忙碌
		this->busyList.push_back(*iter);//this->idleList.push_back(id)
	}
}
//线程执行函数
void* ThreadPool::RunTime(void* vo)
{
	//拿到执行线程自己的id 因为后面要处理忙碌和空闲的情况
	pthread_t id = pthread_self();
	//确保主线程与子线程分离,子线程结束后,资源自动回收
	pthread_detach(id);
	//线程参数获取
	ThreadPool* argThis = (ThreadPool*)vo;

	while (true)
	{
		argThis->Lock();
		//如果任务队列为空 线程则一直等待 
		//知道任务队列不为空则会被pushTask函数唤醒线程
		while (argThis->QueueIsEmpty())
		{
			argThis->Wait();
		}
		argThis->MoveToBusy(id);

		cout << "工作前  任务数:" << argThis->taskQueue.size() << endl;
		cout << "工作前  busy:" << argThis->busyList.size() << endl;
		cout << "工作前  idle:" << argThis->idleList.size() << endl;
		cout << "-----------------------------------------------" << endl;
		//取任务
		BaseTask* task;
		task = argThis->popTask(task);

		argThis->Unlock();

		//任务工作
		task->working();

		//工作结束
		argThis->Lock();
		argThis->MoveToIdle(id);
		argThis->Unlock();

		cout << "工作完  任务数:" << argThis->taskQueue.size() << endl;
		cout << "工作完  busy:" << argThis->busyList.size() << endl;
		cout << "工作完  idle:" << argThis->idleList.size() << endl;
		cout << ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>" << endl;
	}

	return nullptr;
}

🌈ChildTask.h 

  • 子类配置 
#pragma once
#include "BaseTask.h"
#include <iostream>
#include <unistd.h>//sleep头文件
using namespace std;

class ChildTask :
	public BaseTask
{
public:
	ChildTask(char* data);
	~ChildTask();
	void working();
};

🌈ChildTask.cpp

  • 子类设置延时模拟做任务的时间比较长 
#include "ChildTask.h"

ChildTask::ChildTask(char* data) :BaseTask(data)//参数传给父类
{
}

ChildTask::~ChildTask()
{
}

void ChildTask::working()
{
	cout << this->data << "正在执行......" << endl;
	sleep(3);//延时3秒 (模拟做业务的时间比较长)
}

🌈BaseTask.h

  • 基类设置结构体来装业务 
#pragma once
#include <string.h>
class BaseTask
{
public:
	BaseTask(char* data);
	~BaseTask();
	char data[1024]; //装业务
	virtual void working() = 0;//虚函数
};

🌈BaseTask.cpp

  • 基类配置 
#include "BaseTask.h"

BaseTask::BaseTask(char* data)
{
	bzero(this->data, sizeof(this->data));//清空
	memcpy(this->data, data, sizeof(data));
}

BaseTask::~BaseTask()
{
	delete this->data;//清除释放
}

三、测试效果

  • 通过Linux连接VS进行跨平台编程,我们可以清晰的看到有几个线程是在做任务,几个线程是空闲的,整个过程就很清晰直观的展现出来了,如下动图所示: 

  • 10条线程做30个任务的全部记录,如下如所示:

四、总结

📌创建线程池的好处

  • 线程池的使用,能让我们搭建的高并发服务器真正意义上做到高并发
  • 降低资源消耗

        通过重复利用自己创建的线程降低线程创建和销毁造成的消耗

  • 提高响应速度

        当任务到达时,任务可以不需要等待线程创建和销毁就能立即执行

  • 提高线程的可管理性

        线程式稀缺资源,如果无限的创建线程,不仅会消耗资源,还会降低系统的稳定性

        使用线程池可以进行统一分配,调优和监控

以上就是本文的全部内容啦!如果对您有帮助,麻烦点赞啦!收藏啦!欢迎各位评论区留言!!!

有关【Linux网络编程】高并发服务器框架 线程池介绍+线程池封装的更多相关文章

  1. ruby - 具有身份验证的私有(private) Ruby Gem 服务器 - 2

    我想安装一个带有一些身份验证的私有(private)Rubygem服务器。我希望能够使用公共(public)Ubuntu服务器托管内部gem。我读到了http://docs.rubygems.org/read/chapter/18.但是那个没有身份验证-如我所见。然后我读到了https://github.com/cwninja/geminabox.但是当我使用基本身份验证(他们在他们的Wiki中有)时,它会提示从我的服务器获取源。所以。如何制作带有身份验证的私有(private)Rubygem服务器?这是不可能的吗?谢谢。编辑:Geminabox问题。我尝试“捆绑”以安装新的gem..

  2. ruby-on-rails - 启动 Rails 服务器时 ImageMagick 的警告 - 2

    最近,当我启动我的Rails服务器时,我收到了一长串警告。虽然它不影响我的应用程序,但我想知道如何解决这些警告。我的估计是imagemagick以某种方式被调用了两次?当我在警告前后检查我的git日志时。我想知道如何解决这个问题。-bcrypt-ruby(3.1.2)-better_errors(1.0.1)+bcrypt(3.1.7)+bcrypt-ruby(3.1.5)-bcrypt(>=3.1.3)+better_errors(1.1.0)bcrypt和imagemagick有关系吗?/Users/rbchris/.rbenv/versions/2.0.0-p247/lib/ru

  3. ruby-on-rails - s3_direct_upload 在生产服务器中不工作 - 2

    在Rails4.0.2中,我使用s3_direct_upload和aws-sdkgems直接为s3存储桶上传文件。在开发环境中它工作正常,但在生产环境中它会抛出如下错误,ActionView::Template::Error(noimplicitconversionofnilintoString)在View中,create_cv_url,:id=>"s3_uploader",:key=>"cv_uploads/{unique_id}/${filename}",:key_starts_with=>"cv_uploads/",:callback_param=>"cv[direct_uplo

  4. ruby - RuntimeError(自动加载常量 Apps 多线程时检测到循环依赖 - 2

    我收到这个错误:RuntimeError(自动加载常量Apps时检测到循环依赖当我使用多线程时。下面是我的代码。为什么会这样?我尝试多线程的原因是因为我正在编写一个HTML抓取应用程序。对Nokogiri::HTML(open())的调用是一个同步阻塞调用,需要1秒才能返回,我有100,000多个页面要访问,所以我试图运行多个线程来解决这个问题。有更好的方法吗?classToolsController0)app.website=array.join(',')putsapp.websiteelseapp.website="NONE"endapp.saveapps=Apps.order("

  5. ruby - 寻找通过阅读代码确定编程语言的ruby gem? - 2

    几个月前,我读了一篇关于ruby​​gem的博客文章,它可以通过阅读代码本身来确定编程语言。对于我的生活,我不记得博客或gem的名称。谷歌搜索“ruby编程语言猜测”及其变体也无济于事。有人碰巧知道相关gem的名称吗? 最佳答案 是这个吗:http://github.com/chrislo/sourceclassifier/tree/master 关于ruby-寻找通过阅读代码确定编程语言的rubygem?,我们在StackOverflow上找到一个类似的问题:

  6. ruby - 用 Ruby 编写一个简单的网络服务器 - 2

    我想在Ruby中创建一个用于开发目的的极其简单的Web服务器(不,不想使用现成的解决方案)。代码如下:#!/usr/bin/rubyrequire'socket'server=TCPServer.new('127.0.0.1',8080)whileconnection=server.acceptheaders=[]length=0whileline=connection.getsheaders想法是从命令行运行这个脚本,提供另一个脚本,它将在其标准输入上获取请求,并在其标准输出上返回完整的响应。到目前为止一切顺利,但事实证明这真的很脆弱,因为它在第二个请求上中断并出现错误:/usr/b

  7. ruby-on-rails - 在 Rails 中调试生产服务器 - 2

    您如何在Rails中的实时服务器上进行有效调试,无论是在测试版/生产服务器上?我试过直接在服务器上修改文件,然后重启应用,但是修改好像没有生效,或者需要很长时间(缓存?)我也试过在本地做“脚本/服务器生产”,但是那很慢另一种选择是编码和部署,但效率很低。有人对他们如何有效地做到这一点有任何见解吗? 最佳答案 我会回答你的问题,即使我不同意这种热修补服务器代码的方式:)首先,你真的确定你已经重启了服务器吗?您可以通过跟踪日志文件来检查它。您更改的代码显示的View可能会被缓存。缓存页面位于tmp/cache文件夹下。您可以尝试手动删除

  8. Unity 热更新技术 | (三) Lua语言基本介绍及下载安装 - 2

    ?博客主页:https://xiaoy.blog.csdn.net?本文由呆呆敲代码的小Y原创,首发于CSDN??学习专栏推荐:Unity系统学习专栏?游戏制作专栏推荐:游戏制作?Unity实战100例专栏推荐:Unity实战100例教程?欢迎点赞?收藏⭐留言?如有错误敬请指正!?未来很长,值得我们全力奔赴更美好的生活✨------------------❤️分割线❤️-------------------------

  9. 网络编程套接字 - 2

    网络编程套接字网络编程基础知识理解源`IP`地址和目的`IP`地址理解源MAC地址和目的MAC地址认识端口号理解端口号和进程ID理解源端口号和目的端口号认识`TCP`协议认识`UDP`协议网络字节序socket编程接口`sockaddr``UDP`网络程序服务器端代码逻辑:需要用到的接口服务器端代码`udp`客户端代码逻辑`udp`客户端代码`TCP`网络程序服务器代码逻辑多个版本服务器单进程版本多进程版本多线程版本线程池版本服务器端代码客户端代码逻辑客户端代码TCP协议通讯流程TCP协议的客户端/服务器程序流程三次握手(建立连接)数据传输四次挥手(断开连接)TCP和UDP对比网络编程基础知识

  10. ruby - 如何让Ruby捕获线程中的语法错误 - 2

    我正在尝试使用ruby​​编写一个双线程客户端,一个线程从套接字读取数据并将其打印出来,另一个线程读取本地数据并将其发送到远程服务器。我发现的问题是Ruby似乎无法捕获线程内的错误,这是一个示例:#!/usr/bin/rubyThread.new{loop{$stdout.puts"hi"abc.putsefsleep1}}loop{sleep1}显然,如果我在线程外键入abc.putsef,代码将永远不会运行,因为Ruby将报告“undefinedvariableabc”。但是,如果它在一个线程内,则没有错误报告。我的问题是,如何让Ruby捕获这样的错误?或者至少,报告线程中的错误?

随机推荐