草庐IT

hadoop - 如何从 pyspark rdd 或分区确定原始 s3 输入文件名

我正在使用pyspark流式处理来自S3的ETL输入文件。我需要能够建立所有原始输入文件的审计线索在s3://上,我的Parquet输出在hdfs://上结束。给定一个dstream、rdd,甚至是一个特定的rdd分区,是否有可能确定s3中输入数据的原始文件名?目前我知道的唯一方法是采取rdd.toDebugString()并尝试解析它。然而,这感觉真的很hacky并且不在某些情况下工作。例如,解析调试输出对我的批处理模式导入不起作用我也在做(使用sc.TextFile("s3://...foo/*")样式的glob)。有没有人有确定原始文件名的明智方法?似乎其他一些spark用户过去

python - 安排 pyspark 笔记本

我有一个ipython笔记本,其中包含集群上的一些PySpark代码。目前我们正在使用oozie通过HUE在Hadoop上运行这些笔记本。该设置感觉不太理想,我们想知道是否有其他选择。我们先将.ipynb文件转换成.py文件,并移动到hdfs中。除了这个文件,我们还创建了一个调用python文件的.sh文件。内容类似于:#!/bin/shset-e[-r/usr/local/virtualenv/pyspark/bin/activate]&&source/usr/local/virtualenv/pyspark/bin/activatespark-submit--masteryarn-

python - 提交 PySpark 应用以在集群模式下在 YARN 上产生 Spark

我正在尝试测试为我工作的团队构建的大数据平台。它在YARN上运行spark。是否可以创建PySpark应用程序并在YARN集群上提交它们?我能够成功提交示例SparkPijar文件,它在YARNstdout日志中返回输出。这是我要测试的PySpark代码;frompysparkimportSparkConffrompysparkimportSparkContextHDFS_MASTER='hadoop-master'conf=SparkConf()conf.setMaster('yarn')conf.setAppName('spark-test')sc=SparkContext(con

amazon-web-services - Jupyter notebook、pyspark、hadoop-aws 问题

我正在尝试结合使用Jupyter、PySpark和S3文件(通过s3a协议(protocol))。我需要org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider,因为我们需要使用s3sessiontoken。这是添加到hadoop-aws2.8.3+。我正在尝试以下代码:importosfrompyspark.sqlimportSparkSessionos.environ['PYSPARK_SUBMIT_ARGS']='--packagesorg.apache.hadoop:hadoop-aws:3.0.0pyspark-shel

docker - Hadoop Spark docker swarm 其中 pyspark 给出 BlockMissingException 但文件很好

基于https://github.com/gotthardsen/docker-hadoop-spark-workbench/tree/master/swarm我有一个带有hadoop、spark、hue和jupyternotebook设置的dockerswarm设置。我使用Hue将文件上传到hdfs,从hue或名称节点上的hdfs下载或查看文件没有问题。没有丢失block,文件检查表明一切正常。但是当我尝试在jupyter中使用pyspark访问它时,我得到:org.apache.hadoop.hdfs.BlockMissingException:Couldnotobtainbloc

apache-spark - PySpark:使用具有 1000 个字段但具有可变列数的行的模式创建 RDD->DF->Parquet

我正在尝试读取一个ElasticSearch索引,它有数百万个文档,每个文档都有可变数量的字段。我有一个模式,其中有1000个字段,每个字段都有自己的名称和类型。现在,当我通过ES-Hadoop连接器创建一个RDD并稍后通过指定模式转换为一个DataFrame时,它没有说-Inputrowdoesn'thaveexpectednumberofvaluesrequiredbytheschema我有几个问题。1.是否有可能有一个RDD/DF的行包含可变数量的字段?如果不是,除了为每列中缺失的字段添加空值外,还有什么替代方法?我看到默认情况下Spark将所有内容转换为StringType,因

python - PySpark(Python 2.7): How to flatten values after reduce

我正在使用带有自定义分隔符的SparkContext.newAPIHadoopFile读取多行记录文件。反正我已经准备好了,减少了我的数据。但是现在我想再次将key添加到每一行(条目),然后将其写入ApacheParquet文件,然后将其存储到HDFS中。这个图应该可以解释我的问题。我正在寻找的是红色箭头,例如写入文件前的最后一次转换。任何的想法?我尝试了flatMap,但时间戳和浮点值导致了不同的记录。Python脚本可以是downloadedhere和样本textfilehere.我在JupyterNotebook中使用Python代码。 最佳答案

python - PySpark (Python) : loading multiline records via SparkContext. newAPIHadoopFile

我正在加载一个文本文件,该文件采用TSV(表格分隔值)表示法,但每行中都没有键。因此,一行表示一个特定变量,随后的所有行都是该变量的值,直到出现新变量。因此我使用自定义分隔符加载文件(在JupyterNotebookPython2.7-Pyspark中):sheet=sc.newAPIHadoopFile('sample.txt','org.apache.hadoop.mapreduce.lib.input.TextInputFormat','org.apache.hadoop.io.LongWritable','org.apache.hadoop.io.Text',conf={'te

python - RDD 只有第一列值 : Hbase, PySpark

我们正在使用以下命令使用Pyspark读取Hbase表。frompyspark.sql.typesimport*host=port=keyConv="org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"valueConv="org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"cmdata_conf={"hbase.zookeeper.property.clientPort":port

hadoop - 如何将 PySpark worker 中的 numpy 数组保存到 HDFS 或共享文件系统?

我想在PySpark中高效地将numpy数组从工作机器(函数)保存到HDFS或从工作机器(函数)读取numpy数组。我有两台机器A和B。A有master和worker。B有一名worker。例如我想实现如下目标:if__name__=="__main__":conf=SparkConf().setMaster("local").setAppName("Test")sc=SparkContext(conf=conf)sc.parallelize([0,1,2,3],2).foreachPartition(func)deffunc(iterator):P=>forxiniterator:P