草庐IT

c++ - boost::asio::async_read 在换行符上返回文件结尾错误

coder 2023-09-18 原文

我正在尝试使用 async_readasync_write 向服务器发出简单的 tcp 请求并设置超时。

问题是 async_read 在尝试读取直到传输结束时给出错误,在第一个 '\n' 上它返回错误(文件结束)。

逐行读取字符串时(当eots->at(last_request) = '\n')时,它成功读取了整个响应。

if(eots->at(last_request)=="") // read until end
        {
             boost::asio::async_read(
                socket_
                , input_buffer_
                , boost::asio::transfer_at_least(1) // read untill end or error
                , boost::bind(&tcp_client::do_Requests_read_handle, this, boost::asio::placeholders::error)
                );
        }else
        {
            boost::asio::async_read_until(
                socket_
                , input_buffer_
                , eots->at(last_request) // read until current request end of transmission sign/string or error
                , boost::bind(&tcp_client::do_Requests_read_handle, this, _1)
                );
        }

这是预期的行为吗?我做得对吗?

为了测试,我尝试进行 whois 查询 (args whois.iana.org 43 com)。

完整代码:

/*
 * MK async TCP
 * contains basic definitions for extractions and string filtering
 *
 */
#ifndef MK_ASYNC_TCP_HPP
#define MK_ASYNC_TCP_HPP
//
// async_tcp_client.cpp
// ~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#include <boost/asio/deadline_timer.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/read_until.hpp>
#include <boost/asio/read.hpp>
#include <boost/asio/streambuf.hpp>
#include <boost/asio/write.hpp>
#include <boost/asio/placeholders.hpp>
//#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <iostream>

using boost::asio::deadline_timer;
using boost::asio::ip::tcp;

//
// This class manages socket timeouts by applying the concept of a deadline.
// Some asynchronous operations are given deadlines by which they must complete.
// Deadlines are enforced by an "actor" that persists for the lifetime of the
// tcp_client object:
//
//  +----------------+
//  |                |
//  | check_deadline |<---+
//  |                |    |
//  +----------------+    | async_wait()
//              |         |
//              +---------+
//
// If the deadline actor determines that the deadline has expired, the socket
// is closed and any outstanding operations are consequently cancelled.
//
// Connection establishment involves trying each endpoint in turn until a
// connection is successful, or the available endpoints are exhausted. If the
// deadline actor closes the socket, the connect actor is woken up and moves to
// the next endpoint.
//
//  +---------------+
//  |               |
//  | start_connect |<---+
//  |               |    |
//  +---------------+    |
//           |           |
//  async_-  |    +----------------+
// connect() |    |                |
//           +--->| handle_connect |
//                |                |
//                +----------------+
//                          :
// Once a connection is     :
// made, the connect        :
// actor forks in two -     :
//                          :
// an actor for reading     :       and an actor for
// inbound messages:        :       sending heartbeats:
//                          :
//  +------------+          :          +-------------+
//  |            |<- - - - -+- - - - ->|             |
//  | start_read |                     | start_write |<---+
//  |            |<---+                |             |    |
//  +------------+    |                +-------------+    | async_wait()
//          |         |                        |          |
//  async_- |    +-------------+       async_- |    +--------------+
//   read_- |    |             |       write() |    |              |
//  until() +--->| handle_read |               +--->| handle_write |
//               |             |                    |              |
//               +-------------+                    +--------------+
//
// The input actor reads messages from the socket, where messages are delimited
// by the newline character. The deadline for a complete message is 30 seconds.
//
// The heartbeat actor sends a heartbeat (a message that consists of a single
// newline character) every 10 seconds. In this example, no deadline is applied
// message sending.
//
class tcp_client
{
    public:

      tcp_client(boost::asio::io_service& io_service, std::vector<std::string> * requests , std::vector<std::string>  * responses , std::vector<std::string>  * eots, unsigned int request_timeout = 30, unsigned int connect_timeout = 10)
        : stopped_(false),
          socket_(io_service),
          deadline_(io_service),
          heartbeat_timer_(io_service),
          requests(requests),
          responses(responses),
          eots(eots),
          request_timeout(request_timeout),
          connect_timeout(connect_timeout)
      {
        if(eots->size()==0)
        {
            for(unsigned long i=0 ; i<(requests->size()-1); i++)
            {
                eots->push_back("\n");
            }
            eots->push_back("");
        }
        if(responses->size()==0)
        {
            responses->resize(requests->size());
        }
        if( (eots->size() != requests->size()) || (requests->size() != responses->size()) )
        {
            std::cerr<<std::endl<<"wrong nr of parameters"<<std::endl;
            return;
        }
      }

