草庐IT

Pandas 与 PySpark 强强联手,功能与速度齐飞!

云朵君 2023-03-28 原文
​使用Python做数据处理的数据科学家或数据从业者,对数据科学包pandas并不陌生,也不乏像云朵君一样的pandas重度使用者,项目开始写的第一行代码,大多是 import pandas as pd。pandas做数据处理可以说是yyds!而他的缺点也是非常明显,pandas 只能单机处理,它不能随数据量线性伸缩。例如,如果 pandas 试图读取的数据集大于一台机器的可用内存,则会因内存不足而失败。

另外 ​pandas 在处理大型​数据方面非常慢,虽然有像Dask 或 Vaex 等其他库来优化提升数据处理速度,但在大数据处理神之框架Spark面前,也是小菜一碟。

幸运的是,在新的 Spark 3.2 版本中,出现了一个新的Pandas API,将pandas大部分功能都集成到PySpark中,使用pandas的接口,就能使用Spark,因为 Spark 上的 Pandas API 在后台使用 Spark,这样就能达到强强联手的效果,可以说是非常强大,非常方便。

这一切都始于 2019 年 Spark + AI 峰会。Koalas 是一个开源项目,可以在 Spark 之上使用 Pandas。一开始,它只覆盖了 Pandas 的一小部分功能,但后来逐渐壮大起来。现在,在新的 Spark 3.2 版本中,Koalas 已合并到 PySpark。

Spark 现在集成了 Pandas API,因此可以在 Spark 上运行 Pandas。只需要更改一行代码:

import pyspark.pandas as ps
由此我们可以获得诸多的优势​:

  • 如果我们熟悉使用Python 和 Pandas,但不熟悉 Spark,可以省略了需复杂的学习过程而立即使用PySpark。
  • 可以为所有内容使用一个代码库:无论是小数据和大数据,还是单机和分布式机器。
  • 可以在Spark分布式框架上,更快地运行 Pandas 代码。​
最后一点尤其值得注意。

一方面,可以将分布式计算应用于在 Pandas 中的代码。且借助 Spark 引擎,代码即使在单台机器上也会更快!下图展示了在一台机器(具有 96 个 vCPU 和 384 GiBs 内存)上运行 Spark 和单独调用 pandas 分析 130GB 的 CSV 数据集的性能对比。

多线程和 Spark SQL Catalyst Optimizer 都有助于优化性能。例如,Join count 操作在整个阶段代码生成时快 4 倍:没有代码生成时为 5.9 秒,代码生成时为 1.6 秒。

Spark 在链式操作(chaining operations)中具有特别显着的优势。Catalyst 查询优化器可以识别过滤器以明智地过滤数据并可以应用基于磁盘的连接(disk-based joins),而 Pandas 倾向于每一步将所有数据加载到内存中。

现在是不是迫不及待的想尝试如何在 Spark 上使用 Pandas API 编写一些代码?我们现在就开始吧!​

在 Pandas ​/ Pandas-on-Spark / Spark 之间切换

需要知道的第一件事是我们到底在使用什么。在使用 Pandas 时,使用类pandas.core.frame.DataFrame​。在 Spark 中使用 pandas API 时,使用pyspark.pandas.frame.DataFrame。虽然​两者相似,但不相同。主要区别在于前者在单机中,而后者是分布式的。

​可以使用 Pandas-on-Spark 创建一个 Dataframe 并将其转换为 Pandas,反之亦然:

# import Pandas-on-Spark
import pyspark.pandas as ps

# 使用 Pandas-on-Spark 创建一个 DataFrame
ps_df = ps.DataFrame(range(10))

# 将 Pandas-on-Spark Dataframe 转换为 Pandas Dataframe
pd_df = ps_df.to_pandas()

# 将 Pandas Dataframe 转换为 Pandas-on-Spark Dataframe
ps_df = ps.from_pandas(pd_df)
注意,如果使用多台机器,则在将 Pandas-on-Spark Dataframe 转换为 Pandas Dataframe 时,数据会从多台机器传输到一台机器,反之亦然(可参阅PySpark 指南[1])。

还可以将 Pandas-on-Spark Dataframe 转换为 Spark DataFrame,反之亦然:

# 使用 Pandas-on-Spark 创建一个 DataFrame
ps_df = ps.DataFrame(range(10))

# 将 Pandas-on-Spark Dataframe 转换为 Spark Dataframe
spark_df = ps_df.to_spark()

