草庐IT

pyspark-dataframes

全部标签

java - 在 Spark java 中将 JavaRDD 转换为 DataFrame

我正在尝试处理日志文件。首先,我读取日志文件并根据我的要求拆分这些文件,并将每一列保存到单独的JavaRDD中。现在我需要将这些JavaRDD转换为DataFrames以供将来操作。这是我到目前为止尝试过的代码:SparkConfconf=newSparkConf().setAppName("AuctionBid").setMaster("local");JavaSparkContextsc=newJavaSparkContext(conf);JavaRDDdiskfile=sc.textFile("/Users/karuturi/Downloads/log.txt");JavaRDD

hadoop - Pyspark:获取 HDFS 路径上的文件/目录列表

如题。我知道textFile但正如其名称所示,它仅适用于文本文件。我需要访问HDFS或本地路径上路径内的文件/目录。我正在使用pyspark。 最佳答案 使用JVM网关可能不是那么优雅,但在某些情况下,下面的代码可能会有所帮助:URI=sc._gateway.jvm.java.net.URIPath=sc._gateway.jvm.org.apache.hadoop.fs.PathFileSystem=sc._gateway.jvm.org.apache.hadoop.fs.FileSystemConfiguration=sc._g

scala - Spark - 将 CSV 文件加载为 DataFrame?

我想在spark中读取CSV并将其转换为DataFrame并使用df.registerTempTable("table_name")将其存储在HDFS中我试过:scala>valdf=sqlContext.load("hdfs:///csv/file/dir/file.csv")我得到的错误:java.lang.RuntimeException:hdfs:///csv/file/dir/file.csvisnotaParquetfile.expectedmagicnumberattail[80,65,82,49]butfound[49,59,54,10]atparquet.hadoop

Spark中RDD、DataFrame和DataSet的区别与联系

一、RDD、DataFrame和DataSet的定义        在开始SparkRDD与DataFrame与Dataset之间的比较之前,先让我们看一下Spark中的RDD,DataFrame和Datasets的定义:SparkRDD:RDD代表弹性分布式数据集。它是记录的只读分区集合。RDD是Spark的基本数据结构。它允许程序员以容错方式在大型集群上执行内存计算。SparkDataframe:与RDD不同,数据以列的形式组织起来,类似于关系数据库中的表。它是一个不可变的分布式数据集合。Spark中的DataFrame允许开发人员将数据结构(类型)加到分布式数据集合上,从而实现更高级别的

apache-spark - Redis 和 Pyspark 配置

我们有一个带有sparkmaster和3个sparkworker的EC2测试虚拟机,需要做哪些配置才能让Redis与PySpark一起工作?谢谢。 最佳答案 1)制作Redis模块的zip文件2)使用PySpark的addPyFile如下sc.addPyFile("/path/to/redis.zip")引用:WritedatatoRedisfromPySpark 关于apache-spark-Redis和Pyspark配置,我们在StackOverflow上找到一个类似的问题:

python - 如何从 PySpark DStream 写入 Redis?

我正在使用PySpark2.3.1从Kafka读取值流作为DStream。我想对这些数据做一些转换,比如取移动平均,并将其保存到Redis。我的spark作业代码看起来有点像这样:batch_duration=1#Initializesessionspark_session=SparkSession\.builder\.appName("my-app")\.getOrCreate()spark_context=spark_session.sparkContext#Createstreamingcontext(=connectiontoSpark)streaming_context=St

python - 名称错误 : name 'redis' is not defined - PySpark - Redis

我在pyspark中使用addPyFile方法加载redis.zip文件。我可以使用加载文件sc.addPyFile("/home/path/to/redis.zip")但是在使用./pyspark运行代码时,它显示错误:NameError:name'redis'isnotdefinedzip(redis.zip)包含.py文件(client.py,connection.py、exceptions.py、lock.py、utils.py等)。Python版本是-3.5,spark是2.7 最佳答案 如果您将py文件打包成zip并使用

mongodb - pyspark-mongodb 集合读取命令不会执行

我安装了以下版本:-Spark2.1.0,斯卡拉2.11.6,mongoDB3.2.17我尝试使用以下命令启动pysparkshell./bin/pyspark--packagesorg.mongodb.spark:mongo-spark-connector_2.11:2.2.0在此之后我开始了sparksession如下frompyspark.sqlimportSparkSessionmy_spark=SparkSession.builder.appName("myApp").config("spark.mongodb.input.uri","mongodb://127.0.0.1/

python - 如何将sql查询读取到pandas dataframe/python/django

我在下面的views.py中使用它来获取应用fromdjango.dbimportconnectiondeftest(request):cursor=connection.cursor()sql="""SELECTx,nfromtable1limit10"""cursor.execute(sql)rows=cursor.fetchall()#df1=pd.read_sql_query(sql,cursor)notworking)#df1.columns=cursor.keys()notworking)returnrender(request,'app/test.html',{"row"

mysql - Spark DataFrame InsertIntoJDBC - TableAlreadyExists 异常

使用Spark1.4.0,我尝试使用insertIntoJdbc()将数据从SparkDataFrame插入到MemSQL数据库中(这应该与与MySQL数据库交互完全一样)。但是,我不断收到运行时TableAlreadyExists异常。首先,我像这样创建MemSQL表:CREATETABLEIFNOTEXISTStable1(idINTAUTO_INCREMENTPRIMARYKEY,valINT);然后我在Spark中创建一个简单的数据框并尝试像这样插入到MemSQL中:valdf=sc.parallelize(Array(123,234)).toDF.toDF("val")//d