草庐IT

python-2.7 - Dataproc Pyspark 作业仅在一个节点上运行

coder 2024-01-07 原文

我的问题是我的 pyspark 作业没有并行运行。

代码和数据格式:
我的 PySpark 看起来像这样(显然是经过简化的):

class TheThing:
    def __init__(self, dInputData, lDataInstance):
        # ...
    def does_the_thing(self):
        """About 0.01 seconds calculation time per row"""
        # ...
        return lProcessedData

#contains input data pre-processed from other RDDs
#done like this because one RDD cannot work with others inside its transformation
#is about 20-40MB in size
#everything in here loads and processes from BigQuery in about 7 minutes
dInputData = {'dPreloadedData': dPreloadedData}

#rddData contains about 3M rows
#is about 200MB large in csv format
#rddCalculated is about the same size as rddData
rddCalculated = (
    rddData
        .map(
            lambda l, dInputData=dInputData: TheThing(dInputData, l).does_the_thing()
        )
)

llCalculated = rddCalculated.collect()
#save as csv, export to storage

在 Dataproc 集群上运行:
集群是通过 Dataproc UI 创建的.
作业是这样执行的:
gcloud --project <project> dataproc jobs submit pyspark --cluster <cluster_name> <script.py>

我通过 UI 观察了工作状态,started like this .浏览它时,我注意到我的工作节点中只有一个(看似随机的)在做任何事情。所有其他人都完全闲置。

PySpark 的重点是并行运行这个东西,显然不是这样。我已经在各种集群配置中运行了这些数据,最后一个是巨大的,这是我注意到它是单节点使用的时候。因此,为什么我的工作需要很长时间才能完成,而且时间似乎与集群大小无关。

所有使用较小数据集的测试在我的本地机器和集群上都毫无问题地通过了。我真的只需要高档。

编辑
我变了
llCalculated = rddCalculated.collect()
#... save to csv and export

rddCalculated.saveAsTextFile("gs://storage-bucket/results")

并且只有一个节点仍在执行工作。

最佳答案

根据您是从 GCS 还是 HDFS 加载 rddData,默认分割大小可能是 64MB 或 128MB,这意味着您的 200MB 数据集只有 2-4 个分区。 Spark 之所以这样做,是因为典型的基本数据并行任务处理数据的速度足够快,64MB-128MB 意味着可能需要数十秒的处理时间,因此拆分成较小的并行 block 没有任何好处,因为启动开销将占主导地位。

在您的情况下,听起来每 MB 的处理时间要长得多,因为您加入了另一个数据集,并且可能对每条记录执行了相当重量级的计算。所以你需要更多的分区,否则无论你有多少节点,Spark 都不知道要拆分成超过 2-4 个工作单元(如果每台机器都可能打包到一台机器上)有多个核心)。

所以你只需要调用repartition:

rddCalculated = (
    rddData
        .repartition(200)
        .map(
            lambda l, dInputData=dInputData: TheThing(dInputData, l).does_the_thing()
        )
)

或者将重新分区添加到较早的行:

rddData = rddData.repartition(200)

或者如果你在读取时重新分区,你可能会有更好的效率:

rddData = sc.textFile("gs://storage-bucket/your-input-data", minPartitions=200)

关于python-2.7 - Dataproc Pyspark 作业仅在一个节点上运行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37589472/

