下面是我使用 Apache Spark 的用例
1) 我在 HDFS 上有大约 2500 个 Parquet 文件,文件大小因文件而异。
2) 我需要处理每个 parquet 文件并构建一个新的 DataFrame 并将一个新的 DataFrame 写入 orc 文件格式。
3)我的Spark驱动程序是这样的。 我正在迭代每个文件,处理单个 Parquet 文件,创建一个新的 DataFrame 并将一个新的 DataFrame 编写为 ORC,下面是代码片段。
val fs = FileSystem.get(new Configuration())
val parquetDFMap = fs.listStatus(new Path(inputFilePath)).map(folder => {
(folder.getPath.toString, sqlContext.read.parquet(folder.getPath.toString))})
parquetDFMap.foreach {
dfMap =>
val parquetFileName = dfMap._1
val parqFileDataFrame = dfMap._2
for (column <- parqFileDataFrame.columns)
{
val rows = parqFileDataFrame.select(column)
.mapPartitions(lines => lines.filter(filterRowsWithNullValues(_))
.map(row => buildRowRecords(row, masterStructArr.toArray, valuesArr)))
val newDataFrame: DataFrame = parqFileDataFrame.sqlContext.createDataFrame(rows, StructType(masterStructArr))
newDataFrame.write.mode(SaveMode.Append).format("orc").save(orcOutPutFilePath+tableName)
}
}
这种设计的问题我只能及时处理一个 parquet 文件,只有当我创建一个新的数据帧并且将新的数据帧写入 ORC 格式时才应用并行性。因此,如果创建新的 DataFrame 或将新的 DataFrame 写入 ORC 等任何任务需要很长时间才能完成,其他排队的 parquet 处理就会卡住,直到当前的 parquet 操作完成。
能否请您帮助我为这个用例提供更好的方法或设计。
最佳答案
你能为所有 parquet 文件创建一个数据框而不是为每个文件创建一个数据框吗
val df = sqlContext.read.parquet(inputFilePath)
df.map(row => convertToORc(row))
关于scala - Spark DataFrame 并行性,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37838950/
我有一个涉及多台机器、消息队列和事务的问题。因此,例如用户点击网页,点击将消息发送到另一台机器,该机器将付款添加到用户的帐户。每秒可能有数千次点击。事务的所有方面都应该是容错的。我以前从未遇到过这样的事情,但一些阅读表明这是一个众所周知的问题。所以我的问题。我假设安全的方法是使用两阶段提交,但协议(protocol)是阻塞的,所以我不会获得所需的性能,我是否正确?我通常写Ruby,但似乎Redis之类的数据库和Rescue、RabbitMQ等消息队列系统对我的帮助不大——即使我实现某种两阶段提交,如果Redis崩溃,数据也会丢失,因为它本质上只是内存。所有这些让我开始关注erlang和
我明白了:x,(y,z)=1,*[2,3]x#=>1y#=>2z#=>nil我想知道为什么z的值为nil。 最佳答案 x,(y,z)=1,*[2,3]右侧的splat*是内联扩展的,所以它等同于:x,(y,z)=1,2,3左边带括号的列表被视为嵌套赋值,所以它等价于:x=1y,z=23被丢弃,而z被分配给nil。 关于ruby-带括号和splat运算符的并行赋值,我们在StackOverflow上找到一个类似的问题: https://stackoverflow
假设您在Ruby中执行此操作:ar=[1,2]x,y=ar然后,x==1和y==2。是否有一种方法可以在我自己的类中定义,从而产生相同的效果?例如rb=AllYourCode.newx,y=rb到目前为止,对于这样的赋值,我所能做的就是使x==rb和y=nil。Python有这样一个特性:>>>classFoo:...def__iter__(self):...returniter([1,2])...>>>x,y=Foo()>>>x1>>>y2 最佳答案 是的。定义#to_ary。这将使您的对象被视为要分配的数组。irb>o=Obje
我想测试一个并行赋值的返回值,我写了puts(x,y=1,2),但是不行,打印错误信息:SyntaxError:(irb):74:syntaxerror,unexpected',',expecting')'puts(x,y=1,2)^(irb):74:syntaxerror,unexpected')',expectingend-of-input有什么问题吗? 最佳答案 你有两个问题。puts和(之间的空格防止括号列表被解释为参数列表。一旦你在方法名后放置一个空格,任何argumentlisthastobeoutsidethepare
我在一个ruby脚本中有4个测试,我使用命令运行它们rubytest.rb输出看起来像LoadedsuitetestStarted....Finishedin50.326546seconds.4tests,5assertions,0failures,0errors,0pendings,0omissions,0notifications100%passed我想要实现的是,并行运行所有4个测试,而不是按顺序运行。大约4个线程,每个线程运行一个测试,有效地将执行时间减少到4个测试中最慢的一个+并行执行的时间很短。我遇到了this,但这似乎并行运行多个ruby测试文件-假设我有test
我有以下代码:FTP...do|ftp|files.eachdo|file|...ftp.put(file)sleep1endend我想在单独的线程或某种并行方式中运行每个文件。执行此操作的正确方法是什么?这是对的吗?这是我对parallelgem的尝试FTP...do|ftp|Parallel.map(files)do|file|...ftp.put(file)sleep1endend并行的问题是puts/outputs可以像这样同时发生:as=[1,2,3,4,5,6,7,8]results=Parallel.map(as)do|a|putsaend我怎样才能强制执行看跌期权,就像
在我的应用程序中,我有几个生成器类,它们负责获取从外部API请求接收的数据,并将资源构建/保存到数据库中。我正在处理大量数据,并已实现并行gem以通过使用多个进程来加快处理速度。但是,我发现对使用Parallel的方法的任何测试都会失败并出现相同的错误:ActiveRecord::StatementInvalid:PG::ConnectionBad:PQconsumeInput()serverclosedtheconnectionunexpectedlyThisprobablymeanstheserverterminatedabnormallybeforeorwhileprocessi
我有一个ruby脚本读取一个巨大的表(约2000万行),进行一些处理并将其提供给Solr用于索引目的。这一直是我们流程中的一大瓶颈。我打算在这里加快速度,我想实现某种并行性。我对Ruby的多线程特性感到困惑。我们的服务器有ruby1.8.7(2009-06-12补丁级别174)[x86_64-linux]。来自thisblogpost和thisquestionatStackOverflow可见Ruby没有“真正的”多线程方法。我们的服务器有多个核心,所以使用parallelgem对我来说似乎是另一种方法。我应该采用什么方法?此外,我们将非常感谢您对并行数据库读取馈送系统的任何投入。
设置一个临时变量来交换数组中的两个元素似乎比使用并行赋值更有效。谁能帮忙解释下?require"benchmark"Benchmark.bmdo|b|b.reportdo40000000.times{array[1],array[2]=array[2],array[1]}endendBenchmark.bmdo|b|b.reportdo40000000.timesdot=array[1]array[1]=array[2]array[2]=tendendend结果:usersystemtotalreal4.4700000.0200004.490000(4.510368)usersyste
在Ruby1.9.3中,you'reallowedtorunmultipletestcasesatonce.我不确定这是语言的特性、minitest库还是YARV的特性,所以对于任何不好的术语表示歉意。但是他们是否为此取消了GVL,或者这是否仅仅意味着如果一个线程正在执行IO,另一个线程可以利用CPU? 最佳答案 该实现不使用线程,而是使用通过管道进行通信的独立进程。参见例如thispresentation.所以GVL/GIL没有发挥作用。 关于ruby-Ruby1.9.3中的并行测试有