我需要编写一个Storm喷口来从端口读取数据。想知道这在逻辑上是否可行。考虑到这一点,我设计了一个简单的拓扑结构,它具有一个spout和一个bolt。spout将收集使用wget发送的HTTP请求,而bolt将显示请求-仅此而已。我的spout结构如下:publicclassProxySpoutextendsBaseRichSpout{//TheO/PcollectorSpoutOutputCollectorsc;//ThesocketSocketclientSocket;//TheserversocketServerSocketsc;publicProxySpout(intport)
我有一个目录,另一个进程将文件放入其中。我们当前的Storm实现读取此目录并选择最旧的文件并打开文件读取器。该读取器作为spout中的一个字段保存,因此当调用nextTuple()时,将从文件中输出一行。spout完成读取后,它会关闭读取器并打开一个新读取器来读取新文件。为了提高吞吐量,一个想法是让多个spouts一次读取多个文件,因为这些spouts将争夺同一目录中的相同文件,有没有办法在spouts之间进行通信,以便它们可以协商哪些文件阅读?(或者有一个总经理将文件分配给喷口)。目录和文件从HDFS存储和读取。 最佳答案 我认为
我已经开始使用storm,所以我使用thistutorial创建了简单的拓扑当我使用LocalCluster运行我的拓扑时,一切看起来都很好,我的问题是我没有在元组上收到ACK,这意味着我的spoutack从未被调用。我的代码在下面-你知道为什么ack没有被调用吗?所以我的拓扑结构是这样的publicStormTopologybuild(){TopologyBuilderbuilder=newTopologyBuilder();builder.setSpout(HelloWorldSpout.class.getSimpleName(),helloWorldSpout,spoutPara
我是Storm的初学者。我正在尝试执行下面的示例程序HowtocreateatopologyinstormSampleSpout.javaimportjava.util.ArrayList;importjava.util.List;importjava.util.Map;importbacktype.storm.spout.SpoutOutputCollector;importbacktype.storm.task.TopologyContext;importbacktype.storm.topology.IRichSpout;importbacktype.storm.topology
来自here:Storm从一开始就被设计为兼容多种语言。Nimbus是一种Thrift服务,拓扑被定义为Thrift结构。Thrift的使用允许从任何语言使用Storm。我看到通过将拓扑(spouts、bolts、ComponentCommon)序列化为Thrift数据类型来部署在java中创建的拓扑,然后部署到Nimbus上。在Java中,很容易将对象及其方法和数据序列化。所以另一方面,Nimbus只需要创建对象并调用它们。(我可能在这里遗漏了细节,但我希望我理解正确)但我想知道如何用C++编写拓扑并以相同的方式部署它。thrift是否有助于序列化基于c++的拓扑,而Nimbus是否
我的拓扑结构中有一个spout和一个bolt。此拓扑在本地模式下运行良好。但是当将拓扑jar提交到远程集群时,spout的open方法没有被调用。在open()方法中,我正在创建一个哈希键来将数据存储在redis数据库中,但我无法在数据库中找到更新的数据。示例代码是:publicvoidopen(Mapconf,TopologyContextcontext,SpoutOutputCollectorcollector){//TODOAuto-generatedmethodstubSystem.out.println("iaminopen");Jedisjs=newJedis("127.0
我正在尝试在多个spout之间分担任务。我有一种情况,我一次从外部源获取一个元组/消息,并且我想要一个spout的多个实例,其背后的主要目的是分担负载并提高性能效率。我可以对一个Spout本身执行相同的操作,但我想在多个Spout之间分担负载。我无法获得分散负载的逻辑。由于在特定的spout完成消费该部分之前(即基于缓冲区大小集),消息的偏移量是未知的。任何人都可以对如何解决逻辑/算法提出一些亮点吗?预先感谢您的宝贵时间。更新响应答案:现在在Kafka上使用多分区(即5)以下是使用的代码:builder.setSpout("spout",newKafkaSpout(cfg),5);通过
我有一个创建许多Spout和Bolt的Storm拓扑。它们显然会分布在具有自己的JVM的各种系统/节点上。我知道Storm会自动管理网络通信,这样Spout发出的元组就会到达不同JVM上的Bolt。我不明白的是如何维护一些可以跟踪事物的变量。我想要一个变量来计算Bolt-A的所有实例已处理的元组数。另一个用于计算Bolt-B等的变量。我还需要一个用作标志的变量,以便我知道何时Spout没有更多数据可发送,以便Bolt可以开始写入SQL。我考虑过使用Redis,但想知道这是最好的方法还是有其他方法?任何地方都有可用的代码示例吗?我用Google进行了搜索,但找不到太多有用的信息。
我有一个创建许多Spout和Bolt的Storm拓扑。它们显然会分布在具有自己的JVM的各种系统/节点上。我知道Storm会自动管理网络通信,这样Spout发出的元组就会到达不同JVM上的Bolt。我不明白的是如何维护一些可以跟踪事物的变量。我想要一个变量来计算Bolt-A的所有实例已处理的元组数。另一个用于计算Bolt-B等的变量。我还需要一个用作标志的变量,以便我知道何时Spout没有更多数据可发送,以便Bolt可以开始写入SQL。我考虑过使用Redis,但想知道这是最好的方法还是有其他方法?任何地方都有可用的代码示例吗?我用Google进行了搜索,但找不到太多有用的信息。
我想知道是否有任何spout实现可以将数据从HDFS流式传输到Storm(类似于来自HDFS的SparkStreaming)。我知道有bolt实现将数据写入HDFS(https://github.com/ptgoetz/storm-hdfs和http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.1.3/bk_user-guide/content/ch_storm-using-hdfs-connector.html),但我找不到其他方法。我感谢任何建议和提示。 最佳答案 一个选项是使用