草庐IT

c++ - boost 异步 tcp 客户端

coder 2024-02-12 原文

我刚刚开始使用 boost。 我正在使用异步套接字编写 TCP 客户端-服务器。

任务如下:

  1. 客户端向服务器发送一个数字
  2. 客户端可以在收到服务器的回答之前发送另一个数字。
  3. 服务器收到一个数字,用它做一些计算并将结果发送回客户端。
  4. 多个客户端可以连接到服务器。

现在可以执行以下操作

  • 从客户端发送一个数字到服务器
  • 服务器在当前线程中接收到一个数字并在 OnReceive 处理程序中进行计算(我知道这很糟糕......但是我应该如何启动一个新线程来并行计算)
  • 服务器返回应答但客户端已经断开连接

如何让客户端在键盘输入数字的同时等待服务器的响应?

为什么我的客户端不等待服务器的响应?

客户端代码:

using boost::asio::ip::tcp;

class TCPClient
{
    public:
        TCPClient(boost::asio::io_service& IO_Service, tcp::resolver::iterator EndPointIter);
        void Close();

    private:
        boost::asio::io_service& m_IOService;
        tcp::socket m_Socket;

        string m_SendBuffer;
        static const size_t m_BufLen = 100;
        char m_RecieveBuffer[m_BufLen*2];

        void OnConnect(const boost::system::error_code& ErrorCode, tcp::resolver::iterator EndPointIter);
        void OnReceive(const boost::system::error_code& ErrorCode);
        void OnSend(const boost::system::error_code& ErrorCode);
        void DoClose();
};

TCPClient::TCPClient(boost::asio::io_service& IO_Service, tcp::resolver::iterator EndPointIter)
: m_IOService(IO_Service), m_Socket(IO_Service), m_SendBuffer("")
{
    tcp::endpoint EndPoint = *EndPointIter;

    m_Socket.async_connect(EndPoint,
        boost::bind(&TCPClient::OnConnect, this, boost::asio::placeholders::error, ++EndPointIter));
}

void TCPClient::Close()
{
    m_IOService.post(
        boost::bind(&TCPClient::DoClose, this));
}
void TCPClient::OnConnect(const boost::system::error_code& ErrorCode, tcp::resolver::iterator EndPointIter)
{
    cout << "OnConnect..." << endl;
    if (ErrorCode == 0)
    {
        cin >> m_SendBuffer;
        cout << "Entered: " << m_SendBuffer << endl;
        m_SendBuffer += "\0";

        m_Socket.async_send(boost::asio::buffer(m_SendBuffer.c_str(),m_SendBuffer.length()+1),
            boost::bind(&TCPClient::OnSend, this,
            boost::asio::placeholders::error));
    } 
    else if (EndPointIter != tcp::resolver::iterator())
    {
        m_Socket.close();
        tcp::endpoint EndPoint = *EndPointIter;

        m_Socket.async_connect(EndPoint, 
            boost::bind(&TCPClient::OnConnect, this, boost::asio::placeholders::error, ++EndPointIter));
    }
}

void TCPClient::OnReceive(const boost::system::error_code& ErrorCode)
{
    cout << "receiving..." << endl;
    if (ErrorCode == 0)
    {
        cout << m_RecieveBuffer << endl;

        m_Socket.async_receive(boost::asio::buffer(m_RecieveBuffer, m_BufLen),
            boost::bind(&TCPClient::OnReceive, this, boost::asio::placeholders::error));
    } 
    else 
    {
        cout << "ERROR! OnReceive..." << endl;
        DoClose();
    }
}

void TCPClient::OnSend(const boost::system::error_code& ErrorCode)
{
    cout << "sending..." << endl;
    if (!ErrorCode)
    {
        cout << "\""<< m_SendBuffer <<"\" has been sent" << endl;
        m_SendBuffer = "";

        m_Socket.async_receive(boost::asio::buffer(m_RecieveBuffer, m_BufLen),
            boost::bind(&TCPClient::OnReceive, this, boost::asio::placeholders::error));
    }
    else
    {
        cout << "OnSend closing" << endl;
        DoClose();
    }

}

