根据业务需求,需要对pyspark内存资源进行限制
本文使用的环境为pyspark 3.1.2,standalone模式
不足之处还请指出
首先我们需要知道对pyspark进行内存限制,是限制哪部分的内存。
先看一下执行pyspark任务需要启动哪些进程
pyspark与原版基于scala的spark启动的进程大体相似但略有不同。
当启动一个pyspark任务时,可以看到产生了2个系列的进程,分别是负责driver和executor
driver:

| 编号 | 说明 | 内存 |
|---|---|---|
| d1 | spark的driver端,spark-submit进程,运行在jvm,启动sparkContext,构建dag等 | spark算子在driver端用到的内存,包括collect等 |
| d2 | spark的driver端启动的pyspark的driver端,执行python部分代码,通过py4j与d1通信 | python代码中所用到的内存,包括创建一些变量等 |
executor:

| 编号 | 说明 | 内存 |
|---|---|---|
| e1 | spark的worker节点 | 不关注 |
| e2 | 设备上其他spark任务的executor backend,与本文无关 | 不关注 |
| e3 | 本任务对应的spark的executor backend,运行jvm中 | spark在executor端使用的内存,包括collect等算子、shuffle等 |
| e4 | 本任务对应的pyspark的executor backend,管理具体执行task的worker | 占用少量内存 |
| e5 | 具体执行pyspark中的python task的任务的worker,由e4 fork得到,执行算子中的自定义Python函数等 | pyspark在executor端使用的内存,通过python执行,包括map中的func等 |
可以看到,pyspark任务中,主要需要对4处进行内存限制
后两个是pyspark比spark多出来的。
关于spark中的内存,可以关注官方配置文档
其中,重点关注3条配置信息
| Property Name | Default | Meaning | Since Version |
|---|---|---|---|
| spark.driver.memory | 1g | Amount of memory to use for the driver process, i.e. where SparkContext is initialized, in the same format as JVM memory strings with a size unit suffix ("k", "m", "g" or "t") (e.g. 512m, 2g). Note: In client mode, this config must not be set through the SparkConf directly in your application, because the driver JVM has already started at that point. Instead, please set this through the --driver-memory command line option or in your default properties file. |
1.1.1 |
| spark.executor.memory | 1g | Amount of memory to use per executor process, in the same format as JVM memory strings with a size unit suffix ("k", "m", "g" or "t") (e.g. 512m, 2g). |
0.7.0 |
| spark.executor.pyspark.memory | Not set | The amount of memory to be allocated to PySpark in each executor, in MiB unless otherwise specified. If set, PySpark memory for an executor will be limited to this amount. If not set, Spark will not limit Python's memory use and it is up to the application to avoid exceeding the overhead memory space shared with other non-JVM processes. When PySpark is run in YARN or Kubernetes, this memory is added to executor resource requests. Note: This feature is dependent on Python's resource module; therefore, the behaviors and limitations are inherited. For instance, Windows does not support resource limiting and actual resource is not limited on MacOS. |
2.4.0 |
spark.driver.memory和spark.executor.memory这两个参数限制就是spark端driver和executor的内存,
对需要在jvm中执行的任务进行了很好的限制,
但如上文所述,还需要对pyspark端的内存进行限制。
spark.executor.pyspark.memory这个参数是对pyspark的executor内存进行了限制
根据pyspark中worker.py
# set up memory limits
memory_limit_mb = int(os.environ.get('PYSPARK_EXECUTOR_MEMORY_MB', "-1"))
if memory_limit_mb > 0 and has_resource_module:
total_memory = resource.RLIMIT_AS
try:
(soft_limit, hard_limit) = resource.getrlimit(total_memory)
msg = "Current mem limits: {0} of max {1}\n".format(soft_limit, hard_limit)
print(msg, file=sys.stderr)
# convert to bytes
new_limit = memory_limit_mb * 1024 * 1024
if soft_limit == resource.RLIM_INFINITY or new_limit < soft_limit:
msg = "Setting mem limits to {0} of max {1}\n".format(new_limit, new_limit)
print(msg, file=sys.stderr)
resource.setrlimit(total_memory, (new_limit, new_limit))
except (resource.error, OSError, ValueError) as e:
# not all systems support resource limits, so warn instead of failing
print("WARN: Failed to set memory limit: {0}\n".format(e), file=sys.stderr)
看到,其实这个参数主要是使用了Python的resource模块进行了内存限制
然而,这里面设置的resource.RLIMIT_AS是对虚拟内存进行限制
我们通常想限制的是驻留内存。
例如一个小测试
import resource
resource.setrlimit(resource.RLIMIT_AS, (1*1024**3, -1))
def fun():
tmp = []
for i in range(1024**3):
try:
tmp.append('a'*1024)
except MemoryError:
break
return tmp
x = fun(), fun(), fun(), fun()
通过resource.setrlimit限制了1g内存。resource.RLIMIT_AS为虚拟内存的flag,RLIMIT_RSS为驻留内存,但只在老linux内核中生效,现在无法对内核态操作
运行后资源如下

