草庐IT

Streaming

全部标签

hadoop - 无法全局访问 Kafka Spark Streaming 中的数据

我正在尝试将数据从Kafka流式传输到SparkJavaPairInputDStreamdirectKafkaStream=KafkaUtils.createDirectStream(ssc,String.class,String.class,StringDecoder.class,StringDecoder.class,kafkaParams,topics);我在这里迭代JavaPairInputDStream来处理RDD。directKafkaStream.foreachRDD(rdd->{rdd.foreachPartition(items->{while(items.hasNe

hadoop - Hadoop 可以做流式处理吗?

有人建议Hadoop做流式处理,并引用了Flume和Kafka作为例子。虽然我知道它们可能具有流功能,但我想知道它们是否可以被视为与Storm/Spark/Flink等流处理技术处于同一级别。Kafka是一个“发布-订阅模型消息系统”,而Flume是一个数据摄取工具。即使它们与hadoop交互/集成,它们在技术上是“hadoop”本身的一部分吗?PS:我知道有一个HadoopStreaming这是完全不同的事情。 最佳答案 Hadoop只是YARN、HDFS和MapReduce。作为一个项目,它不适应(近)实时摄取或处理。Hadoo

java - 中间键值对流是否在 hadoop 中优化

mapreduce作业中的中间键值对在被洗牌到将运行reduce任务的tasktracker节点之前被写入mapred.local.dir。我知道HFDS已优化以写入大数据block,因此与常规文件系统相比,可最大限度地减少硬盘的寻道时间。现在我很好奇hadoop是否也针对将中间kev-value对流式传输到本地文件系统进行了优化?我问这个是因为我的应用程序只有很少的输入数据,但是有大量的中间数据和中等大小的输出数据。hadoop在我的案例中是有益的还是我应该考虑一个不同的框架?(请注意,我的软件与WordCount密切相关,但我发出所有子字符串而不是所有单词)非常感谢您的帮助!EDI

python - Hadoop Streaming "comparator.options"未被尊重

我有一个python映射器和缩减器,我正在使用它和Hadoop流式API。在命令行上,这些脚本可以正常工作并执行预期的工作。我有一个NASA网络访问日志示例,您可以在此处看到它已正确处理和排序。tail-n10NASA_access_log_Jul95|./mapper.py|sort|./reducer.py|sort-r-k1,14163.205.53.141tornado.umd.edu在mapreduce作业中尝试相同的操作时,排序没有得到遵守。hadoopjar/usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-st

scala - 使用 Spark Streaming 从 http 创建分析

您好,我的要求是从http://10.3.9.34:9900/messages创建分析,即从http://10.3.9.34:9900/messages并将此数据放在HDFS位置/user/cloudera/flume并使用Tableau或HUEUI从HDFS创建分析报告。我在CDH5.5的spark-shell的scala控制台中尝试使用以下代码,但无法从http链接获取数据importorg.apache.spark.SparkContextvaldataRDD=sc.textFile("http://10.3.9.34:9900/messages")dataRDD.collect

join - Map-side 加入 Hadoop Streaming

我有一个文件,其中每一行都是一条记录。我希望某个字段中具有相同值的所有记录(如果字段A则调用)转到同一个映射器。我听说这被称为Map-SideJoin,而且我还听说如果文件中的记录按我所说的字段A排序很容易。如果更简单的话,数据可以分布在多个文件中,但每个文件都按字段A排序。这样对吗?我如何在流媒体中做到这一点?我正在使用Python。假设它只是我用来启动Hadoop的命令的一部分? 最佳答案 只希望将某些记录发送给某些映射器的真正理由是什么?如果您想要的最终结果是3个输出文件(一个全是A,另一个全是B,最后一个全是C),您可以使用

hadoop - 如果我使用 -mapper cat 而不是 -mapper org.apache.hadoop.mapred.lib.IdentityMapper,Hadoop Streaming 的性能会降低吗?

我在尝试使用org.apache.hadoop.mapred.lib.IdentityMapper作为HadoopStreaming1.0.3中-mapper的参数时遇到了问题。“猫”虽然有效;使用cat会影响性能——尤其是在ElasticMapReduce上吗? 最佳答案 我遇到了类似的问题,其中身份映射器不起作用,我必须使用Cat。我们没有看到性能上的巨大变化,据我所知,identitymapper是一个jar,而cat是unix命令。 关于hadoop-如果我使用-mapperca

hadoop - 动态加载文件时的 Spark Streaming 和 Data Locality

我正在运行一个spark流应用程序,它从Kafka接收HDFS上的文件路径,应该打开这些文件并对它们执行某种计算。问题是我无法享受数据局部性的好处,因为执行程序可能在任何节点上运行,而打开文件的执行程序不一定是持有文件的执行程序。有没有一种方法可以按照我介绍的方式动态打开文件,同时保持数据局部性?谢谢,丹尼尔 最佳答案 我不确定你打开文件的意思,如果你能分享一些代码会很有帮助,但如果你使用的是sc.textFile,那是一个RDD转换。转换被集群管理器安排为任务,因此不一定会从运行DStream转换的执行器节点执行。

hadoop - 使用 flume 读取 IBM MQ 数据

我想从IBMMQ读取数据并将其放入HDF。查看了flume的JMS源,似乎它可以连接到IBMMQ,但我不明白所需属性列表中的“destinationType”和“destinationName”是什么意思。有人可以解释一下吗?此外,我应该如何配置我的水槽代理flumeAgent1(跑在和MQ一样的机器上)读取MQ数据----flumeAgent2(跑在Hadoop集群上)写入Hdfs或者在Hadoop集群上只有一个代理就足够了谁能帮助我理解如何将MQ与flume集成引用https://flume.apache.org/FlumeUserGuide.html谢谢,查雅

hadoop - Flume-NG:如何使用 Flume-ng 自动读取目录中新添加的文件(Flume 代理的来源)

spooldir选项用于流式传输特定目录的所有文件。完成整个目录读取后,作业将暂停/停止。但是,如果我想将新文件添加到同一目录中,会发生什么??我的要求是在任何新文件添加到该特定spooldir文件夹时流式传输该目录。请指教!!!提前致谢。 最佳答案 假脱机目录源将在文件出现在目录中时继续读取文件,它不会在处理flume启动时存在于目录中的文件集后暂停。这就是文档所说的“此源允许您通过将要摄取的文件放入磁盘上的“假脱机”目录来摄取数据。此来源将监视指定目录中的新文件,并在新文件出现时解析事件。”