草庐IT

spark-submit

全部标签

Spark算子-Scala版本 头歌答案

Spark算子--Scala版本第1关Spark算子--Scala版本编程要求根据提示,在右侧编辑器begin-end处补充代码,输出每个元素及其长度并去重。测试说明平台会对你编写的代码进行测试:预期输出:(an,2)``(dog,3)``(cat,3)开始你的任务吧,祝你成功!​importorg.apache.spark.rdd.RDDimportorg.apache.spark.{SparkConf,SparkContext}​objectEduCoder1{ defmain(args:Array[String]):Unit={ valconf=newSparkConf().setApp

python - 将 Pandas DataFrame 转换为 Spark DataFrame

我之前问过一个关于如何Convertscipysparsematrixtopyspark.sql.dataframe.DataFrame的问题,并在阅读提供的答案以及thisarticle后取得了一些进展.我最终找到了以下用于将scipy.sparse.csc_matrix转换为pandas数据帧的代码:df=pd.DataFrame(csc_mat.todense()).to_sparse(fill_value=0)df.columns=header然后我尝试使用建议的语法将pandas数据帧转换为spark数据帧:spark_df=sqlContext.createDataFram

python - 在 Spark ML 中创建自定义交叉验证

我不熟悉Spark和PySpark数据框以及机器学习。如何为ML库创建自定义交叉验证。例如,我想改变训练折叠的形成方式,例如分层拆分。这是我当前的代码numFolds=10predictions=[]lr=LogisticRegression()\.setFeaturesCol("features")\.setLabelCol('label')#GridsearchonLRmodellrparamGrid=ParamGridBuilder()\.addGrid(lr.regParam,[0.01,0.1,0.5,1.0,2.0])\.addGrid(lr.elasticNetParam

python - 使用 spark(PySpark) 进行多处理

这个问题在这里已经有了答案:HowtorunindependenttransformationsinparallelusingPySpark?(1个回答)关闭5年前。用例如下:我有一个大数据框,其中有一个“user_id”列(每个user_id可以出现在很多行中)。我有一个用户列表my_users我需要对其进行分析。Groupby、filter和aggregate可能是个好主意,但pyspark中包含的可用聚合函数不符合我的需要。在pysparkver中,userdefinedaggregationfunctions仍然没有得到完全支持,我决定暂时保留它..相反,我只是迭代my_use

python - 重命名 spark 数据框中的嵌套字段

在Spark中有一个数据框df:|--array_field:array(nullable=true)||--element:struct(containsNull=true)|||--a:string(nullable=true)|||--b:long(nullable=true)|||--c:long(nullable=true)如何将字段array_field.a重命名为array_field.a_renamed?[更新]:.withColumnRenamed()不适用于嵌套字段,所以我尝试了这个hacky和不安全的方法:#Firstaltertheschema:schema=d

python - 从 Flask 应用访问 Spark

我编写了一个简单的Flask应用程序来将一些数据传递给Spark。该脚本在IPythonNotebook中有效,但当我尝试在它自己的服务器中运行它时却无效。我不认为Spark上下文在脚本中运行。如何让Spark在以下示例中工作?fromflaskimportFlask,requestfrompysparkimportSparkConf,SparkContextapp=Flask(__name__)conf=SparkConf()conf.setMaster("local")conf.setAppName("SparkContext1")conf.set("spark.executor.

python - Python 包中用于 Spark 数据帧的 udf() 的函数

对于通过pyspark的Spark数据帧,我们可以使用pyspark.sql.functions.udf来创建一个用户定义函数(UDF)。我想知道我是否可以在udf()中使用Python包中的任何函数,例如numpy中的np.random.normal? 最佳答案 假设您想将名为new的列添加到通过重复调用numpy.random.normal构造的DataFramedf中,您可以这样做:importnumpyfrompyspark.sql.functionsimportUserDefinedFunctionfrompyspark.

python - Spark toDebugString 在 python 中不好用

这是我在scala中使用toDebugString时得到的结果:scala>vala=sc.parallelize(Array(1,2,3)).distincta:org.apache.spark.rdd.RDD[Int]=MappedRDD[3]atdistinctat:12scala>a.toDebugStringres0:String=(4)MappedRDD[3]atdistinctat:12|ShuffledRDD[2]atdistinctat:12+-(4)MappedRDD[1]atdistinctat:12|ParallelCollectionRDD[0]atparal

python - 如何在spark中按多个键分组?

我有一堆以复合键和值的形式存在的元组。例如,tfile.collect()=[(('id1','pd1','t1'),5.0),(('id2','pd2','t2'),6.0),(('id1','pd1','t2'),7.5),(('id1','pd1','t3'),8.1)]我想对这个集合执行类似于sql的操作,我可以在其中根据id[1..n]或pd[1..n]聚合信息。我想使用vanillapysparkapi来实现,而不是使用SQLContext。在我当前的实现中,我正在读取一堆文件并合并RDD。defreadfile():fr=range(6,23)tfile=sc.union

python - 作业完成后 spark-submit 继续挂起

我正在尝试在AWS中使用hdfs测试spark1.6。我正在使用示例文件夹中可用的wordcountpython示例。我使用spark-submit提交作业,作业成功完成,并且也在控制台上打印结果。Web用户界面还表示已完成。然而,Spark提交永远不会终止。我已经验证上下文在字数统计示例代码中也已停止。有什么问题吗?这是我在控制台上看到的。6-05-2414:58:04,749INFO[Thread-3]handler.ContextHandler(ContextHandler.java:doStop(843))-stoppedo.s.j.s.ServletContextHandle