# 将 Spark Dataframe 转换为 Pandas-on-Spark Dataframe
ps_df_new = spark_df.to_pandas_on_spark()

数据类型如何改变​?

在使用 Pandas-on-Spark 和 Pandas 时,数据类型基本相同。将 Pandas-on-Spark DataFrame 转换为 Spark DataFrame 时,数据类型会自动转换为适当的类型(请参阅PySpark 指南[2])

下面的示例显示了在转换时是​如何将数据类型从 PySpark DataFrame 转换为 pandas-on-Spark DataFrame。

>>> sdf = spark.createDataFrame([
... (1, Decimal(1.0), 1., 1., 1, 1, 1, datetime(2020, 10, 27), "1", True, datetime(2020, 10, 27)),
... ], 'tinyint tinyint, decimal decimal, float float, double double, integer integer, long long, short short, timestamp timestamp, string string, boolean boolean, date date')
>>> sdf
DataFrame[tinyint: tinyint, decimal: decimal(10,0),
float: float, double: double, integer: int,
long: bigint, short: smallint, timestamp: timestamp,
string: string, boolean: boolean, date: date]
psdf = sdf.pandas_api()
psdf.dtypes
tinyint int8
decimal object
float float32
double float64
integer int32
long int64
short int16
timestamp datetime64[ns]
string object
boolean bool
date object
dtype: object

Pandas-on-Spark vs Spark 函数

在 Spark 中的 DataFrame 及其在 Pandas-on-Spark 中的最常用函数。注意,Pandas-on-Spark 和 Pandas 在语法上的唯一区别就是 import pyspark.pandas as ps 一行。

当你看完如下内容后,你会发现,即使您不熟悉 Spark,也可以通过 Pandas API 轻松使用。

导入库

# 运行Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Spark") \
.getOrCreate()
# 在Spark上运行Pandas
import pyspark.pandas as ps

读取数据

以 old dog iris 数据集为例。

# SPARK
sdf = spark.read.options(inferSchema='True',
header='True').csv('iris.csv')
# PANDAS-ON-SPARK
pdf = ps.read_csv('iris.csv')

选择

# SPARK
sdf.select("sepal_length","sepal_width").show()
# PANDAS-ON-SPARK
pdf[["sepal_length","sepal_width"]].head()

删除列

# SPARK
sdf.drop('sepal_length').show()# PANDAS-ON-SPARK
pdf.drop('sepal_length').head()

删除重复项

# SPARK
sdf.dropDuplicates(["sepal_length","sepal_width"]).show()
# PANDAS-ON-SPARK
pdf[["sepal_length", "sepal_width"]].drop_duplicates()

筛选

# SPARK
sdf.filter( (sdf.flower_type == "Iris-setosa") & (sdf.petal_length > 1.5) ).show()
# PANDAS-ON-SPARK
pdf.loc[ (pdf.flower_type == "Iris-setosa") & (pdf.petal_length > 1.5) ].head()

计数

# SPARK
sdf.filter(sdf.flower_type == "Iris-virginica").count()
# PANDAS-ON-SPARK
pdf.loc[pdf.flower_type == "Iris-virginica"].count()

唯一值

# SPARK
sdf.select("flower_type").distinct().show()
# PANDAS-ON-SPARK
pdf["flower_type"].unique()

排序

# SPARK
sdf.sort("sepal_length", "sepal_width").show()
# PANDAS-ON-SPARK
pdf.sort_values(["sepal_length", "sepal_width"]).head()

分组

# SPARK
sdf.groupBy("flower_type").count().show()
# PANDAS-ON-SPARK
pdf.groupby("flower_type").count()

替换

# SPARK
sdf.replace("Iris-setosa", "setosa").show()
# PANDAS-ON-SPARK
pdf.replace("Iris-setosa", "setosa").head()

连接

#SPARK
sdf.union(sdf)
# PANDAS-ON-SPARK
pdf.append(pdf)

transform 和 apply 函数应用

有许多 API 允许用户针对 pandas-on-Spark DataFrame 应用函数,例如:

DataFrame.transform()
DataFrame.apply()
DataFrame.pandas_on_spark.transform_batch()
DataFrame.pandas_on_spark.apply_batch()
Series.pandas_on_spark.transform_batch()
每个 API 都有不同的用途,并且在内部工作方式不同。

transform 和 apply

DataFrame.transform()和DataFrame.apply()之间的主要区别在于,前者需要返回相同长度的输入,而后者不需要。

