草庐IT

networking - boost::asio tcp async_accept 处理程序未被调用但同步连接工作正常

coder 2023-09-18 原文

我有一个 TCP 客户端,它可以正常调用 TCP 套接字上的常规连接。但是,对 async_connect 的调用永远不会触发处理程序。他们都使用几乎相同的代码。区别仅在于调用连接与异步连接。

标题

#ifndef TCPCLIENT_H
#define TCPCLIENT_H

#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/asio.hpp>

#include <boost/thread/thread.hpp> 
#include <boost/thread/mutex.hpp>

using boost::asio::ip::tcp;

//How about an interface for outputting errors
class BoostTCPDebugOutputInterface
{
public:
    virtual void outputError(std::string& error)=0;
    virtual void outputDebug(std::string& debug) = 0;
    virtual void outputWarning(std::string& warning) = 0;
    virtual void outputInfo(std::string& info) = 0;

};

class ShutdownCallback
{
public:
    virtual void shutdown() = 0;
};

class BoostTCPConnection 
    : public boost::enable_shared_from_this<BoostTCPConnection>
{
public:
    typedef boost::shared_ptr<BoostTCPConnection> Connection_ptr;

    static Connection_ptr create(boost::asio::io_service& io_service, BoostTCPDebugOutputInterface* output, ShutdownCallback* shutdowner)
    {
        return Connection_ptr(new BoostTCPConnection(io_service, output, shutdowner));
    }

    virtual ~BoostTCPConnection(void);

    void start();
    bool isRunning();
    void setRunning(bool running);

    void enqueueMessageToSend(boost::shared_ptr<std::string>& message);

    boost::asio::ip::tcp::socket& socket()
    {
        return socket_;
    }

    void getQueueSize(unsigned long& tcpToSend, unsigned long& tcpRead);

    void getMessages(std::list< boost::shared_ptr<std::string> >& messages);

protected:
    BoostTCPConnection(boost::asio::io_service& io_service, BoostTCPDebugOutputInterface* output, ShutdownCallback* shutdowner) : socket_(io_service), errorOutput_(output), shutdowner_(shutdowner) {}

    void send();
    void handleWrite(const boost::system::error_code& error,size_t bytes_transferred);
    void handleReceive(const boost::system::error_code& error,size_t bytes_transferred);

    void readMessage();
    void handleReceive();

    std::string hostIP_;
    unsigned int hostPort_;

    boost::asio::io_service io_service_;
    boost::asio::ip::tcp::socket socket_;

    boost::mutex runningMutex_;
    bool running_;

    boost::mutex readMutex_;
    std::list< boost::shared_ptr<std::string> > receivedMsgs_;

    boost::mutex sendMutex_;
    std::list< boost::shared_ptr<std::string> > sendMsgs_;

    BoostTCPDebugOutputInterface* errorOutput_;
    ShutdownCallback* shutdowner_;

    static const size_t HEADERSIZE = 4;
};

class TCPClient
{
public:
    TCPClient();
    virtual ~TCPClient();

    bool start(std::string& hostIP,  unsigned int& hostPort, BoostTCPDebugOutputInterface* output, ShutdownCallback* shutdowner);
    void stop();

    void getQueueSize(unsigned long& tcpToSend, unsigned long& tcpRead);

    void send(boost::shared_ptr<std::string>& message);
    void getMessages(std::list< boost::shared_ptr<std::string> >& messages);

protected:
    void threadAction();

    void handleConnect(const boost::system::error_code& error);


    BoostTCPConnection::Connection_ptr connection_;

    boost::thread_group worker_threads_;

    boost::asio::io_service io_service_;

    BoostTCPDebugOutputInterface* errorOutput_;
    ShutdownCallback* shutdowner_;
};

#endif

CPP文件

#include "TCPClient.h"

#include <iostream>

BoostTCPConnection::~BoostTCPConnection(void)
{
}

void BoostTCPConnection::start()
{
    setRunning(true);
    while (isRunning())
    {

        bool readData(false);
        bool wroteData(false);

        if (!socket_.is_open())
        {
            std::string info("BoostTCPConnection::start() socket is no longer open.  ");
            errorOutput_->outputError(info);
            shutdowner_->shutdown();
            //Stop this NOW!!!
        }

        //Check if there are 4 bytes for packet size 
        //If there are read the size and then do an read to get the packet
        //The handler function should put the packet on a queue.
        boost::asio::socket_base::bytes_readable command(true);

        socket_.io_control(command);
        std::size_t bytes_readable = command.get();

        if ( bytes_readable >= HEADERSIZE )
        {
            readMessage();
            readData=true;
        }

        size_t sendSize(0);
        {
            boost::mutex::scoped_lock(sendMutex_);
            sendSize = sendMsgs_.size();
        }

        if ( sendSize > 0)
        {
            send();
        }


        if ( !readData && !wroteData )
            boost::this_thread::sleep(boost::posix_time::milliseconds(5)); 
    }
}

