草庐IT

c++ - boost::asio async_receive_from UDP 端点在线程之间共享?

coder 2023-11-15 原文

Boost asio 专门允许多个线程调用 io_service 上的 run() 方法。这似乎是创建多线程 UDP 服务器的好方法。但是,我遇到了一个问题,我正在努力寻找答案。

查看典型的 async_receive_from 调用:

m_socket->async_receive_from(
        boost::asio::buffer(m_recv_buffer),
        m_remote_endpoint,
        boost::bind(
            &udp_server::handle_receive,
            this,
            boost::asio::placeholders::error,
            boost::asio::placeholders::bytes_transferred));

远程端点和消息缓冲区没有传递给处理程序,而是处于更高的范围级别(在我的示例中为成员变量)。在 UDP 消息到达时处理它的代码如下所示:

void dns_server::handle_receive(const boost::system::error_code &error, std::size_t size)
{
    // process message
    blah(m_recv_buffer, size);

    // send something back
    respond(m_remote_endpoint);
}

如果有多个线程在运行,同步是如何进行的?在线程之间共享一个端点和接收缓冲区意味着 asio 在消息同时到达的情况下等待处理程序在单个线程中完成,然后再在另一个线程中调用处理程序。这似乎否定了允许多个线程首先调用 run 的意义。

如果我想获得请求的并发服务,看起来我需要将工作数据包连同端点的拷贝交给一个单独的线程,允许处理程序方法立即返回,以便 asio 可以获取on 并将另一条消息并行传递给另一个调用 run() 的线程。

这似乎有点令人讨厌。我在这里缺少什么?

最佳答案

Having a single end point and receive buffer shared between the threads implies that asio waits for a handler to complete within a single thread

如果您的意思是“使用单线程运行服务时”,那么这是正确的。

否则,情况并非如此。相反,当您同时调用单个服务对象(即套接字,而不是 io_service)上的操作时,Asio 只是说行为是“未定义的”。

That seems to negate the point of allowing multiple threads to call run in the first place.

除非处理需要相当长的时间,否则不会。

第一段介绍Timer.5 sample 似乎很好地阐述了您的主题。

session

要分离特定于请求的数据(缓冲区和端点),您需要一些 session 概念。 Asio 中一个流行的机制是 bound shared_ptr s 或共享自此 session 类(boost 绑定(bind)支持直接绑定(bind)到 boost::shared_ptr 实例)。

避免并发、不同步地访问 m_socket 的成员您可以添加锁或使用 strand上面链接的 Timer.5 示例中记录的方法。

演示

供您欣赏的是 Daytime.6 异步 UDP 日间服务器,修改为与许多服务 IO 线程一起工作。

请注意,从逻辑上讲,仍然只有一个 IO 线程(strand),因此我们没有违反套接字类的文档化线程安全性。

然而,与官方示例不同,响应可能会乱序排队,这取决于 udp_session::handle_request 中实际处理所花费的时间。 .

注意

  • 一个udp_session用于保存每个请求的缓冲区和远程端点的类
  • 一个线程池,能够在多个内核上扩展实际处理(而非 IO)的负载。
#include <ctime>
#include <iostream>
#include <string>
#include <boost/array.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/asio.hpp>
#include <boost/thread.hpp>

using namespace boost;
using asio::ip::udp;
using system::error_code;

std::string make_daytime_string()
{
    using namespace std; // For time_t, time and ctime;
    time_t now = time(0);
    return ctime(&now);
}

class udp_server; // forward declaration

struct udp_session : enable_shared_from_this<udp_session> {

    udp_session(udp_server* server) : server_(server) {}

    void handle_request(const error_code& error);

    void handle_sent(const error_code& ec, std::size_t) {
        // here response has been sent
        if (ec) {
            std::cout << "Error sending response to " << remote_endpoint_ << ": " << ec.message() << "\n";
        }
    }

    udp::endpoint remote_endpoint_;
    array<char, 100> recv_buffer_;
    std::string message;
    udp_server* server_;
};

class udp_server
{
    typedef shared_ptr<udp_session> shared_session;
  public:
    udp_server(asio::io_service& io_service)
        : socket_(io_service, udp::endpoint(udp::v4(), 1313)), 
          strand_(io_service)
    {
        receive_session();
    }

