草庐IT

java - 以多种方式发送数据,具体取决于您希望如何发送

coder 2024-03-10 原文

我有一堆键和值,我想通过将它们打包到一个字节数组中来发送到我们的消息队列。我会将所有键和值组成一个字节数组,这些键和值应始终小于 50K,然后发送到我们的消息队列。

数据包类:

public final class Packet implements Closeable {
  private static final int MAX_SIZE = 50000;
  private static final int HEADER_SIZE = 36;

  private final byte dataCenter;
  private final byte recordVersion;
  private final long address;
  private final long addressFrom;
  private final long addressOrigin;
  private final byte recordsPartition;
  private final byte replicated;
  private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
  private int pendingItems = 0;

  public Packet(final RecordPartition recordPartition) {
    this.recordsPartition = (byte) recordPartition.getPartition();
    this.dataCenter = Utils.LOCATION.get().datacenter();
    this.recordVersion = 1;
    this.replicated = 0;
    final long packedAddress = new Data().packAddress();
    this.address = packedAddress;
    this.addressFrom = 0L;
    this.addressOrigin = packedAddress;
  }

  private void addHeader(final ByteBuffer buffer, final int items) {
    buffer.put(dataCenter).put(recordVersion).putInt(items).putInt(buffer.capacity())
        .putLong(address).putLong(addressFrom).putLong(addressOrigin).put(recordsPartition)
        .put(replicated);
  }

  private void sendData() {
    if (itemBuffer.position() == 0) {
      // no data to be sent
      return;
    }
    final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
    addHeader(buffer, pendingItems);
    buffer.put(itemBuffer);
    SendRecord.getInstance().sendToQueueAsync(address, buffer.array());
    // SendRecord.getInstance().sendToQueueAsync(address, buffer.array());
    // SendRecord.getInstance().sendToQueueSync(address, buffer.array());
    // SendRecord.getInstance().sendToQueueSync(address, buffer.array(), socket);
    itemBuffer.clear();
    pendingItems = 0;
  }

  public void addAndSendJunked(final byte[] key, final byte[] data) {
    if (key.length > 255) {
      return;
    }
    final byte keyLength = (byte) key.length;
    final byte dataLength = (byte) data.length;

    final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
    final int newSize = itemBuffer.position() + additionalSize;
    if (newSize >= (MAX_SIZE - HEADER_SIZE)) {
      sendData();
    }
    if (additionalSize > (MAX_SIZE - HEADER_SIZE)) {
      throw new AppConfigurationException("Size of single item exceeds maximum size");
    }

    final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
    final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
    // data layout
    itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength)
        .put(data);
    pendingItems++;
  }

  @Override
  public void close() {
    if (pendingItems > 0) {
      sendData();
    }
  }
}

下面是我发送数据的方式。截至目前,我的设计只允许通过调用上述 sendData() 方法中的 sendToQueueAsync 方法异步发送数据。

  private void validateAndSend(final RecordPartition partition) {
    final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);

    final Packet packet = new Packet(partition);

    DataHolder dataHolder;
    while ((dataHolder = dataHolders.poll()) != null) {
      packet.addAndSendJunked(dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8),
          dataHolder.getProcessBytes());
    }
    packet.close();
  }

现在我需要扩展我的设计,以便我可以用三种不同的方式发送数据。由用户决定他想要发送数据的方式,“同步”或“异步”。

  • 我需要通过调用 sender.sendToQueueAsync 方法异步发送数据。
  • 或者我需要通过调用 sender.sendToQueueSync 方法同步发送数据。
  • 或者我需要通过调用 sender.sendToQueueSync 方法在特定套接字上同步发送数据。在这种情况下,我需要以某种方式传递 socket 变量,以便 sendData 知道这个变量。

发送记录类:

public class SendRecord {
  private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
  private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
      .concurrencyLevel(100).build();

  private static class Holder {
    private static final SendRecord INSTANCE = new SendRecord();
  }

  public static SendRecord getInstance() {
    return Holder.INSTANCE;
  }

  private SendRecord() {
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        handleRetry();
      }
    }, 0, 1, TimeUnit.SECONDS);
  }

  private void handleRetry() {
    List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
    for (PendingMessage message : messages) {
      if (message.hasExpired()) {
        if (message.shouldRetry()) {
          message.markResent();
          doSendAsync(message);
        } else {
          cache.invalidate(message.getAddress());
        }
      }
    }
  }

  // called by multiple threads concurrently
  public boolean sendToQueueAsync(final long address, final byte[] encodedRecords) {
    PendingMessage m = new PendingMessage(address, encodedRecords, true);
    cache.put(address, m);
    return doSendAsync(m);
  }

  // called by above method and also by handleRetry method
  private boolean doSendAsync(final PendingMessage pendingMessage) {
    Optional<SocketHolder> liveSocket = SocketManager.getInstance().getNextSocket();
    ZMsg msg = new ZMsg();
    msg.add(pendingMessage.getEncodedRecords());
    try {
      // this returns instantly
      return msg.send(liveSocket.get().getSocket());
    } finally {
      msg.destroy();
    }
  }

  // called by send method below
  private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(pendingMessage.getEncodedRecords());
    try {
      // this returns instantly
      return msg.send(socket);
    } finally {
      msg.destroy();
    }
  }

  // called by multiple threads to send data synchronously without passing socket
  public boolean sendToQueueSync(final long address, final byte[] encodedRecords) {
    PendingMessage m = new PendingMessage(address, encodedRecords, false);
    cache.put(address, m);
    try {
      if (doSendAsync(m)) {
        return m.waitForAck();
      }
      return false;
    } finally {
      cache.invalidate(address);
    }
  }

  // called by a threads to send data synchronously but with socket as the parameter
  public boolean sendToQueueSync(final long address, final byte[] encodedRecords, final Socket socket) {
    PendingMessage m = new PendingMessage(address, encodedRecords, false);
    cache.put(address, m);
    try {
      if (doSendAsync(m, socket)) {
        return m.waitForAck();
      }
      return false;
    } finally {
      cache.invalidate(address);
    }
  }

  public void handleAckReceived(final long address) {
    PendingMessage record = cache.getIfPresent(address);
    if (record != null) {
      record.ackReceived();
      cache.invalidate(address);
    }
  }
}

