草庐IT

c++ - Boost::Beast:带有 websocket 流水线的服务器

coder 2024-02-23 原文

我正在编写一个带有 boost beast 1.70 和 mysql 8 C 连接器的 c++ websocket 服务器。服务器将同时连接多个客户端。特殊之处在于每个客户端将向服务器连续执行 100 个 websocket 请求。我的服务器的每个请求都是“CPU 轻型”,但服务器对每个请求执行“时间繁重”的 SQL 请求。

我已经使用 websocket_server_coro.cpp 示例启动了我的服务器。服务器步骤是:

1) websocket 读取

2) 一个sql请求

3) websocket 写入

问题是对于给定的用户,服务器在第 2 步被“锁定”,并且在这一步和第 3 步完成之前无法读取。因此,这 100 个请求是按顺序解决的。这对我的用例来说太慢了。

我已经读到,非阻塞读/写对于 boost beast 是不可能的。但是,我现在要做的是在协程中执行 async_read 和 async_write。

void ServerCoro::accept(websocket::stream<beast::tcp_stream> &ws) {
    beast::error_code ec;

    ws.set_option(websocket::stream_base::timeout::suggested(beast::role_type::server));

    ws.set_option(websocket::stream_base::decorator([](websocket::response_type &res) {
                res.set(http::field::server, std::string(BOOST_BEAST_VERSION_STRING) + " websocket-Server-coro");
            }));

    ws.async_accept(yield[ec]);
    if (ec) return fail(ec, "accept");

    while (!_bStop) {
        beast::flat_buffer buffer;
        ws.async_read(buffer, yield[ec]);

        if (ec == websocket::error::closed) {
            std::cout << "=> get closed" << std::endl;
            return;
        }

        if (ec) return fail(ec, "read");

        auto buffer_str = new std::string(boost::beast::buffers_to_string(buffer.cdata()));
        net::post([&, buffer_str] {

            // sql async request such as :
            // while (status == (mysql_real_query_nonblocking(this->con, sqlRequest.c_str(), sqlRequest.size()))) {
            //    ioc.poll_one(ec);
            // }
            // more sql ...

            ws.async_write(net::buffer(worker->getResponse()), yield[ec]); // this line is throwing void boost::coroutines::detail::pull_coroutine_impl<void>::pull(): Assertion `! is_running()' failed.
            if (ec) return fail(ec, "write");

        });
    }
}

问题是带有 async_write 的行抛出一个错误:

void boost::coroutines::detail::pull_coroutine_impl::pull(): 断言`! is_running()' 失败。

如果用 sync_write 替换此行,它可以工作,但服务器对给定用户保持顺序。 我试图在单线程服务器上执行此代码。我还尝试对 async_read 和 async_write 使用相同的链。仍然有断言错误。

对于 websocket 的 boost beast 来说这样的服务器是不可能的吗? 谢谢。

最佳答案

根据 Vinnie Falco 的建议,我以“websocket 聊天”和“异步服务器”为例重写了代码。这是代码的最终工作结果:

void Session::on_read(beast::error_code ec, std::size_t bytes_transferred)
{
    boost::ignore_unused(bytes_transferred);

    if(ec == websocket::error::closed) return;  // This indicates that the Session was closed
    if(ec) return fail(ec, "read");

    net::post([&, that = shared_from_this(), ss = std::make_shared<std::string const>(std::move(boost::beast::buffers_to_string(_buffer.cdata())))] {
        /* Sql things that call ioc.poll_one(ec) HERE, for me the sql response go inside worker.getResponse() used below */

        net::dispatch(_wsStrand, [&, that = shared_from_this(), sss = std::make_shared < std::string const>(worker.getResponse())] {
            async_write(sss);
        });
    });
    _buffer.consume(_buffer.size()); // we remove from the buffer what we just read
    do_read(); // go for another read
}

void Session::async_write(const std::shared_ptr<std::string const> &message) {
    _writeMessages.push_back(message);

    if (_writeMessages.size() > 1) {
        BOOST_LOG_TRIVIAL(warning) << "WRITE IS LOCKED";
    } else {
        _ws.text(_ws.got_text());
            _ws.async_write(net::buffer(*_writeMessages.front()), boost::asio::bind_executor(_wsStrand, beast::bind_front_handler(
                    &Session::on_write, this)));
    }
}

void Session::on_write(beast::error_code ec, std::size_t)
{
    // Handle the error, if any
    if(ec) return fail(ec, "write");

    // Remove the string from the queue
    _writeMessages.erase(_writeMessages.begin());

    // Send the next message if any
    if(!_writeMessages.empty())
        _ws.async_write(net::buffer(*_writeMessages.front()), boost::asio::bind_executor(_wsStrand, beast::bind_front_handler(
                        &Session::on_write, this)));
}

谢谢。

关于c++ - Boost::Beast:带有 websocket 流水线的服务器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56222201/

有关c++ - Boost::Beast:带有 websocket 流水线的服务器的更多相关文章

  1. ruby - 具有身份验证的私有(private) Ruby Gem 服务器 - 2

    我想安装一个带有一些身份验证的私有(private)Rubygem服务器。我希望能够使用公共(public)Ubuntu服务器托管内部gem。我读到了http://docs.rubygems.org/read/chapter/18.但是那个没有身份验证-如我所见。然后我读到了https://github.com/cwninja/geminabox.但是当我使用基本身份验证(他们在他们的Wiki中有)时,它会提示从我的服务器获取源。所以。如何制作带有身份验证的私有(private)Rubygem服务器?这是不可能的吗?谢谢。编辑:Geminabox问题。我尝试“捆绑”以安装新的gem..

  2. ruby-on-rails - 如何优雅地重启 thin + nginx? - 2

    我的瘦服务器配置了nginx,我的ROR应用程序正在它们上运行。在我发布代码更新时运行thinrestart会给我的应用程序带来一些停机时间。我试图弄清楚如何优雅地重启正在运行的Thin实例,但找不到好的解决方案。有没有人能做到这一点? 最佳答案 #Restartjustthethinserverdescribedbythatconfigsudothin-C/etc/thin/mysite.ymlrestartNginx将继续运行并代理请求。如果您将Nginx设置为使用多个上游服务器,例如server{listen80;server

  3. ruby-on-rails - 启动 Rails 服务器时 ImageMagick 的警告 - 2

    最近,当我启动我的Rails服务器时,我收到了一长串警告。虽然它不影响我的应用程序,但我想知道如何解决这些警告。我的估计是imagemagick以某种方式被调用了两次?当我在警告前后检查我的git日志时。我想知道如何解决这个问题。-bcrypt-ruby(3.1.2)-better_errors(1.0.1)+bcrypt(3.1.7)+bcrypt-ruby(3.1.5)-bcrypt(>=3.1.3)+better_errors(1.1.0)bcrypt和imagemagick有关系吗?/Users/rbchris/.rbenv/versions/2.0.0-p247/lib/ru

  4. ruby-on-rails - s3_direct_upload 在生产服务器中不工作 - 2

    在Rails4.0.2中,我使用s3_direct_upload和aws-sdkgems直接为s3存储桶上传文件。在开发环境中它工作正常,但在生产环境中它会抛出如下错误,ActionView::Template::Error(noimplicitconversionofnilintoString)在View中,create_cv_url,:id=>"s3_uploader",:key=>"cv_uploads/{unique_id}/${filename}",:key_starts_with=>"cv_uploads/",:callback_param=>"cv[direct_uplo

  5. ruby-on-rails - 在 Rails 中调试生产服务器 - 2

    您如何在Rails中的实时服务器上进行有效调试,无论是在测试版/生产服务器上?我试过直接在服务器上修改文件,然后重启应用,但是修改好像没有生效,或者需要很长时间(缓存?)我也试过在本地做“脚本/服务器生产”,但是那很慢另一种选择是编码和部署,但效率很低。有人对他们如何有效地做到这一点有任何见解吗? 最佳答案 我会回答你的问题,即使我不同意这种热修补服务器代码的方式:)首先,你真的确定你已经重启了服务器吗?您可以通过跟踪日志文件来检查它。您更改的代码显示的View可能会被缓存。缓存页面位于tmp/cache文件夹下。您可以尝试手动删除

  6. ruby - 使用 `+=` 和 `send` 方法 - 2

    如何将send与+=一起使用?a=20;a.send"+=",10undefinedmethod`+='for20:Fixnuma=20;a+=10=>30 最佳答案 恐怕你不能。+=不是方法,而是语法糖。参见http://www.ruby-doc.org/docs/ProgrammingRuby/html/tut_expressions.html它说Incommonwithmanyotherlanguages,Rubyhasasyntacticshortcut:a=a+2maybewrittenasa+=2.你能做的最好的事情是:

  7. ruby-on-rails - 带有 Zeus 的 RSpec 3.1,我应该在 spec_helper 中要求 'rspec/rails' 吗? - 2

    使用rspec-rails3.0+,测试设置分为spec_helper和rails_helper我注意到生成的spec_helper不需要'rspec/rails'。这会导致zeus崩溃:spec_helper.rb:5:in`':undefinedmethod`configure'forRSpec:Module(NoMethodError)对thisissue最常见的回应是需要'rspec/rails'。但这是否会破坏仅使用spec_helper拆分rails规范和PORO规范的全部目的?或者这无关紧要,因为Zeus无论如何都会预加载Rails?我应该在我的spec_helper中做

  8. ruby - 如何计算 Liquid 中的变量 +1 - 2

    我对如何计算通过{%assignvar=0%}赋值的变量加一完全感到困惑。这应该是最简单的任务。到目前为止,这是我尝试过的:{%assignamount=0%}{%forvariantinproduct.variants%}{%assignamount=amount+1%}{%endfor%}Amount:{{amount}}结果总是0。也许我忽略了一些明显的东西。也许有更好的方法。我想要存档的只是获取运行的迭代次数。 最佳答案 因为{{incrementamount}}将输出您的变量值并且不会影响{%assign%}定义的变量,我

  9. ruby - 我的 Ruby IRC 机器人没有连接到 IRC 服务器。我究竟做错了什么? - 2

    require"socket"server="irc.rizon.net"port="6667"nick="RubyIRCBot"channel="#0x40"s=TCPSocket.open(server,port)s.print("USERTesting",0)s.print("NICK#{nick}",0)s.print("JOIN#{channel}",0)这个IRC机器人没有连接到IRC服务器,我做错了什么? 最佳答案 失败并显示此消息::irc.shakeababy.net461*USER:Notenoughparame

  10. Ruby:如何使用带有散列的 'send' 方法调用方法? - 2

    假设我有一个类A,里面有一些方法。假设stringmethodName是这些方法之一,我已经知道我想给它什么参数。它们在散列中{'param1'=>value1,'param2'=>value2}所以我有:params={'param1'=>value1,'param2'=>value2}a=A.new()a.send(methodName,value1,value2)#callmethodnamewithbothparams我希望能够通过传递我的哈希以某种方式调用该方法。这可能吗? 最佳答案 确保methodName是一个符号,而

随机推荐