草庐IT

sockets - 错误的文件描述符 Boost asio

coder 2023-09-19 原文

我尝试用 boost asio 和 boost beast 做一个简单的 tcp/http 服务器。 但是,当我尝试读取套接字消息时,我得到了错误的文件描述符。 我真的不明白哪里出了问题。我使用 std::move 将套接字从服务器类传输到 detect_session 类以获得相同的“套接字”

服务器

tcp_server::tcp_server(boost::asio::io_context& ioc, tcp::endpoint endpoint, 
std::shared_ptr<std::string const> const& doc_root)
: acceptor(ioc, endpoint),
doc_root(doc_root)
{
wait_for_connection();
}

void tcp_server::wait_for_connection()
{
acceptor.async_accept(
[this](boost::system::error_code ec, tcp::socket socket)
    {
      if (!ec)
      {
    std::cout << "accepted" << std::endl;
    std::make_shared<detect_session>(std::move(socket), std::move(buffer), 
  doc_root)->run();
  }
wait_for_connection();
});
}

检测 session .h

#ifndef DETECT_SESSION_H
#define DETECT_SESSION_H
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/bind_executor.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/make_unique.hpp>

#include "message.h"
#include "http_session.h"
#include "tcp_connexion.h"
#include "logger.h"

using tcp = boost::asio::ip::tcp;
namespace ssl = boost::asio::ssl;
namespace http = boost::beast::http;
namespace websocket = boost::beast::websocket ;

class detect_session: public std::enable_shared_from_this<detect_session>
{
boost::asio::strand<boost::asio::io_context::executor_type> strand;
tcp::socket m_socket;
std::shared_ptr<std::string const> doc_root;

public:
detect_session(tcp::socket socket, boost::beast::flat_buffer buffer,std::shared_ptr<std::string const> const& doc_root);
~detect_session();
void run();
void handshake();
void on_handshake();
void do_read();
void on_read(boost::system::error_code ec);

http::request<http::string_body> req;

protected:
boost::beast::flat_buffer buffer;
    connection_ptr m_tcp_connection;
private:
void on_timer(boost::system::error_code ec);
boost::asio::steady_timer timer;
void do_timeout();
void checkGETVerb(boost::system::error_code ec);
void do_eof();
message message_read;
};

#endif // DETECT_SESSION_H

检测 session .cpp

detect_session::detect_session(tcp::socket socket,  
boost::beast::flat_buffer buffer,std::shared_ptr<std::string const> const& 
doc_root)
    : m_socket(std::move(socket))
    , strand(socket.get_executor())
, timer(socket.get_executor().context(),
        (std::chrono::steady_clock::time_point::max)())
, doc_root(doc_root)
{
}

detect_session::~detect_session()
{
 //dtor
}

void detect_session::run()
{
if(! strand.running_in_this_thread())
        return  
boost::asio::post(boost::asio::bind_executor(strand,std::bind(&detect_session::run, shared_from_this())));

  on_timer({});
  do_read();
  }

  void detect_session::on_timer(boost::system::error_code ec)
  {
    if(ec && ec != boost::asio::error::operation_aborted)
       // return fail(ec, "timer");
    // Verify that the timer really expired since the deadline may have moved.

    if(timer.expiry() <= std::chrono::steady_clock::now())
        return do_timeout();
    // Wait on the timer
    timer.async_wait(
        boost::asio::bind_executor(
            strand,
            std::bind(
                &detect_session::on_timer,
                shared_from_this(),
                std::placeholders::_1)));
      } 

  void detect_session::do_read()
  {
  timer.expires_after(std::chrono::seconds(15));
  //boost::asio::io_context &ioc = m_socket.get_executor().context();
  m_tcp_connection = connection_ptr(new tcp_connection(m_socket.get_executor().context()));
  m_tcp_connection->async_read(message_read, boost::bind(&detect_session::on_read, this, boost::asio::placeholders::error) );
 }

 void detect_session::on_read(boost::system::error_code ec)
 {
 // Happens when the timer closes the socket
    if(ec == boost::asio::error::operation_aborted)
        return;
    // This means they closed the connection
    if(ec == http::error::end_of_stream)
        do_eof();
    if(ec)
        return log.fail(ec, "read");
   std::cout<< <<message_read.m_message<< std::endl;
    if(message_read.tcp == 0)
    {
    // See if it is a HTTP session
        req = {};
        http::async_read(m_socket, buffer, req, boost::asio::bind_executor(strand, std::bind(&detect_session::checkGETVerb, this, std::placeholders::_1) ));
    }
   // else
        /*std::make_shared<connection_ptr>(
std::move(socket.get_executor().context()));*/
    }

   void detect_session::checkGETVerb(boost::system::error_code ec)
   {
   if (req.method() == http::verb::get)
   {
   std::cout<<req << std::endl;
   }
   }

   void detect_session::do_timeout()
   {
   // Closing the socket cancels all outstanding operations. They
   // will complete with boost::asio::error::operation_aborted
   boost::system::error_code ec;
   m_socket.shutdown(tcp::socket::shutdown_both, ec);
   m_socket.close(ec);
   }

   void detect_session::do_eof()
   {
   // Send a TCP shutdown
   boost::system::error_code ec;
   m_socket.shutdown(tcp::socket::shutdown_send, ec);
   std::cout<< "socket closed"<<std::endl;
   // At this point the connection is closed gracefully
   }

