草庐IT

c++ - Boost asio,单个 TCP 服务器,多个客户端

coder 2023-09-18 原文

我正在创建一个将使用 boost asio 的 TCP 服务器,它将接受来自许多客户端的连接、接收数据并发送确认。问题是我希望能够接受所有客户,但我一次只想与一个客户合作。我希望所有其他事务都保持在队列中。

例子:

  1. Client1 连接
  2. Client2 连接
  3. Client1发送数据并请求回复
  4. Client2发送数据并请求回复
  5. Client2的请求被放入队列
  6. Client1的数据被读取,服务器回复,事务结束
  7. Client2的请求从队列中取出,服务器读取数据,回复事务结束。

所以这是介于异步服务器和阻塞服务器之间的东西。我只想一次做一件事,但同时我希望能够将所有客户端套接字及其需求存储在队列中。

我能够使用我需要的所有功能创建服务器-客户端通信,但只能在单线程上进行。一旦客户端断开连接,服务器也会终止。我真的不知道如何开始实现我上面提到的内容。每次连接被接受时我应该打开新线程吗?我应该使用 async_accept 还是阻塞接受?

我读过 boost::asio 聊天示例,其中许多客户端连接到单个服务器,但这里没有我需要的排队机制。

我知道这篇文章可能有点令人困惑,但 TCP 服务器对我来说是新的,所以我对术语不够熟悉。也没有要发布的源代码,因为我只是寻求有关该项目概念的帮助。

最佳答案

继续接受。

你没有显示任何代码,但它通常看起来像

void do_accept() {
    acceptor_.async_accept(socket_, [this](boost::system::error_code ec) {
        std::cout << "async_accept -> " << ec.message() << "\n";
        if (!ec) {
            std::make_shared<Connection>(std::move(socket_))->start();
            do_accept(); // THIS LINE
        }
    });
}

如果您不包含标记为 //THIS LINE 的行,您确实不会接受超过 1 个连接。

如果这没有帮助,请包含一些我们可以使用的代码。

为了好玩,一个演示

对于非网络部分,这仅使用标准库功能。

网络监听器

网络部分如前所述:

#include <boost/asio.hpp>
#include <boost/asio/high_resolution_timer.hpp>
#include <istream>

using namespace std::chrono_literals;
using Clock = std::chrono::high_resolution_clock;

namespace Shared {
    using PostRequest = std::function<void(std::istream& is)>;
}

namespace Network {

    namespace ba = boost::asio;
    using ba::ip::tcp;
    using error_code = boost::system::error_code;

    using Shared::PostRequest;

    struct Connection : std::enable_shared_from_this<Connection> {
        Connection(tcp::socket&& s, PostRequest poster) : _s(std::move(s)), _poster(poster) {}

        void process() {
            auto self = shared_from_this();
            ba::async_read(_s, _request, [this,self](error_code ec, size_t) {
                if (!ec || ec == ba::error::eof) {
                    std::istream reader(&_request);
                    _poster(reader);
                }
            });
        }

      private:
        tcp::socket   _s;
        ba::streambuf _request;
        PostRequest   _poster;
    };

    struct Server {

        Server(unsigned port, PostRequest poster) : _port(port), _poster(poster) {}

        void run_for(Clock::duration d = 30s) {
            _stop.expires_from_now(d);
            _stop.async_wait([this](error_code ec) { if (!ec) _svc.post([this] { _a.close(); }); });

            _a.listen();

            do_accept();

            _svc.run();
        }
      private:
        void do_accept() {
            _a.async_accept(_s, [this](error_code ec) {
                if (!ec) {
                    std::make_shared<Connection>(std::move(_s), _poster)->process();
                    do_accept();
                }
            });
        }

        unsigned short            _port;
        PostRequest               _poster;

        ba::io_service            _svc;
        ba::high_resolution_timer _stop { _svc };
        tcp::acceptor             _a { _svc, tcp::endpoint {{}, _port } };
        tcp::socket               _s { _svc };
    };
}

工作服务部分的唯一“连接”是在构造时传递给服务器的 PostRequest 处理程序:

Network::Server server(6767, handler);

我还选择了异步操作,因此我们可以有一个计时器来停止服务,即使我们不使用任何线程也是如此:

server.run_for(3s); // this blocks

工作部分

这是完全独立的,将使用线程。首先,让我们定义一个Request,和一个线程安全的Queue:

namespace Service {
    struct Request {
        std::vector<char> data; // or whatever you read from the sockets...
    };

    Request parse_request(std::istream& is) {
        Request result;
        result.data.assign(std::istream_iterator<char>(is), {});
        return result;
    }

