草庐IT

java - 在 while 循环中添加项目时处理队列中的项目

coder 2024-04-02 原文

我有一种方法可以在 while 循环中监听 UDP 数据包。我想在数据包到达时使用不同类中的另一种方法解析数据包,并在应用程序的另一部分对每个数据包进行许多不同的解析和分析。我认为让 PacketParser 方法在循环外处理 Queue 会更好。是否可以在数据包进入时将数据包添加到 Queue 中,然后让应用程序的另一部分在项目进入队列时监听项目并执行其他操作,因为原始 while 循环保持监听数据包并将它们添加到队列中?我想让另一个函数监视队列并处理数据包,Java 中是否有一些东西可以监视 QueueStack?有更好的方法吗?

public void read(String multicastIpAddress, int multicastPortNumber) {
        PacketParser parser = new PacketParser(logger);
        InetAddress multicastAddress = null;
        MulticastSocket multicastSocket = null;
        final int PortNumber = multicastPortNumber;
        try {
            multicastAddress = InetAddress.getByName(multicastIpAddress);
            multicastSocket = new MulticastSocket(PortNumber);
            String hostname = InetAddress.getLocalHost().getHostName();
            byte[] buffer = new byte[8192];
            multicastSocket.joinGroup(multicastAddress);
            System.out.println("Listening from " + hostname + " at " + multicastAddress.getHostName());
            int numberOfPackets = 0;
            while (true) {
                numberOfPackets++;
                DatagramPacket datagramPacket = new DatagramPacket(buffer, buffer.length);
                multicastSocket.receive(datagramPacket);
                // add to queue for another function to process the packets

            }
        } catch (SocketException socketException) {
            System.out.println("Socket exception " + socketException);
        } catch  (IOException exception) {
            System.out.println("Exception " + exception);
        } finally {
            if (multicastSocket != null) {
                try {
                    multicastSocket.leaveGroup(multicastAddress);
                    multicastSocket.close();
                } catch (IOException exception) {
                    System.out.println(exception.toString());
                }
            }
        }
    }

最佳答案

好的,所以我阅读了一些有关生产者-消费者模式的资料并弄明白了,所以这就是我所做的。

基本上,生产者-消费者模式涉及三件事:生产者、消费者和共享队列。在此上下文中,PacketReader 是接收网络数据包并将其放入共享队列的生产者。 PacketParser 是处理共享队列中数据包的消费者。因此,我创建了一个 LinkedBlockingQueue 实例,并将该共享队列传递给一个消费者实例 (PacketReader) 和一个生产者实例 (PacketParser)。然后将消费者和生产者实例分别传递到 Thread 类的实例中。最后,在每个线程实例上调用 start() 方法。

public class Main {

    public static void main(String[] args) {
        BlockingQueue<Packet> queue = new LinkedBlockingQueue<>();
        ILogger logger = Injector.getLogger();

        Thread reader = new Thread(new PacketReader(logger, queue, "239.1.1.1", 49410));
        Thread parser = new Thread(new PacketParser(logger, queue));

        reader.start();
        parser.start();
    }
}

使用 LinkedBlockingQueue 的原因是 put() 方法会在队列满时阻塞,而 take() 会在队列满时阻塞如果为空则排队。生产者和消费者类需要实现 Runnable 接口(interface)并包含名为 run() 的不带参数的方法。

消费类

public class PacketParser implements Runnable {

    private ILogger logger;
    private BlockingQueue<Packet> queue;
    private boolean running = true;

    public PacketParser(ILogger logger, BlockingQueue<Packet> queue) {
        this.logger = logger;
        this.queue = queue;
    }

    public void stop() {
        running = false;
    }

    public void run() {
        while (running) {
            Packet packet;
            try {
                packet = queue.take();
                parse(packet);
            } catch (InterruptedException exception) {
                logger.Log(exception.getStackTrace().toString());
            }
        }
    }

生产者类

public class PacketReader implements Runnable {

    private ILogger logger;
    private final Queue<Packet> queue;
    private String multicastIpAddress;
    private int multicastPortNumber;
    private boolean running = true;

    public PacketReader(ILogger logger, Queue<Packet> queue, String multicastIpAddress, int multicastPortNumber) {
        this.logger = logger;
        this.queue = queue;
        this.multicastIpAddress = multicastIpAddress;
        this.multicastPortNumber = multicastPortNumber;
    }

    public void stop() {
        running = false;
    }

    public void run() {
        InetAddress multicastAddress = null;
        MulticastSocket multicastSocket = null;
        try {
            multicastAddress = InetAddress.getByName(multicastIpAddress);
            multicastSocket = new MulticastSocket(multicastPortNumber);
            String hostname = InetAddress.getLocalHost().getHostName();
            byte[] buffer = new byte[8192];
            multicastSocket.joinGroup(multicastAddress);
            System.out.println("Listening from " + hostname + " at " + multicastAddress.getHostName());
            int numberOfPackets = 0;

            while (running) {
                numberOfPackets++;
                DatagramPacket datagramPacket = new DatagramPacket(buffer, buffer.length);
                multicastSocket.receive(datagramPacket);
                Packet packet = new Packet(numberOfPackets, datagramPacket);
                queue.add(packet);
            }
        } catch (SocketException socketException) {
            System.out.println("Socket exception " + socketException);
        } catch  (IOException exception) {
            System.out.println("Exception " + exception);
        } finally {
            if (multicastSocket != null) {
                try {
                    multicastSocket.leaveGroup(multicastAddress);
                    multicastSocket.close();
                } catch (IOException exception) {
                    System.out.println(exception.toString());
                }
            }
        }
    }
}

关于java - 在 while 循环中添加项目时处理队列中的项目,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34795139/

有关java - 在 while 循环中添加项目时处理队列中的项目的更多相关文章