tcp_connection.h

#include <boost/tuple/tuple.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/array.hpp>

#include <iostream>

class tcp_connection
{
public:
tcp_connection(boost::asio::io_context& io_context) : m_socket(io_context)
{
}

boost::asio::ip::tcp::socket& socket()
{
    return m_socket;
}

template <typename T, typename Handler>
void async_write(const T& t, Handler handler)
{
    // Serialize the data first so we know how large it is.
    std::ostringstream archive_stream;
    boost::archive::text_oarchive archive(archive_stream);
    archive << t;
    m_outbound_data = archive_stream.str();

    // Format the header.
    std::ostringstream header_stream;
    header_stream << std::setw(header_length)
        << std::hex << m_outbound_data.size();
    if (!header_stream || header_stream.str().size() != header_length)
    {
        // Something went wrong, inform the caller.
        boost::system::error_code error(boost::asio::error::invalid_argument);
        m_socket.get_io_service().post(boost::bind(handler, error));
        return;
    }
    m_outbound_header = header_stream.str();

    // Write the serialized data to the socket. We use "gather-write" to send
    // both the header and the data in a single write operation.
    std::vector<boost::asio::const_buffer> buffers;
    buffers.push_back(boost::asio::buffer(m_outbound_header));
    buffers.push_back(boost::asio::buffer(m_outbound_data));
    boost::asio::async_write(m_socket, buffers, handler);
}

/// Asynchronously read a data structure from the socket.
template <typename T, typename Handler>
void async_read(T& t, Handler handler)
{
    // Issue a read operation to read exactly the number of bytes in a header.
    void (tcp_connection::*f)(
        const boost::system::error_code&,
        T&, boost::tuple<Handler>)
        = &tcp_connection::handle_read_header<T, Handler>;
    boost::asio::async_read(m_socket, boost::asio::buffer(m_inbound_header),
        boost::bind(f,
        this, boost::asio::placeholders::error, boost::ref(t),
        boost::make_tuple(handler)));
}

/// Handle a completed read of a message header. The handler is passed using
/// a tuple since boost::bind seems to have trouble binding a function object
/// created using boost::bind as a parameter.
template <typename T, typename Handler>
void handle_read_header(const boost::system::error_code& e,
    T& t, boost::tuple<Handler> handler)
{
    if (e)
    {
        boost::get<0>(handler)(e);
    }
    else
    {
        // Determine the length of the serialized data.
        std::istringstream is(std::string(m_inbound_header, header_length));
        std::size_t m_inbound_datasize = 0;
        if (!(is >> std::hex >> m_inbound_datasize))
        {
            // Header doesn't seem to be valid. Inform the caller.
            boost::system::error_code error(boost::asio::error::invalid_argument);
            boost::get<0>(handler)(error);
            return;
        }

        // Start an asynchronous call to receive the data.
        m_inbound_data.resize(m_inbound_datasize);
        void (tcp_connection::*f)(
            const boost::system::error_code&,
            T&, boost::tuple<Handler>)
            = &tcp_connection::handle_read_data<T, Handler>;
        boost::asio::async_read(m_socket, boost::asio::buffer(m_inbound_data),
            boost::bind(f, this,
            boost::asio::placeholders::error, boost::ref(t), handler));
    }
}

/// Handle a completed read of message data.
template <typename T, typename Handler>
void handle_read_data(const boost::system::error_code& e,
    T& t, boost::tuple<Handler> handler)
{
    if (e)
    {
        boost::get<0>(handler)(e);
    }
    else
    {
        // Extract the data structure from the data just received.
        try
        {
            std::string archive_data(&m_inbound_data[0], m_inbound_data.size());
            std::istringstream archive_stream(archive_data);
            boost::archive::text_iarchive archive(archive_stream);
            archive >> t;
        }
        catch (std::exception& e)
        {
            // Unable to decode data.
            boost::system::error_code error(boost::asio::error::invalid_argument);
            boost::get<0>(handler)(error);
            return;
        }

        // Inform caller that data has been received ok.
        boost::get<0>(handler)(e);
    }
}

private:
/// The underlying socket.
boost::asio::ip::tcp::socket m_socket;

/// The size of a fixed length header.
enum { header_length = 8 };

/// Holds an outbound header.
std::string m_outbound_header;

/// Holds the outbound data.
std::string m_outbound_data;

/// Holds an inbound header.
char m_inbound_header[header_length];

/// Holds the inbound data.
std::vector<char> m_inbound_data;

boost::array<char, 128> m_network_buffer;

};

