草庐IT

PySpark3

全部标签

python - PySpark 将列中的空值替换为其他列中的值

我想用相邻列中的值替换一列中的空值,例如,如果我有A|B0,12,null3,null4,2我希望它是:A|B0,12,23,34,2尝试过df.na.fill(df.A,"B")但是没有用,它说值应该是一个float、整数、长整型、字符串或字典有什么想法吗? 最佳答案 我们可以使用coalescefrompyspark.sql.functionsimportcoalescedf.withColumn("B",coalesce(df.B,df.A)) 关于python-PySpark将列

python - 带有 HappyBase 连接池的 PySpark dataframe.foreach() 返回 'TypeError: can' t pickle thread.lock 对象'

我有一个PySpark作业可以更新HBase中的一些对象(Sparkv1.6.0;happybasev0.9)。如果我为每一行打开/关闭一个HBase连接,它会有点工作:defprocess_row(row):conn=happybase.Connection(host=[hbase_master])#updateHBaserecordwithdatafromrowconn.close()my_dataframe.foreach(process_row)几千次更新插入后,我们开始看到这样的错误:TTransportException:Couldnotconnectto[hbase_ma

python - PySpark DataFrame 上分组数据的 Pandas 样式转换

如果我们有一个由一列类别和一列值组成的Pandas数据框,我们可以通过执行以下操作删除每个类别中的均值:df["DemeanedValues"]=df.groupby("Category")["Values"].transform(lambdag:g-numpy.mean(g))据我所知,Spark数据帧不直接提供这种分组/转换操作(我在Spark1.5.0上使用PySpark)。那么,实现这种计算的最佳方式是什么?我试过使用group-by/join如下:df2=df.groupBy("Category").mean("Values")df3=df2.join(df)但它非常慢,因为

python - PySpark 将 'map' 类型的列转换为数据框中的多列

输入我有一列Parameters类型为map的形式:frompyspark.sqlimportSQLContextsqlContext=SQLContext(sc)d=[{'Parameters':{'foo':'1','bar':'2','baz':'aaa'}}]df=sqlContext.createDataFrame(d)df.collect()#[Row(Parameters={'foo':'1','bar':'2','baz':'aaa'})]df.printSchema()#root#|--Parameters:map(nullable=true)#||--key:str

python - pyspark:合并(外连接)两个数据框

我有以下两个数据框:DF1:Id|field_A|field_B|field_C|field_D1|cat|12|black|112|dog|128|white|193|dog|35|yellow|204|dog|21|brown|45|bird|10|blue|76|cow|99|brown|34和DF2:Id|field_B|field_C|field_D|field_E3|35|yellow|20|1235|10|blue|7|4546|99|brown|34|398我希望得到new_DF作为Id|field_A|field_B|field_C|field_D|field_E1|

python - 连接后如何在 Pyspark Dataframe 中选择和排序多个列

我想从现有数据框(在连接后创建)中选择多个列,并希望将这些字段排序为我的目标表结构。怎么做到呢?我使用的方法如下。在这里我可以选择需要但不能按顺序制作的必要列。Required(TargetTablestructure):hist_columns=("acct_nbr","account_sk_id","zip_code","primary_state","eff_start_date","eff_end_date","eff_flag")account_sk_df=hist_process_df.join(broadcast(df_sk_lkp),'acct_nbr','inner'

python - pyspark:从现有列创建 MapType 列

我需要基于现有列创建一个新的SparkDFMapType列,其中列名是键,值是值。例如-我有这个DF:rdd=sc.parallelize([('123k',1.3,6.3,7.6),('d23d',1.5,2.0,2.2),('as3d',2.2,4.3,9.0)])schema=StructType([StructField('key',StringType(),True),StructField('metric1',FloatType(),True),StructField('metric2',FloatType(),True),StructField('metric3',Flo

python - 您如何使用 boto3(或其他方式)在 emr 上自动化 pyspark 作业?

我正在创建一个作业来解析大量服务器数据,然后将其上传到Redshift数据库中。我的工作流程如下:从S3抓取日志数据使用sparkdataframes或sparksql解析数据并写回S3将数据从S3上传到Redshift。不过,我对如何自动执行此操作感到困惑,以便我的进程启动一个EMR集群,引导正确的程序进行安装,并运行我的python脚本,该脚本将包含用于解析和编写的代码。是否有人可以与我分享任何示例、教程或经验,以帮助我学习如何执行此操作? 最佳答案 看看boto3EMR创建集群的文档。您基本上必须调用run_job_flow并

python - 如何在 PySpark 的 UDF 中返回 "Tuple type"?

所有datatypesinpyspark.sql.typesare:__all__=["DataType","NullType","StringType","BinaryType","BooleanType","DateType","TimestampType","DecimalType","DoubleType","FloatType","ByteType","IntegerType","LongType","ShortType","ArrayType","MapType","StructField","StructType"]我必须编写一个返回元组数组的UDF(在pyspark中)

python - 将标准 python 键值字典列表转换为 pyspark 数据框

考虑我有一个python字典键值对列表,其中键对应于表的列名,因此对于下面的列表如何将其转换为具有两个colsarg1arg2的pyspark数据帧?[{"arg1":"","arg2":""},{"arg1":"","arg2":""},{"arg1":"","arg2":""}]我怎样才能使用以下结构来做到这一点?df=sc.parallelize([...]).toDF上面代码中arg1arg2放在哪里(...) 最佳答案 旧方法:sc.parallelize([{"arg1":"","arg2":""},{"arg1":""