      // Called by the user of the tcp_client class to initiate the connection process.
      // The endpoint iterator will have been obtained using a tcp::resolver.
      void start(tcp::resolver::iterator endpoint_iter)
      {
        // Start the connect actor.
        start_connect(endpoint_iter);

        // Start the deadline actor. You will note that we're not setting any
        // particular deadline here. Instead, the connect and input actors will
        // update the deadline prior to each asynchronous operation.
        deadline_.async_wait(boost::bind(&tcp_client::check_deadline, this));
      }

      // This function terminates all the actors to shut down the connection. It
      // may be called by the user of the tcp_client class, or by the class itself in
      // response to graceful termination or an unrecoverable error.
      void stop()
      {
        stopped_ = true;
        boost::system::error_code ignored_ec;
        socket_.close(ignored_ec);
        deadline_.cancel();
        heartbeat_timer_.cancel();
      }

    private:
      void start_connect(tcp::resolver::iterator endpoint_iter)
      {
        if (endpoint_iter != tcp::resolver::iterator())
        {
          std::cout << "Trying " << endpoint_iter->endpoint() << "...\n";

          // Set a deadline for the connect operation.
          deadline_.expires_from_now(boost::posix_time::seconds(60));

          // Start the asynchronous connect operation.
          socket_.async_connect(endpoint_iter->endpoint(),
              boost::bind(&tcp_client::handle_connect,
                this, _1, endpoint_iter));
        }
        else
        {
          // There are no more endpoints to try. Shut down the client.
          stop();
        }
      }
      void handle_connect(const boost::system::error_code& ec, tcp::resolver::iterator endpoint_iter)
      {
        if (stopped_)
          return;

        // The async_connect() function automatically opens the socket at the start
        // of the asynchronous operation. If the socket is closed at this time then
        // the timeout handler must have run first.
        if (!socket_.is_open())
        {
          std::cout << "Connect timed out\n";

          // Try the next available endpoint.
          start_connect(++endpoint_iter);
        }

        // Check if the connect operation failed before the deadline expired.
        else if (ec)
        {
          std::cout << "Connect error: " << ec.message() << "\n";

          // We need to close the socket used in the previous connection attempt
          // before starting a new one.
          socket_.close();

          // Try the next available endpoint.
          start_connect(++endpoint_iter);
        }

        // Otherwise we have successfully established a connection.
        else
        {
          std::cout << "Connected to " << endpoint_iter->endpoint() << "\n";
          boost::asio::socket_base::keep_alive option(true);
          socket_.set_option(option);

          //~ // Start the input actor.
          //~ start_read();

          //~ // Start the heartbeat actor.
          //~ start_write();
          deadline_.expires_from_now(boost::posix_time::seconds(this->request_timeout));
          do_Requests_write();


        }
      }


      void handle_Requests_finish()
      {
        if(last_request<requests->size())
        {
            last_request++;
            do_Requests_write();
        }else
        {
            stop();
        }
      }

      void do_Requests_write()
      {
        if (stopped_)
          return;

        // Start an asynchronous operation to send a heartbeat message.
        boost::asio::async_write(
            socket_
            , boost::asio::buffer(requests->at(last_request)+"\n")
            , boost::bind(&tcp_client::do_Requests_write_handle, this, _1)
            );
      }

      void do_Requests_write_handle(const boost::system::error_code& ec)
      {
        if (stopped_)
          return;

        if (!ec)
        {
          do_Requests_read();
        }
        else
        {
          std::cout << "Error do_Requests_write_handle: " << ec.message() << "\n";

          stop();
        }
      }

      void do_Requests_read()
      {
        // Set a deadline for the read operation.
        deadline_.expires_from_now(boost::posix_time::seconds(this->request_timeout));

        // Start an asynchronous operation to read a newline-delimited message.
        if(eots->at(last_request)=="") // read untill end
        {
             boost::asio::async_read(
                socket_
                , input_buffer_
                , boost::asio::transfer_at_least(1) // read untill end or error
                , boost::bind(&tcp_client::do_Requests_read_handle, this, boost::asio::placeholders::error)
                );
        }else
        {
            boost::asio::async_read_until(
                socket_
                , input_buffer_
                , eots->at(last_request) // read untill current request end of transmission sign/string or error
                , boost::bind(&tcp_client::do_Requests_read_handle, this, _1)
                );
        }
      }