void BoostTCPConnection::readMessage()
{
    size_t messageSize(0);

    char temp[4]="";
    std::vector<char> header(4);

    boost::system::error_code ec;

    //Read the header which is the size
    size_t read=boost::asio::read(socket_, boost::asio::buffer(header), ec);
    if (ec)
    {       
        std::string info("BoostTCPConnection::readMessage errorcode ");
        info+=ec.message();
        errorOutput_->outputError(info);
        shutdowner_->shutdown();
        //TODO Signal the GUI to stop
        return;
    }
    memcpy((void*)(&messageSize),(void*)header.data(),4);

    std::vector<char> rcvBuffer(messageSize);

    read=boost::asio::read(socket_, boost::asio::buffer(rcvBuffer),ec);
    if (ec)
    {

        std::string info("BoostTCPConnection::readMessage errorcode ");
        info+=ec.message();
        errorOutput_->outputError(info);
        shutdowner_->shutdown();
        //TODO Signal the GUI to stop
        return;
    }

    rcvBuffer.push_back('\0');

    std::string test(rcvBuffer.begin(),rcvBuffer.end());
    boost::shared_ptr<std::string> message(new std::string(rcvBuffer.begin(),rcvBuffer.end()));

    receivedMsgs_.push_back(message);

}

void BoostTCPConnection::getMessages(std::list< boost::shared_ptr<std::string> >& messages)
{

    if (messages.size() > 0)
        messages.clear();

    {
        boost::mutex::scoped_lock lock(readMutex_); 
        receivedMsgs_.swap(messages);
    }
}

void BoostTCPConnection::handleReceive(const boost::system::error_code& error, size_t bytes_transferred)
{

    if (error)
    {
        std::ostringstream oss;
        oss<< "BoostTCPConnection::handleReceive got an error Code of "<<error.value()<<" and message "<<error.message()<<" bytes_transferred = "<<bytes_transferred<<std::endl;
        errorOutput_->outputError(oss.str());
        shutdowner_->shutdown();
        return;
    }

}


bool BoostTCPConnection::isRunning()
{
    boost::mutex::scoped_lock lock(runningMutex_);
    return running_;
}

void BoostTCPConnection::setRunning(bool running)
{
    boost::mutex::scoped_lock lock(runningMutex_);
    running_=running;
}

void BoostTCPConnection::enqueueMessageToSend(boost::shared_ptr<std::string>& message)
{
    boost::mutex::scoped_lock lock(sendMutex_);
    sendMsgs_.push_back(message);
}

void BoostTCPConnection::getQueueSize(unsigned long& tcpToSend, unsigned long& tcpRead)
{
    {
        boost::mutex::scoped_lock lock(sendMutex_);
        tcpToSend=sendMsgs_.size();
    }

    {
        boost::mutex::scoped_lock lock(readMutex_);
        tcpRead=receivedMsgs_.size();
    }

}


void BoostTCPConnection::send()
{
    if (sendMsgs_.empty())
        return;

    boost::shared_ptr<std::string> message;
    {
        message=sendMsgs_.front();
        sendMsgs_.pop_front();
    }

    char temp[4];
    size_t messageSize=message->size();
    memcpy(temp,&messageSize, 4);

    message->insert(0,temp, 4);

    boost::asio::async_write(socket_, boost::asio::buffer(*message),
        boost::bind(&BoostTCPConnection::handleWrite, shared_from_this(),
        boost::asio::placeholders::error,
        boost::asio::placeholders::bytes_transferred));

}

void BoostTCPConnection::handleWrite(const boost::system::error_code& error,size_t bytes_transferred)
{
    //Success
    if (error.value() == 0) 
        return;

    std::ostringstream oss;
    oss<< "BoostTCPConnection::handleWrite  got an error Code of "<<error.value()<<" and message "<<error.message()<<" with bytes_transferred = "<<bytes_transferred<<std::endl;
    errorOutput_->outputError(oss.str());
    shutdowner_->shutdown();

}