void TCPClient::DoClose()
{
    m_Socket.close();
}

int main()
{
    try 
    {
        cout << "Client is starting..." << endl;
        boost::asio::io_service IO_Service;

        tcp::resolver Resolver(IO_Service);

        string port = "13";
        tcp::resolver::query Query("127.0.0.1", port);

        tcp::resolver::iterator EndPointIterator = Resolver.resolve(Query);

        TCPClient Client(IO_Service, EndPointIterator);

        cout << "Client is started!" << endl;

        cout << "Enter a query string " << endl;

        boost::thread ClientThread(boost::bind(&boost::asio::io_service::run, &IO_Service));

        Client.Close();
        ClientThread.join();
    } 
    catch (exception& e)
    {
        cerr << e.what() << endl;
    }

    cout << "\nClosing";
    getch();
}

这是控制台的输出

Client is starting...
Client is started!
OnConnect...
12
Entered: 12
sending...
"12" has been sent
receiving...
ERROR! OnReceive...

Closing

服务器部分

class Session
{
    public:
        Session(boost::asio::io_service& io_service)
            : socket_(io_service)
        {
            dataRx[0] = '\0';
            dataTx[0] = '\0';
        }

        tcp::socket& socket()
        {
            return socket_;
        }

        void start()
        {
            socket_.async_read_some(boost::asio::buffer(dataRx, max_length),
                boost::bind(&Session::handle_read, this,
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred));
        }

        void handle_read(const boost::system::error_code& error, size_t bytes_transferred)
        {
            cout << "reading..." << endl;
            cout << "Data: " << dataRx << endl;

            if (!error)
            {
                if (!isValidData())
                {
                    cout << "Bad data!" << endl;
                    sprintf(dataTx, "Bad data!\0");
                    dataRx[0] = '\0';
                }
                else
                {
                    sprintf(dataTx, getFactorization().c_str());
                    dataRx[0] = '\0';
                }

                boost::asio::async_write(socket_,
                    boost::asio::buffer(dataTx, max_length*2),
                    boost::bind(&Session::handle_write, this,
                    boost::asio::placeholders::error));
            }
            else
            {
                delete this;
            }
        }

        void handle_write(const boost::system::error_code& error)
        {
            cout << "writing..." << endl;
            if (!error)
            {
                cout << "dataTx sent: " << dataTx << endl;
                dataTx[0] = '\0';

                socket_.async_read_some(boost::asio::buffer(dataRx, max_length),
                    boost::bind(&Session::handle_read, this,
                    boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred));
            }
            else
            {
                delete this;
            }
        }

        string getFactorization() const
        {
            //Do something
        }

        bool isValidData()
        {
            locale loc; 
            for (int i = 0; i < strlen(dataRx); i++)
                if (!isdigit(dataRx[i],loc))
                    return false;

            return true;
        }

    private:
        tcp::socket socket_;
        static const size_t max_length = 100;
        char dataRx[max_length];
        char dataTx[max_length*2];
};

class Server
{
    public:
        Server(boost::asio::io_service& io_service, short port)
            : io_service_(io_service),
            acceptor_(io_service, tcp::endpoint(tcp::v4(), port))
        {
            Session* new_session = new Session(io_service_);
            acceptor_.async_accept(new_session->socket(),
                boost::bind(&Server::handle_accept, this, new_session,
                boost::asio::placeholders::error));
        }

        void handle_accept(Session* new_session, const boost::system::error_code& error)
        {
            if (!error)
            {
                new_session->start();
                new_session = new Session(io_service_);
                acceptor_.async_accept(new_session->socket(),
                    boost::bind(&Server::handle_accept, this, new_session,
                    boost::asio::placeholders::error));
            }
            else
            {
                delete new_session;
            }
        }

