草庐IT

spark-ml

全部标签

python - Spark 程序在独立集群上运行时给出奇怪的结果

我有这个spark程序,我会尽量将它限制在相关部分#Splitbydelimiter,#Ifthefileisinunicode,weneedtoconverteachvaluetoafloatinordertobeableto#treatitasanumberpoints=sc.textFile(filename).map(lambdaline:[float(x)forxinline.split(",")]).persist()#startwithKrandomlyselectedpointsfromthedataset#Acentroidcannotbeanactualdatapo

python - 数据源用完时如何停止 Spark 流

我有一个SparkStreaming作业,每5秒从Kafka读取一次,对传入数据进行一些转换,然后写入文件系统。这真的不需要成为流式传输作业,实际上,我只想每天运行一次以将消息排入文件系统。不过,我不确定如何停止这项工作。如果我将超时传递给streamingContext.awaitTermination,它不会停止进程,它所做的只是导致进程在需要迭代流时产生错误(参见下面的错误)完成我想做的事情的最好方法是什么这是针对Python上的Spark1.6编辑:感谢@marios,解决方案是这样的:ssc.start()ssc.awaitTermination(10)ssc.stop()在

python - 当有更多机器可用时,Spark 只使用一台工作机器

我正在尝试通过Spark并行化机器学习预测任务。我之前在其他任务中成功使用过Spark多次,并且之前没有遇到过并行化问题。在这个特定任务中,我的集群有4个worker。我在具有4个分区的RDD上调用mapPartitions。map函数从磁盘加载一个模型(一个引导脚本分发执行此操作所需的一切;我已经验证它存在于每台从机上)并对RDD分区中的数据点执行预测。代码运行,但只使用一个执行器。其他执行者的日志显示“调用了关机Hook”。在不同的代码运行中,它使用不同的机器,但一次只使用一台。如何让Spark同时使用多台机器?我通过Zeppelinnotebook在AmazonEMR上使用PyS

python - 保留索引-字符串对应关系 spark 字符串索引器

Spark的StringIndexer非常有用,但通常需要检索生成的索引值与原始字符串之间的对应关系,并且似乎应该有一种内置的方法来完成此操作。我将使用Sparkdocumentation中的这个简单示例进行说明:frompyspark.ml.featureimportStringIndexerdf=sqlContext.createDataFrame([(0,"a"),(1,"b"),(2,"c"),(3,"a"),(4,"a"),(5,"c")],["id","category"])indexer=StringIndexer(inputCol="category",outputCo

python - 如何计算分组的 Spark 数据框中的 bool 值

我想计算分组的Spark数据框中的一列中有多少条记录为真,但我不知道如何在python中执行此操作。例如,我有一个包含region、salary和IsUnemployed列的数据,其中IsUnemployed作为bool值。我想看看每个地区有多少失业人员。我知道我们可以做一个filter然后groupby但我想同时生成两个聚合,如下所示frompyspark.sqlimportfunctionsasFdata.groupby("Region").agg(F.avg("Salary"),F.count("IsUnemployed")) 最佳答案

Spark中的Shuffle

  一、Spark Shuffle概述   大多数Spark作业的性能主要就是消耗在了shuffle环节,因为该环节包含了大量的磁盘IO、序列化、网络数据传输等操作。因此,如果要让作业的性能更上一层楼,就有必要对shuffle过程进行调优。但是也必须提醒大家的是,影响一个Spark作业性能的因素,主要还是代码开发、资源参数以及数据倾斜,shuffle调优只能在整个Spark的性能调优中占到一小部分而已。     在Spark的源码中,负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。   在Spark1.2以前,默认的shuffle计

python - 高斯混合模型 : Difference between Spark MLlib and scikit-learn

我正在尝试对数据集样本使用高斯混合模型。我同时使用了MLlib(与pyspark)和scikit-learn,得到了截然不同的结果,scikit-learn一个看起来更逼真。frompyspark.mllib.clusteringimportGaussianMixtureasSparkGaussianMixturefromsklearn.mixtureimportGaussianMixturefrompyspark.mllib.linalgimportVectorsScikit-learn:local=pd.DataFrame([x.asDict()forxindf.sample(0.

python - 在非 Spark 环境中加载 pyspark ML 模型

我对在python中部署机器学习模型很感兴趣,因此可以通过向服务器发出请求来进行预测。我将创建一个Cloudera集群并利用pyspark库利用Spark开发模型。我想知道如何保存模型以便在服务器上使用它。我已经看到不同的算法具有.save函数(就像在这篇文章HowtosaveandloadMLLibmodelinApacheSpark中回答的那样),但是由于服务器将在没有Spark的不同机器上而不是在Cloudera集群中,我不知道不知道是否可以使用他们的.load和.predict函数。是否可以通过使用pyspark库函数进行预测而不使用Spark?或者我是否必须进行任何转换才能保

python - 从 Azure ML 实验中访问 Azure blob 存储

AzureML实验提供了通过Reader和Writer模块将CSV文件读取和写入Azureblob存储的方法。但是,我需要将一个JSON文件写入blob存储。由于没有这样做的模块,我试图从ExecutePythonScript模块中这样做。#Importthenecessaryitemsfromazure.storage.blobimportBlobServicedefazureml_main(dataframe1=None,dataframe2=None):account_name='mystorageaccount'account_key='mykeyhere=='json_str

python - 从 Azure ML 实验中访问 Azure blob 存储

AzureML实验提供了通过Reader和Writer模块将CSV文件读取和写入Azureblob存储的方法。但是,我需要将一个JSON文件写入blob存储。由于没有这样做的模块,我试图从ExecutePythonScript模块中这样做。#Importthenecessaryitemsfromazure.storage.blobimportBlobServicedefazureml_main(dataframe1=None,dataframe2=None):account_name='mystorageaccount'account_key='mykeyhere=='json_str