草庐IT

c++ - MySQL异步?

coder 2023-10-10 原文

我基本上面临着阻塞问题。 我的服务器基于 C++ Boost.ASIO 使用 8 个线程进行编码,因为服务器有 8 个逻辑核心。

我的问题是一个线程在 MySQL 查询上可能会面临 0.2~1.5 秒的阻塞,老实说我不知道​​如何解决这个问题,因为 MySQL C++ 连接器不支持异步查询,而且我不知道如何解决“正确”设计服务器以使用多个线程进行查询。

这是我征求有关在这种情况下该怎么做的意见的地方。 为异步查询sql创建100个线程? 我可以听取专家对此的意见吗?

最佳答案

好的,正确的解决方案是扩展 Asio 并编写一个 mysql_service 实现来集成它。我差点要去 find out how this is done right away ,但我想开始使用“仿真”。

想法是有

  • 您使用 io_service 的业务流程(正如您已经在做的那样)
  • 一个数据库“门面”接口(interface),将异步查询分派(dispatch)到一个不同的队列 (io_service) 并将完成处理程序发送回 business_process io_service

这里需要进行细微的调整,您需要防止业务流程端的 io_service 在其作业队列为空时立即关闭,因为它可能仍在等待数据库层的响应。

因此,将其建模为一个快速演示:

namespace database
{
    // data types
    struct sql_statement { std::string dml; };
    struct sql_response { std::string echo_dml; }; // TODO cover response codes, resultset data etc.

我希望你能原谅我粗略的简化:/

struct service
{
    service(unsigned max_concurrent_requests = 10)
        : work(io_service::work(service_)),
        latency(mt19937(), uniform_int<int>(200, 1500)) // random 0.2 ~ 1.5s
    {
        for (unsigned i = 0; i < max_concurrent_requests; ++i)
            svc_threads.create_thread(boost::bind(&io_service::run, &service_));
    }

    friend struct connection;

private:
    void async_query(io_service& external, sql_statement query, boost::function<void(sql_response response)> completion_handler)
    {
        service_.post(bind(&service::do_async_query, this, ref(external), std::move(query), completion_handler));
    }

    void do_async_query(io_service& external, sql_statement q, boost::function<void(sql_response response)> completion_handler)
    {
        this_thread::sleep_for(chrono::milliseconds(latency())); // simulate the latency of a db-roundtrip

        external.post(bind(completion_handler, sql_response { q.dml }));
    }

    io_service service_;
    thread_group svc_threads; // note the order of declaration
    optional<io_service::work> work;

    // for random delay
    random::variate_generator<mt19937, uniform_int<int> > latency;
};

该服务协调最大数量的并发请求(在“数据库 io_service”端)并将完成的 ping/pong 返回到另一个 io_service(async_query/do_async_query 组合)。这个 stub 实现以明显的方式模拟了 0.2~1.5 秒的延迟:)

现在是客户端“门面”

struct connection
{
    connection(int connection_id, io_service& external, service& svc)
        : connection_id(connection_id),
          external_(external), 
          db_service_(svc)
    { }

    void async_query(sql_statement query, boost::function<void(sql_response response)> completion_handler)
    {
        db_service_.async_query(external_, std::move(query), completion_handler);
    }
  private:
    int connection_id;
    io_service& external_;
    service& db_service_;
};

connection 实际上只是一种便利,因此我们不必显式处理调用站点上的各种队列。

现在,让我们用古老的 Asio 风格实现一个演示业务流程:

namespace domain
{
    struct business_process : id_generator
    {
        business_process(io_service& app_service, database::service& db_service_) 
            : id(generate_id()), phase(0), 
            in_progress(io_service::work(app_service)),
            db(id, app_service, db_service_)
        { 
            app_service.post([=] { start_select(); });
        }

    private:
        int id, phase;
        optional<io_service::work> in_progress;

        database::connection db;

        void start_select() {
            db.async_query({ "select * from tasks where completed = false" }, [=] (database::sql_response r) { handle_db_response(r); });
        }

