草庐IT

python - 从 Flask 应用访问 Spark

我编写了一个简单的Flask应用程序来将一些数据传递给Spark。该脚本在IPythonNotebook中有效,但当我尝试在它自己的服务器中运行它时却无效。我不认为Spark上下文在脚本中运行。如何让Spark在以下示例中工作?fromflaskimportFlask,requestfrompysparkimportSparkConf,SparkContextapp=Flask(__name__)conf=SparkConf()conf.setMaster("local")conf.setAppName("SparkContext1")conf.set("spark.executor.

python - Python 包中用于 Spark 数据帧的 udf() 的函数

对于通过pyspark的Spark数据帧,我们可以使用pyspark.sql.functions.udf来创建一个用户定义函数(UDF)。我想知道我是否可以在udf()中使用Python包中的任何函数,例如numpy中的np.random.normal? 最佳答案 假设您想将名为new的列添加到通过重复调用numpy.random.normal构造的DataFramedf中,您可以这样做:importnumpyfrompyspark.sql.functionsimportUserDefinedFunctionfrompyspark.

python - Spark toDebugString 在 python 中不好用

这是我在scala中使用toDebugString时得到的结果:scala>vala=sc.parallelize(Array(1,2,3)).distincta:org.apache.spark.rdd.RDD[Int]=MappedRDD[3]atdistinctat:12scala>a.toDebugStringres0:String=(4)MappedRDD[3]atdistinctat:12|ShuffledRDD[2]atdistinctat:12+-(4)MappedRDD[1]atdistinctat:12|ParallelCollectionRDD[0]atparal

python - 如何在spark中按多个键分组?

我有一堆以复合键和值的形式存在的元组。例如,tfile.collect()=[(('id1','pd1','t1'),5.0),(('id2','pd2','t2'),6.0),(('id1','pd1','t2'),7.5),(('id1','pd1','t3'),8.1)]我想对这个集合执行类似于sql的操作,我可以在其中根据id[1..n]或pd[1..n]聚合信息。我想使用vanillapysparkapi来实现,而不是使用SQLContext。在我当前的实现中,我正在读取一堆文件并合并RDD。defreadfile():fr=range(6,23)tfile=sc.union

python - 作业完成后 spark-submit 继续挂起

我正在尝试在AWS中使用hdfs测试spark1.6。我正在使用示例文件夹中可用的wordcountpython示例。我使用spark-submit提交作业,作业成功完成,并且也在控制台上打印结果。Web用户界面还表示已完成。然而,Spark提交永远不会终止。我已经验证上下文在字数统计示例代码中也已停止。有什么问题吗?这是我在控制台上看到的。6-05-2414:58:04,749INFO[Thread-3]handler.ContextHandler(ContextHandler.java:doStop(843))-stoppedo.s.j.s.ServletContextHandle

python - Spark : More Efficient Aggregation to join strings from different rows

我目前正在处理DNA序列数据,但遇到了一些性能障碍。我有两个查找字典/散列(作为RDD),以DNA“单词”(短序列)作为键,索引位置列表作为值。一个用于较短的查询序列,另一个用于数据库序列。即使是非常非常大的序列,创建表的速度也非常快。下一步,我需要将它们配对并找到“命中”(每个常用词的索引位置对)。我首先加入查找词典,速度相当快。但是,我现在需要这些对,所以我必须进行两次平面映射,一次是从查询中扩展索引列表,第二次是从数据库中扩展索引列表。这并不理想,但我看不到另一种方法。至少它表现不错。此时的输出为:(query_index,(word_length,diagonal_offset

python - 如何将 Spark Streaming 数据转换为 Spark DataFrame

到目前为止,Spark还没有创建流式数据的DataFrame,但是我在做异常检测的时候,使用DataFrame进行数据分析更加方便快捷。我已经完成了这部分,但是当我尝试使用流数据进行实时异常检测时,问题出现了。试了好几种方法,仍然无法将DStream转为DataFrame,也无法将DStream内部的RDD转为DataFrame。这是我最新版本的代码的一部分:importsysimportrefrompysparkimportSparkContextfrompyspark.sql.contextimportSQLContextfrompyspark.sqlimportRowfrompy

python - Python 中导入的 Spark 问题

我们正在python脚本上运行一个spark-submit命令,该脚本使用Spark在Python中使用Caffe并行进行对象检测。如果在纯Python脚本中运行,脚本本身运行得非常好,但在与Spark代码一起使用时会返回导入错误。我知道spark代码不是问题,因为它在我的家用机器上运行良好,但在AWS上运行不佳。我不确定这是否与环境变量有关,就好像它没有检测到它们一样。设置了这些环境变量:SPARK_HOME=/opt/spark/spark-2.0.0-bin-hadoop2.7PATH=$SPARK_HOME/bin:$PATHPYTHONPATH=$SPARK_HOME/pyt

python - 为什么我的 Spark 比纯 Python 运行得慢?性能比较

在这里激发新手。我尝试使用Spark对我的数据框执行一些pandas操作,令人惊讶的是它比纯Python慢(即在Python中使用pandas包)。这是我所做的:1)在Spark中:train_df.filter(train_df.gender=='-unknown-').count()返回结果大约需要30秒。但是使用Python大约需要1秒。2)在Spark中:sqlContext.sql("SELECTgender,count(*)FROMtrainGROUPBYgender").show()同样的事情,在Spark中大约需要30秒,在Python中需要1秒。我的Spark比纯Py

python - SparkSession 初始化错误 - 无法使用 spark.read

我尝试创建一个独立的PySpark程序来读取csv并将其存储在配置单元表中。我在配置Sparksession、session和上下文对象时遇到问题。这是我的代码:frompysparkimportSparkConf,SparkContextfrompyspark.sqlimportSQLContext,SparkSessionfrompyspark.sql.typesimport*conf=SparkConf().setAppName("test_import")sc=SparkContext(conf=conf)sqlContext=SQLContext(sc)spark=Spark