草庐IT

PySpark3

全部标签

python - Geoip 2's python library doesn' t 在 pySpark 的 map 函数中工作

我正在使用geoip2的python库和pySpark来获取某些IP的地理地址。我的代码是这样的:geoDBpath='somePath/geoDB/GeoLite2-City.mmdb'geoPath=os.path.join(geoDBpath)sc.addFile(geoPath)reader=geoip2.database.Reader(SparkFiles.get(geoPath))defip2city(ip):try:city=reader.city(ip).city.nameexcept:city='notfound'returncity我试过了printip2city(

python - 用同一列的平均值填充 Pyspark 数据框列空值

有了这样的数据框,rdd_2=sc.parallelize([(0,10,223,"201601"),(0,10,83,"2016032"),(1,20,None,"201602"),(1,20,3003,"201601"),(1,20,None,"201603"),(2,40,2321,"201601"),(2,30,10,"201602"),(2,61,None,"201601")])df_data=sqlContext.createDataFrame(rdd_2,["id","type","cost","date"])df_data.show()+---+----+----+--

python - pyspark错误does not exist in jvm error when initializing SparkContext

我在emr上使用spark并编写pyspark脚本,尝试时出现错误frompysparkimportSparkContextsc=SparkContext()这是错误File"pyex.py",line5,insc=SparkContext()File"/usr/local/lib/python3.4/site-packages/pyspark/context.py",line118,in__init__conf,jsc,profiler_cls)File"/usr/local/lib/python3.4/site-packages/pyspark/context.py",line19

python - PySpark:TypeError: 'Column' 对象不可调用

我正在从HDFS加载数据,我想按特定变量过滤这些数据。但不知何故Column.isin命令不起作用。它抛出这个错误:TypeError:'Column'objectisnotcallablefrompyspark.sql.functionsimportudf,colvariables=('852-PI-769','812-HC-037','852-PC-571-OUT')df=sqlContext.read.option("mergeSchema","true").parquet("parameters.parquet")same_var=col("Variable").isin(va

python - 如何在 PySpark shell session 中更新 pyfile?

在交互式pysparksession中,您可以通过sc.addPyFile('file_location')导入python文件。如果您需要对该文件进行更改并保存它们,是否有任何方法可以“重新广播”更新后的文件而无需关闭您的sparksession并启动一个新的session?简单地再次添加文件是行不通的。我不确定重命名文件是否有效,但无论如何我都不想这样做。据我从spark文档中得知,只有一种添加pyfile的方法,而不是更新pyfile的方法。我希望我错过了什么!谢谢 最佳答案 我认为在交互式session期间不可行。您将必须重

python - 如何将 pyspark.sql.dataframe.DataFrame 转换回 databricks notebook 中的 sql 表

我通过执行以下行创建了pyspark.sql.dataframe.DataFrame类型的数据框:dataframe=sqlContext.sql("select*frommy_data_table")如何将其转换回可以运行sql查询的sparksql表? 最佳答案 您可以使用createReplaceTempView创建表格.在你的情况下它会是这样的:dataframe.createOrReplaceTempView("mytable")在此之后,您可以使用SQL查询您的mytable。如果你的spark版本是≤1.6.2你可以使

python - PySpark:StructField(..., ..., False) 总是返回 `nullable=true` 而不是 `nullable=false`

我是PySpark的新手,正面临一个奇怪的问题。我试图在加载CSV数据集时将某些列设置为不可空。我可以使用非常小的数据集(test.csv)重现我的案例:col1,col2,col311,12,1321,22,2331,32,3341,42,4351,,53第5行第2列有一个空值,我不想在我的DF中获取该行。我将所有字段设置为不可为空(nullable=false),但我得到了一个架构,其中所有三列都具有nullable=true。即使我将所有三列都设置为不可为空,也会发生这种情况!我正在运行最新可用的Spark版本2.0.1。代码如下:frompyspark.sqlimportSpa

python - pyspark Window.partitionBy 与 groupBy

假设我有一个包含大约21亿条记录的数据集。这是一个包含客户信息的数据集,我想知道他们做了多少次。所以我应该对ID进行分组并对一列求和(它有0和1值,其中1表示一个Action)。现在,我可以使用一个简单的groupBy和agg(sum)它,但据我所知,这并不是很有效。groupBy将在分区之间移动大量数据。或者,我也可以使用带有partitionBy子句的Window函数,然后对数据求和。缺点之一是我必须应用额外的过滤器,因为它会保留所有数据。我想要每个ID一条记录。但是我看不到这个窗口是如何处理数据的。是不是比这个groupBy和sum.还是一样? 最佳答

使用PySpark计算AUC,KS与PSI

当特征数量或者模型数量很多的时候,使用PySpark去计算相关风控指标会节省很多的时间。网上关于使用PySpark计算相关风控指标的资料较少,尤其是PSI计算不管是国内还是国外相关的代码都没有正确的,这里抛砖引玉,写了三个风控常用的指标AUC,KS和PSI相关的计算方法,供参考。AUCAUC的相关概念网上已经有很多的很好的文章,这里不在赘述,AUC使用的到的计算公式如下:\[AUC=\frac{\sum_{i\inpositiveClass}rank_i-{\displaystyle\frac{M(1+M)}2}}{M\timesN}\]其中M为负类样本的数目,N为正类样本的数目使用PySpa

python - 意外类型 : <class 'pyspark.sql.types.DataTypeSingleton' > when casting to Int on a ApacheSpark Dataframe

尝试将StringType转换为pyspark数据帧上的IntType时出现错误:joint=aggregates.join(df_data_3,aggregates.year==df_data_3.year)joint2=joint.filter(joint.CountyCode==999).filter(joint.CropName=='WOOL')\.select(aggregates.year,'Production')\.withColumn("ProductionTmp",df_data_3.Production.cast(IntegerType))\.drop("Prod