草庐IT

c++ - 根据报文序列号分发响应报文

coder 2024-02-22 原文

我有一个第三方服务器,我正在为它写一个dll接口(interface),我的客户使用我的dll与服务器通信。

该协议(protocol)使用长 tcp 连接,所有流量都来自此 tcp 连接。可能同时发送/接收多个数据包,比如同时发送 send_msgheart_beat,所以我必须使用 async_write/async_read以防止阻塞操作。每个数据包都有其序列号。例如,我发送了一个序列号==123的消息,然后我应该等待服务器响应一个序列号==123的数据包。

更新:不保证服务器按顺序响应数据包。如果两个数据包按AB的顺序发送,响应顺序可能是response_Bresponse_A。序列 ID 是识别数据包的唯一方式。

数据包看起来像:

4bytes size   +   2 bytes crc check   +   4 bytes SEQUENCE ID   +   ....

问题是使用我的 dll 的客户更喜欢以阻塞方式使用功能,他们不喜欢回调。例如,他们喜欢

bool DLL_EXPORT send_msg(...) {
   // send msg via long_connection, the seq_id==123
   // recv msg via long_connection, just want the packet with seq_id==123 (How?)
   return if_msg_sent_successfully;
}

我用的是boost asio,不知道有没有boost的实用类,或者适合这种场景的设计模式,下面是我能想到的解决方案:

// use a global std::map<seq_id, packet_content>
std::map<int, std::string> map_received;

每次收到一个数据包,将seq_idpacket_body写入map_received,send_msg函数如下所示

bool DLL_EXPORT send_msg(...) {
   // async_send msg via long_connection, the seq_id==123
   while(not_time_out) {
       if(map_received.find(123) != map_received.end()) {
           // get the packet and erase the 123 pair
       }
       Sleep(10); // prevent cpu cost
   }
   return if_msg_sent_successfully;
}

这个解决方案很丑陋,必须有更好的设计。有什么想法吗?

最佳答案

你可以使用 std::promisestd::future (如果您还没有使用 C++11,则可以使用它们的 boost 对应物)。

想法是存储一个 std::shared_ptr<std::promise<bool>>每当发送请求时,将当前序列 ID 作为映射中的键。 在阻塞发送函数中等待相应的 std::future<bool>要设置。 当收到响应包时,对应的std::promise<bool>从 map 中获取并设置值,发送功能“畅通无阻”。

以下示例大致基于 Boost asio documentation 中的聊天客户端示例并且不完整(例如连接部分丢失,标题和正文阅读未拆分等)。由于它不完整,我没有进行运行时测试,但它应该可以说明这个想法。

#include <thread>
#include <map>
#include <future> 
#include <iostream>

#include <boost/asio.hpp>

class Message
{
public:
  enum { header_length = 10 };
  enum { max_body_length = 512 };

  Message()
    : body_length_(0)
  {
  }

  const char* data() const
  {
    return data_;
  }

  char* data()
  {
    return data_;
  }

  std::size_t length() const
  {
    return header_length + body_length_;
  }

  const char* body() const
  {
    return data_ + header_length;
  }

  char* body()
  {
    return data_ + header_length;
  }

private:
  char data_[header_length + max_body_length];
  std::size_t body_length_;
};


class Client
{    
public:
    Client(boost::asio::io_service& io_service)
        : io_service(io_service),
          socket(io_service),
          current_sequence_id(0)
    {}

    bool blocking_send(const std::string& msg)
    {
        auto future = async_send(msg);
        // blocking wait
        return future.get();
    }

    void start_reading()
    {
        auto handler = [this](boost::system::error_code ec, std::size_t /*length*/)
                       {
                           if(!ec)
                           {
                               // parse response ...
                               int response_id = 0;
                               auto promise = map_received[response_id];
                               promise->set_value(true);
                               map_received.erase(response_id);
                           }
                       };
        boost::asio::async_read(socket,
                                boost::asio::buffer(read_msg_.data(), Message::header_length),
                                handler);
    }

    void connect()
    {
        // ...
        start_reading();
    }

private:

    std::future<bool> async_send(const std::string& msg)
    {
        auto promise = std::make_shared<std::promise<bool>>();
        auto handler = [=](boost::system::error_code ec, std::size_t /*length*/){std::cout << ec << std::endl;};
        boost::asio::async_write(socket,
                                 boost::asio::buffer(msg),
                                 handler);
        // store promise in map
        map_received[current_sequence_id] = promise;
        current_sequence_id++;
        return promise->get_future();
    }

    boost::asio::io_service& io_service;
    boost::asio::ip::tcp::socket socket;
    std::map<int, std::shared_ptr<std::promise<bool>>> map_received;
    int current_sequence_id;
    Message read_msg_;
};



int main()
{
    boost::asio::io_service io_service;
    Client client(io_service);

    std::thread t([&io_service](){ io_service.run(); });

    client.connect();
    client.blocking_send("dummy1");
    client.blocking_send("dummy2");

    return 0;
}

关于c++ - 根据报文序列号分发响应报文,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30976015/

