草庐IT

PySpark3

全部标签

python - Pyspark:洗牌 RDD

我试图随机化RDD中元素的顺序。我目前的方法是使用由打乱后的整数组成的RDD压缩元素,然后通过这些整数进行连接。但是,pyspark仅使用100000000个整数就失败了。我正在使用下面的代码。我的问题是:是否有更好的方法来压缩随机索引或以其他方式随机播放?我试过按随机键排序,虽然有效,但速度很慢。defrandom_indices(n):"""returnaniterableofrandomindicesinrange(0,n)"""indices=range(n)random.shuffle(indices)returnindicespyspark中发生以下情况:UsingPyth

hadoop - 使用 pyspark,在 h​​adoop 文件系统上读/写 2D 图像

我希望能够在hdfs文件系统上读取/写入图像并利用hdfs局部性。我有一个图像集合,其中每个图像由uint16的二维数组存储为xml文件的基本附加信息。我想在hdfs文件系统上创建一个存档,并使用spark来分析存档。现在我正在努力寻找通过hdfs文件系统存储数据的最佳方式,以便能够充分利用spark+hdfs结构。据我所知,最好的方法是创建一个sequenceFile包装器。我有两个问题:创建sequenceFile包装器是最好的方法吗?有没有人提供我可以用来开始的例子?我一定不是第一个需要通过spark读取与hdfs上的文本文件不同的内容的人! 最佳答案

hadoop - pyspark : how to check if a file exists in hdfs

我想在通过SparkContext加载它们之前检查hdfs中是否存在多个文件。我使用pyspark。我试过os.system("hadoopfs-test-e%s"%path)但是由于我有很多路径要检查,所以作业崩溃了。我还尝试了sc.wholeTextFiles(parent_path)然后按键过滤。但它也崩溃了,因为parent_path包含很多子路径和文件。你可以帮帮我吗? 最佳答案 正确的说法TristanReid:...(Spark)Itcanreadmanyformats,anditsupportsHadoopglobe

hadoop - Pyspark:获取 HDFS 路径上的文件/目录列表

如题。我知道textFile但正如其名称所示,它仅适用于文本文件。我需要访问HDFS或本地路径上路径内的文件/目录。我正在使用pyspark。 最佳答案 使用JVM网关可能不是那么优雅,但在某些情况下,下面的代码可能会有所帮助:URI=sc._gateway.jvm.java.net.URIPath=sc._gateway.jvm.org.apache.hadoop.fs.PathFileSystem=sc._gateway.jvm.org.apache.hadoop.fs.FileSystemConfiguration=sc._g

apache-spark - Redis 和 Pyspark 配置

我们有一个带有sparkmaster和3个sparkworker的EC2测试虚拟机,需要做哪些配置才能让Redis与PySpark一起工作?谢谢。 最佳答案 1)制作Redis模块的zip文件2)使用PySpark的addPyFile如下sc.addPyFile("/path/to/redis.zip")引用:WritedatatoRedisfromPySpark 关于apache-spark-Redis和Pyspark配置,我们在StackOverflow上找到一个类似的问题:

python - 如何从 PySpark DStream 写入 Redis?

我正在使用PySpark2.3.1从Kafka读取值流作为DStream。我想对这些数据做一些转换,比如取移动平均,并将其保存到Redis。我的spark作业代码看起来有点像这样:batch_duration=1#Initializesessionspark_session=SparkSession\.builder\.appName("my-app")\.getOrCreate()spark_context=spark_session.sparkContext#Createstreamingcontext(=connectiontoSpark)streaming_context=St

python - 名称错误 : name 'redis' is not defined - PySpark - Redis

我在pyspark中使用addPyFile方法加载redis.zip文件。我可以使用加载文件sc.addPyFile("/home/path/to/redis.zip")但是在使用./pyspark运行代码时,它显示错误:NameError:name'redis'isnotdefinedzip(redis.zip)包含.py文件(client.py,connection.py、exceptions.py、lock.py、utils.py等)。Python版本是-3.5,spark是2.7 最佳答案 如果您将py文件打包成zip并使用

mongodb - pyspark-mongodb 集合读取命令不会执行

我安装了以下版本:-Spark2.1.0,斯卡拉2.11.6,mongoDB3.2.17我尝试使用以下命令启动pysparkshell./bin/pyspark--packagesorg.mongodb.spark:mongo-spark-connector_2.11:2.2.0在此之后我开始了sparksession如下frompyspark.sqlimportSparkSessionmy_spark=SparkSession.builder.appName("myApp").config("spark.mongodb.input.uri","mongodb://127.0.0.1/

python - 如何将其他参数传递给pyspark中用户定义的方法以进行过滤方法?

我在python中使用spark并且我有一个过滤器约束如下:my_rdd.filter(my_func)其中my_func是我编写的一种方法,用于根据我自己的逻辑过滤rdd项。我定义了my_func如下:defmy_func(my_item):{...}现在,除了进入它的项目之外,我想将另一个单独的参数传递给my_func。我怎样才能做到这一点?我知道my_item将引用来自my_rdd的一项,我如何将自己的参数(比如说my_param)作为附加参数传递给my_func? 最佳答案 使用以下lambda语法并使用额外参数修改您的my

python - 作为 PySpark 的 reduceByKey 键的列表

我正在尝试对格式为(([a,b,c],1),([a,b,c],1),([a,d,b,e],1),...似乎pyspark不会接受数组作为普通键中的键,通过简单地应用.reduceByKey(add)来减少值。我已经尝试通过.map((x,y):(str(x),y))首先将数组转换为字符串,但这不起作用,因为后处理将字符串返回数组太慢了。有没有办法让pyspark使用数组作为键或使用另一个函数快速将字符串转换回数组?这是相关的错误代码File"/home/jan/Documents/spark-1.4.0/python/lib/pyspark.zip/pyspark/shuffle.py