草庐IT

python - 如何在 Debug模式下调用PySpark?

我已使用ApacheSpark1.4设置了IntelliJIDEA。我希望能够将调试点添加到我的SparkPython脚本中,以便可以轻松调试它们。我目前正在运行这段Python来初始化Spark过程proc=subprocess.Popen([SPARK_SUBMIT_PATH,scriptFile,inputFile],shell=SHELL_OUTPUT,stdout=subprocess.PIPE)ifVERBOSE:printproc.stdout.read()printproc.stderr.read()当spark-submit最终调用myFirstSparkScript

python - 在 Pyspark SQL 中,您需要在哪里使用 lit()?

我试图弄清楚您需要在哪里使用lit值,该值在文档中定义为literalcolumn。以这个udf为例,它返回一个SQL列数组的索引:deffind_index(column,index):returncolumn[index]如果我将一个整数传递给它,我会得到一个错误。我需要将lit(n)值传递到udf以获得数组的正确索引。有没有什么地方可以让我更好地学习何时使用lit以及可能使用col的硬性规则? 最佳答案 为了简单起见,您需要一个Column(可以是使用lit创建的,但它不是唯一的选择)当JVM对应对象需要一列并且Python包

python - 如何在 pyspark.sql.functions.when() 中使用多个条件?

我有一个包含几列的数据框。现在我想从其他2列派生一个新列:frompyspark.sqlimportfunctionsasFnew_df=df.withColumn("new_col",F.when(df["col-1"]>0.0&df["col-2"]>0.0,1).otherwise(0))这样我只得到一个异常(exception):py4j.Py4JException:Methodand([classjava.lang.Double])doesnotexist它只适用于这样的一个条件:new_df=df.withColumn("new_col",F.when(df["col-1"

python - 使用 pyspark 覆盖 Spark 输出

我正在尝试使用PySpark中的以下选项覆盖Spark数据帧,但我没有成功spark_df.write.format('com.databricks.spark.csv').option("header","true",mode='overwrite').save(self.output_file_path)mode=overwrite命令不成功 最佳答案 试试:spark_df.write.format('com.databricks.spark.csv')\.mode('overwrite').option("header","

python - 调用 map 后的pyspark EOFError

我是spark和pyspark的新手。我正在将一个小的csv文件(约40k)读入数据框。frompyspark.sqlimportfunctionsasFdf=sqlContext.read.format('com.databricks.spark.csv').options(header='true',inferschema='true').load('/tmp/sm.csv')df=df.withColumn('verified',F.when(df['verified']=='Y',1).otherwise(0))df2=df.map(lambdax:Row(label=floa

python - 如何使用 pyspark 在 Spark 2.0 中构建 sparkSession?

我刚刚获得了spark2.0的访问权限;到目前为止,我一直在使用spark1.6.1。有人可以帮我使用pyspark(python)设置sparkSession吗?我知道在线提供的scala示例是相似的(here),但我希望能直接使用python语言进行演练。我的具体情况:我在zeppelinspark笔记本中从S3加载avro文件。然后构建df并从中运行各种pyspark和sql查询。我所有的旧查询都使用sqlContext。我知道这是不好的做法,但我用开始我的笔记本sqlContext=SparkSession.builder.enableHiveSupport().getOrCr

python - 在 PySpark 中爆炸

我想从包含单词列表的DataFrame转换为每个单词在其自己的行中的DataFrame。如何在DataFrame中的列上展开?这是一个示例,其中包含我的一些尝试,您可以取消注释每个代码行并获取以下注释中列出的错误。我在Python2.7和Spark1.6.1中使用PySpark。frompyspark.sql.functionsimportsplit,explodeDF=sqlContext.createDataFrame([('cat\n\nelephantrat\nratcat',)],['word'])print'Dataset:'DF.show()print'\n\nTryin

apache-spark - 如何在 PySpark 中运行脚本

我正在尝试在pyspark环境中运行一个脚本,但到目前为止我还不能。如何在pyspark中运行像pythonscript.py这样的脚本? 最佳答案 你可以这样做:./bin/spark-submitmypythonfile.py从Spark2.0开始不支持通过pyspark运行python应用程序。 关于apache-spark-如何在PySpark中运行脚本,我们在StackOverflow上找到一个类似的问题: https://stackoverflow

python - Pyspark:解析一列 json 字符串

我有一个由一列组成的pyspark数据框,称为json,其中每一行都是json的unicode字符串。我想解析每一行并返回一个新的数据框,其中每一行都是解析后的json。#SampleDataFramejstr1=u'{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}'jstr2=u'{"header":{"id":12346,"foo":"baz"},"

python - 如何添加第三方 Java JAR 文件以在 PySpark 中使用

我有一些Java中的第三方数据库客户端库。我想通过java_gateway.py例如:使客户端类(不是JDBC驱动程序!)通过Java网关对Python客户端可用:java_import(gateway.jvm,"org.mydatabase.MyDBClient")不清楚将第三方库添加到JVM类路径的位置。我试图添加到文件compute-classpath.sh,但这似乎不起作用。我明白了:Py4jError:Tryingtocallapackage此外,与Hive相比:hiveJAR文件未通过文件compute-classpath.sh加载,这让我很怀疑。似乎还有一些其他机制正在设