# transform
psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
def pandas_plus(pser):
return pser + 1 # 应该总是返回与输入相同的长度。

psdf.transform(pandas_plus)

# apply
psdf = ps.DataFrame({'a': [1,2,3], 'b':[5,6,7]})
def pandas_plus(pser):
return pser[pser % 2 == 1] # 允许任意长度

psdf.apply(pandas_plus)
在这种情况下,每个函数采用一个 pandas Series,Spark 上的 pandas API 以分布式方式计算函数,如下所示。

在“列”轴的情况下,该函数将每一行作为一个熊猫系列。

psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
def pandas_plus(pser):
return sum(pser) # 允许任意长度
psdf.apply(pandas_plus, axis='columns')
上面的示例将每一行的总和计算为pands Series

pandas_on_spark.transform_batch和pandas_on_spark.apply_batch

batch 后缀表示 pandas-on-Spark DataFrame 或 Series 中的每个块。API 对 pandas-on-Spark DataFrame 或 Series 进行切片,然后以 pandas DataFrame 或 Series 作为输入和输出应用给定函数。请参阅以下示例:

psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
def pandas_plus(pdf):
return pdf + 1 # 应该总是返回与输入相同的长度。

psdf.pandas_on_spark.transform_batch(pandas_plus)

psdf = ps.DataFrame({'a': [1,2,3], 'b':[4,5,6]})
def pandas_plus(pdf):
return pdf[pdf.a > 1] # 允许任意长度

psdf.pandas_on_spark.apply_batch(pandas_plus)
两个示例中的函数都将 pandas DataFrame 作为 pandas-on-Spark DataFrame 的一个块,并输出一个 pandas DataFrame。Spark 上的 Pandas API 将 pandas 数据帧组合为 pandas-on-Spark 数据帧。

在 Spark 上使用 pandas API的注意事项

避免shuffle

某些操作,例如sort_values在并行或分布式环境中比在单台机器上的内存中更难完成,因为它需要将数据发送到其他节点,并通过网络在多个节点之间交换数据。

避免在单个分区上计算

另一种常见情况是在单个分区上进行计算。目前, DataFrame.rank 等一些 API 使用 PySpark 的 Window 而不指定分区规范。这会将所有数据移动到单个机器中的单个分区中,并可能导致严重的性能下降。对于非常大的数据集,应避免使用此类 API。

不要使用重复的列名

不允许使用重复的列名,因为 Spark SQL 通常不允许这样做。Spark 上的 Pandas API 继承了这种行为。例如,见下文:

import pyspark.pandas as ps
psdf = ps.DataFrame({'a': [1, 2], 'b':[3, 4]})
psdf.columns = ["a", "a"]
Reference 'a' is ambiguous, could be: a, a.;
此外,强烈建议不要使用区分大小写的列名。Spark 上的 Pandas API 默认不允许它。

import pyspark.pandas as ps
psdf = ps.DataFrame({'a': [1, 2], 'A':[3, 4]})
Reference 'a' is ambiguous, could be: a, a.;
但可以在 Spark 配置spark.sql.caseSensitive中打开以启用它,但需要自己承担风险。

from pyspark.sql import SparkSession
builder = SparkSession.builder.appName("pandas-on-spark")
builder = builder.config("spark.sql.caseSensitive", "true")
builder.getOrCreate()

import pyspark.pandas as ps
psdf = ps.DataFrame({'a': [1, 2], 'A':[3, 4]})
psdf
a A
0 1 3
1 2 4

使用默认索引

pandas-on-Spark 用户面临的一个常见问题是默认索引导致性能下降。当索引未知时,Spark 上的 Pandas API 会附加一个默认索引,例如 Spark DataFrame 直接转换为 pandas-on-Spark DataFrame。

如果计划在生产中处理大数据,请通过将默认索引配置为distributed或distributed-sequence来使其确保为分布式。

有关配置默认索引的更多详细信息,请参阅默认索引类型[3]。

在 Spark 上使用 pandas API

尽管 Spark 上的 pandas API 具有大部分与 pandas 等效的 API,但仍有一些 API 尚未实现或明确不受支持。因此尽可能直接在 Spark 上使用 pandas API。

例如,Spark 上的 pandas API 没有实现__iter__(),阻止用户将所有数据从整个集群收集到客户端(驱动程序)端。不幸的是,许多外部 API,例如 min、max、sum 等 Python 的内置函数,都要求给定参数是可迭代的。对于 pandas,它开箱即用,如下所示:

>>> import pandas as pd
>>> max(pd.Series([1, 2, 3]))
3
>>> min(pd.Series([1, 2, 3]))
1
>>> sum(pd.Series([1, 2, 3]))
6
Pandas 数据集存在于单台机器中,自然可以在同一台机器内进行本地迭代。但是,pandas-on-Spark 数据集存在于多台机器上,并且它们是以分布式方式计算的。很难在本地迭代,很可能用户在不知情的情况下将整个数据收集到客户端。因此,最好坚持使用 pandas-on-Spark API。上面的例子可以转换如下:

>>> import pyspark.pandas as ps
>>> ps.Series([1, 2, 3]).max()
3
>>> ps.Series([1, 2, 3]).min()
1
>>> ps.Series([1, 2, 3]).sum()
6
pandas 用户的另一个常见模式可能是依赖列表推导式或生成器表达式。但是,它还假设数据集在引擎盖下是本地可迭代的。因此,它可以在 pandas 中无缝运行,如下所示:

import pandas as pd
data = []
countries = ['London', 'New York', 'Helsinki']
pser = pd.Series([20., 21., 12.], index=countries)
for temperature in pser:
assert temperature > 0
if temperature > 1000:
temperature = None
data.append(temperature ** 2)

pd.Series(data, index=countries)
London 400.0
New York 441.0
Helsinki 144.0
dtype: float64
但是,对于 Spark 上的 pandas API,它的工作原理与上述相同。上面的示例也可以更改为直接使用 pandas-on-Spark API,如下所示:

import pyspark.pandas as ps
import numpy as np
countries = ['London', 'New York', 'Helsinki']
psser = ps.Series([20., 21., 12.], index=countries)
def square(temperature) -> np.float64:
assert temperature > 0
if temperature > 1000:
temperature = None
return temperature ** 2

psser.apply(square)
London 400.0
New York 441.0
Helsinki 144.0

减少对不同 DataFrame 的操作

Spark 上的 Pandas API 默认不允许对不同 DataFrame(或 Series)进行操作,以防止昂贵的操作。只要有可能,就应该避免这种操作。

写在最后

到目前为止,我们将能够在 Spark 上使用 Pandas。这将会导致Pandas 速度的大大提高,迁移到 Spark 时学习曲线的减少,以及单机计算和分布式计算在同一代码库中的合并。

参考资料

[1]PySpark 指南: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/pandas_pyspark.html

[2]PySpark 指南: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/types.html

[3]默认索引类型: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/options.html#default-index-type