    struct Queue {
        Queue(size_t max = 50) : _max(max) {}

        void enqueue(Request req) {
            std::unique_lock<std::mutex> lk(mx);
            cv.wait(lk, [this] { return _queue.size() < _max; });
            _queue.push_back(std::move(req));

            cv.notify_one();
        }

        Request dequeue(Clock::time_point deadline) {
            Request req;

            {
                std::unique_lock<std::mutex> lk(mx);
                _peak = std::max(_peak, _queue.size());
                if (cv.wait_until(lk, deadline, [this] { return _queue.size() > 0; })) {
                    req = std::move(_queue.front());
                    _queue.pop_front();
                    cv.notify_one();
                } else {
                    throw std::range_error("dequeue deadline");
                }
            }

            return  req;
        }

        size_t peak_depth() const {
            std::lock_guard<std::mutex> lk(mx);
            return _peak;
        }

      private:
        mutable std::mutex mx;
        mutable std::condition_variable cv;

        size_t _max = 50;
        size_t _peak = 0;
        std::deque<Request> _queue;
    };

这没什么特别的,而且实际上还没有使用线程。让我们创建一个接受队列引用的辅助函数(如果需要,可以启动 1 个以上的辅助函数):

    void worker(std::string name, Queue& queue, Clock::duration d = 30s) {
        auto const deadline = Clock::now() + d;

        while(true) try {
            auto r = queue.dequeue(deadline);
            (std::cout << "Worker " << name << " handling request '").write(r.data.data(), r.data.size()) << "'\n";
        }
        catch(std::exception const& e) {
            std::cout << "Worker " << name << " got " << e.what() << "\n";
            break;
        }
    }
}

主要驱动程序

这里是队列被实例化的地方,网络服务器和一些工作线程都被启动了:

int main() {
    Service::Queue queue;

    auto handler = [&](std::istream& is) {
            queue.enqueue(Service::parse_request(is));
        };

    Network::Server server(6767, handler);

    std::vector<std::thread> pool;
    pool.emplace_back([&queue] { Service::worker("one", queue, 6s); });
    pool.emplace_back([&queue] { Service::worker("two", queue, 6s); });

    server.run_for(3s); // this blocks

    for (auto& thread : pool)
        if (thread.joinable())
            thread.join();

    std::cout << "Maximum queue depth was " << queue.peak_depth() << "\n";
}

现场演示

See It Live On Coliru

测试负载如下所示:

for a in "hello world" "the quick" "brown fox" "jumped over" "the pangram" "bye world"
do
     netcat 127.0.0.1 6767 <<< "$a" || echo "not sent: '$a'"&
done
wait

它打印出类似的东西:

Worker one handling request 'brownfox'
Worker one handling request 'thepangram'
Worker one handling request 'jumpedover'
Worker two handling request 'Worker helloworldone handling request 'byeworld'
Worker one handling request 'thequick'
'
Worker one got dequeue deadline
Worker two got dequeue deadline
Maximum queue depth was 6

关于c++ - Boost asio,单个 TCP 服务器,多个客户端,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46935025/