typedef boost::shared_ptr<tcp_connection> connection_ptr;

#endif // TCP_CONNECTION_H

消息.h

#ifndef MESSAGE_H
#define MESSAGE_H

#include <boost/archive/text_iarchive.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/serialization/map.hpp>
#include <boost/serialization/list.hpp>

class message
{
public:

void reset()
{
    m_list_string.clear();
    m_message.clear();
    m_login.clear();
}

int m_type;

// Generic datas
std::list<std::string>              m_list_string;
std::string                         m_message;
std::string                         m_login;
bool tcp;

template<class Archive>
void serialize(Archive& ar, const unsigned int version){
    ar & m_type & m_list_string & m_message & m_login;
}

enum {
    NEW_MSG = 0,
    PERSON_LEAVED = 1,
    PERSON_CONNECTED = 2,
};
};

#endif // MESSAGE_H

客户端:

tcp_client::tcp_client(boost::asio::io_context& io_context): m_io_context(io_context), socket(io_context)
{
}

tcp_client::~tcp_client()
{
}

void tcp_client::run()
{
boost::asio::io_context io_context;
tcp::resolver resolver(io_context);
const tcp::resolver::results_type endpoint = resolver.resolve("192.168.9.129", "4000");

m_tcp_connection = connection_ptr(new tcp_connection(m_io_context));
tcp::socket& sock = m_tcp_connection->socket();
boost::asio::async_connect(sock, endpoint,
        [this](boost::system::error_code ec, tcp::endpoint)
        {
          if (!ec)
          {
              write(QString("Welcome !"));
          }
        });
  }

 // Close the connection
 void tcp_client::close()
{
m_io_context.post(boost::bind(&tcp_client::do_close, this));
}

 void tcp_client::handle_read(const boost::system::error_code& error)
{
std::cout<<"12"<<std::endl;
if (!error)
{
    notify(m_message_read);
    m_tcp_connection->async_read(m_message_read,
        boost::bind(&tcp_client::handle_read, this,
        boost::asio::placeholders::error)
        );
}
else
{
    do_close();
}
}

void tcp_client::write(QString msg)
{
message e;
e.m_type = message::NEW_MSG;
e.m_login = m_login; 
e.m_message = msg.toStdString();
e.tcp = 1;
write(e);
}

 void tcp_client::write(message& e)
 {
    m_tcp_connection->async_write(e,
        boost::bind(&tcp_client::handle_write, this,
        boost::asio::placeholders::error)
        );
 }

tcp_connection.h和message.h在客户端和服务器端是一样的

最佳答案

这是设计使然。如果您关闭 套接字,句柄(描述符)将不再有效,因此之后开始的任何操作都会提示句柄无效。

