草庐IT

hadoop - 使用 Hive on spark 将数据从 gzip 格式转换为 sequenceFile 格式

I'mtryingtoreadalargegzipfileintohivethroughsparkruntimetoconvertintoSequenceFileformat而且,我想高效地做到这一点。据我所知,Spark只支持每个gzip文件一个映射器,就像它对文本文件一样。有没有办法改变正在读取的gzip文件的映射器数量?还是我应该选择另一种格式,如Parquet?我现在卡住了。问题是我的日志文件是类似json的数据,保存为txt格式,然后进行gzip压缩,所以为了阅读,我使用了org.apache.spark.sql.json。我看到的示例显示-将数据转换为SequenceFil

hadoop - 无法启动 apache.spark.master

每当我在本地机器上运行start-master.sh命令时,我都会收到以下错误,请有人帮我解决这个问题终端错误终端报错startingorg.apache.spark.deploy.master.Master,loggingto/usr/local/spark-2.0.1-bin-hadoop2.6/logs/spark-andani-org.apache.spark.deploy.master.Master-1-andani.sakha.com.outfailedtolaunchorg.apache.spark.deploy.master.Master:atio.netty.util

python - 使用 pyspark 从 Hadoop 中删除文件(查询)

我正在使用Hadoop来存储我的数据-对于某些数据,我正在使用分区,对于某些数据,我没有。我使用pysparkDataFrame类以parquet格式保存数据,如下所示:df=sql_context.read.parquet('/some_path')df.write.mode("append").parquet(parquet_path)我想用pyspark编写一个删除旧数据的脚本,使用类似的方式(我需要在数据框上过滤查询这个旧数据)。我没有在pyspark文档中找到任何内容...有什么办法可以实现吗? 最佳答案 Pyspark主

hadoop - Spark(2.3) 无法识别通过 Hive Alter Table 命令添加的 Parquet 表中的新列

我有一个使用Spark2.3APIdf.saveAstable创建的HiveParquet表。有一个单独的Hive进程可以更改同一个Parquet表以添加列(根据要求)。但是,下次当我尝试将同一个parquet表读入Spark数据帧时,使用HiveAlterTable命令添加到parquet表的新列不会显示在df.printSchema输出中。根据初步分析,似乎可能存在一些冲突,Spark使用自己的模式而不是读取Hive元存储。因此,我尝试了以下选项:更改Spark设置:spark.sql.hive.convertMetastoreParquet=false并刷新spark目录:spa

python - 无法将收集的 RDD 保存到驱动程序的本地文件系统

我试图在调用collect()之后保存一个RDD。我在Host-1上调用spark-submit(我假设Driver是我从中调用spark-submit脚本的主机,所以在这种情况下Host-1是驱动程序),从HBase获取一些数据,在其上运行一些操作然后在RDD上调用collect()并迭代收集的列表并将其保存到本地文件系统文件。本质上:if__name__=="__main__":sc=SparkContext(appName="HBaseInputFormat")#readthedatafromhbase#...#...output=new_rdd.collect()withope

python - 尝试启动 PySpark 时出现空指针异常

我正在使用以下命令启动pyspark./bin/pyspark--masteryarn--deploy-modeclient--executor-memory5g我得到以下错误15/10/1417:19:15INFOspark.SparkContext:SparkContextalreadystopped.Traceback(mostrecentcalllast):File"/opt/spark-1.5.1/python/pyspark/shell.py",line43,insc=SparkContext(pyFiles=add_files)File"/opt/spark-1.5.1/

hadoop - 在整个集群中使用 spark-submit 运行 Spark 作业

我最近在AmazonEMR上设置了一个Spark集群,其中有1个主节点和2个从节点。我可以运行pyspark,并使用spark-submit提交作业。但是,当我创建一个独立作业时,例如job.py,我创建了一个SparkContext,如下所示:sc=SparkContext("local","AppName")这看起来不对,但我不确定该放什么。当我提交作业时,我确定它没有使用整个集群。如果我想在我的整个集群上运行一个作业,比如每个从属4个进程,我必须做什么a.)作为参数传递给spark-submitb.)在脚本本身中作为参数传递给SparkContext()。

hadoop - S3NativeFileSystem 调用是否会在 AWS EMR 4.6.0 上杀死我的 Pyspark 应用程序

当我的Spark应用程序必须从S3访问大量CSV文件(每个~1000@63MB)并将它们通过管道传输到SparkRDD时,它失败了。拆分CSV的实际过程似乎可行,但对S3NativeFileSystem的额外函数调用似乎导致错误和作业崩溃。首先,以下是我的PySpark应用程序:frompysparkimportSparkContextsc=SparkContext("local","SimpleApp")frompyspark.sqlimportSQLContextsqlContext=SQLContext(sc)importtimestartTime=float(time.time

python - 验证字段值的有效方法 Spark

在将数据保存到hdfs之前,我需要验证数据框中的某些列。我想知道在pyspark1.5.2/python2.7中是否有一种优雅有效的方法来做到这一点例如,假设我有以下数据+-----+---+|a|b|+-----+---+|"foo"|123|+-----+---+我想确保列a的每个值不超过3个字符和列b是.我目前的想法是编写一个执行简单if/else的udf,并返回某个值,然后根据这些结果决定是否使作业失败。但是,对于大量数据,我担心它会很慢或者至少是非常繁重的处理。是否已经有一种完善的方法可以在spark中执行此操作?或者是否有任何流行的策略来做到这一点?我自己找不到关于这个主题

hadoop - Spark集群按顺序向文件中的每一行添加数字

我有一个文件,每行都包含名称,我想按顺序向每一行添加数字。例如,如果一个文件是这样的abcd我要它实现这个a,1b,2c,3d,4我写了这段代码来实现这个vallines=sc.textFile("data.txt")valpair=lines.zipWithIndex().map{case(i,line)=>i.toString+","+line}pair.collect()但是如您所知,Spark将其任务分布在不同的集群中。所以我不确定这是否有效。所以任何人都可以告诉我如何实现这一目标吗?提前致谢。 最佳答案 如果您将运行此代码