有关c++ - Boost asio,单个 TCP 服务器,多个客户端的更多相关文章

  1. ruby - 使用 ruby​​ 和 savon 的 SOAP 服务 - 2

    我正在尝试使用ruby​​和Savon来使用网络服务。测试服务为http://www.webservicex.net/WS/WSDetails.aspx?WSID=9&CATID=2require'rubygems'require'savon'client=Savon::Client.new"http://www.webservicex.net/stockquote.asmx?WSDL"client.get_quotedo|soap|soap.body={:symbol=>"AAPL"}end返回SOAP异常。检查soap信封,在我看来soap请求没有正确的命名空间。任何人都可以建议我

  2. ruby-on-rails - Rails 3 中的多个路由文件 - 2

    Rails2.3可以选择随时使用RouteSet#add_configuration_file添加更多路由。是否可以在Rails3项目中做同样的事情? 最佳答案 在config/application.rb中:config.paths.config.routes在Rails3.2(也可能是Rails3.1)中,使用:config.paths["config/routes"] 关于ruby-on-rails-Rails3中的多个路由文件,我们在StackOverflow上找到一个类似的问题

  3. ruby - 具有身份验证的私有(private) Ruby Gem 服务器 - 2

    我想安装一个带有一些身份验证的私有(private)Rubygem服务器。我希望能够使用公共(public)Ubuntu服务器托管内部gem。我读到了http://docs.rubygems.org/read/chapter/18.但是那个没有身份验证-如我所见。然后我读到了https://github.com/cwninja/geminabox.但是当我使用基本身份验证(他们在他们的Wiki中有)时,它会提示从我的服务器获取源。所以。如何制作带有身份验证的私有(private)Rubygem服务器?这是不可能的吗?谢谢。编辑:Geminabox问题。我尝试“捆绑”以安装新的gem..

  4. ruby-on-rails - 在 Ruby 中循环遍历多个数组 - 2

    我有多个ActiveRecord子类Item的实例数组,我需要根据最早的事件循环打印。在这种情况下,我需要打印付款和维护日期,如下所示:ItemAmaintenancerequiredin5daysItemBpaymentrequiredin6daysItemApaymentrequiredin7daysItemBmaintenancerequiredin8days我目前有两个查询,用于查找maintenance和payment项目(非排他性查询),并输出如下内容:paymentrequiredin...maintenancerequiredin...有什么方法可以改善上述(丑陋的)代

  5. ruby-on-rails - Rails - 一个 View 中的多个模型 - 2

    我需要从一个View访问多个模型。以前,我的links_controller仅用于提供以不同方式排序的链接资源。现在我想包括一个部分(我假设)显示按分数排序的顶级用户(@users=User.all.sort_by(&:score))我知道我可以将此代码插入每个链接操作并从View访问它,但这似乎不是“ruby方式”,我将需要在不久的将来访问更多模型。这可能会变得很脏,是否有针对这种情况的任何技术?注意事项:我认为我的应用程序正朝着单一格式和动态页面内容的方向发展,本质上是一个典型的网络应用程序。我知道before_filter但考虑到我希望应用程序进入的方向,这似乎很麻烦。最终从任何

  6. ruby-on-rails - 如何优雅地重启 thin + nginx? - 2

    我的瘦服务器配置了nginx,我的ROR应用程序正在它们上运行。在我发布代码更新时运行thinrestart会给我的应用程序带来一些停机时间。我试图弄清楚如何优雅地重启正在运行的Thin实例,但找不到好的解决方案。有没有人能做到这一点? 最佳答案 #Restartjustthethinserverdescribedbythatconfigsudothin-C/etc/thin/mysite.ymlrestartNginx将继续运行并代理请求。如果您将Nginx设置为使用多个上游服务器,例如server{listen80;server

  7. ruby - 多个属性的 update_column 方法 - 2

    我有一个具有一些属性的模型:attr1、attr2和attr3。我需要在不执行回调和验证的情况下更新此属性。我找到了update_column方法,但我想同时更新三个属性。我需要这样的东西:update_columns({attr1:val1,attr2:val2,attr3:val3})代替update_column(attr1,val1)update_column(attr2,val2)update_column(attr3,val3) 最佳答案 您可以使用update_columns(attr1:val1,attr2:val2

  8. ruby-on-rails - 启动 Rails 服务器时 ImageMagick 的警告 - 2

    最近,当我启动我的Rails服务器时,我收到了一长串警告。虽然它不影响我的应用程序,但我想知道如何解决这些警告。我的估计是imagemagick以某种方式被调用了两次?当我在警告前后检查我的git日志时。我想知道如何解决这个问题。-bcrypt-ruby(3.1.2)-better_errors(1.0.1)+bcrypt(3.1.7)+bcrypt-ruby(3.1.5)-bcrypt(>=3.1.3)+better_errors(1.1.0)bcrypt和imagemagick有关系吗?/Users/rbchris/.rbenv/versions/2.0.0-p247/lib/ru

  9. ruby-on-rails - 在 ruby​​ .gemspec 文件中,如何指定依赖项的多个版本? - 2

    我正在尝试修改当前依赖于定义为activeresource的gem:s.add_dependency"activeresource","~>3.0"为了让gem与Rails4一起工作,我需要扩展依赖关系以与activeresource的版本3或4一起工作。我不想简单地添加以下内容,因为它可能会在以后引起问题:s.add_dependency"activeresource",">=3.0"有没有办法指定可接受版本的列表?~>3.0还是~>4.0? 最佳答案 根据thedocumentation,如果你想要3到4之间的所有版本,你可以这

  10. ruby-on-rails - s3_direct_upload 在生产服务器中不工作 - 2

    在Rails4.0.2中,我使用s3_direct_upload和aws-sdkgems直接为s3存储桶上传文件。在开发环境中它工作正常,但在生产环境中它会抛出如下错误,ActionView::Template::Error(noimplicitconversionofnilintoString)在View中,create_cv_url,:id=>"s3_uploader",:key=>"cv_uploads/{unique_id}/${filename}",:key_starts_with=>"cv_uploads/",:callback_param=>"cv[direct_uplo

随机推荐