    private:
        boost::asio::io_service& io_service_;
        tcp::acceptor acceptor_;
};

int main(int argc, char* argv[])
{
    cout << "Server is runing..." << endl;
    try
    {
        boost::asio::io_service io_service;

        int port = 13;
        Server s(io_service, port);
        cout << "Server is run!" << endl;
        io_service.run();
    }
    catch (boost::system::error_code& e)
    {
        std::cerr << e << "\n";
    }
    catch (std::exception& e)
    {
        std::cerr << "Exception: " << e.what() << "\n";
    }

    return 0;
}

服务器的输出

Server is runing...
Server is run!
reading...
Data: 12
writing...
dataTx sent: 13    //just send back received ++number
reading...
Data:

非常感谢您的帮助

========

已添加

好的,我明白了。但是检查 ErrorCode == boost::asio::error::eof 不起作用...我做错了什么?

else if (ErrorCode == boost::asio::error::eof)
{
    cout << "boost::asio::error::eof in OnReceive!" << endl;
}
else 
{
    cout << "ERROR! OnReceive..." << ErrorCode << endl;
    DoClose();
}

打印出来的是ERROR! OnReceive...system:10009 好像是我比较的不对

========

已添加

我找到了根本原因。我已经声明使用 async_receive(而不是 async_read_some)并将 main 中的行交换为

ClientThread.join();
Client.Close();

现在可以正常使用了!