有关Pandas 与 PySpark 强强联手,功能与速度齐飞!的更多相关文章

  1. ruby-on-rails - Cucumber 是否只是 rspec 的包装器以帮助将测试组织成功能? - 2

    只是想确保我理解了事情。据我目前收集到的信息,Cucumber只是一个“包装器”,或者是一种通过将事物分类为功能和步骤来组织测试的好方法,其中实际的单元测试处于步骤阶段。它允许您根据事物的工作方式组织您的测试。对吗? 最佳答案 有点。它是一种组织测试的方式,但不仅如此。它的行为就像最初的Rails集成测试一样,但更易于使用。这里最大的好处是您的session在整个Scenario中保持透明。关于Cucumber的另一件事是您(应该)从使用您的代码的浏览器或客户端的角度进行测试。如果您愿意,您可以使用步骤来构建对象和设置状态,但通常您

  2. ruby-on-rails - rails 功能测试 - 2

    在Rails自动生成的功能测试(test/functional/products_controller_test.rb)中,我看到以下代码:classProductsControllerTest我的问题是:方法调用products()在哪里/如何定义?products(:one)到底是什么意思?看代码,大概意思是“创建一个产品”,但是它是如何工作的呢?注意我是Ruby/Rails的新手,如果这些是微不足道的问题,我深表歉意。 最佳答案 如果您查看test/fixtures文件夹,您会看到一个products.yml文件。这是在您创建

  3. ruby-on-rails - 功能测试 Authlogic? - 2

    在我的一些Controller中,我有一个before_filter检查用户是否登录?用于CRUD操作。application.rbdeflogged_in?unlesscurrent_userredirect_toroot_pathendendprivatedefcurrent_user_sessionreturn@current_user_sessionifdefined?(@current_user_session)@current_user_session=UserSession.findenddefcurrent_userreturn@current_userifdefine

  4. ruby - Ruby 中允许 "p *1..10"打印出数字 1-10 的功能是什么? - 2

    require'pp'p*1..10这会打印出1-10。为什么这么简洁?您还可以用它做什么? 最佳答案 它是“splat”运算符。它可用于分解数组和范围并在赋值期间收集值。这里收集赋值中的值:a,*b=1,2,3,4=>a=1b=[2,3,4]在此示例中,内部数组([3,4])中的值被分解并收集到包含数组中:a=[1,2,*[3,4]]=>a=[1,2,3,4]您可以定义将参数收集到数组中的函数:deffoo(*args)pargsendfoo(1,2,"three",4)=>[1,2,"three",4]

  5. java - Ruby 和 Java 的速度 - 2

    按照目前的情况,这个问题不适合我们的问答形式。我们希望答案得到事实、引用或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visitthehelpcenter指导。关闭10年前。在我在网上找到的每个基准测试中,Ruby似乎都很慢,比Java慢得多。Ruby的人只是说这无关紧要。您能举个例子说明RubyonRails(以及Ruby本身)的速度真的无关紧要吗?

  6. ruby - 现代计算机的功能是否不足以处理字符串而无需使用符号(在 Ruby 中) - 2

    我读过的关于Ruby符号的每一篇文章都在谈论符号相对于字符串的效率。但是,这不是1970年代。我的电脑可以处理一些额外的垃圾收集。我错了吗?我拥有最新最好的奔腾双核处理器和4GBRAM。我认为这应该足以处理一些字符串。 最佳答案 您的计算机可能能够处理“一点点额外的垃圾收集”,但是当“一点点”发生在运行数百万次的内部循环中时呢?如果它在内存有限的嵌入式系统上运行呢?有很多地方你可以随意使用字符串,但在某些地方你不能。这完全取决于上下文。 关于ruby-现代计算机的功能是否不足以处理字符串

  7. ruby - 如何在 Cucumber 的功能名称中使用空格 - 2

    我正在使用Windows并尝试运行一个现有的功能包,该功能包最初是在MacOS上构建的,这允许他们通过使用带空格的"\"来解决问题。我正在使用Ruby2.2.3和Cucumber。功能名称包含空格,我无法更改它。我尝试使用""和''来绕过空白,但每次都有同样的问题。这是问题的一个例子。如果我运行:cucumberfeatures/'Namecontainingwhitespaces.feature'它工作正常。但是当我运行时:cucumber-pmy_profile和cucumber.yml包含:my_profile:features/'Namecontainingwhitespace

  8. ruby - 如何测试在运行之间提供功能的 Ruby 脚本? - 2

    单元测试的好方法是测试脚本在执行之间保持正确数据的能力——在使用Ctrl-C终止脚本然后重新运行之后?是否有针对执行类似操作的现有模块或脚本的任何测试可以针对最佳实践进行审查? 最佳答案 像http://avdi.org/devblog/2010/07/19/greenletters-painless-automation-and-testing-for-command-line-applications/一样使用库或者期望、运行、终止并重新运行您的程序,并检查它是否运行正确。好的做法是将程序设计为独立的模块,每个模块都经过良好测试

  9. vue 实现内容超出两行显示展开更多功能,可依据需求自定义任意行数! - 2

    平时开发中我们经常会遇到这样的需求,在一个不限高度的盒子中会有很多内容,如果全部显示用户体验会非常不好,所以可以先折叠起来,当内容达到一定高度时,显示展开更多按钮,点击即可显示全部内容,先来看看效果图: 这样做用户体验瞬间得到提升,接下来看看具体细节。0">主要操作在内容这里{{item.username}},……展开更多样式大家可依据自己项目需求进行设计,这里就不贴了,主要说几个关键的。1、在data中定义三个属性isShowMore:false, //控制展开更多的显示与隐藏textHeight:null, //框中内容的高度status:false, //内容状态是否打开2.计算内容是否

  10. ruby-on-rails - Ruby 的 range step 方法导致执行速度很慢? - 2

    我有这段代码:date_counter=Time.mktime(2011,01,01,00,00,00,"+05:00")@weeks=Array.new(date_counter..Time.now).step(1.week)do|week|logger.debug"WEEK:"+week.inspect@weeks从技术上讲,代码有效,输出:SatJan0100:00:00-05002011SatJan0800:00:00-05002011SatJan1500:00:00-05002011etc.但是执行时间完全是垃圾!每周计算大约需要四秒钟。我在这段代码中是否遗漏了一些奇怪的低效

随机推荐