//***************************************************
//              TCPClient
//***************************************************

TCPClient::TCPClient()
{

}

TCPClient::~TCPClient()
{

}

void TCPClient::threadAction()
{
    io_service_.run();
}

bool TCPClient::start(std::string& hostIP,  unsigned int& hostPort, BoostTCPDebugOutputInterface* output, ShutdownCallback* shutdowner)
{
    bool bResult(false);
    errorOutput_=output;
    connection_ = BoostTCPConnection::create(io_service_, output, shutdowner);
    shutdowner_=shutdowner;

    //Use multiple threads to do my bidding
    for( int x = 0; x < 3; ++x )
    {
        worker_threads_.create_thread( 
            boost::bind( &TCPClient::threadAction, this )
            );
    }

    boost::system::error_code ec;
    try
    {
        boost::asio::ip::tcp::endpoint ep( boost::asio::ip::address_v4::from_string(hostIP), hostPort);
        if (connection_)
        {


            connection_->socket().async_connect(ep,
                                                boost::bind(&TCPClient::handleConnect, 
                                                this, boost::asio::placeholders::error));

                            //Synchronous code that works fine
                            //boost::system::error_code ec;         
            //connection_->socket().connect(ep, ec);
            /*if (!ec)
            {
                worker_threads_.create_thread( 
                    boost::bind( &TCPClient::handleConnect, this, ec ));

                bResult=true;
            }
            else
            {
                std::ostringstream oss;
                oss<< "BoostTCPConnection::start has an error "<<ec.message()<<std::endl;
                errorOutput_->outputError(oss.str());

            }*/

        }
    }
    catch (std::exception& e)
    {
        std::ostringstream oss;
        oss<< "BoostTCPConnection::start received the exception "<<e.what()<<std::endl;
        errorOutput_->outputError(oss.str());
    }

    return bResult;
}

void TCPClient::handleConnect(const boost::system::error_code& error)
{

    if (error )
    {
        std::ostringstream oss;
        oss<< "BoostTCPConnection::handleConnect received the error "<<error.message()<<std::endl;
        errorOutput_->outputError(oss.str());
        shutdowner_->shutdown();
        return;
    }

    if (connection_)
        connection_->start();

}

void TCPClient::stop()
{
    if (connection_)
        connection_->setRunning(false);

    io_service_.stop();
    worker_threads_.join_all();
}

void TCPClient::getQueueSize(unsigned long& tcpToSend, unsigned long& tcpRead)
{
    if (connection_)
        connection_->getQueueSize(tcpToSend, tcpRead);
    else
    {
        tcpToSend=0;
        tcpRead=0;
    }
}

void TCPClient::send(boost::shared_ptr<std::string>& message)
{
    if (connection_)
        connection_->enqueueMessageToSend(message);
}

void TCPClient::getMessages(std::list< boost::shared_ptr<std::string> >& messages)
{ 
    if (connection_)
        connection_->getMessages(messages);
}

最佳答案

问题是你的 io_service 在启动时有一个空队列并立即返回:

void TCPClient::threadAction() {
  io_service_.run();
  assert(0); // triggers right away because there's no async operation queued
}

来自 Stopping the io_service from running out of work :

Some applications may need to prevent an io_service object's run() call from returning when there is no more work to do. For example, the io_service may be being run in a background thread that is launched prior to the application's asynchronous operations. The run() call may be kept running by creating an object of type io_service::work.

关于networking - boost::asio tcp async_accept 处理程序未被调用但同步连接工作正常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13300352/