调用者只会调用以下三种方法之一:

  • sendToQueueAsync 通过传递两个参数
  • 通过传递两个参数sendToQueueSync
  • 通过传递三个参数sendToQueueSync

我应该如何设计我的 PacketSendRecord 类,以便我可以告诉 Packet 类这个数据需要在任何一个中发送以上三种方式到我的消息队列。由用户决定他想以哪种方式将数据发送到消息队列。截至目前,我的 Packet 类的结构方式,它只能以一种方式发送数据。

最佳答案

我认为您最好的选择是策略模式 (https://en.wikipedia.org/wiki/Strategy_pattern)。

使用此模式,您可以封装每种类型的“发送”的行为,例如,AsynchronousSend 类、SynchronousSend 类和AsynchronousSocketSend 类。 (您可能会想出更好的名字)。然后,Packet 类可以根据某些逻辑决定使用哪个类将数据发送到队列。

关于java - 以多种方式发送数据,具体取决于您希望如何发送,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48201935/

有关java - 以多种方式发送数据,具体取决于您希望如何发送的更多相关文章

  1. ruby - 如何以所有可能的方式将字符串拆分为长度最多为 3 的连续子字符串? - 2

    我试图获取一个长度在1到10之间的字符串,并输出将字符串分解为大小为1、2或3的连续子字符串的所有可能方式。例如:输入:123456将整数分割成单个字符,然后继续查找组合。该代码将返回以下所有数组。[1,2,3,4,5,6][12,3,4,5,6][1,23,4,5,6][1,2,34,5,6][1,2,3,45,6][1,2,3,4,56][12,34,5,6][12,3,45,6][12,3,4,56][1,23,45,6][1,2,34,56][1,23,4,56][12,34,56][123,4,5,6][1,234,5,6][1,2,345,6][1,2,3,456][123

  2. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  3. java - 等价于 Java 中的 Ruby Hash - 2

    我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/

  4. ruby - Ruby 有 `Pair` 数据类型吗? - 2

    有时我需要处理键/值数据。我不喜欢使用数组,因为它们在大小上没有限制(很容易不小心添加超过2个项目,而且您最终需要稍后验证大小)。此外,0和1的索引变成了魔数(MagicNumber),并且在传达含义方面做得很差(“当我说0时,我的意思是head...”)。散列也不合适,因为可能会不小心添加额外的条目。我写了下面的类来解决这个问题:classPairattr_accessor:head,:taildefinitialize(h,t)@head,@tail=h,tendend它工作得很好并且解决了问题,但我很想知道:Ruby标准库是否已经带有这样一个类? 最佳

  5. ruby-on-rails - 正确的 Rails 2.1 做事方式 - 2

    question的一些答案关于redirect_to让我想到了其他一些问题。基本上,我正在使用Rails2.1编写博客应用程序。我一直在尝试自己完成大部分工作(因为我对Rails有所了解),但在需要时会引用Internet上的教程和引用资料。我设法让一个简单的博客正常运行,然后我尝试添加评论。靠我自己,我设法让它进入了可以从script/console添加评论的阶段,但我无法让表单正常工作。我遵循的其中一个教程建议在帖子Controller中创建一个“评论”操作,以添加评论。我的问题是:这是“标准”方式吗?我的另一个问题的答案之一似乎暗示应该有一个CommentsController参

  6. jquery - 我的 jquery AJAX POST 请求无需发送 Authenticity Token (Rails) - 2

    rails中是否有任何规定允许站点的所有AJAXPOST请求在没有authenticity_token的情况下通过?我有一个调用Controller方法的JqueryPOSTajax调用,但我没有在其中放置任何真实性代码,但调用成功。我的ApplicationController确实有'request_forgery_protection'并且我已经改变了config.action_controller.consider_all_requests_local在我的environments/development.rb中为false我还搜索了我的代码以确保我没有重载ajaxSend来发送

  7. java - 从 JRuby 调用 Java 类的问题 - 2

    我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www

  8. ruby - nanoc 和多种布局 - 2

    是否可以为特定(或所有)项目使用多个布局?例如,我有几个项目,我想对其应用两种不同的布局。一个是绿色的,一个是蓝色的(但是)。我想将它们编译到我的输出目录中的两个不同文件夹中(例如v1和v2)。我一直在玩弄规则和编译block,但我不知道这是怎么回事。因为,每个项目在编译过程中只编译一次,我不能告诉nanoc第一次用layout1编译,第二次用layout2编译。我试过这样的东西,但它导致输出文件损坏。compile'*'doifitem.binary?#don’tfilterbinaryitemselsefilter:erblayout'layout1'layout'layout2'

  9. java - 我的模型类或其他类中应该有逻辑吗 - 2

    我只想对我一直在思考的这个问题有其他意见,例如我有classuser_controller和classuserclassUserattr_accessor:name,:usernameendclassUserController//dosomethingaboutanythingaboutusersend问题是我的User类中是否应该有逻辑user=User.newuser.do_something(user1)oritshouldbeuser_controller=UserController.newuser_controller.do_something(user1,user2)我

  10. java - 什么相当于 ruby​​ 的 rack 或 python 的 Java wsgi? - 2

    什么是ruby​​的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht

随机推荐