  1. ruby - 我需要将 Bundler 本身添加到 Gemfile 中吗? - 2

    当我使用Bundler时,是否需要在我的Gemfile中将其列为依赖项?毕竟,我的代码中有些地方需要它。例如,当我进行Bundler设置时:require"bundler/setup" 最佳答案 没有。您可以尝试,但首先您必须用鞋带将自己抬离地面。 关于ruby-我需要将Bundler本身添加到Gemfile中吗?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/4758609/

  2. ruby - 如何在 buildr 项目中使用 Ruby 代码? - 2

    如何在buildr项目中使用Ruby?我在很多不同的项目中使用过Ruby、JRuby、Java和Clojure。我目前正在使用我的标准Ruby开发一个模拟应用程序,我想尝试使用Clojure后端(我确实喜欢功能代码)以及JRubygui和测试套件。我还可以看到在未来的不同项目中使用Scala作为后端。我想我要为我的项目尝试一下buildr(http://buildr.apache.org/),但我注意到buildr似乎没有设置为在项目中使用JRuby代码本身!这看起来有点傻,因为该工具旨在统一通用的JVM语言并且是在ruby中构建的。除了将输出的jar包含在一个独特的、仅限ruby​​

  3. ruby - 如何指定 Rack 处理程序 - 2

    Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack

  4. ruby - 将 Bootstrap Less 添加到 Sinatra - 2

    我有一个ModularSinatra应用程序,我正在尝试将Bootstrap添加到应用程序中。get'/bootstrap/application.css'doless:"bootstrap/bootstrap"end我在views/bootstrap中有所有less文件,包括bootstrap.less。我收到这个错误:Less::ParseErrorat/bootstrap/application.css'reset.less'wasn'tfound.Bootstrap.less的第一行是://CSSReset@import"reset.less";我尝试了所有不同的路径格式,但它

  5. ruby - 续集在添加关联时访问many_to_many连接表 - 2

    我正在使用Sequel构建一个愿望list系统。我有一个wishlists和itemstable和一个items_wishlists连接表(该名称是续集选择的名称)。items_wishlists表还有一个用于facebookid的额外列(因此我可以存储opengraph操作),这是一个NOTNULL列。我还有Wishlist和Item具有续集many_to_many关联的模型已建立。Wishlist类也有:selectmany_to_many关联的选项设置为select:[:items.*,:items_wishlists__facebook_action_id].有没有一种方法可以

  6. ruby-on-rails - 项目升级后 Pow 不会更改 ruby​​ 版本 - 2

    我在我的Rails项目中使用Pow和powifygem。现在我尝试升级我的ruby​​版本(从1.9.3到2.0.0,我使用RVM)当我切换ruby​​版本、安装所有gem依赖项时,我通过运行railss并访问localhost:3000确保该应用程序正常运行以前,我通过使用pow访问http://my_app.dev来浏览我的应用程序。升级后,由于错误Bundler::RubyVersionMismatch:YourRubyversionis1.9.3,butyourGemfilespecified2.0.0,此url不起作用我尝试过的:重新创建pow应用程序重启pow服务器更新战俘

  7. ruby-on-rails - 新 Rails 项目 : 'bundle install' can't install rails in gemfile - 2

    我已经像这样安装了一个新的Rails项目:$railsnewsite它执行并到达:bundleinstall但是当它似乎尝试安装依赖项时我得到了这个错误Gem::Ext::BuildError:ERROR:Failedtobuildgemnativeextension./System/Library/Frameworks/Ruby.framework/Versions/2.0/usr/bin/rubyextconf.rbcheckingforlibkern/OSAtomic.h...yescreatingMakefilemake"DESTDIR="cleanmake"DESTDIR="

  8. java - 等价于 Java 中的 Ruby Hash - 2

    我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/

  9. ruby - 检查字符串是否包含散列中的任何键并返回它包含的键的值 - 2

    我有一个包含多个键的散列和一个字符串,该字符串不包含散列中的任何键或包含一个键。h={"k1"=>"v1","k2"=>"v2","k3"=>"v3"}s="thisisanexamplestringthatmightoccurwithakeysomewhereinthestringk1(withspecialcharacterslike(^&*$#@!^&&*))"检查s是否包含h中的任何键的最佳方法是什么,如果包含,则返回它包含的键的值?例如,对于上面的h和s的例子,输出应该是v1。编辑:只有字符串是用户定义的。哈希将始终相同。 最佳答案

  10. Ruby 从大范围中获取第 n 个项目 - 2

    假设我有这个范围:("aaaaa".."zzzzz")如何在不事先/每次生成整个项目的情况下从范围中获取第N个项目? 最佳答案 一种快速简便的方法:("aaaaa".."zzzzz").first(42).last#==>"aaabp"如果出于某种原因你不得不一遍又一遍地这样做,或者如果你需要避免为前N个元素构建中间数组,你可以这样写:moduleEnumerabledefskip(n)returnto_enum:skip,nunlessblock_given?each_with_indexdo|item,index|yieldit

随机推荐