(worse, it could be reused for a new file/connection and you might end up talking to the wrong party, causing undefined behaviour or data corruption. I've seen a bug like that live. Not fun to debug)

您可能希望/仅/执行shutdown,这将导致所有未决/新操作失败,但句柄仍然有效。然后当 socket 实例被析构时,它将自动安全地执行 close 调用,没有竞争条件。

关于sockets - 错误的文件描述符 Boost asio,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52321458/

有关sockets - 错误的文件描述符 Boost asio的更多相关文章

  1. ruby - 使用 RubyZip 生成 ZIP 文件时设置压缩级别 - 2

    我有一个Ruby程序,它使用rubyzip压缩XML文件的目录树。gem。我的问题是文件开始变得很重,我想提高压缩级别,因为压缩时间不是问题。我在rubyzipdocumentation中找不到一种为创建的ZIP文件指定压缩级别的方法。有人知道如何更改此设置吗?是否有另一个允许指定压缩级别的Ruby库? 最佳答案 这是我通过查看ruby​​zip内部创建的代码。level=Zlib::BEST_COMPRESSIONZip::ZipOutputStream.open(zip_file)do|zip|Dir.glob("**/*")d

  2. ruby - 其他文件中的 Rake 任务 - 2

    我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时

  3. ruby-on-rails - 在 Rails 中将文件大小字符串转换为等效千字节 - 2

    我的目标是转换表单输入,例如“100兆字节”或“1GB”,并将其转换为我可以存储在数据库中的文件大小(以千字节为单位)。目前,我有这个:defquota_convert@regex=/([0-9]+)(.*)s/@sizes=%w{kilobytemegabytegigabyte}m=self.quota.match(@regex)if@sizes.include?m[2]eval("self.quota=#{m[1]}.#{m[2]}")endend这有效,但前提是输入是倍数(“gigabytes”,而不是“gigabyte”)并且由于使用了eval看起来疯狂不安全。所以,功能正常,

  4. ruby-on-rails - Rails 3 中的多个路由文件 - 2

    Rails2.3可以选择随时使用RouteSet#add_configuration_file添加更多路由。是否可以在Rails3项目中做同样的事情? 最佳答案 在config/application.rb中:config.paths.config.routes在Rails3.2(也可能是Rails3.1)中,使用:config.paths["config/routes"] 关于ruby-on-rails-Rails3中的多个路由文件,我们在StackOverflow上找到一个类似的问题

  5. ruby-openid:执行发现时未设置@socket - 2

    我在使用omniauth/openid时遇到了一些麻烦。在尝试进行身份验证时,我在日志中发现了这一点:OpenID::FetchingError:Errorfetchinghttps://www.google.com/accounts/o8/.well-known/host-meta?hd=profiles.google.com%2Fmy_username:undefinedmethod`io'fornil:NilClass重要的是undefinedmethodio'fornil:NilClass来自openid/fetchers.rb,在下面的代码片段中:moduleNetclass

  6. ruby - 将差异补丁应用于字符串/文件 - 2

    对于具有离线功能的智能手机应用程序,我正在为Xml文件创建单向文本同步。我希望我的服务器将增量/差异(例如GNU差异补丁)发送到目标设备。这是计划:Time=0Server:hasversion_1ofXmlfile(~800kiB)Client:hasversion_1ofXmlfile(~800kiB)Time=1Server:hasversion_1andversion_2ofXmlfile(each~800kiB)computesdeltaoftheseversions(=patch)(~10kiB)sendspatchtoClient(~10kiBtransferred)Cl

  7. ruby-on-rails - Rails 常用字符串(用于通知和错误信息等) - 2

    大约一年前,我决定确保每个包含非唯一文本的Flash通知都将从模块中的方法中获取文本。我这样做的最初原因是为了避免一遍又一遍地输入相同的字符串。如果我想更改措辞,我可以在一个地方轻松完成,而且一遍又一遍地重复同一件事而出现拼写错误的可能性也会降低。我最终得到的是这样的:moduleMessagesdefformat_error_messages(errors)errors.map{|attribute,message|"Error:#{attribute.to_s.titleize}#{message}."}enddeferror_message_could_not_find(obje

  8. ruby - 如何将脚本文件的末尾读取为数据文件(Perl 或任何其他语言) - 2

    我正在寻找执行以下操作的正确语法(在Perl、Shell或Ruby中):#variabletoaccessthedatalinesappendedasafileEND_OF_SCRIPT_MARKERrawdatastartshereanditcontinues. 最佳答案 Perl用__DATA__做这个:#!/usr/bin/perlusestrict;usewarnings;while(){print;}__DATA__Texttoprintgoeshere 关于ruby-如何将脚

  9. ruby - 使用 Vim Rails,您可以创建一个新的迁移文件并一次性打开它吗? - 2

    使用带有Rails插件的vim,您可以创建一个迁移文件,然后一次性打开该文件吗?textmate也可以这样吗? 最佳答案 你可以使用rails.vim然后做类似的事情::Rgeneratemigratonadd_foo_to_bar插件将打开迁移生成的文件,这正是您想要的。我不能代表textmate。 关于ruby-使用VimRails,您可以创建一个新的迁移文件并一次性打开它吗?,我们在StackOverflow上找到一个类似的问题: https://sta

  10. Ruby 写入和读取对象到文件 - 2

    好的,所以我的目标是轻松地将一些数据保存到磁盘以备后用。您如何简单地写入然后读取一个对象?所以如果我有一个简单的类classCattr_accessor:a,:bdefinitialize(a,b)@a,@b=a,bendend所以如果我从中非常快地制作一个objobj=C.new("foo","bar")#justgaveitsomerandomvalues然后我可以把它变成一个kindaidstring=obj.to_s#whichreturns""我终于可以将此字符串打印到文件或其他内容中。我的问题是,我该如何再次将这个id变回一个对象?我知道我可以自己挑选信息并制作一个接受该信

随机推荐