        void handle_db_response(database::sql_response r) {
            if (phase++ < 4)
            {
                if ((id + phase) % 3 == 0) // vary the behaviour slightly
                {
                    db.async_query({ "insert into tasks (text, completed) values ('hello', false)" }, [=] (database::sql_response r) { handle_db_response(r); });
                } else
                {
                    db.async_query({ "update * tasks set text = 'update' where id = 123" }, [=] (database::sql_response r) { handle_db_response(r); });
                }
            } else
            {
                in_progress.reset();
                lock_guard<mutex> lk(console_mx);
                std::cout << "business_process " << id << " has completed its work\n";
            }
        }
    };

}

此业务流程首先将其自身发布到应用服务上。然后它会连续执行一些数据库查询,并最终退出(通过执行 in_progress.reset(),应用服务会意识到这一点)。

一个演示 main,在一个线程上启动 10 个业务流程:

int main()
{
    io_service app;
    database::service db;

    ptr_vector<domain::business_process> bps;
    for (int i = 0; i < 10; ++i)
    {
        bps.push_back(new domain::business_process(app, db));
    }

    app.run();
}

在我的示例中,business_processes 不执行任何 CPU 密集型工作,因此跨 CPU 调度它们没有用处,但如果您愿意,可以通过替换 app.run() 符合:

thread_group g;
for (unsigned i = 0; i < thread::hardware_concurrency(); ++i)
    g.create_thread(boost::bind(&io_service::run, &app));
g.join_all();

查看正在运行的演示 Live On Coliru

关于c++ - MySQL异步?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23310511/