virt达到了限制的1g,但res只有900m。在其他情况下,通常virt远远大于res,这样virt达到了我们限制好的数值,但是res很小,内存远远没得到充分利用,造成资源浪费。
另注:
在standalone模式下,每个worker(e5)限制的virt内存是在application启动时计算好的。通过spark.executor.pyspark.memory 除以 --executor-cores 得到。
\(workerMemoryMb =memoryMb / execCores\)
减少每个stage的task个数并不能增加每个worker的virt内存限制大小
pyspark的driver负责执行python流程代码,内存包含Python中创建的各种变量等
spark官方似乎没有参数对这部分内存进行限制
可以自行使用resource模块,对virt内存进行限制
spark的driver和executor出现oom后,会产生java.lang.OutOfMemoryError: Java heap space报错信息
pyspark的driver和executor出现oom后,产生MemoryError,附有对应python代码
Control groups,是一种Linux内核特性,对进程进行分级分组管理,不同组用不同资源限制并监控。
可以对pyspark的驻留内存进行管理
以centos为例
yum install -y libcgroup libcgroup-tools
这里先设置了一个组,用作pyspark的总体控制
再在这个组中设置两个组,分别对driver端的进程和executor的进程进行了限制
/sys/fs/cgroup/memory这个路径是cgroup对memory进行控制的配置,在这里建立对应文件夹来建立具体分组
首先是整体分组
mkdir /sys/fs/cgroup/memory/pyspark
再driver和executor分别建组控制
mkdir /sys/fs/cgroup/memory/pyspark/driver
mkdir /sys/fs/cgroup/memory/pyspark/executor
建组后,cgroup会自动生成一些配置文件,如下图

关于每一项的说明可以参考大佬的文档
在这里主要关注memory.limit_in_bytes和cgroup.procs
memory.limit_in_bytes为当前限制的内存额度。超过额度的话会对相应进程进行kill
可以使用echo重定向对这个进行限制
echo 10g > /sys/fs/cgroup/memory/ai_pyspark/driver/memory.limit_in_bytes
echo 50g > /sys/fs/cgroup/memory/ai_pyspark/executor/memory.limit_in_bytes
则将这个分组的内存限制为10g和50g
cgroup.procs中包含这个分组中的pid
可将spark-submit和worker的pid追加进这个文件,cgroup便将这个进行拉进这个分组进行管理

echo 160224 >> /sys/fs/cgroup/memory/ai_pyspark/driver/cgroup.procs
echo 167910 >> /sys/fs/cgroup/memory/ai_pyspark/executor/cgroup.procs
cgroup会将进程中新产生的子进程自动加入cgroup.procs
例如当产生新的pyspark.daemon时,cgroup会将对应的pid添加到executor分组中
linux系统中每一个进程都有自己的分组,我们没配置分组的进程会在/sys/fs/cgroup/memory分组中,如果想将某个分组中的某个pid移除这个分组,只需将他移动到另一个分组,例如
echo 167910 >> /sys/fs/cgroup/memory/cgroup.procs
另注:
如果executor发生oom,当前spark executor backend进程挂掉,spark会启动一个新的executor backend,不要忘记将新的executor pid再加入cgroup.procs
cgroups(7) — Linux manual page
我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div
我有一个Ruby程序,它使用rubyzip压缩XML文件的目录树。gem。我的问题是文件开始变得很重,我想提高压缩级别,因为压缩时间不是问题。我在rubyzipdocumentation中找不到一种为创建的ZIP文件指定压缩级别的方法。有人知道如何更改此设置吗?是否有另一个允许指定压缩级别的Ruby库? 最佳答案 这是我通过查看rubyzip内部创建的代码。level=Zlib::BEST_COMPRESSIONZip::ZipOutputStream.open(zip_file)do|zip|Dir.glob("**/*")d
类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc
很好奇,就使用rubyonrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提
假设我做了一个模块如下:m=Module.newdoclassCendend三个问题:除了对m的引用之外,还有什么方法可以访问C和m中的其他内容?我可以在创建匿名模块后为其命名吗(就像我输入“module...”一样)?如何在使用完匿名模块后将其删除,使其定义的常量不再存在? 最佳答案 三个答案:是的,使用ObjectSpace.此代码使c引用你的类(class)C不引用m:c=nilObjectSpace.each_object{|obj|c=objif(Class===objandobj.name=~/::C$/)}当然这取决于
作为我的Rails应用程序的一部分,我编写了一个小导入程序,它从我们的LDAP系统中吸取数据并将其塞入一个用户表中。不幸的是,与LDAP相关的代码在遍历我们的32K用户时泄漏了大量内存,我一直无法弄清楚如何解决这个问题。这个问题似乎在某种程度上与LDAP库有关,因为当我删除对LDAP内容的调用时,内存使用情况会很好地稳定下来。此外,不断增加的对象是Net::BER::BerIdentifiedString和Net::BER::BerIdentifiedArray,它们都是LDAP库的一部分。当我运行导入时,内存使用量最终达到超过1GB的峰值。如果问题存在,我需要找到一些方法来更正我的代
我正在尝试使用ruby和Savon来使用网络服务。测试服务为http://www.webservicex.net/WS/WSDetails.aspx?WSID=9&CATID=2require'rubygems'require'savon'client=Savon::Client.new"http://www.webservicex.net/stockquote.asmx?WSDL"client.get_quotedo|soap|soap.body={:symbol=>"AAPL"}end返回SOAP异常。检查soap信封,在我看来soap请求没有正确的命名空间。任何人都可以建议我
关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。
我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t
我想将html转换为纯文本。不过,我不想只删除标签,我想智能地保留尽可能多的格式。为插入换行符标签,检测段落并格式化它们等。输入非常简单,通常是格式良好的html(不是整个文档,只是一堆内容,通常没有anchor或图像)。我可以将几个正则表达式放在一起,让我达到80%,但我认为可能有一些现有的解决方案更智能。 最佳答案 首先,不要尝试为此使用正则表达式。很有可能你会想出一个脆弱/脆弱的解决方案,它会随着HTML的变化而崩溃,或者很难管理和维护。您可以使用Nokogiri快速解析HTML并提取文本:require'nokogiri'h