现在我正在尝试同时从套接字读取数据和向套接字写入数据(因为客户端应该能够在收到服务器的响应之前发送额外的请求。

OnConnect 函数中我创建了 boost 线程:

boost::thread addMsgThread(boost::bind(&TCPClient::addMsgLoop, this));
boost::thread receivingThread(boost::bind(&TCPClient::startReceiving, this));
boost::thread sendingThread(boost::bind(&TCPClient::startSending, this));

实现

void TCPClient::startReceiving()
{
    cout << "receiving..." << endl;
    m_RecieveBuffer[0] = '\0';
    m_Socket.async_receive(boost::asio::buffer(m_RecieveBuffer, m_BufLen),
        boost::bind(&TCPClient::receivingLoop, this, boost::asio::placeholders::error)); //runtime error here
    cout << "m_RecieveBuffer = " << m_RecieveBuffer << endl;
}

void TCPClient::receivingLoop(const boost::system::error_code& ErrorCode)
{
    cout << "receiving..." << endl;
    if (ErrorCode == 0)
    {
        cout << "m_RecieveBuffer = " << m_RecieveBuffer << endl;

        m_Socket.async_receive(boost::asio::buffer(m_RecieveBuffer, m_BufLen),
            boost::bind(&TCPClient::receivingLoop, this, boost::asio::placeholders::error));
    }
    else 
    {
        cout << "ERROR! receivingLoop..." << ErrorCode << endl;
        DoClose();
    }
}

void TCPClient::addMsgLoop()
{
    while (true)
    {
        string tmp;
        cin >> tmp;

        cout << "Entered: " << tmp << endl;
        tmp += "\0";

        try
        {
            msgQueue.push(tmp);
        }
        catch(exception &e)
        {
            cerr << "Canno add msg to send queue... " << e.what() << endl;
        }
    }
}

receivesend 线程的问题相同:运行时错误(在 boost 库中的某处写入访问冲突)。

void TCPClient::startReceiving()
{
     ...
     m_Socket.async_receive(); //runtime error here
}

在后续版本中一切正常(但我不知道如何在回答前实现多次发送)。 谁能告诉我如何解决这个问题或如何通过其他方式实现这个问题?可能合并会有所帮助,但我现在确信这是个好方法。

最佳答案

boost::asio::ip::tcp::socket::async_read_some顾名思义就是不保证读取完整的数据。当客户端完成写入时,它将 error 对象设置为 boost::asio::error::eof

你得到的错误是因为:

服务器部分

        if (!error)
        {
            ...
        }
        else
        {
            delete this;
        }

else block 中,您假设这是一个错误情况并关闭连接。这并非总是如此。在 else 之前,您需要检查 error == boost::asio::error::eof

除此之外,在读取处理程序中,您应该继续收集缓冲区中读取的任何内容,直到您遇到 error == boost::asio::error::eof。只有这样,您才应该验证读取的数据并写回客户端。

查看 HTTP 服务器 1 , 2 , 3examples 中实现部分。

更新:更新问题的答案

更新后的代码存在线程同步问题。

  1. msgQueue 被两个或多个线程同时访问,没有任何锁。
  2. 可以同时调用同一个套接字上的读写。

如果我正确理解你的问题,你想:

  1. 获取用户输入并将其发送到服务器。
  2. 保持同时接收服务器的响应。

您可以使用两个 boost::asio::io_service::strands对于这两个任务。使用 Asio 时,链是同步任务的方式。 Asio 确保发布在链中的任务同步执行。

  1. strand1 中发布一个 send 任务,如下所示:read_user_input -> send_to_server -> handle_send -> read_user_input

  2. strand2 中发布一个 read 任务,如下所示:read_some -> handle_read -> read_some

这将确保 msgQueue 不会被两个线程同时访问。使用两个套接字对服务器进行读取和写入,以确保不会在同一个套接字上同时调用读取和写入。

关于c++ - boost 异步 tcp 客户端,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12990840/

有关c++ - boost 异步 tcp 客户端的更多相关文章

  1. ruby-on-rails - 如何在 ruby​​ 中使用两个参数异步运行 exe? - 2

    exe应该在我打开页面时运行。异步进程需要运行。有什么方法可以在ruby​​中使用两个参数异步运行exe吗?我已经尝试过ruby​​命令-system()、exec()但它正在等待过程完成。我需要用参数启动exe,无需等待进程完成是否有任何ruby​​gems会支持我的问题? 最佳答案 您可以使用Process.spawn和Process.wait2:pid=Process.spawn'your.exe','--option'#Later...pid,status=Process.wait2pid您的程序将作为解释器的子进程执行。除

  2. ruby-on-rails - 如何优雅地重启 thin + nginx? - 2

    我的瘦服务器配置了nginx,我的ROR应用程序正在它们上运行。在我发布代码更新时运行thinrestart会给我的应用程序带来一些停机时间。我试图弄清楚如何优雅地重启正在运行的Thin实例,但找不到好的解决方案。有没有人能做到这一点? 最佳答案 #Restartjustthethinserverdescribedbythatconfigsudothin-C/etc/thin/mysite.ymlrestartNginx将继续运行并代理请求。如果您将Nginx设置为使用多个上游服务器,例如server{listen80;server

  3. ruby - 使用 `+=` 和 `send` 方法 - 2

    如何将send与+=一起使用?a=20;a.send"+=",10undefinedmethod`+='for20:Fixnuma=20;a+=10=>30 最佳答案 恐怕你不能。+=不是方法,而是语法糖。参见http://www.ruby-doc.org/docs/ProgrammingRuby/html/tut_expressions.html它说Incommonwithmanyotherlanguages,Rubyhasasyntacticshortcut:a=a+2maybewrittenasa+=2.你能做的最好的事情是:

  4. ruby - 如何计算 Liquid 中的变量 +1 - 2

    我对如何计算通过{%assignvar=0%}赋值的变量加一完全感到困惑。这应该是最简单的任务。到目前为止,这是我尝试过的:{%assignamount=0%}{%forvariantinproduct.variants%}{%assignamount=amount+1%}{%endfor%}Amount:{{amount}}结果总是0。也许我忽略了一些明显的东西。也许有更好的方法。我想要存档的只是获取运行的迭代次数。 最佳答案 因为{{incrementamount}}将输出您的变量值并且不会影响{%assign%}定义的变量,我

  5. ruby-on-rails - 在 Ruby on Rails 中发送响应之前如何等待多个异步操作完成? - 2

    在我做的一些网络开发中,我有多个操作开始,比如对外部API的GET请求,我希望它们同时开始,因为一个不依赖另一个的结果。我希望事情能够在后台运行。我找到了concurrent-rubylibrary这似乎运作良好。通过将其混合到您创建的类中,该类的方法具有在后台线程上运行的异步版本。这导致我编写如下代码,其中FirstAsyncWorker和SecondAsyncWorker是我编写的类,我在其中混合了Concurrent::Async模块,并编写了一个名为“work”的方法来发送HTTP请求:defindexop1_result=FirstAsyncWorker.new.async.

  6. arrays - Ruby 数组 += vs 推送 - 2

    我有一个数组数组,想将元素附加到子数组。+=做我想做的,但我想了解为什么push不做。我期望的行为(并与+=一起工作):b=Array.new(3,[])b[0]+=["apple"]b[1]+=["orange"]b[2]+=["frog"]b=>[["苹果"],["橙子"],["Frog"]]通过推送,我将推送的元素附加到每个子数组(为什么?):a=Array.new(3,[])a[0].push("apple")a[1].push("orange")a[2].push("frog")a=>[[“苹果”、“橙子”、“Frog”]、[“苹果”、“橙子”、“Frog”]、[“苹果”、“

  7. += 的 Ruby 方法 - 2

    有没有办法让Ruby能够做这样的事情?classPlane@moved=0@x=0defx+=(v)#thisiserror@x+=v@moved+=1enddefto_s"moved#{@moved}times,currentxis#{@x}"endendplane=Plane.newplane.x+=5plane.x+=10putsplane.to_s#moved2times,currentxis15 最佳答案 您不能在Ruby中覆盖复合赋值运算符。任务在内部处理。您应该覆盖+,而不是+=。plane.a+=b与plane.a=

  8. ruby - Sinatra + Heroku + Datamapper 使用 dm-sqlite-adapter 部署问题 - 2

    出于某种原因,heroku尝试要求dm-sqlite-adapter,即使它应该在这里使用Postgres。请注意,这发生在我打开任何URL时-而不是在gitpush本身期间。我构建了一个默认的Facebook应用程序。gem文件:source:gemcuttergem"foreman"gem"sinatra"gem"mogli"gem"json"gem"httparty"gem"thin"gem"data_mapper"gem"heroku"group:productiondogem"pg"gem"dm-postgres-adapter"endgroup:development,:t

  9. ruby - 在 TCPServer (Ruby) 中,我如何从客户端获取 IP/MAC? - 2

    我想在Ruby的TCPServer中获取客户端的IP地址。以及(如果可能的话)MAC地址。例如,Ruby中的时间服务器,请参阅评论。tcpserver=TCPServer.new("",80)iftcpserverputs"Listening"loopdosocket=tcpserver.acceptifsocketThread.newdoputs"Connectedfrom"+#HERE!HowcanigettheIPAddressfromtheclient?socket.write(Time.now.to_s)socket.closeendendendend非常感谢!

  10. ruby - Ruby 中字符串运算符 + 和 << 的区别 - 2

    我是Ruby和这个网站的新手。下面两个函数是不同的,一个在函数外修改变量,一个不修改。defm1(x)x我想确保我理解正确-当调用m1时,对str的引用被复制并传递给将其视为x的函数。运算符当调用m2时,对str的引用被复制并传递给将其视为x的函数。运算符+创建一个新字符串,赋值x=x+"4"只是将x重定向到新字符串,而原始str变量保持不变。对吧?谢谢 最佳答案 String#+::str+other_str→new_strConcatenation—ReturnsanewStringcontainingother_strconc

随机推荐