草庐IT

apache-spark-2.3

全部标签

hadoop - 使用Spark的有状态操作updateStateByKey如何保持实时性

首先是虚构的用例。假设我有一个元组流(user_id,time_stamp,login_ip)。我想以5秒的粒度维护每个用户的最后登录IP。使用Spark流,我可以使用updateStateByKey方法更新这张map。问题是,随着数据流不断涌来,每个时间间隔的RDD越来越大,因为看到了更多的user_ids。一段时间后,map会变得很大,维护时间会变长,无法实现结果的实时传递。请注意,这只是我想出的一个简单示例来说明问题。实际问题可能更复杂,确实需要实时交付。关于如何解决这个问题的任何想法(在Spark以及其他解决方案中都会很好)? 最佳答案

hadoop - Apache Giraph 中具有复杂值的顶点

我正在尝试将一些包含相关顶点信息的文本文件读入Giraph:每一行都是vertex_idattribute_1attribute_2.....attribute_n其中每个属性都是一个字符串。目标是创建一个顶点,其中所有这些属性都是顶点值的一部分。查找各种输入格式我找不到任何现成的东西,所以我假设我必须从VertexValueInputFormat派生我的顶点输入类(我有一个单独的边阅读器)。问题是:如何?我已经创建了一个包含String[]数组的Value类,但我如何将它交给Giraph/Hadoop?这是单行阅读器:https://giraph.apache.org/giraph-

hadoop - Spark Streaming - HBase 批量加载

我目前正在使用Python将CSV数据批量加载到HBase表中,目前我在使用saveAsNewAPIHadoopFile编写适当的HFile时遇到了问题我的代码目前如下所示:defcsv_to_key_value(row):cols=row.split(",")result=((cols[0],[cols[0],"f1","c1",cols[1]]),(cols[0],[cols[0],"f2","c2",cols[2]]),(cols[0],[cols[0],"f3","c3",cols[3]]))returnresultdefbulk_load(rdd):conf={#Ommitt

hadoop - 无法查询 Spark 创建的 Parquet 文件

在Spark中创建了一个parquet文件。这是代码片段parquet_file_name=os.path.join(partition,os.path.basename(fileLocation)+".parquet")dfData=sqlContext.createDataFrame(addedColumns,schema)dfData.save(parquet_file_name,"parquet","append")我可以在Spark中读取文件内容。In[1]:sqlContext=SQLContext(sc)parquetFile=sqlContext.parquetFile

datetime - 从时间戳到现在的 Apache Pig

我正在尝试使用amabari中的apachepig分析一些数据在这种特定情况下,我遇到了问题...我有一列时间戳以这种特定格式编写14333061460001422814565122使用在线转换器,如果我输入1433306146000,输出是08/30/47389@12:06pm(UTC)但事实并非如此,事实上,如果我手动删除最后3个数字(1433306146),结果是06/03/2015@凌晨4:35(UTC)。那么,pig中有一种方法可以删除时间戳列中的最后3个数字吗?还有一个主要问题是:使用函数GetDay(datetime)如果我传递时间戳,它会起作用吗?或者有不同的方法从时间

java - 将 Apache Nutch 2.3 与 Hbase 0.94.14 和 Solr 5.2.1 集成时出错

我正在将Nutch与Hbase和Solr集成。启动Hadoop和Hbase服务后,我在NutchHome中运行如下命令sudo-Ebin/crawlurls/seed.txtTestCrawlhttp://localhost:8983/solr/2我遇到了这些错误:InjectingseedURLs/usr/local/apache-nutch-2.3.1/runtime/local/bin/nutchinjecturls/seed.txt-crawlIdTestCrawlInjectorJob:startingat2016-05-2615:41:14InjectorJob:Injec

hadoop - Apache Flume spoolDirectory 配置失败

我正在使用以下代码将源目录中的文件写入hdfs。#Initializeagent'ssource,channelandsinkagent.sources=testagent.channels=memoryChannelagent.sinks=flumeHDFS#Settingthesourcetospooldirectorywherethefileexistsagent.sources.test.type=spooldiragent.sources.test.spoolDir=/Data#Settingthechanneltomemoryagent.channels.memoryCha

scala - 使用 Spark Streaming 读取 fileStream

我在HDFS上有一个目录,其中每10分钟复制一个文件(现有文件被覆盖)。我想使用Spark流(1.6.0)读取文件的内容,并将其用作引用数据以将其加入其他流。我将“记住窗口”spark.streaming.fileStream.minRememberDuration设置为“600s”并设置newFilesOnly到false,因为当我启动应用程序时,我不想从已经存在的HDFS中获取初始数据。valssc=newStreamingContext(sparkConf,Seconds(2))defdefaultFilter(path:Path):Boolean=!path.getName()

scala - Spark - 按输出 (RDD) 从组中删除 CompactBuffer

问题陈述RDD分组后需要格式化Spark输出(移除CompactBuffer)输入Header1^Header2A^4BA^11AB^7AC^6DFC^7DS期望的输出(A,(4B,11A))(B,(7A))(C,(6DF,7DS))我尝试了什么valrecords=sc.textFIle("/user/chronicles/test.txt").map(x=>{valy=x.split("\\^",-1)(y(0).trim(),y(1).trim())}).groupBy(x=>x._1)records.foreach(println)输出(A,CompactBuffer((4B,

scala - Spark Streaming textFileStream 复制

我正在尝试监视HDFS中的存储库以读取和处理复制到它的文件中的数据(将文件从本地系统复制到HDFS我使用hdfsdfs-put),有时它会产生问题:SparkStreaming:java.io.FileNotFoundException:Filedoesnotexist:.COPYING所以我阅读了论坛中的问题和此处的问题SparkStreaming:java.io.FileNotFoundException:Filedoesnotexist:._COPYING_根据我读到的内容,问题与Spark流式传输在文件完成复制到HDFS和Github之前读取文件有关:https://githu