有关c++ - 根据报文序列号分发响应报文的更多相关文章

  1. ruby-on-rails - 如何优雅地重启 thin + nginx? - 2

    我的瘦服务器配置了nginx,我的ROR应用程序正在它们上运行。在我发布代码更新时运行thinrestart会给我的应用程序带来一些停机时间。我试图弄清楚如何优雅地重启正在运行的Thin实例,但找不到好的解决方案。有没有人能做到这一点? 最佳答案 #Restartjustthethinserverdescribedbythatconfigsudothin-C/etc/thin/mysite.ymlrestartNginx将继续运行并代理请求。如果您将Nginx设置为使用多个上游服务器,例如server{listen80;server

  2. ruby - 如何根据特征实现 FactoryGirl 的条件行为 - 2

    我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden

  3. ruby-on-rails - 每次我尝试部署时,我都会得到 - (gcloud.preview.app.deploy) 错误响应 : [4] DEADLINE_EXCEEDED - 2

    我是Google云的新手,我正在尝试对其进行首次部署。我的第一个部署是RubyonRails项目。我基本上是在关注thisguideinthegoogleclouddocumentation.唯一的区别是我使用的是我自己的项目,而不是他们提供的“helloworld”项目。这是我的app.yaml文件runtime:customvm:trueentrypoint:bundleexecrackup-p8080-Eproductionconfig.ruresources:cpu:0.5memory_gb:1.3disk_size_gb:10当我转到我的项目目录并运行gcloudprevie

  4. ruby - 是否有用于序列化和反序列化各种格式的对象层次结构的模式? - 2

    给定一个复杂的对象层次结构,幸运的是它不包含循环引用,我如何实现支持各种格式的序列化?我不是来讨论实际实现的。相反,我正在寻找可能会派上用场的设计模式提示。更准确地说:我正在使用Ruby,我想解析XML和JSON数据以构建复杂的对象层次结构。此外,应该可以将该层次结构序列化为JSON、XML和可能的HTML。我可以为此使用Builder模式吗?在任何提到的情况下,我都有某种结构化数据-无论是在内存中还是文本中-我想用它来构建其他东西。我认为将序列化逻辑与实际业务逻辑分开会很好,这样我以后就可以轻松支持多种XML格式。 最佳答案 我最

  5. ruby - 使用 `+=` 和 `send` 方法 - 2

    如何将send与+=一起使用?a=20;a.send"+=",10undefinedmethod`+='for20:Fixnuma=20;a+=10=>30 最佳答案 恐怕你不能。+=不是方法,而是语法糖。参见http://www.ruby-doc.org/docs/ProgrammingRuby/html/tut_expressions.html它说Incommonwithmanyotherlanguages,Rubyhasasyntacticshortcut:a=a+2maybewrittenasa+=2.你能做的最好的事情是:

  6. ruby - 如何计算 Liquid 中的变量 +1 - 2

    我对如何计算通过{%assignvar=0%}赋值的变量加一完全感到困惑。这应该是最简单的任务。到目前为止,这是我尝试过的:{%assignamount=0%}{%forvariantinproduct.variants%}{%assignamount=amount+1%}{%endfor%}Amount:{{amount}}结果总是0。也许我忽略了一些明显的东西。也许有更好的方法。我想要存档的只是获取运行的迭代次数。 最佳答案 因为{{incrementamount}}将输出您的变量值并且不会影响{%assign%}定义的变量,我

  7. ruby - 如何使用 Selenium Webdriver 根据 div 的内容执行操作? - 2

    我有一个使用SeleniumWebdriver和Nokogiri的Ruby应用程序。我想选择一个类,然后对于那个类对应的每个div,我想根据div的内容执行一个Action。例如,我正在解析以下页面:https://www.google.com/webhp?sourceid=chrome-instant&ion=1&espv=2&ie=UTF-8#q=puppies这是一个搜索结果页面,我正在寻找描述中包含“Adoption”一词的第一个结果。因此机器人应该寻找带有className:"result"的div,对于每个检查它的.descriptiondiv是否包含单词“adoption

  8. ruby-on-rails - 在 Ruby on Rails 中发送响应之前如何等待多个异步操作完成? - 2

    在我做的一些网络开发中,我有多个操作开始,比如对外部API的GET请求,我希望它们同时开始,因为一个不依赖另一个的结果。我希望事情能够在后台运行。我找到了concurrent-rubylibrary这似乎运作良好。通过将其混合到您创建的类中,该类的方法具有在后台线程上运行的异步版本。这导致我编写如下代码,其中FirstAsyncWorker和SecondAsyncWorker是我编写的类,我在其中混合了Concurrent::Async模块,并编写了一个名为“work”的方法来发送HTTP请求:defindexop1_result=FirstAsyncWorker.new.async.

  9. ruby - 如何根据长度将路径数组转换为嵌套数组或散列 - 2

    我需要根据字符串路径的长度将字符串路径数组转换为符号、哈希和数组的数组给定以下数组:array=["info","services","about/company","about/history/part1","about/history/part2"]我想生成以下输出,对不同级别进行分组,根据级别的结构混合使用符号和对象。产生以下输出:[:info,:services,about:[:company,history:[:part1,:part2]]]#altsyntax[:info,:services,{:about=>[:company,{:history=>[:part1,:pa

  10. ruby - 在 Ruby 中比较序列 - 2

    假设我必须(小型到中型)阵列:tokens=["aaa","ccc","xxx","bbb","ccc","yyy","zzz"]template=["aaa","bbb","ccc"]如何确定tokens是否以相同的顺序包含template的所有条目?(请注意,在上面的示例中,应忽略第一个“ccc”,从而由于最后一个“ccc”而导致匹配。) 最佳答案 这适用于您的示例数据。tokens=["aaa","ccc","xxx","bbb","ccc","yyy","zzz"]template=["aaa","bbb","ccc"]po

随机推荐