有关python-2.7 - Dataproc Pyspark 作业仅在一个节点上运行的更多相关文章

  1. ruby - 如何从 ruby​​ 中的字符串运行任意对象方法? - 2

    总的来说,我对ruby​​还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用

  2. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  3. ruby - 使用 Vim Rails,您可以创建一个新的迁移文件并一次性打开它吗? - 2

    使用带有Rails插件的vim,您可以创建一个迁移文件,然后一次性打开该文件吗?textmate也可以这样吗? 最佳答案 你可以使用rails.vim然后做类似的事情::Rgeneratemigratonadd_foo_to_bar插件将打开迁移生成的文件,这正是您想要的。我不能代表textmate。 关于ruby-使用VimRails,您可以创建一个新的迁移文件并一次性打开它吗?,我们在StackOverflow上找到一个类似的问题: https://sta

  4. ruby-on-rails - Rails - 一个 View 中的多个模型 - 2

    我需要从一个View访问多个模型。以前,我的links_controller仅用于提供以不同方式排序的链接资源。现在我想包括一个部分(我假设)显示按分数排序的顶级用户(@users=User.all.sort_by(&:score))我知道我可以将此代码插入每个链接操作并从View访问它,但这似乎不是“ruby方式”,我将需要在不久的将来访问更多模型。这可能会变得很脏,是否有针对这种情况的任何技术?注意事项:我认为我的应用程序正朝着单一格式和动态页面内容的方向发展,本质上是一个典型的网络应用程序。我知道before_filter但考虑到我希望应用程序进入的方向,这似乎很麻烦。最终从任何

  5. ruby-on-rails - 渲染另一个 Controller 的 View - 2

    我想要做的是有2个不同的Controller,client和test_client。客户端Controller已经构建,我想创建一个test_clientController,我可以使用它来玩弄客户端的UI并根据需要进行调整。我主要是想绕过我在客户端中内置的验证及其对加载数据的管理Controller的依赖。所以我希望test_clientController加载示例数据集,然后呈现客户端Controller的索引View,以便我可以调整客户端UI。就是这样。我在test_clients索引方法中试过这个:classTestClientdefindexrender:template=>

  6. ruby - 如何每月在 Heroku 运行一次 Scheduler 插件? - 2

    在选择我想要运行操作的频率时,唯一的选项是“每天”、“每小时”和“每10分钟”。谢谢!我想为我的Rails3.1应用程序运行调度程序。 最佳答案 这不是一个优雅的解决方案,但您可以安排它每天运行,并在实际开始工作之前检查日期是否为当月的第一天。 关于ruby-如何每月在Heroku运行一次Scheduler插件?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/8692687/

  7. ruby-on-rails - 如何在 ruby​​ 中使用两个参数异步运行 exe? - 2

    exe应该在我打开页面时运行。异步进程需要运行。有什么方法可以在ruby​​中使用两个参数异步运行exe吗?我已经尝试过ruby​​命令-system()、exec()但它正在等待过程完成。我需要用参数启动exe,无需等待进程完成是否有任何ruby​​gems会支持我的问题? 最佳答案 您可以使用Process.spawn和Process.wait2:pid=Process.spawn'your.exe','--option'#Later...pid,status=Process.wait2pid您的程序将作为解释器的子进程执行。除

  8. ruby - 无法运行 Rails 2.x 应用程序 - 2

    我尝试运行2.x应用程序。我使用rvm并为此应用程序设置其他版本的ruby​​:$rvmuseree-1.8.7-head我尝试运行服务器,然后出现很多错误:$script/serverNOTE:Gem.source_indexisdeprecated,useSpecification.Itwillberemovedonorafter2011-11-01.Gem.source_indexcalledfrom/Users/serg/rails_projects_terminal/work_proj/spohelp/config/../vendor/rails/railties/lib/r

  9. ruby-on-rails - 如果 Object::try 被发送到一个 nil 对象,为什么它会起作用? - 2

    如果您尝试在Ruby中的nil对象上调用方法,则会出现NoMethodError异常并显示消息:"undefinedmethod‘...’fornil:NilClass"然而,有一个tryRails中的方法,如果它被发送到一个nil对象,它只返回nil:require'rubygems'require'active_support/all'nil.try(:nonexisting_method)#noNoMethodErrorexceptionanymore那么try如何在内部工作以防止该异常? 最佳答案 像Ruby中的所有其他对象

  10. ruby - Sinatra:运行 rspec 测试时记录噪音 - 2

    Sinatra新手;我正在运行一些rspec测试,但在日志中收到了一堆不需要的噪音。如何消除日志中过多的噪音?我仔细检查了环境是否设置为:test,这意味着记录器级别应设置为WARN而不是DEBUG。spec_helper:require"./app"require"sinatra"require"rspec"require"rack/test"require"database_cleaner"require"factory_girl"set:environment,:testFactoryGirl.definition_file_paths=%w{./factories./test/

随机推荐