有关c++ - MySQL异步?的更多相关文章

  1. ruby-on-rails - 如何在 ruby​​ 中使用两个参数异步运行 exe? - 2

    exe应该在我打开页面时运行。异步进程需要运行。有什么方法可以在ruby​​中使用两个参数异步运行exe吗?我已经尝试过ruby​​命令-system()、exec()但它正在等待过程完成。我需要用参数启动exe,无需等待进程完成是否有任何ruby​​gems会支持我的问题? 最佳答案 您可以使用Process.spawn和Process.wait2:pid=Process.spawn'your.exe','--option'#Later...pid,status=Process.wait2pid您的程序将作为解释器的子进程执行。除

  2. ruby-on-rails - 如何优雅地重启 thin + nginx? - 2

    我的瘦服务器配置了nginx,我的ROR应用程序正在它们上运行。在我发布代码更新时运行thinrestart会给我的应用程序带来一些停机时间。我试图弄清楚如何优雅地重启正在运行的Thin实例,但找不到好的解决方案。有没有人能做到这一点? 最佳答案 #Restartjustthethinserverdescribedbythatconfigsudothin-C/etc/thin/mysite.ymlrestartNginx将继续运行并代理请求。如果您将Nginx设置为使用多个上游服务器,例如server{listen80;server

  3. ruby - 使用 `+=` 和 `send` 方法 - 2

    如何将send与+=一起使用?a=20;a.send"+=",10undefinedmethod`+='for20:Fixnuma=20;a+=10=>30 最佳答案 恐怕你不能。+=不是方法,而是语法糖。参见http://www.ruby-doc.org/docs/ProgrammingRuby/html/tut_expressions.html它说Incommonwithmanyotherlanguages,Rubyhasasyntacticshortcut:a=a+2maybewrittenasa+=2.你能做的最好的事情是:

  4. 使用canal同步MySQL数据到ES - 2

    文章目录一、概述简介原理模块二、配置Mysql使用版本环境要求1.操作系统2.mysql要求三、配置canal-server离线下载在线下载上传解压修改配置单机配置集群配置分库分表配置1.修改全局配置2.实例配置垂直分库水平分库3.修改group-instance.xml4.启动监听四、配置canal-adapter1修改启动配置2配置映射文件3启动ES数据同步查询所有订阅同步数据同步开关启动4.验证五、配置canal-admin一、概述简介canal是Alibaba旗下的一款开源项目,Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。Git地址:https://github.co

  5. ruby - 如何计算 Liquid 中的变量 +1 - 2

    我对如何计算通过{%assignvar=0%}赋值的变量加一完全感到困惑。这应该是最简单的任务。到目前为止,这是我尝试过的:{%assignamount=0%}{%forvariantinproduct.variants%}{%assignamount=amount+1%}{%endfor%}Amount:{{amount}}结果总是0。也许我忽略了一些明显的东西。也许有更好的方法。我想要存档的只是获取运行的迭代次数。 最佳答案 因为{{incrementamount}}将输出您的变量值并且不会影响{%assign%}定义的变量,我

  6. ruby-on-rails - 在 Ruby on Rails 中发送响应之前如何等待多个异步操作完成? - 2

    在我做的一些网络开发中,我有多个操作开始,比如对外部API的GET请求,我希望它们同时开始,因为一个不依赖另一个的结果。我希望事情能够在后台运行。我找到了concurrent-rubylibrary这似乎运作良好。通过将其混合到您创建的类中,该类的方法具有在后台线程上运行的异步版本。这导致我编写如下代码,其中FirstAsyncWorker和SecondAsyncWorker是我编写的类,我在其中混合了Concurrent::Async模块,并编写了一个名为“work”的方法来发送HTTP请求:defindexop1_result=FirstAsyncWorker.new.async.

  7. ruby-on-rails - 无法安装 mysql2 0.3.14 gem - 2

    我看到其他人也遇到过类似的问题,但没有一个解决方案对我有用。0.3.14gem与其他gem文件一起存在。我已经完全按照此处指示完成了所有操作:https://github.com/brianmario/mysql2.我仍然得到以下信息。我不知道为什么安装程序指示它找不到include目录,因为我已经检查过它存在。thread.h文件存在,但不在ruby​​目录中。相反,它在这里:C:\RailsInstaller\DevKit\lib\perl5\5.8\msys\CORE\我正在运行Windows7并尝试在Aptana3中构建我的Rails项目。我的Ruby是1.9.3。$gemin

  8. arrays - Ruby 数组 += vs 推送 - 2

    我有一个数组数组,想将元素附加到子数组。+=做我想做的,但我想了解为什么push不做。我期望的行为(并与+=一起工作):b=Array.new(3,[])b[0]+=["apple"]b[1]+=["orange"]b[2]+=["frog"]b=>[["苹果"],["橙子"],["Frog"]]通过推送,我将推送的元素附加到每个子数组(为什么?):a=Array.new(3,[])a[0].push("apple")a[1].push("orange")a[2].push("frog")a=>[[“苹果”、“橙子”、“Frog”]、[“苹果”、“橙子”、“Frog”]、[“苹果”、“

  9. ruby - 如何使用 ruby​​ mysql2 执行事务 - 2

    我已经开始使用mysql2gem。我试图弄清楚一些基本的事情——其中之一是如何明确地执行事务(对于批处理操作,比如多个INSERT/UPDATE查询)。在旧的ruby-mysql中,这是我的方法:client=Mysql.real_connect(...)inserts=["INSERTINTO...","UPDATE..WHEREid=..",#etc]client.autocommit(false)inserts.eachdo|ins|beginclient.query(ins)rescue#handleerrorsorabortentirelyendendclient.commi

  10. += 的 Ruby 方法 - 2

    有没有办法让Ruby能够做这样的事情?classPlane@moved=0@x=0defx+=(v)#thisiserror@x+=v@moved+=1enddefto_s"moved#{@moved}times,currentxis#{@x}"endendplane=Plane.newplane.x+=5plane.x+=10putsplane.to_s#moved2times,currentxis15 最佳答案 您不能在Ruby中覆盖复合赋值运算符。任务在内部处理。您应该覆盖+,而不是+=。plane.a+=b与plane.a=

随机推荐