我需要编写一个Storm喷口来从端口读取数据。想知道这在逻辑上是否可行。考虑到这一点,我设计了一个简单的拓扑结构,它具有一个spout和一个bolt。spout将收集使用wget发送的HTTP请求,而bolt将显示请求-仅此而已。我的spout结构如下:publicclassProxySpoutextendsBaseRichSpout{//TheO/PcollectorSpoutOutputCollectorsc;//ThesocketSocketclientSocket;//TheserversocketServerSocketsc;publicProxySpout(intport)
我有一个目录,另一个进程将文件放入其中。我们当前的Storm实现读取此目录并选择最旧的文件并打开文件读取器。该读取器作为spout中的一个字段保存,因此当调用nextTuple()时,将从文件中输出一行。spout完成读取后,它会关闭读取器并打开一个新读取器来读取新文件。为了提高吞吐量,一个想法是让多个spouts一次读取多个文件,因为这些spouts将争夺同一目录中的相同文件,有没有办法在spouts之间进行通信,以便它们可以协商哪些文件阅读?(或者有一个总经理将文件分配给喷口)。目录和文件从HDFS存储和读取。 最佳答案 我认为
我已经开始使用storm,所以我使用thistutorial创建了简单的拓扑当我使用LocalCluster运行我的拓扑时,一切看起来都很好,我的问题是我没有在元组上收到ACK,这意味着我的spoutack从未被调用。我的代码在下面-你知道为什么ack没有被调用吗?所以我的拓扑结构是这样的publicStormTopologybuild(){TopologyBuilderbuilder=newTopologyBuilder();builder.setSpout(HelloWorldSpout.class.getSimpleName(),helloWorldSpout,spoutPara
我是Storm的初学者。我正在尝试执行下面的示例程序HowtocreateatopologyinstormSampleSpout.javaimportjava.util.ArrayList;importjava.util.List;importjava.util.Map;importbacktype.storm.spout.SpoutOutputCollector;importbacktype.storm.task.TopologyContext;importbacktype.storm.topology.IRichSpout;importbacktype.storm.topology
来自here:Storm从一开始就被设计为兼容多种语言。Nimbus是一种Thrift服务,拓扑被定义为Thrift结构。Thrift的使用允许从任何语言使用Storm。我看到通过将拓扑(spouts、bolts、ComponentCommon)序列化为Thrift数据类型来部署在java中创建的拓扑,然后部署到Nimbus上。在Java中,很容易将对象及其方法和数据序列化。所以另一方面,Nimbus只需要创建对象并调用它们。(我可能在这里遗漏了细节,但我希望我理解正确)但我想知道如何用C++编写拓扑并以相同的方式部署它。thrift是否有助于序列化基于c++的拓扑,而Nimbus是否
我的拓扑结构中有一个spout和一个bolt。此拓扑在本地模式下运行良好。但是当将拓扑jar提交到远程集群时,spout的open方法没有被调用。在open()方法中,我正在创建一个哈希键来将数据存储在redis数据库中,但我无法在数据库中找到更新的数据。示例代码是:publicvoidopen(Mapconf,TopologyContextcontext,SpoutOutputCollectorcollector){//TODOAuto-generatedmethodstubSystem.out.println("iaminopen");Jedisjs=newJedis("127.0
我正在尝试在多个spout之间分担任务。我有一种情况,我一次从外部源获取一个元组/消息,并且我想要一个spout的多个实例,其背后的主要目的是分担负载并提高性能效率。我可以对一个Spout本身执行相同的操作,但我想在多个Spout之间分担负载。我无法获得分散负载的逻辑。由于在特定的spout完成消费该部分之前(即基于缓冲区大小集),消息的偏移量是未知的。任何人都可以对如何解决逻辑/算法提出一些亮点吗?预先感谢您的宝贵时间。更新响应答案:现在在Kafka上使用多分区(即5)以下是使用的代码:builder.setSpout("spout",newKafkaSpout(cfg),5);通过
我想知道是否有任何spout实现可以将数据从HDFS流式传输到Storm(类似于来自HDFS的SparkStreaming)。我知道有bolt实现将数据写入HDFS(https://github.com/ptgoetz/storm-hdfs和http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.1.3/bk_user-guide/content/ch_storm-using-hdfs-connector.html),但我找不到其他方法。我感谢任何建议和提示。 最佳答案 一个选项是使用
我已经设置了storm拓扑(1个工作人员),其中spout(在java中)使事件从redis中出列(使用blpop)并传输到bolt。但一个观察结果是,当队列超过200万并且在stormnimbus/supervisor/zookeeper/worker日志中没有发现警告/异常时,一些事件没有收到bolt(在clojure中,6-spout线程,50-bolt线程)。在本地,此场景不会使用虚拟数据进行复制。集群中没有网络延迟/数据包丢失。平均处理延迟为100毫秒。如何找到在生产中修复它的原因。(nsevent-processor(:import[backtype.stormStormS