草庐IT

pyspark-dataframes

全部标签

PySpark任务提交spark-submit参数设置一文详解

目录前言一、PySpark集群运行原理二、spark-submit参数详解1.指定运行目录2.--deploy-mode 3.--master 4.驱动程序和执行器资源 5.--files和--verbose 6.Spark提交配置三.PySpark程序提交配置选项1.构建一套虚拟环境2.模块依赖问题原因参阅前言之前我们已经进行了pyspark环境的搭建以及经过jupyternotebook进行过开发以及实现了一系列的函数功能.PySpark数据分析基础:Spark本地环境部署搭建 但是一般我们跑spark都是在集群上面跑,只有测试一般在本地上测试,而且每个公司配置的spark集群的端口和设置

linux - PySpark 速度 Ubuntu 与 Windows

我有一个PySpark示例作业,它是PageRank算法的一个版本。代码如下:from__future__importprint_functionfromoperatorimportaddimporttimeitfrompyspark.sqlimportSparkSession#Normalizealistofpairs(url,rank)to1defnormalize(ranks):norm=sum([rankforu,rankinranks])ranks=[(u,rank/norm)for(u,rank)inranks]returnsorted(ranks,key=lambdax:

python - 值错误 : arrays must all be same length - print dataframe to CSV

感谢您的光临!我希望得到一些帮助使用pandas数据框创建csv。这是我的代码:a=ldamallet[bow_corpus_new[:21]]b=data_text_newprint(a)print("/n")print(b)d={'PreprocessedDocument':b['PreprocessedDocument'].tolist(),'topic_0':a[0][1],'topic_1':a[1][1],'topic_2':a[2][1],'topic_3':a[3][1],'topic_4':a[4][1],'topic_5':a[5][1],'topic_6':a[6

hadoop - 使用 Apache Spark Streaming 和 Dataframes 交互式搜索 Parquet 存储的数据

我有大量数据作为Parquet文件存储在我的HadoopHDFS上我正在使用Spark流以交互方式接收来自Web服务器的查询,并将接收到的查询转换为SQL,以便使用SparkSQL在我的数据上运行。在此过程中,我需要运行多个SQL查询,然后通过合并或减去各个查询的结果来返回一些聚合结果。有没有什么方法可以优化和提高流程速度,例如,对已收到的数据帧而不是整个数据库运行查询?有没有更好的方式来交互查询Parquet存储的数据并给出结果?谢谢! 最佳答案 如果您在同一个RDD上运行多个查询,您可以通过在查询之前使用.cache()缓存RD

hadoop - pyspark:如何释放资源

我在IPythonNotebook中执行这段代码defsome():importpysparkconf=(pyspark.SparkConf().setMaster("yarn-client").setAppName("MyTest"))sc=pyspark.SparkContext(conf=conf)data=sc.textFile("/tmp/mytest/")printdata.count()some()我希望Spark在函数some()执行结束后释放资源(执行者和驱动程序应该退出)。然而它并没有发生。应用程序仅在我关闭我的笔记本时终止。谁能告诉我如何从我的脚本中终止pyspa

python - 如何在地理上过滤 PySpark 中的条目?

我有一个时间段内用户经纬度格式的位置数据集,我想使用GIS函数过滤条目。例如,查找多边形内的条目(来自GIS世界的ST_Contains)并使用ESRIgeodatabase文件添加一列,即用户入口所在的区。我在网上搜索并找到了Magellan,但Python支持是notworking此时。我还在EsriSpatial中找到了Hive对GIS功能的支持。,但没有找到关于如何在启动PySpark时加载正确的包或如何在PySparkshell中注册所需函数的文档:(ST_Polygon、ST_Contains等...)。我应该考虑其他替代方案吗?我正在使用Azure的HDInsight,所

hadoop - takeOrdered 在 Pyspark 中不工作以进行反向排序

当我尝试获取前3个元素时它工作正常,但按相反的顺序它不起作用sc.textFile("/user/sachinkerala6174/inData/movieStat").takeOrdered(3)上面的语句没问题当尝试下面的命令时出错sc.textFile("/user/sachinkerala6174/inData/movieStat").takeOrdered(3,key=lambdax:-x)输入数据196538812509491865389171774222118788871162441288060692316631886397596298448841828061152288

python - 如何在pyspark sql中保存一个表?

我想将生成的表格保存到csv、文本文件或类似文件中,以便能够使用RStudio执行可视化。我正在使用pyspark.sql在hadoop设置中执行一些查询。我想将我的结果保存在hadoop中,然后将结果复制到我的本地驱动器中。myTable=sqlContext.sql("SOMEQUERIES")myTable.show()#ShowmyresultmyTable.registerTempTable("myTable")#SaveastablemyTable.saveAsTextFile("SEARCHPATH")#Savingresultinmyhadoop这将返回:Attribu

python - 从日期到字符串的 Pyspark 类型转换问题

我正在使用pyspark2.1。以下是我的数据框内容expecteddays,date139,30.JUl.2017134,01.NOV.2018我的输出应该如下所示138,30.JUL.2017,最后一列的填充由我的以下模块dateRangeBetween和get_date负责下面是我的代码fromdatetimeimportdatetimefromdatetimeimporttimedeltaimportpandasaspdfromdatetimeimporttimedeltafrompyspark.sqlimportSparkSessionfrompysparkimportSpa

session - pyspark 按用户标识计算 session 持续时间组

我正在尝试使用pyspark计算事件中每个用户ID的session持续时间,数据示例如下:diff_session.show(8,False):|userid|platform|previousTime|currentTime|timeDifference||1234|13|null|2017-07-2010:49:30.027|null||1234|13|null|2017-07-2010:04:23.1|null||1234|13|2017-07-2010:04:23.1|2017-07-2010:06:23.897|120||1234|13|2017-07-2010:04:23.