草庐IT

c++ - boost::asio 错误?销毁io_service之前的task_io_service

coder 2024-02-12 原文

我在我的代码中发现了这个奇怪的错误。这是我设法完成的自包含测试用例。

#include <memory>
#include <thread>
#include <stack>
#include <system_error>

#include <boost/asio.hpp>

using boost::asio::io_service;
using std::placeholders::_1;

class async_service
{
public:
    async_service();
    async_service(size_t number_threads);
    ~async_service();

    async_service(const async_service&) = delete;
    void operator=(const async_service&) = delete;

    void spawn();
    void shutdown();

    io_service& get_service();
    const io_service& get_service() const;

private:
    io_service service_;
    io_service::work* work_;
    std::vector<std::thread> threads_;
};

async_service::async_service()
  : work_(nullptr)
{
}

async_service::async_service(size_t number_threads)
  : work_(nullptr)
{
    for (size_t i = 0; i < number_threads; ++i)
        spawn();
}

async_service::~async_service()
{
    std::cout << __PRETTY_FUNCTION__ << std::endl;
    service_.stop();
    for (std::thread& t: threads_)
        t.join();
}

void run_service(io_service* service)
{
    service->run();
}

void async_service::spawn()
{
    if (!work_)
        work_ = new io_service::work(service_);
    threads_.push_back(std::thread(run_service, &service_));
}
void async_service::shutdown()
{
    delete work_;
    work_ = nullptr;
}

io_service& async_service::get_service()
{
    return service_;
}
const io_service& async_service::get_service() const
{
    return service_;
}

// --------------------------------------------------------------

template <typename... Args>
class subscriber
  : public std::enable_shared_from_this<subscriber<Args...>>
{
public:
    typedef std::function<void (Args...)> handler_type;
    typedef std::shared_ptr<subscriber<Args...>> ptr;

    subscriber(async_service& service)
      : strand_(service.get_service())
    {
    }

    void subscribe(handler_type handle)
    {
        strand_.dispatch(
            std::bind(&subscriber<Args...>::do_subscribe,
                this->shared_from_this(), handle));
    }

    void relay(Args... params)
    {
        strand_.dispatch(
            std::bind(&subscriber<Args...>::do_relay,
                this->shared_from_this(), std::forward<Args>(params)...));
    }

private:
    typedef std::stack<handler_type> registry_stack;

    void do_subscribe(handler_type handle)
    {
        registry_.push(handle);
    }

    void do_relay(Args... params)
    {
        registry_stack notify_copy = std::move(registry_);
        registry_ = registry_stack();
        while (!notify_copy.empty())
        {
            notify_copy.top()(params...);
            notify_copy.pop();
        }
        assert(notify_copy.empty());
    }

    io_service::strand strand_;
    registry_stack registry_;
};

// --------------------------------------------------------

class lala_channel_proxy
  : public std::enable_shared_from_this<lala_channel_proxy>
{
public:
    typedef std::function<void (const std::error_code&)> receive_inventory_handler;

    lala_channel_proxy(async_service& service)
      : strand_(service.get_service())
    {
        inventory_subscriber_ =
            std::make_shared<inventory_subscriber_type>(service);
    }

    void start()
    {
        read_header();
    }

    void subscribe_inventory(receive_inventory_handler handle_receive)
    {
        inventory_subscriber_->subscribe(handle_receive);
    }

    typedef subscriber<const std::error_code&> inventory_subscriber_type;

    void read_header()
    {
        strand_.post(
            std::bind(&lala_channel_proxy::handle_read_header,
                shared_from_this(), boost::system::error_code(), 0));
    }

    void handle_read_header(const boost::system::error_code& ec,
        size_t bytes_transferred)
    {
        std::cout << "inventory ----------" << std::endl;
        inventory_subscriber_->relay(std::error_code());
        sleep(1.0);
        read_header();
    }

    io_service::strand strand_;
    inventory_subscriber_type::ptr inventory_subscriber_;
};

typedef std::shared_ptr<lala_channel_proxy> lala_channel_proxy_ptr;