  private:
    void receive_session()
    {
        // our session to hold the buffer + endpoint
        auto session = make_shared<udp_session>(this);

        socket_.async_receive_from(
                asio::buffer(session->recv_buffer_), 
                session->remote_endpoint_,
                strand_.wrap(
                    bind(&udp_server::handle_receive, this,
                        session, // keep-alive of buffer/endpoint
                        asio::placeholders::error,
                        asio::placeholders::bytes_transferred)));
    }

    void handle_receive(shared_session session, const error_code& ec, std::size_t /*bytes_transferred*/) {
        // now, handle the current session on any available pool thread
        socket_.get_io_service().post(bind(&udp_session::handle_request, session, ec));

        // immediately accept new datagrams
        receive_session();
    }

    void enqueue_response(shared_session const& session) {
        socket_.async_send_to(asio::buffer(session->message), session->remote_endpoint_,
                strand_.wrap(bind(&udp_session::handle_sent, 
                        session, // keep-alive of buffer/endpoint
                        asio::placeholders::error,
                        asio::placeholders::bytes_transferred)));
    }

    udp::socket  socket_;
    asio::strand strand_;

    friend struct udp_session;
};

void udp_session::handle_request(const error_code& error)
{
    if (!error || error == asio::error::message_size)
    {
        message = make_daytime_string(); // let's assume this might be slow

        // let the server coordinate actual IO
        server_->enqueue_response(shared_from_this());
    }
}

int main()
{
    try {
        asio::io_service io_service;
        udp_server server(io_service);

        thread_group group;
        for (unsigned i = 0; i < thread::hardware_concurrency(); ++i)
            group.create_thread(bind(&asio::io_service::run, ref(io_service)));

        group.join_all();
    }
    catch (std::exception& e) {
        std::cerr << e.what() << std::endl;
    }
}

结束语

有趣的是,在大多数情况下,您会看到单线程版本的性能一样好,没有理由使设计复杂化。

或者,您可以使用单线程 io_service专用于 IO,如果这确实是 CPU 密集型部分,则使用老式的工作池来对请求进行后台处理。首先,这简化了设计,其次,这可能会 boost IO 任务的吞吐量,因为不再需要协调发布在链上的任务。

关于c++ - boost::asio async_receive_from UDP 端点在线程之间共享?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26703583/

