我有一个 flink 作业,它使用 TextOutputFormat 将数据写入目标。代码是这样的:
String basePath = "/Users/me/out";
// String basePath = "hdfs://10.199.200.204:9000/data";
// ensure we have a format for this.
TextOutputFormat<String> format = new TextOutputFormat<>(new Path(basePath, selection + "/" + uid));
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
format.configure(GlobalConfiguration.getConfiguration());
format.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());
// then serialize and write.
String record = serializationFunction.map(value);
log.info("Writing " + record);
format.writeRecord(record);
当使用普通文件系统上的路径作为目标时,这工作得很好。但是,当我将基本路径更改为 hdfs 位置时,它不再按预期工作。发生的情况是,输出文件实际上是在 HDFS 上创建的,但是它的大小为零字节。我在通话期间没有收到任何异常。
我正在使用 Hadoop 2.6.0 和 Flink 0.10.1。使用命令行工具 (hadoop fs -put ...) 将文件复制到 hdfs 是可行的,所以我想我可以排除一些 Hadoop 错误配置。我还启动了 Wireshark 并看到数据正在传输到 Hadoop 服务器,所以我是否需要在实际写入之前以某种方式提交它?
最佳答案
为了将结果刷新到 HDFS,您必须在完成记录写入后调用 TextOutputFormat 的 close 方法。
// do writing
while (some condition) {
format.writeRecord(record);
}
// finished writing
format.close();
关于hadoop - Flink 在 HDFS 上写入产生空文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34328908/
好的,所以我的目标是轻松地将一些数据保存到磁盘以备后用。您如何简单地写入然后读取一个对象?所以如果我有一个简单的类classCattr_accessor:a,:bdefinitialize(a,b)@a,@b=a,bendend所以如果我从中非常快地制作一个objobj=C.new("foo","bar")#justgaveitsomerandomvalues然后我可以把它变成一个kindaidstring=obj.to_s#whichreturns""我终于可以将此字符串打印到文件或其他内容中。我的问题是,我该如何再次将这个id变回一个对象?我知道我可以自己挑选信息并制作一个接受该信
1.1.1 YARN的介绍 为克服Hadoop1.0中HDFS和MapReduce存在的各种问题⽽提出的,针对Hadoop1.0中的MapReduce在扩展性和多框架⽀持⽅⾯的不⾜,提出了全新的资源管理框架YARN. ApacheYARN(YetanotherResourceNegotiator的缩写)是Hadoop集群的资源管理系统,负责为计算程序提供服务器计算资源,相当于⼀个分布式的操作系统平台,⽽MapReduce等计算程序则相当于运⾏于操作系统之上的应⽤程序。 YARN被引⼊Hadoop2,最初是为了改善MapReduce的实现,但是因为具有⾜够的通⽤性,同样可以⽀持其他的分布式计算模
我想知道Ruby用来在命令行打印这些东西的输出流:irb(main):001:0>a="test"=>"test"irb(main):002:0>putsatest=>nilirb(main):003:0>a=>"test"$stdout是否用于irb(main):002:0>和irb(main):003:0>?而且,在这两次调用之间,$stdout的值是否有任何变化?另外,有人能告诉我打印/写入这些内容的Ruby源代码吗? 最佳答案 是的。而且很容易向自己测试/证明。在命令行试试这个:ruby-e'puts"foo"'>test.
我正在编写一个ruby程序,它应该执行另一个程序,通过stdin向它传递值,从它的stdout读取响应,然后打印响应。这是我目前所拥有的。#!/usr/bin/envrubyrequire'open3'stdin,stdout,stderr=Open3.popen3('./MyProgram')stdin.puts"helloworld!"output=stdout.readerrors=stderr.readstdin.closestdout.closestderr.closeputs"Output:"puts"-------"putsoutputputs"\nErrors:"p
我正在处理http://prepwork.appacademy.io/mini-curriculum/array/中概述的数组问题我正在尝试创建函数my_transpose,它接受一个矩阵并返回其转置。我对写入二维数组感到很困惑!这是一个代码片段,突出了我的困惑。rows=[[0,1,2],[3,4,5],[6,7,8]]columns=Array.new(3,Array.new(3))putscolumns.to_s#Outputisa3x3arrayfilledwithnilcolumns[0][0]=0putscolumns.to_s#Outputis[[0,nil,nil],[
我在一个ruby文件中有一个函数可以像这样写入一个文件File.open("myfile",'a'){|f|f.puts("#{sometext}")}这个函数在不同的线程中被调用,使得像上面这样的文件写入不是线程安全的。有谁知道如何以最简单的方式使这个文件写入线程安全?更多信息:如果重要的话,我正在使用rspec框架。 最佳答案 您可以通过File#flock给锁File.open("myfile",'a'){|f|f.flock(File::LOCK_EX)f.puts("#{sometext}")}
有人可以花我一些代码,在图像底部添加文本吗?我想使用Rmagick,但我也愿意使用其他工具。 最佳答案 我也发现了这个,它非常适合我。require'RMagick'includeMagick#Dimisionsbasedonanimage3072x2048unlessARGV[0]andFile.exists?(ARGV[0])puts"\n\n\nYouneedtospecifyafilename:watermark.rb\n\n\n"exitendimg=Image.read(ARGV[0]).firstnew_img="wm
我目前正在试验ActionController::Live,但我不知道如何正确地测试它。在我的Controller中,我写了这个response.stream.write("event:#{event}\n")response.stream.write("data:#{post.to_json}\n\n")但是当我在rspec测试中检查对象时,我看到了这个(rdb:1)response.stream.instance_variable_get(:@buf)["event:event\n"]当我将“数据”写入流时,我不明白为什么它没有出现在数组中。当我删除第一个response.stre
重新定义Float#/似乎没有效果:classFloatdef/(other)"magic!"endendputs10.0/2.0#=>5.0但是当另一个中缀运算符Float#*被重新定义时,Float#/突然采用了新的定义:classFloatdef/(other)"magic!"enddef*(other)"spooky"endendputs10.0/2.0#=>"magic!"我很想知道是否有人可以解释这种行为的来源,以及其他人是否得到相同的结果。ruby:ruby2.0.0p353(2013-11-22)[x64-mingw32]要快速确认错误,请运行thisscript.
我想使用rmagick将图像写入文件。下面给出的是我的代码im="base64encodedstring"image=Magick::Image.from_blob(Base64.decode64(im)image[0].format="jpeg"name="something_temp"path="/somepath/"+nameFile.open(path,"wb"){|f|f.write(image[0])}我也尝试过使用f.write(image).但是文件中写入的是#.这是什么原因? 最佳答案 这应该有效:image[0]