草庐IT

PySpark3

全部标签

python - 在 PySpark 中爆炸

我想从包含单词列表的DataFrame转换为每个单词在其自己的行中的DataFrame。如何在DataFrame中的列上展开?这是一个示例,其中包含我的一些尝试,您可以取消注释每个代码行并获取以下注释中列出的错误。我在Python2.7和Spark1.6.1中使用PySpark。frompyspark.sql.functionsimportsplit,explodeDF=sqlContext.createDataFrame([('cat\n\nelephantrat\nratcat',)],['word'])print'Dataset:'DF.show()print'\n\nTryin

apache-spark - 如何在 PySpark 中运行脚本

我正在尝试在pyspark环境中运行一个脚本,但到目前为止我还不能。如何在pyspark中运行像pythonscript.py这样的脚本? 最佳答案 你可以这样做:./bin/spark-submitmypythonfile.py从Spark2.0开始不支持通过pyspark运行python应用程序。 关于apache-spark-如何在PySpark中运行脚本,我们在StackOverflow上找到一个类似的问题: https://stackoverflow

python - Pyspark:解析一列 json 字符串

我有一个由一列组成的pyspark数据框,称为json,其中每一行都是json的unicode字符串。我想解析每一行并返回一个新的数据框,其中每一行都是解析后的json。#SampleDataFramejstr1=u'{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}'jstr2=u'{"header":{"id":12346,"foo":"baz"},"

python - 如何添加第三方 Java JAR 文件以在 PySpark 中使用

我有一些Java中的第三方数据库客户端库。我想通过java_gateway.py例如:使客户端类(不是JDBC驱动程序!)通过Java网关对Python客户端可用:java_import(gateway.jvm,"org.mydatabase.MyDBClient")不清楚将第三方库添加到JVM类路径的位置。我试图添加到文件compute-classpath.sh,但这似乎不起作用。我明白了:Py4jError:Tryingtocallapackage此外,与Hive相比:hiveJAR文件未通过文件compute-classpath.sh加载,这让我很怀疑。似乎还有一些其他机制正在设

python - 在 PySpark 中编码和组装多个功能

我有一个Python类,用于在Spark中加载和处理一些数据。在我需要做的各种事情中,我正在生成一个从Spark数据帧中的各个列派生的虚拟变量列表。我的问题是我不确定如何正确定义用户定义函数来完成我所需要的。我确实目前有一个方法,当映射到底层数据帧RDD时,解决了一半的问题(请记住,这是一个更大的data_processor类中的方法):defbuild_feature_arr(self,table):#thisdicthaskeysforallthecolumnsforwhichIneeddummycodingcategories={'gender':['1','2'],..}#th

python - 取消持久化(pyspark)中的所有数据帧

我是一个spark应用程序,有几个点我想保持当前状态。这通常是在一大步之后,或者缓存我想多次使用的状态。看来,当我第二次在我的数据帧上调用缓存时,一个新副本被缓存到内存中。在我的应用程序中,这会在扩展时导致内存问题。即使在我当前的测试中,给定的数据帧最大约为100MB,但中间结果的累积大小会超出执行程序上分配的内存。请参阅下面的一个小示例来显示此行为。cache_test.py:frompysparkimportSparkContext,HiveContextspark_context=SparkContext(appName='cache_test')hive_context=Hiv

python - 在 PySpark 数据框中添加列总和作为新列

我正在使用PySpark,并且我有一个带有一堆数字列的Spark数据框。我想添加一列,它是所有其他列的总和。假设我的数据框有“a”、“b”和“c”列。我知道我可以做到:df.withColumn('total_col',df.a+df.b+df.c)问题是我不想单独输入每一列并添加它们,尤其是当我有很多列时。我希望能够自动执行此操作,或者通过指定要添加的列名列表来执行此操作。还有其他方法吗? 最佳答案 这并不明显。我没有看到sparkDataframesAPI中定义的列的基于行的总和。版本2这可以通过一种相当简单的方式完成:newd

python - 在 pyspark 中找不到 col 函数

在pyspark1.6.2中,我可以通过导入col函数frompyspark.sql.functionsimportcol但是当我尝试在Githubsourcecode中查找时我在functions.py文件中找不到col函数,python如何导入不存在的函数? 最佳答案 它存在。它只是没有明确定义。从pyspark.sql.functions导出的函数是JVM代码的精简包装器,除了少数需要特殊处理的异常(exception)情况外,它们是使用辅助方法自动生成的。如果你仔细检查出处you'llfindcollistedamongot

python - 如何将向量拆分为列 - 使用 PySpark

这个问题在这里已经有了答案:HowtoaccesselementofaVectorUDTcolumninaSparkDataFrame?(5个回答)关闭3个月前。上下文:我有一个DataFrame有2列:单词和向量。其中“vector”的列类型为VectorUDT。一个例子:word|vectorassert|[435,323,324,212...]我想得到这个:word|v1|v2|v3|v4|v5|v6......assert|435|5435|698|356|....问题:如何使用PySpark将包含向量的列拆分为每个维度的多个列?提前致谢 最佳答案

python - 计算 PySpark 中 Spark 数据帧每列中非 NaN 条目的数量

我在Hive中加载了一个非常大的数据集(大约190万行和1450列)。我需要确定每列的“覆盖率”,即每列具有非NaN值的行的比例。这是我的代码:frompysparkimportSparkContextfrompyspark.sqlimportHiveContextimportstringasstringsc=SparkContext(appName="compute_coverages")##CreatethecontextsqlContext=HiveContext(sc)df=sqlContext.sql("select*fromdata_table")nrows_tot=df.