我正在为我的 ubuntu 服务器(为我的多客户端匿名聊天程序)实现一个简单的线程池机制,我需要让我的工作线程休眠直到一个作业(以函数指针和参数的形式)需要被执行。
我当前的系统正在崩溃。我(工作线程正在)询问经理是否有工作可用,以及是否有 5 毫秒没有 sleep 。如果有,将作业添加到工作队列并运行该函数。可悲的周期浪费。
我喜欢做的是制作一个简单的类似事件的系统。我正在考虑拥有一个互斥量 vector (每个工作人员一个),并在创建时将互斥量的句柄作为参数传入。然后在我的经理类(负责并分发作业)中,每当创建线程时,锁定互斥锁。当需要执行作业时解锁下一个互斥量,等待它被锁定和解锁,然后重新锁定它。不过,我想知道是否有更好的方法来达到这一目的。
tldr; 所以我的问题是这样的。让线程等待来自管理类的作业的最有效、最有效和最安全的方法是什么?轮询是一种我什至应该考虑的技术(一次超过 1000 个客户端),互斥锁是否合适?还是有其他技巧?
最佳答案
你需要的是条件变量。
所有工作线程调用 wait() 将暂停它们。
然后,父线程将一个工作项放入队列中,并在条件变量上调用信号。这将唤醒一个正在休眠的线程。它可以从队列中删除作业并执行作业,然后在条件变量上调用 wait 以返回休眠状态。
尝试:
#include <pthread.h>
#include <memory>
#include <list>
// Use RAII to do the lock/unlock
struct MutexLock
{
MutexLock(pthread_mutex_t& m) : mutex(m) { pthread_mutex_lock(&mutex); }
~MutexLock() { pthread_mutex_unlock(&mutex); }
private:
pthread_mutex_t& mutex;
};
// The base class of all work we want to do.
struct Job
{
virtual void doWork() = 0;
};
// pthreads is a C library the call back must be a C function.
extern "C" void* threadPoolThreadStart(void*);
// The very basre minimal part of a thread pool
// It does not create the workers. You need to create the work threads
// then make them call workerStart(). I leave that as an exercise for you.
class ThreadPool
{
public:
ThreadPool(unsigned int threadCount=1);
~ThreadPool();
void addWork(std::auto_ptr<Job> job);
private:
friend void* threadPoolThreadStart(void*);
void workerStart();
std::auto_ptr<Job> getJob();
bool finished; // Threads will re-wait while this is true.
pthread_mutex_t mutex; // A lock so that we can sequence accesses.
pthread_cond_t cond; // The condition variable that is used to hold worker threads.
std::list<Job*> workQueue; // A queue of jobs.
std::vector<pthread_t>threads;
};
// Create the thread pool
ThreadPool::ThreadPool(int unsigned threadCount)
: finished(false)
, threads(threadCount)
{
// If we fail creating either pthread object than throw a fit.
if (pthread_mutex_init(&mutex, NULL) != 0)
{ throw int(1);
}
if (pthread_cond_init(&cond, NULL) != 0)
{
pthread_mutex_destroy(&mutex);
throw int(2);
}
for(unsigned int loop=0; loop < threadCount;++loop)
{
if (pthread_create(threads[loop], NULL, threadPoolThreadStart, this) != 0)
{
// One thread failed: clean up
for(unsigned int kill = loop -1; kill < loop /*unsigned will wrap*/;--kill)
{
pthread_kill(threads[kill], 9);
}
throw int(3);
}
}
}
// Cleanup any left overs.
// Note. This does not deal with worker threads.
// You need to add a method to flush all worker threads
// out of this pobject before you let the destructor destroy it.
ThreadPool::~ThreadPool()
{
finished = true;
for(std::vector<pthread_t>::iterator loop = threads.begin();loop != threads.end(); ++loop)
{
// Send enough signals to free all threads.
pthread_cond_signal(&cond);
}
for(std::vector<pthread_t>::iterator loop = threads.begin();loop != threads.end(); ++loop)
{
// Wait for all threads to exit (they will as finished is true and
// we sent enough signals to make sure
// they are running).
void* result;
pthread_join(*loop, &result);
}
// Destroy the pthread objects.
pthread_cond_destroy(&cond);
pthread_mutex_destroy(&mutex);
// Delete all re-maining jobs.
// Notice how we took ownership of the jobs.
for(std::list<Job*>::const_iterator loop = workQueue.begin(); loop != workQueue.end();++loop)
{
delete *loop;
}
}
// Add a new job to the queue
// Signal the condition variable. This will flush a waiting worker
// otherwise the job will wait for a worker to finish processing its current job.
void ThreadPool::addWork(std::auto_ptr<Job> job)
{
MutexLock lock(mutex);
workQueue.push_back(job.release());
pthread_cond_signal(&cond);
}
// Start a thread.
// Make sure no exceptions escape as that is bad.
void* threadPoolThreadStart(void* data)
{
ThreadPool* pool = reinterpret_cast<ThreadPool*>(workerStart);
try
{
pool->workerStart();
}
catch(...){}
return NULL;
}
// This is the main worker loop.
void ThreadPool::workerStart()
{
while(!finished)
{
std::auto_ptr<Job> job = getJob();
if (job.get() != NULL)
{
job->doWork();
}
}
}
// The workers come here to get a job.
// If there are non in the queue they are suspended waiting on cond
// until a new job is added above.
std::auto_ptr<Job> ThreadPool::getJob()
{
MutexLock lock(mutex);
while((workQueue.empty()) && (!finished))
{
pthread_cond_wait(&cond, &mutex);
// The wait releases the mutex lock and suspends the thread (until a signal).
// When a thread wakes up it is help until it can acquire the mutex so when we
// get here the mutex is again locked.
//
// Note: You must use while() here. This is because of the situation.
// Two workers: Worker A processing job A.
// Worker B suspended on condition variable.
// Parent adds a new job and calls signal.
// This wakes up thread B. But it is possible for Worker A to finish its
// work and lock the mutex before the Worker B is released from the above call.
//
// If that happens then Worker A will see that the queue is not empty
// and grab the work item in the queue and start processing. Worker B will
// then lock the mutext and proceed here. If the above is not a while then
// it would try and remove an item from an empty queue. With a while it sees
// that the queue is empty and re-suspends on the condition variable above.
}
std::auto_ptr<Job> result;
if (!finished)
{ result.reset(workQueue.front());
workQueue.pop_front();
}
return result;
}
关于c++ - 线程等待父级,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5799924/
我的瘦服务器配置了nginx,我的ROR应用程序正在它们上运行。在我发布代码更新时运行thinrestart会给我的应用程序带来一些停机时间。我试图弄清楚如何优雅地重启正在运行的Thin实例,但找不到好的解决方案。有没有人能做到这一点? 最佳答案 #Restartjustthethinserverdescribedbythatconfigsudothin-C/etc/thin/mysite.ymlrestartNginx将继续运行并代理请求。如果您将Nginx设置为使用多个上游服务器,例如server{listen80;server
我收到这个错误:RuntimeError(自动加载常量Apps时检测到循环依赖当我使用多线程时。下面是我的代码。为什么会这样?我尝试多线程的原因是因为我正在编写一个HTML抓取应用程序。对Nokogiri::HTML(open())的调用是一个同步阻塞调用,需要1秒才能返回,我有100,000多个页面要访问,所以我试图运行多个线程来解决这个问题。有更好的方法吗?classToolsController0)app.website=array.join(',')putsapp.websiteelseapp.website="NONE"endapp.saveapps=Apps.order("
如何将send与+=一起使用?a=20;a.send"+=",10undefinedmethod`+='for20:Fixnuma=20;a+=10=>30 最佳答案 恐怕你不能。+=不是方法,而是语法糖。参见http://www.ruby-doc.org/docs/ProgrammingRuby/html/tut_expressions.html它说Incommonwithmanyotherlanguages,Rubyhasasyntacticshortcut:a=a+2maybewrittenasa+=2.你能做的最好的事情是:
我正在尝试使用ruby编写一个双线程客户端,一个线程从套接字读取数据并将其打印出来,另一个线程读取本地数据并将其发送到远程服务器。我发现的问题是Ruby似乎无法捕获线程内的错误,这是一个示例:#!/usr/bin/rubyThread.new{loop{$stdout.puts"hi"abc.putsefsleep1}}loop{sleep1}显然,如果我在线程外键入abc.putsef,代码将永远不会运行,因为Ruby将报告“undefinedvariableabc”。但是,如果它在一个线程内,则没有错误报告。我的问题是,如何让Ruby捕获这样的错误?或者至少,报告线程中的错误?
我是ruby的新手,我认为重新构建一个我用C#编写的简单聊天程序是个好主意。我正在使用Ruby2.0.0MRI(Matz的Ruby实现)。问题是我想在服务器运行时为简单的服务器命令提供I/O。这是从示例中获取的服务器。我添加了使用gets()获取输入的命令方法。我希望此方法在后台作为线程运行,但该线程正在阻塞另一个线程。require'socket'#Getsocketsfromstdlibserver=TCPServer.open(2000)#Sockettolistenonport2000defcommandsx=1whilex==1exitProgram=gets.chomp
我对如何计算通过{%assignvar=0%}赋值的变量加一完全感到困惑。这应该是最简单的任务。到目前为止,这是我尝试过的:{%assignamount=0%}{%forvariantinproduct.variants%}{%assignamount=amount+1%}{%endfor%}Amount:{{amount}}结果总是0。也许我忽略了一些明显的东西。也许有更好的方法。我想要存档的只是获取运行的迭代次数。 最佳答案 因为{{incrementamount}}将输出您的变量值并且不会影响{%assign%}定义的变量,我
在我做的一些网络开发中,我有多个操作开始,比如对外部API的GET请求,我希望它们同时开始,因为一个不依赖另一个的结果。我希望事情能够在后台运行。我找到了concurrent-rubylibrary这似乎运作良好。通过将其混合到您创建的类中,该类的方法具有在后台线程上运行的异步版本。这导致我编写如下代码,其中FirstAsyncWorker和SecondAsyncWorker是我编写的类,我在其中混合了Concurrent::Async模块,并编写了一个名为“work”的方法来发送HTTP请求:defindexop1_result=FirstAsyncWorker.new.async.
我有一个使用PDFKit呈现网页的pdf版本的Rails应用程序。我使用Thin作为开发服务器。问题是当我处于开发模式时。当我使用“bundleexecrailss”启动我的服务器并尝试呈现任何PDF时,整个过程会陷入僵局,因为当您呈现PDF时,会向服务器请求一些额外的资源,如图像和css,看起来只有一个线程.如何配置Rails开发服务器以运行多个工作线程?非常感谢。 最佳答案 我找到的最简单的解决方案是unicorn.geminstallunicorn创建一个unicorn.conf:worker_processes3然后使用它:
我有一个数组数组,想将元素附加到子数组。+=做我想做的,但我想了解为什么push不做。我期望的行为(并与+=一起工作):b=Array.new(3,[])b[0]+=["apple"]b[1]+=["orange"]b[2]+=["frog"]b=>[["苹果"],["橙子"],["Frog"]]通过推送,我将推送的元素附加到每个子数组(为什么?):a=Array.new(3,[])a[0].push("apple")a[1].push("orange")a[2].push("frog")a=>[[“苹果”、“橙子”、“Frog”]、[“苹果”、“橙子”、“Frog”]、[“苹果”、“
有没有办法让Ruby能够做这样的事情?classPlane@moved=0@x=0defx+=(v)#thisiserror@x+=v@moved+=1enddefto_s"moved#{@moved}times,currentxis#{@x}"endendplane=Plane.newplane.x+=5plane.x+=10putsplane.to_s#moved2times,currentxis15 最佳答案 您不能在Ruby中覆盖复合赋值运算符。任务在内部处理。您应该覆盖+,而不是+=。plane.a+=b与plane.a=