我创建了一个自定义的 xmloutputformat 类,将 reducer 的输出转换为 xml 格式。
这里的问题是代码执行成功但最终输出是普通格式而不是 XML 格式。
谁能帮帮我……?
package dd;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MaxTemperature extends Configured implements Tool {
public static class MapMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus
// signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
public static class Mapreducers extends
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
public int run(String[] args) throws Exception {
Job job = new Job();
job.setJarByClass(MaxTemperature.class);
job.setJobName("MaxTemperature");
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(args[0]), conf);
if (fs.exists(new Path(args[1]))) {
fs.delete(new Path(args[1]), true);
}
FileInputFormat.addInputPath(job, new Path(args[0]));
XmlOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MapMapper.class);
job.setCombinerClass(Mapreducers.class);
job.setReducerClass(Mapreducers.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int xx = 1;
xx = ToolRunner.run(new MaxTemperature(), args);
System.exit(xx);
}
}
自定义xml格式代码如下所示:-
package dd;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/** An {@link OutputFormat} that writes plain text files. */
public class XmlOutputFormat<K, V> extends FileOutputFormat {
protected static class XmlRecordWriter<K, V> extends RecordWriter<K, V> {
private static final String utf8 = "UTF-8";
protected DataOutputStream out;
public XmlRecordWriter(DataOutputStream out) throws IOException {
this.out = out;
out.writeBytes("<results>\n");
}
/**
* Write the object to the byte stream, handling Text as a special case.
*
* @param o
* the object to print
* @throws IOException
* if the write throws, we pass it on
*/
private void writeObject(Object o) throws IOException {
if (o instanceof Text) {
Text to = (Text) o;
out.write(to.getBytes(), 0, to.getLength());
} else {
out.write(o.toString().getBytes(utf8));
}
}
private void writeKey(Object o, boolean closing) throws IOException {
out.writeBytes("<");
if (closing) {
out.writeBytes("/");
}
writeObject(o);
out.writeBytes(">");
if (closing) {
out.writeBytes("\n");
}
}
public synchronized void write(K key, V value) throws IOException {
boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey && nullValue) {
return;
}
Object keyObj = key;
if (!nullKey) {
keyObj = "value";
}
writeKey(keyObj, false);
if (!nullValue) {
writeObject(value);
}
writeKey(keyObj, true);
}
public synchronized void close(TaskAttemptContext context)
throws IOException {
out.close();
}
}
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job)
throws IOException, InterruptedException {
Path file = FileOutputFormat.getOutputPath(job);
Configuration conf = new Configuration();
FileSystem fs = file.getFileSystem(conf);
FSDataOutputStream fileout = fs.create(file);
return new XmlRecordWriter<K, V>(fileout);
}
}
--
提前致谢。
最佳答案
希望对你有帮助。
您可以仔细检查并相应地修改您的代码。
更新
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
context.write(new Text("<MapReduce>"), null);
}
public static class Mapreducers extends
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
Text out = new Text(constructPropertyXml(key, maxValue));
context.write(out, null);
}
}
public static String constructPropertyXml(Text key, Text maxvalue) {
StringBuilder sb = new StringBuilder();
sb.append("<result><key>").append(key)
.append("</key><value>").append(maxvalue)
.append("</value></result>");
return sb.toString();
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
context.write(new Text("</MapReduce>"), null);
}
关于java - 如何在 hadoop 中将 reducer 输出作为 xml 格式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24135656/
出于纯粹的兴趣,我很好奇如何按顺序创建PI,而不是在过程结果之后生成数字,而是让数字在过程本身生成时显示。如果是这种情况,那么数字可以自行产生,我可以对以前看到的数字实现垃圾收集,从而创建一个无限系列。结果只是在Pi系列之后每秒生成一个数字。这是我通过互联网筛选的结果:这是流行的计算机友好算法,类机器算法:defarccot(x,unity)xpow=unity/xn=1sign=1sum=0loopdoterm=xpow/nbreakifterm==0sum+=sign*(xpow/n)xpow/=x*xn+=2sign=-signendsumenddefcalc_pi(digits
我的目标是转换表单输入,例如“100兆字节”或“1GB”,并将其转换为我可以存储在数据库中的文件大小(以千字节为单位)。目前,我有这个:defquota_convert@regex=/([0-9]+)(.*)s/@sizes=%w{kilobytemegabytegigabyte}m=self.quota.match(@regex)if@sizes.include?m[2]eval("self.quota=#{m[1]}.#{m[2]}")endend这有效,但前提是输入是倍数(“gigabytes”,而不是“gigabyte”)并且由于使用了eval看起来疯狂不安全。所以,功能正常,
我想将html转换为纯文本。不过,我不想只删除标签,我想智能地保留尽可能多的格式。为插入换行符标签,检测段落并格式化它们等。输入非常简单,通常是格式良好的html(不是整个文档,只是一堆内容,通常没有anchor或图像)。我可以将几个正则表达式放在一起,让我达到80%,但我认为可能有一些现有的解决方案更智能。 最佳答案 首先,不要尝试为此使用正则表达式。很有可能你会想出一个脆弱/脆弱的解决方案,它会随着HTML的变化而崩溃,或者很难管理和维护。您可以使用Nokogiri快速解析HTML并提取文本:require'nokogiri'h
如何在buildr项目中使用Ruby?我在很多不同的项目中使用过Ruby、JRuby、Java和Clojure。我目前正在使用我的标准Ruby开发一个模拟应用程序,我想尝试使用Clojure后端(我确实喜欢功能代码)以及JRubygui和测试套件。我还可以看到在未来的不同项目中使用Scala作为后端。我想我要为我的项目尝试一下buildr(http://buildr.apache.org/),但我注意到buildr似乎没有设置为在项目中使用JRuby代码本身!这看起来有点傻,因为该工具旨在统一通用的JVM语言并且是在ruby中构建的。除了将输出的jar包含在一个独特的、仅限ruby
我正在使用的第三方API的文档状态:"[O]urAPIonlyacceptspaddedBase64encodedstrings."什么是“填充的Base64编码字符串”以及如何在Ruby中生成它们。下面的代码是我第一次尝试创建转换为Base64的JSON格式数据。xa=Base64.encode64(a.to_json) 最佳答案 他们说的padding其实就是Base64本身的一部分。它是末尾的“=”和“==”。Base64将3个字节的数据包编码为4个编码字符。所以如果你的输入数据有长度n和n%3=1=>"=="末尾用于填充n%
我有一个对象has_many应呈现为xml的子对象。这不是问题。我的问题是我创建了一个Hash包含此数据,就像解析器需要它一样。但是rails自动将整个文件包含在.........我需要摆脱type="array"和我该如何处理?我没有在文档中找到任何内容。 最佳答案 我遇到了同样的问题;这是我的XML:我在用这个:entries.to_xml将散列数据转换为XML,但这会将条目的数据包装到中所以我修改了:entries.to_xml(root:"Contacts")但这仍然将转换后的XML包装在“联系人”中,将我的XML代码修改为
为了将Cucumber用于命令行脚本,我按照提供的说明安装了arubagem。它在我的Gemfile中,我可以验证是否安装了正确的版本并且我已经包含了require'aruba/cucumber'在'features/env.rb'中为了确保它能正常工作,我写了以下场景:@announceScenario:Testingcucumber/arubaGivenablankslateThentheoutputfrom"ls-la"shouldcontain"drw"假设事情应该失败。它确实失败了,但失败的原因是错误的:@announceScenario:Testingcucumber/ar
exe应该在我打开页面时运行。异步进程需要运行。有什么方法可以在ruby中使用两个参数异步运行exe吗?我已经尝试过ruby命令-system()、exec()但它正在等待过程完成。我需要用参数启动exe,无需等待进程完成是否有任何rubygems会支持我的问题? 最佳答案 您可以使用Process.spawn和Process.wait2:pid=Process.spawn'your.exe','--option'#Later...pid,status=Process.wait2pid您的程序将作为解释器的子进程执行。除
我正在使用puppet为ruby程序提供一组常量。我需要提供一组主机名,我的程序将对其进行迭代。在我之前使用的bash脚本中,我只是将它作为一个puppet变量hosts=>"host1,host2"我将其提供给bash脚本作为HOSTS=显然这对ruby不太适用——我需要它的格式hosts=["host1","host2"]自从phosts和putsmy_array.inspect提供输出["host1","host2"]我希望使用其中之一。不幸的是,我终其一生都无法弄清楚如何让它发挥作用。我尝试了以下各项:我发现某处他们指出我需要在函数调用前放置“function_”……这
鉴于我有以下迁移:Sequel.migrationdoupdoalter_table:usersdoadd_column:is_admin,:default=>falseend#SequelrunsaDESCRIBEtablestatement,whenthemodelisloaded.#Atthispoint,itdoesnotknowthatusershaveais_adminflag.#Soitfails.@user=User.find(:email=>"admin@fancy-startup.example")@user.is_admin=true@user.save!ende