class lala_channel
{
public:
    lala_channel(async_service& service)
    {
        lala_channel_proxy_ptr proxy =
            std::make_shared<lala_channel_proxy>(service);
        proxy->start();
        //weak_proxy_ = proxy;
        strong_proxy_ = proxy;
    }
    void subscribe_inventory(
        lala_channel_proxy::receive_inventory_handler handle_receive)
    {
        lala_channel_proxy_ptr proxy = strong_proxy_;
        proxy->subscribe_inventory(handle_receive);
    }
    lala_channel_proxy_ptr strong_proxy_;
    // Normally this has a weak pointer to the channel pimpl to allow
    // it to die, but whether it uses a weak_ptr or shared_ptr makes
    // no difference.
    //std::weak_ptr<channel_proxy> weak_proxy_;
};

typedef std::shared_ptr<lala_channel> lala_channel_ptr;
//typedef lala_channel_proxy_ptr lala_channel_ptr;

class session
  : public std::enable_shared_from_this<session>
{
public:
    typedef std::function<void (const std::error_code&)> completion_handler;

    session(async_service& service, async_service& mempool_service,
            async_service& disk_service)
      : strand_(service.get_service()),
        txpool_strand_(mempool_service.get_service()),
        chain_strand_(disk_service.get_service()), service_(service)
    {
    }

    void start()
    {
        auto this_ptr = shared_from_this();
        lala_channel_ptr node =
            std::make_shared<lala_channel>(service_);
        node->subscribe_inventory(
            std::bind(&session::inventory, this_ptr, _1, node));
        for (size_t i = 0; i < 500; ++i)
        {
            chain_strand_.post(
                []()
                {
                    std::cout << "HERE!" << std::endl;
                    sleep(2);
                });
        }
    }

private:
    void inventory(const std::error_code& ec, lala_channel_ptr node)
    {
        if (ec)
        {
            std::cerr << ec.message() << std::endl;
            return;
        }
        auto this_ptr = shared_from_this();
        txpool_strand_.post([]() {});
        node->subscribe_inventory(
            std::bind(&session::inventory, this_ptr, _1, node));
    }

    async_service& service_;
    io_service::strand txpool_strand_, strand_, chain_strand_;
};

int main()
{
    // First level
    {
        // Bug only happens for this ordering of async_service's
        // That means it is only triggered when they are destroyed in
        // this reverse order.
        async_service network_service(1), disk_service(1), mempool_service(1);
        //async_service network_service(1), mempool_service(1), disk_service(1);
        //async_service disk_service(1), mempool_service(1), network_service(1);
        //async_service disk_service(1), network_service(1), mempool_service(1);
        //async_service mempool_service(1), disk_service(1), network_service(1);
        //async_service mempool_service(1), network_service(1), disk_service(1);

        // Second level
        {
            // Should be kept alive by io_service
            auto s = std::make_shared<session>(network_service, mempool_service, disk_service);
            s->start();
        }
        //network_service.shutdown();
        //disk_service.shutdown();
        //mempool_service.shutdown();
        sleep(3);
    // Never gets past here
    }
    std::cout << "Exiting..." << std::endl;
    return 0;
}

当我运行它时,我得到这个:

$ g++ -std=c++0x /tmp/ideone_y6OlI.cpp -lboost_system -pthread -ggdb
$ gdb a.out 
GNU gdb (Ubuntu/Linaro 7.4-2012.04-0ubuntu2) 7.4-2012.04
Copyright (C) 2012 Free Software Foundation, Inc.
License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>
This is free software: you are free to change and redistribute it.
There is NO WARRANTY, to the extent permitted by law.  Type "show copying"
and "show warranty" for details.
This GDB was configured as "x86_64-linux-gnu".
For bug reporting instructions, please see:
<http://bugs.launchpad.net/gdb-linaro/>...
Reading symbols from /home/genjix/src/brokenlibbtc/a.out...done.
(gdb) r
Starting program: /home/genjix/src/brokenlibbtc/a.out 
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".
[New Thread 0x7ffff6deb700 (LWP 28098)]
[New Thread 0x7ffff65ea700 (LWP 28099)]
[New Thread 0x7ffff5de9700 (LWP 28100)]
inventory ----------
HERE!
inventory ----------
HERE!
inventory ----------
async_service::~async_service()
async_service::~async_service()
[Thread 0x7ffff5de9700 (LWP 28100) exited]

