我开始使用 Pyspark 进行一些数据处理。我可以做一些像
这样的事情对我来说很有趣rdd.map(lambda x : (x['somekey'], 1)).reduceByKey(lambda x,y: x+y).count()
它会将这些函数中的逻辑发送到可能多台机器上以并行执行。
现在,如果我有 Java 背景,如果我想将包含某些方法的对象发送到另一台机器,那台机器需要知道通过网络流式传输的对象的类定义。最近 java 有了函数式接口(interface)的想法,它将在编译时为我创建该接口(interface)的实现(即 MyInterface impl = ()->System.out.println("Stuff");)
MyInterface 只有一种方法,'doStuff()'
但是,如果我想通过线路发送这样的函数,目标机器需要知道实现(impl 本身)才能调用它的“doStuff()”方法。
我的问题归结为...用 Scala 编写的 Spark 如何实际将功能发送到其他机器?我有几个预感:
谢谢!
编辑:Spark 是用 Scala 编写的,但我很想知道如何在 Java 中实现这一点(其中一个函数只有在一个类中才能存在,因此改变了需要在工作节点上更新的类定义)。
编辑 2: 这是java中遇到混淆的问题:
public class Playground
{
private static interface DoesThings
{
public void doThing();
}
public void func() throws Exception {
Socket s = new Socket("addr", 1234);
ObjectOutputStream oos = new ObjectOutputStream(s.getOutputStream());
oos.writeObject("Hello!"); // Works just fine, you're just sending a string
oos.writeObject((DoesThings)()->System.out.println("Hey, im doing a thing!!")); // Sends the object, but error on other machine
DoesThings dt = (DoesThings)()->System.out.println("Hey, im doing a thing!!");
System.out.println(dt.getClass());
}
}
System.out,println(dt.getClass()) 返回: “类 JohnLibs.Playground$$Lambda$1/23237446”
现在,假设接口(interface)定义不在同一个文件中,而是在两台机器都有的共享文件中。但是这个驱动程序 func() 实质上创建了一种实现 DoesThings 的新型类。 如您所见,目标机器不知道 JohnLibs.Playground$$Lambda$1/23237446 是什么,即使它知道 DoesThings 是什么。这一切都归结为您不能在不绑定(bind)到类的情况下传递函数。在 python 中,您可以只发送一个带有定义的字符串,然后执行该字符串(因为它已被解释)。也许这就是 spark 所做的,因为它使用 scala 而不是 java(如果 scala 可以在类之外具有函数)
最佳答案
Java 字节码,当然是 Java 和 Scala 的编译目标,是专门为平台独立而创建的。因此,如果您有一个类文件,您可以将它移动到任何其他机器,而不管“硅”架构如何,并且只要它具有至少该版本的 JVM,它就会运行。 James Gosling 和他的团队故意这样做是为了让代码从一开始就在机器之间移动,并且在 Java 0.98(我使用的第一个版本)中很容易演示。
当 JVM 试图加载一个类时,它会使用 ClassLoader 的一个实例。类加载器包含两件事,获取字节码文件的二进制文件的能力,以及加载代码的能力(验证其完整性,将其转换为内存中的 java.lang.Class 实例,并使其可用于其他代码在系统中)。在 Java 1 中,如果您想控制 bye 的加载方式,您大多必须编写自己的类加载器,尽管有一个特定于 sun 的 AppletClassLoader,它是为从 http 而不是从文件系统加载类文件而编写的。
稍后,在 Java 1.2 中,“如何获取类文件的字节”部分在 URLClassloader 中被分离出来。这可以使用任何支持的协议(protocol)来加载类。实际上,协议(protocol)支持机制过去和现在都可以通过可插入的协议(protocol)处理程序进行扩展。因此,现在您可以从任何地方加载类,而不会在较难的部分出错,这就是您验证和安装类的方式。
除此之外,Java 的 RMI 机制允许将序列化对象(类名,以及对象的“状态”部分)包装在 MarshaledObject 中。这添加了“可以从哪里加载此类”,表示为 URL。 RMI 自动将内存中的真实对象转换为 MarshaledObjects,并在网络上传送它们。如果 JVM 接收到一个它已经具有类定义的编码对象,它总是使用该类定义(为了安全)。但是,如果没有,那么只要满足一系列标准(安全性,以及简单的正常工作标准),就可以从该远程服务器加载类文件,从而允许 JVM 加载它从未见过定义的类。 (显然,此类系统的代码通常必须针对无处不在的接口(interface)编写——否则,将会有大量的反射发生!)
现在,我不知道(事实上,我发现你的问题试图确定 Spark 是否使用 RMI 基础设施(我知道 hadoop 没有,因为,似乎是因为作者想创建他们自己的系统- - 这当然是有趣和有教育意义的 - 而不是使用灵活的、可配置的、经过广泛测试的,包括经过安全测试的!- 系统。)
但是,要使这项工作大体上必须发生的就是我为 RMI 概述的步骤,这些要求基本上是:
1) 对象可以序列化为所有参与者都能理解的某种字节序列格式
2) 当通过线路发送对象时,接收端必须有某种方法来获取定义它们的类文件。这可以是 a) 预安装,b) RMI 的“在这里找到这个”的方法或 c) 发送系统发送 jar。这些都可以工作
3) 应该维护安全。在 RMI 中,这个要求相当“在你面前”,但我在 Spark 中看不到它,所以他们要么隐藏了配置,要么只是修复了它可以做什么。
无论如何,这不是一个真正的答案,因为我用一个具体的例子描述了原则,而不是对你的问题的实际具体答案。我还是想找到那个!
关于java - Apache Spark 如何将函数发送到引擎盖下的其他机器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42096854/
我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div
总的来说,我对ruby还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用
我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时
关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。
给定这段代码defcreate@upgrades=User.update_all(["role=?","upgraded"],:id=>params[:upgrade])redirect_toadmin_upgrades_path,:notice=>"Successfullyupgradeduser."end我如何在该操作中实际验证它们是否已保存或未重定向到适当的页面和消息? 最佳答案 在Rails3中,update_all不返回任何有意义的信息,除了已更新的记录数(这可能取决于您的DBMS是否返回该信息)。http://ar.ru
我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t
我正在寻找执行以下操作的正确语法(在Perl、Shell或Ruby中):#variabletoaccessthedatalinesappendedasafileEND_OF_SCRIPT_MARKERrawdatastartshereanditcontinues. 最佳答案 Perl用__DATA__做这个:#!/usr/bin/perlusestrict;usewarnings;while(){print;}__DATA__Texttoprintgoeshere 关于ruby-如何将脚
Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack
在选择我想要运行操作的频率时,唯一的选项是“每天”、“每小时”和“每10分钟”。谢谢!我想为我的Rails3.1应用程序运行调度程序。 最佳答案 这不是一个优雅的解决方案,但您可以安排它每天运行,并在实际开始工作之前检查日期是否为当月的第一天。 关于ruby-如何每月在Heroku运行一次Scheduler插件?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/8692687/
我有一个对象has_many应呈现为xml的子对象。这不是问题。我的问题是我创建了一个Hash包含此数据,就像解析器需要它一样。但是rails自动将整个文件包含在.........我需要摆脱type="array"和我该如何处理?我没有在文档中找到任何内容。 最佳答案 我遇到了同样的问题;这是我的XML:我在用这个:entries.to_xml将散列数据转换为XML,但这会将条目的数据包装到中所以我修改了:entries.to_xml(root:"Contacts")但这仍然将转换后的XML包装在“联系人”中,将我的XML代码修改为