有关c++ - boost::asio async_receive_from UDP 端点在线程之间共享?的更多相关文章

  1. ruby-on-rails - Rails 应用程序之间的通信 - 2

    我构建了两个需要相互通信和发送文件的Rails应用程序。例如,一个Rails应用程序会发送请求以查看其他应用程序数据库中的表。然后另一个应用程序将呈现该表的json并将其发回。我还希望一个应用程序将存储在其公共(public)目录中的文本文件发送到另一个应用程序的公共(public)目录。我从来没有做过这样的事情,所以我什至不知道从哪里开始。任何帮助,将不胜感激。谢谢! 最佳答案 无论Rails是什么,几乎所有Web应用程序都有您的要求,大多数现代Web应用程序都需要相互通信。但是有一个小小的理解需要你坚持下去,网站不应直接访问彼此

  2. ruby-on-rails - 如何优雅地重启 thin + nginx? - 2

    我的瘦服务器配置了nginx,我的ROR应用程序正在它们上运行。在我发布代码更新时运行thinrestart会给我的应用程序带来一些停机时间。我试图弄清楚如何优雅地重启正在运行的Thin实例,但找不到好的解决方案。有没有人能做到这一点? 最佳答案 #Restartjustthethinserverdescribedbythatconfigsudothin-C/etc/thin/mysite.ymlrestartNginx将继续运行并代理请求。如果您将Nginx设置为使用多个上游服务器,例如server{listen80;server

  3. ruby - 通过 ruby​​ 进程共享变量 - 2

    我正在编写一个gem,我必须在其中fork两个启动两个webrick服务器的进程。我想通过基类的类方法启动这个服务器,因为应该只有这两个服务器在运行,而不是多个。在运行时,我想调用这两个服务器上的一些方法来更改变量。我的问题是,我无法通过基类的类方法访问fork的实例变量。此外,我不能在我的基类中使用线程,因为在幕后我正在使用另一个不是线程安全的库。所以我必须将每个服务器派生到它自己的进程。我用类变量试过了,比如@@server。但是当我试图通过基类访问这个变量时,它是nil。我读到在Ruby中不可能在分支之间共享类变量,对吗?那么,还有其他解决办法吗?我考虑过使用单例,但我不确定这是

  4. ruby - #之间? Cooper 的 *Beginning Ruby* 中的错误或异常 - 2

    在Cooper的书BeginningRuby中,第166页有一个我无法重现的示例。classSongincludeComparableattr_accessor:lengthdef(other)@lengthother.lengthenddefinitialize(song_name,length)@song_name=song_name@length=lengthendenda=Song.new('Rockaroundtheclock',143)b=Song.new('BohemianRhapsody',544)c=Song.new('MinuteWaltz',60)a.betwee

  5. ruby-on-rails - `a ||= b` 和 `a = b if a.nil 之间的区别? - 2

    我正在检查一个Rails项目。在ERubyHTML模板页面上,我看到了这样几行:我不明白为什么不这样写:在这种情况下,||=和ifnil?有什么区别? 最佳答案 在这种特殊情况下没有区别,但可能是出于习惯。每当我看到nil?被使用时,它几乎总是使用不当。在Ruby中,很少有东西在逻辑上是假的,只有文字false和nil是。这意味着像if(!x.nil?)这样的代码几乎总是更好地表示为if(x)除非期望x可能是文字false。我会将其切换为||=false,因为它具有相同的结果,但这在很大程度上取决于偏好。唯一的缺点是赋值会在每次运行

  6. ruby - 使用 `+=` 和 `send` 方法 - 2

    如何将send与+=一起使用?a=20;a.send"+=",10undefinedmethod`+='for20:Fixnuma=20;a+=10=>30 最佳答案 恐怕你不能。+=不是方法,而是语法糖。参见http://www.ruby-doc.org/docs/ProgrammingRuby/html/tut_expressions.html它说Incommonwithmanyotherlanguages,Rubyhasasyntacticshortcut:a=a+2maybewrittenasa+=2.你能做的最好的事情是:

  7. UE4 源码阅读:从引擎启动到Receive Begin Play - 2

    一、引擎主循环UE版本:4.27一、引擎主循环的位置:Launch.cpp:GuardedMain函数二、、GuardedMain函数执行逻辑:1、EnginePreInit:加载大多数模块int32ErrorLevel=EnginePreInit(CmdLine);PreInit模块加载顺序:模块加载过程:(1)注册模块中定义的UObject,同时为每个类构造一个类默认对象(CDO,记录类的默认状态,作为模板用于子类实例创建)(2)调用模块的StartUpModule方法2、FEngineLoop::Init()1、检查Engine的配置文件找出使用了哪一个GameEngine类(UGame

  8. [工业相机] 分辨率、精度和公差之间的关系 - 2

    📢博客主页:https://blog.csdn.net/weixin_43197380📢欢迎点赞👍收藏⭐留言📝如有错误敬请指正!📢本文由Loewen丶原创,首发于CSDN,转载注明出处🙉📢现在的付出,都会是一种沉淀,只为让你成为更好的人✨文章预览:一.分辨率(Resolution)1、工业相机的分辨率是如何定义的?2、工业相机的分辨率是如何选择的?二.精度(Accuracy)1、像素精度(PixelAccuracy)2、定位精度和重复定位精度(RepeatPrecision)三.公差(Tolerance)四.课后作业(Post-ClassExercises)视觉行业的初学者,甚至是做了1~2年

  9. ruby - 如何计算 Liquid 中的变量 +1 - 2

    我对如何计算通过{%assignvar=0%}赋值的变量加一完全感到困惑。这应该是最简单的任务。到目前为止,这是我尝试过的:{%assignamount=0%}{%forvariantinproduct.variants%}{%assignamount=amount+1%}{%endfor%}Amount:{{amount}}结果总是0。也许我忽略了一些明显的东西。也许有更好的方法。我想要存档的只是获取运行的迭代次数。 最佳答案 因为{{incrementamount}}将输出您的变量值并且不会影响{%assign%}定义的变量,我

  10. ruby - 无法理解 `puts{}.class` 和 `puts({}.class)` 之间的区别 - 2

    由于匿名block和散列block看起来大致相同。我正在玩它。我做了一些严肃的观察,如下所示:{}.class#=>Hash好的,这很酷。空block被视为Hash。print{}.class#=>NilClassputs{}.class#=>NilClass为什么上面的代码和NilClass一样,下面的代码又显示了Hash?puts({}.class)#Hash#=>nilprint({}.class)#Hash=>nil谁能帮我理解上面发生了什么?我完全不同意@Lindydancer的观点你如何解释下面几行:print{}.class#NilClassprint[].class#A

随机推荐