Program received signal SIGSEGV, Segmentation fault.
[Switching to Thread 0x7ffff6deb700 (LWP 28098)]
0x0000000000405873 in boost::asio::detail::task_io_service::wake_one_idle_thread_and_unlock (this=0x6255e0, lock=...) at /usr/include/boost/asio/detail/impl/task_io_service.ipp:461
461     first_idle_thread_ = idle_thread->next;

boost 1.48 和 1.49 也是一样。

我想知道为什么会这样。它只发生在这种高度特殊的配置中。如果我更改任何内容,则不会出现错误。

async_service 是 io_service 的便捷包装器。奇怪的是,如果我将 io_service 更改为 *io_service 并且不删除 io_service 那么错误就不会发生......但这应该无关紧要吧?

如果您查看 main() 中的源代码,会创建 3 个 async_service 对象。它们中的每一个都管理单个 io_service 的生命周期。

        // Bug only happens for this ordering of async_service's
        // That means it is only triggered when they are destroyed in
        // this reverse order.
        async_service network_service(1), disk_service(1), mempool_service(1);
        //async_service network_service(1), mempool_service(1), disk_service(1);
        //async_service disk_service(1), mempool_service(1), network_service(1);
        //async_service disk_service(1), network_service(1), mempool_service(1);
        //async_service mempool_service(1), disk_service(1), network_service(1);
        //async_service mempool_service(1), network_service(1), disk_service(1);

订阅者类表示订阅...调用特定事件的事物。 session 和 channel 是从一个更大的程序改编而来的,所以它们可能看起来很困惑/令人困惑。

最佳答案

一个问题是 session::inventory,当从构造函数的第一个参数下的线程执行时(network_service 在失败的情况下),试图访问一个使用第二个参数 (mempool_service) 初始化的链。

void inventory(const std::error_code& ec, lala_channel_ptr node)
{
    if (ec)
    {
        std::cerr << ec.message() << std::endl;
        return;
    }
    auto this_ptr = shared_from_this();
    txpool_strand_.post([]() {}); // <-- one problem is here.
    node->subscribe_inventory(
        std::bind(&session::inventory, this_ptr, _1, node));
}

按照销毁的顺序,mempool_service已经被销毁了,在post执行的过程中,那里的访问会在某处失败。

关于c++ - boost::asio 错误?销毁io_service之前的task_io_service,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10902761/