      void do_Requests_read_handle(const boost::system::error_code& ec)
      {
        if (stopped_)
          return;

        if (!ec)
        {
          // Extract the newline-delimited message from the buffer.
          //~ std::string line;
          //~ std::istream is(&input_buffer_);
          //~ std::getline(is, line);
          std::istream response_istream(&input_buffer_);
          std::string response;
          response_istream >> response;

          // Empty messages are heartbeats and so ignored.
          std::cout << "Received: " << response << "\n";
          responses->at(last_request)+=response+"\n";
          //~ if (!line.empty())
          //~ {
            //~ std::cout << "Received: " << line << "\n";
          //~ }

          do_Requests_read();
        }
        else
        {
            std::cout<<(std::string)"Error on receive: " + ec.message() + "\n";
            responses->at(last_request)+= (std::string)"Error on receive: " + ec.message() + "\n";
            handle_Requests_finish();
        }
      }

      void check_deadline()
      {
        if (stopped_)
          return;

        // Check whether the deadline has passed. We compare the deadline against
        // the current time since a new asynchronous operation may have moved the
        // deadline before this actor had a chance to run.
        if (deadline_.expires_at() <= deadline_timer::traits_type::now())
        {
          // The deadline has passed. The socket is closed so that any outstanding
          // asynchronous operations are cancelled.
          socket_.close();

          // There is no longer an active deadline. The expiry is set to positive
          // infinity so that the actor takes no action until a new deadline is set.
          deadline_.expires_at(boost::posix_time::pos_infin);
        }

        // Put the actor back to sleep.
        deadline_.async_wait(boost::bind(&tcp_client::check_deadline, this));
      }

    private:
        bool stopped_;
        tcp::socket socket_;
        boost::asio::streambuf input_buffer_;
        deadline_timer deadline_;
        deadline_timer heartbeat_timer_;
        std::vector<std::string> *requests, *responses, *eots;
        unsigned int last_request=0;
        unsigned int request_timeout = 30;
        unsigned int connect_timeout = 10;
};

int main(int argc, char* argv[])
{
    std::vector<std::string> requests, responses, eots;
  try
  {
    if (argc < 4)
    {
      std::cerr << "Usage: tcp_client <host> <port> <query1> <query2> <query3> [..]\n";
      return 1;
    }
    for(int i = 3; i<argc ; i++ )
    {
        requests.push_back(argv[i]);
        eots.push_back("");
        responses.push_back("");
    }

    boost::asio::io_service io_service;
    tcp::resolver r(io_service);
    tcp_client c(io_service,&requests,&responses,&eots);

    c.start(r.resolve(tcp::resolver::query(argv[1], argv[2])));

    io_service.run();
  }
  catch (std::exception& e)
  {
    std::cerr << "Exception: " << e.what() << "\n";
  }

  return 0;
}
#endif // MK_ASYNC_TCP_HPP

最佳答案

async_read()操作以 boost::asio::error::eof 错误代码完成,因为已到达文件末尾,而不是因为到达第一个 \n 时从 input_buffer_ 逐字读取。

whois.iana.org:43 上对 com 的响应是 1830 字节。

$ nc whois.iana.org 43 | wc --bytesenter
comenter
1830

boost::asio::streambuf提供给读取操作,它将尝试分配未指定大小的缓冲区,以便可以读取数据。 current implementation将尝试分配一个大小为 512 的缓冲区。因此,如果接收到 1830 个字节,并且每次读取操作读取缓冲区最大值为 512 字节,那么所有接收到的字节将在第 4 次读取操作中被读取。因此,第 5 次读取操作将导致文件结束。

async_read_until() 的完成条件导致行为略有不同。当 streambuf 包含指定的定界符或发生错误时,此操作被视为完成。当 async_read_until() 完成时,streambuf 可能包含分隔符之外的其他数据。如果 streambuf 的附加数据包含分隔符,则随后对 async_read_until() 的调用将满足其完成条件,而无需调用 AsyncReadStreamasync_read_some( ) 函数。

关于c++ - boost::asio::async_read 在换行符上返回文件结尾错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22663063/