有关networking - boost::asio tcp async_accept 处理程序未被调用但同步连接工作正常的更多相关文章

  1. ruby - 在 Ruby 程序执行时阻止 Windows 7 PC 进入休眠状态 - 2

    我需要在客户计算机上运行Ruby应用程序。通常需要几天才能完成(复制大备份文件)。问题是如果启用sleep,它会中断应用程序。否则,计算机将持续运行数周,直到我下次访问为止。有什么方法可以防止执行期间休眠并让Windows在执行后休眠吗?欢迎任何疯狂的想法;-) 最佳答案 Here建议使用SetThreadExecutionStateWinAPI函数,使应用程序能够通知系统它正在使用中,从而防止系统在应用程序运行时进入休眠状态或关闭显示。像这样的东西:require'Win32API'ES_AWAYMODE_REQUIRED=0x0

  2. ruby-on-rails - 由于 "wkhtmltopdf",PDFKIT 显然无法正常工作 - 2

    我在从html页面生成PDF时遇到问题。我正在使用PDFkit。在安装它的过程中,我注意到我需要wkhtmltopdf。所以我也安装了它。我做了PDFkit的文档所说的一切......现在我在尝试加载PDF时遇到了这个错误。这里是错误:commandfailed:"/usr/local/bin/wkhtmltopdf""--margin-right""0.75in""--page-size""Letter""--margin-top""0.75in""--margin-bottom""0.75in""--encoding""UTF-8""--margin-left""0.75in""-

  3. ruby-on-rails - 'compass watch' 是如何工作的/它是如何与 rails 一起使用的 - 2

    我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t

  4. ruby - 如何指定 Rack 处理程序 - 2

    Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack

  5. ruby - 在 Ruby 中编写命令行实用程序 - 2

    我想用ruby​​编写一个小的命令行实用程序并将其作为gem分发。我知道安装后,Guard、Sass和Thor等某些gem可以从命令行自行运行。为了让gem像二进制文件一样可用,我需要在我的gemspec中指定什么。 最佳答案 Gem::Specification.newdo|s|...s.executable='name_of_executable'...endhttp://docs.rubygems.org/read/chapter/20 关于ruby-在Ruby中编写命令行实用程序

  6. ruby-on-rails - Rails 应用程序之间的通信 - 2

    我构建了两个需要相互通信和发送文件的Rails应用程序。例如,一个Rails应用程序会发送请求以查看其他应用程序数据库中的表。然后另一个应用程序将呈现该表的json并将其发回。我还希望一个应用程序将存储在其公共(public)目录中的文本文件发送到另一个应用程序的公共(public)目录。我从来没有做过这样的事情,所以我什至不知道从哪里开始。任何帮助,将不胜感激。谢谢! 最佳答案 无论Rails是什么,几乎所有Web应用程序都有您的要求,大多数现代Web应用程序都需要相互通信。但是有一个小小的理解需要你坚持下去,网站不应直接访问彼此

  7. ruby - 无法运行 Rails 2.x 应用程序 - 2

    我尝试运行2.x应用程序。我使用rvm并为此应用程序设置其他版本的ruby​​:$rvmuseree-1.8.7-head我尝试运行服务器,然后出现很多错误:$script/serverNOTE:Gem.source_indexisdeprecated,useSpecification.Itwillberemovedonorafter2011-11-01.Gem.source_indexcalledfrom/Users/serg/rails_projects_terminal/work_proj/spohelp/config/../vendor/rails/railties/lib/r

  8. ruby-on-rails - Rails 应用程序中的 Rails : How are you using application_controller. rb 是新手吗? - 2

    刚入门rails,开始慢慢理解。有人可以解释或给我一些关于在application_controller中编码的好处或时间和原因的想法吗?有哪些用例。您如何为Rails应用程序使用应用程序Controller?我不想在那里放太多代码,因为据我了解,每个请求都会调用此Controller。这是真的? 最佳答案 ApplicationController实际上是您应用程序中的每个其他Controller都将从中继承的类(尽管这不是强制性的)。我同意不要用太多代码弄乱它并保持干净整洁的态度,尽管在某些情况下ApplicationContr

  9. ruby - 无法让 RSpec 工作—— 'require' : cannot load such file - 2

    我花了三天的时间用头撞墙,试图弄清楚为什么简单的“rake”不能通过我的规范文件。如果您遇到这种情况:任何文件夹路径中都不要有空格!。严重地。事实上,从现在开始,您命名的任何内容都没有空格。这是我的控制台输出:(在/Users/*****/Desktop/LearningRuby/learn_ruby)$rake/Users/*******/Desktop/LearningRuby/learn_ruby/00_hello/hello_spec.rb:116:in`require':cannotloadsuchfile--hello(LoadError) 最佳

  10. ruby-on-rails - rspec should have_select ('cars' , :options => ['volvo' , 'saab' ] 不工作 - 2

    关闭。这个问题需要detailsorclarity.它目前不接受答案。想改进这个问题吗?通过editingthispost添加细节并澄清问题.关闭8年前。Improvethisquestion在首页我有:汽车:VolvoSaabMercedesAudistatic_pages_spec.rb中的测试代码:it"shouldhavetherightselect"dovisithome_pathit{shouldhave_select('cars',:options=>['volvo','saab','mercedes','audi'])}end响应是rspec./spec/request

随机推荐