有关c++ - boost::asio 错误?销毁io_service之前的task_io_service的更多相关文章

  1. ruby-on-rails - Rails 常用字符串(用于通知和错误信息等) - 2

    大约一年前,我决定确保每个包含非唯一文本的Flash通知都将从模块中的方法中获取文本。我这样做的最初原因是为了避免一遍又一遍地输入相同的字符串。如果我想更改措辞,我可以在一个地方轻松完成,而且一遍又一遍地重复同一件事而出现拼写错误的可能性也会降低。我最终得到的是这样的:moduleMessagesdefformat_error_messages(errors)errors.map{|attribute,message|"Error:#{attribute.to_s.titleize}#{message}."}enddeferror_message_could_not_find(obje

  2. ruby-on-rails - 如何优雅地重启 thin + nginx? - 2

    我的瘦服务器配置了nginx,我的ROR应用程序正在它们上运行。在我发布代码更新时运行thinrestart会给我的应用程序带来一些停机时间。我试图弄清楚如何优雅地重启正在运行的Thin实例,但找不到好的解决方案。有没有人能做到这一点? 最佳答案 #Restartjustthethinserverdescribedbythatconfigsudothin-C/etc/thin/mysite.ymlrestartNginx将继续运行并代理请求。如果您将Nginx设置为使用多个上游服务器,例如server{listen80;server

  3. ruby-on-rails - 迷你测试错误 : "NameError: uninitialized constant" - 2

    我遵循MichaelHartl的“RubyonRails教程:学习Web开发”,并创建了检查用户名和电子邮件长度有效性的测试(名称最多50个字符,电子邮件最多255个字符)。test/helpers/application_helper_test.rb的内容是:require'test_helper'classApplicationHelperTest在运行bundleexecraketest时,所有测试都通过了,但我看到以下消息在最后被标记为错误:ERROR["test_full_title_helper",ApplicationHelperTest,1.820016791]test

  4. ruby-on-rails - 如何在 Rails View 上显示错误消息? - 2

    我是rails的新手,想在form字段上应用验证。myviewsnew.html.erb.....模拟.rbclassSimulation{:in=>1..25,:message=>'Therowmustbebetween1and25'}end模拟Controller.rbclassSimulationsController我想检查模型类中row字段的整数范围,如果不在范围内则返回错误信息。我可以检查上面代码的范围,但无法返回错误消息提前致谢 最佳答案 关键是您使用的是模型表单,一种显示ActiveRecord模型实例属性的表单。c

  5. 使用 ACL 调用 upload_file 时出现 Ruby S3 "Access Denied"错误 - 2

    我正在尝试编写一个将文件上传到AWS并公开该文件的Ruby脚本。我做了以下事情:s3=Aws::S3::Resource.new(credentials:Aws::Credentials.new(KEY,SECRET),region:'us-west-2')obj=s3.bucket('stg-db').object('key')obj.upload_file(filename)这似乎工作正常,除了该文件不是公开可用的,而且我无法获得它的公共(public)URL。但是当我登录到S3时,我可以正常查看我的文件。为了使其公开可用,我将最后一行更改为obj.upload_file(file

  6. ruby-on-rails - 错误 : Error installing pg: ERROR: Failed to build gem native extension - 2

    我克隆了一个rails仓库,我现在正尝试捆绑安装背景:OSXElCapitanruby2.2.3p173(2015-08-18修订版51636)[x86_64-darwin15]rails-v在您的Gemfile中列出的或native可用的任何gem源中找不到gem'pg(>=0)ruby​​'。运行bundleinstall以安装缺少的gem。bundleinstallFetchinggemmetadatafromhttps://rubygems.org/............Fetchingversionmetadatafromhttps://rubygems.org/...Fe

  7. ruby - #之间? Cooper 的 *Beginning Ruby* 中的错误或异常 - 2

    在Cooper的书BeginningRuby中,第166页有一个我无法重现的示例。classSongincludeComparableattr_accessor:lengthdef(other)@lengthother.lengthenddefinitialize(song_name,length)@song_name=song_name@length=lengthendenda=Song.new('Rockaroundtheclock',143)b=Song.new('BohemianRhapsody',544)c=Song.new('MinuteWaltz',60)a.betwee

  8. ruby - 如何验证 IO.copy_stream 是否成功 - 2

    这里有一个很好的答案解释了如何在Ruby中下载文件而不将其加载到内存中:https://stackoverflow.com/a/29743394/4852737require'open-uri'download=open('http://example.com/image.png')IO.copy_stream(download,'~/image.png')我如何验证下载文件的IO.copy_stream调用是否真的成功——这意味着下载的文件与我打算下载的文件完全相同,而不是下载一半的损坏文件?documentation说IO.copy_stream返回它复制的字节数,但是当我还没有下

  9. ruby-on-rails - 每次我尝试部署时,我都会得到 - (gcloud.preview.app.deploy) 错误响应 : [4] DEADLINE_EXCEEDED - 2

    我是Google云的新手,我正在尝试对其进行首次部署。我的第一个部署是RubyonRails项目。我基本上是在关注thisguideinthegoogleclouddocumentation.唯一的区别是我使用的是我自己的项目,而不是他们提供的“helloworld”项目。这是我的app.yaml文件runtime:customvm:trueentrypoint:bundleexecrackup-p8080-Eproductionconfig.ruresources:cpu:0.5memory_gb:1.3disk_size_gb:10当我转到我的项目目录并运行gcloudprevie

  10. ruby-on-rails - Rails 5 Active Record 记录无效错误 - 2

    我有两个Rails模型,即Invoice和Invoice_details。一个Invoice_details属于Invoice,一个Invoice有多个Invoice_details。我无法使用accepts_nested_attributes_forinInvoice通过Invoice模型保存Invoice_details。我收到以下错误:(0.2ms)BEGIN(0.2ms)ROLLBACKCompleted422UnprocessableEntityin25ms(ActiveRecord:4.0ms)ActiveRecord::RecordInvalid(Validationfa

随机推荐