有关c++ - boost::asio::async_read 在换行符上返回文件结尾错误的更多相关文章

  1. ruby - 使用 RubyZip 生成 ZIP 文件时设置压缩级别 - 2

    我有一个Ruby程序,它使用rubyzip压缩XML文件的目录树。gem。我的问题是文件开始变得很重,我想提高压缩级别,因为压缩时间不是问题。我在rubyzipdocumentation中找不到一种为创建的ZIP文件指定压缩级别的方法。有人知道如何更改此设置吗?是否有另一个允许指定压缩级别的Ruby库? 最佳答案 这是我通过查看ruby​​zip内部创建的代码。level=Zlib::BEST_COMPRESSIONZip::ZipOutputStream.open(zip_file)do|zip|Dir.glob("**/*")d

  2. ruby - 其他文件中的 Rake 任务 - 2

    我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时

  3. ruby-on-rails - 在 Rails 中将文件大小字符串转换为等效千字节 - 2

    我的目标是转换表单输入,例如“100兆字节”或“1GB”,并将其转换为我可以存储在数据库中的文件大小(以千字节为单位)。目前,我有这个:defquota_convert@regex=/([0-9]+)(.*)s/@sizes=%w{kilobytemegabytegigabyte}m=self.quota.match(@regex)if@sizes.include?m[2]eval("self.quota=#{m[1]}.#{m[2]}")endend这有效,但前提是输入是倍数(“gigabytes”,而不是“gigabyte”)并且由于使用了eval看起来疯狂不安全。所以,功能正常,

  4. ruby-on-rails - Rails 3 中的多个路由文件 - 2

    Rails2.3可以选择随时使用RouteSet#add_configuration_file添加更多路由。是否可以在Rails3项目中做同样的事情? 最佳答案 在config/application.rb中:config.paths.config.routes在Rails3.2(也可能是Rails3.1)中,使用:config.paths["config/routes"] 关于ruby-on-rails-Rails3中的多个路由文件,我们在StackOverflow上找到一个类似的问题

  5. ruby - 将差异补丁应用于字符串/文件 - 2

    对于具有离线功能的智能手机应用程序,我正在为Xml文件创建单向文本同步。我希望我的服务器将增量/差异(例如GNU差异补丁)发送到目标设备。这是计划:Time=0Server:hasversion_1ofXmlfile(~800kiB)Client:hasversion_1ofXmlfile(~800kiB)Time=1Server:hasversion_1andversion_2ofXmlfile(each~800kiB)computesdeltaoftheseversions(=patch)(~10kiB)sendspatchtoClient(~10kiBtransferred)Cl

  6. ruby - 如何将脚本文件的末尾读取为数据文件(Perl 或任何其他语言) - 2

    我正在寻找执行以下操作的正确语法(在Perl、Shell或Ruby中):#variabletoaccessthedatalinesappendedasafileEND_OF_SCRIPT_MARKERrawdatastartshereanditcontinues. 最佳答案 Perl用__DATA__做这个:#!/usr/bin/perlusestrict;usewarnings;while(){print;}__DATA__Texttoprintgoeshere 关于ruby-如何将脚

  7. ruby - 使用 Vim Rails,您可以创建一个新的迁移文件并一次性打开它吗? - 2

    使用带有Rails插件的vim,您可以创建一个迁移文件,然后一次性打开该文件吗?textmate也可以这样吗? 最佳答案 你可以使用rails.vim然后做类似的事情::Rgeneratemigratonadd_foo_to_bar插件将打开迁移生成的文件,这正是您想要的。我不能代表textmate。 关于ruby-使用VimRails,您可以创建一个新的迁移文件并一次性打开它吗?,我们在StackOverflow上找到一个类似的问题: https://sta

  8. ruby-on-rails - 如何优雅地重启 thin + nginx? - 2

    我的瘦服务器配置了nginx,我的ROR应用程序正在它们上运行。在我发布代码更新时运行thinrestart会给我的应用程序带来一些停机时间。我试图弄清楚如何优雅地重启正在运行的Thin实例,但找不到好的解决方案。有没有人能做到这一点? 最佳答案 #Restartjustthethinserverdescribedbythatconfigsudothin-C/etc/thin/mysite.ymlrestartNginx将继续运行并代理请求。如果您将Nginx设置为使用多个上游服务器,例如server{listen80;server

  9. Ruby 写入和读取对象到文件 - 2

    好的,所以我的目标是轻松地将一些数据保存到磁盘以备后用。您如何简单地写入然后读取一个对象?所以如果我有一个简单的类classCattr_accessor:a,:bdefinitialize(a,b)@a,@b=a,bendend所以如果我从中非常快地制作一个objobj=C.new("foo","bar")#justgaveitsomerandomvalues然后我可以把它变成一个kindaidstring=obj.to_s#whichreturns""我终于可以将此字符串打印到文件或其他内容中。我的问题是,我该如何再次将这个id变回一个对象?我知道我可以自己挑选信息并制作一个接受该信

  10. ruby - 如何使用 Ruby aws/s3 Gem 生成安全 URL 以从 s3 下载文件 - 2

    我正在编写一个小脚本来定位aws存储桶中的特定文件,并创建一个临时验证的url以发送给同事。(理想情况下,这将创建类似于在控制台上右键单击存储桶中的文件并复制链接地址的结果)。我研究过回形针,它似乎不符合这个标准,但我可能只是不知道它的全部功能。我尝试了以下方法:defauthenticated_url(file_name,bucket)AWS::S3::S3Object.url_for(file_name,bucket,:secure=>true,:expires=>20*60)end产生这种类型的结果:...-1.amazonaws.com/file_path/file.zip.A

随机推荐