这是我的代码:
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class SecondarySort extends Configured implements Tool{
public static void main(String[] args) {
try {
ToolRunner.run(new Configuration(), new SecondarySort(), args);
} catch (Exception e) {
e.printStackTrace();
}
}
static class KeyPartitioner implements Partitioner<StockKey, DoubleWritable> {
@Override
public int getPartition(StockKey arg0, DoubleWritable arg1, int arg2) {
int partition = arg0.name.hashCode() % arg2;
return partition;
}
@Override
public void configure(JobConf job) {
}
}
static class StockKey implements WritableComparable<StockKey> {
String name;
Long timestamp;
public StockKey() {
}
StockKey(String name, Long timestamp){
this.name = name;
this.timestamp = timestamp;
}
@Override
public void readFields(DataInput arg0) throws IOException {
name = WritableUtils.readString(arg0);
timestamp = arg0.readLong();
}
@Override
public void write(DataOutput arg0) throws IOException {
WritableUtils.writeString(arg0, name);
arg0.writeLong(timestamp);
}
@Override
public int compareTo(StockKey arg0) {
int result = 0;
result = name.compareToIgnoreCase(arg0.name);
if(result == 0)
result = timestamp.compareTo(arg0.timestamp);
return result;
}
public String toString() {
String outputString = name+","+timestamp;
return outputString;
}
}
static class StockReducer implements Reducer<StockKey, DoubleWritable, Text, Text>{
public void reduce(StockKey key, Iterator<DoubleWritable> value, Outp
OutputCollector<Text, Text> context, Reporter reporter)
throws IOException {
Text k = new Text(key.toString());
while(value.hasNext()) {
Double v = value.next().get();
Text t = new Text(v.toString());
context.collect(k, t);
}
}
@Override
public void configure(JobConf job) {
// TODO Auto-generated method stub
}
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
}
}
static class StockMapper implements Mapper<LongWritable, Text, StockKey,
DoubleWritable> {
public void map(LongWritable offset, Text value, OutputCollector<StockKey,
DoubleWritable> context, Reporter reporter)
throws IOException {
String[] values = value.toString().split(",");
StockKey key = new StockKey(values[0].trim(),
Long.parseLong(values[1].trim()));
DoubleWritable val = new
DoubleWritable(Double.parseDouble(values[2].trim()));
context.collect(key, val);
}
@Override
public void configure(JobConf job) {
// TODO Auto-generated method stub
}
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
}
}
@SuppressWarnings("unchecked")
@Override
public int run(String[] arg) throws Exception {
JobConf conf = new JobConf(getConf(), SecondarySort.class);
conf.setJobName(SecondarySort.class.getName());
conf.setJarByClass(SecondarySort.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
conf.setMapOutputKeyClass(StockKey.class);
conf.setMapOutputValueClass(Text.class);
conf.setPartitionerClass((Class<? extends Partitioner<StockKey,
DoubleWritable>>) KeyPartitioner.class);
conf.setMapperClass((Class<? extends Mapper<LongWritable, Text, StockKey,
DoubleWritable>>) StockMapper.class);
conf.setReducerClass((Class<? extends Reducer<StockKey, DoubleWritable,
Text, Text>>) StockReducer.class);
FileInputFormat.addInputPath(conf, new Path(arg[0]));
FileOutputFormat.setOutputPath(conf, new Path(arg[1]));
JobClient.runJob(conf);
return 0;
}
}
异常(exception)情况:
java.io.IOException: Type mismatch in value from map: expected
org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.DoubleWritable
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:876)
at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:499)
at SecondarySort$StockMapper.map(SecondarySort.java:135)
at SecondarySort$StockMapper.map(SecondarySort.java:1)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127)
at org.apache.hadoop.mapred.Child.main(Child.java:264)
12/07/13 03:22:32 INFO mapred.JobClient: Task Id :
attempt_201207130314_0002_m_000001_2, Status : FAILED
java.io.IOException: Type mismatch in value from map: expected
org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.DoubleWritable
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:876)
at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:499)
at SecondarySort$StockMapper.map(SecondarySort.java:135)
at SecondarySort$StockMapper.map(SecondarySort.java:1)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127)
at org.apache.hadoop.mapred.Child.main(Child.java:264)
最佳答案
这段代码有很多潜在的问题可能会导致它:
StockKey - 你应该覆盖默认的 hashCode() 方法 - 目前两个 StockKey 具有相同的内容将具有不同的 hashCode 值(就好像您不覆盖 JVM 默认值一样,那么它将返回一个数字,该数字在所有范围内都是两个对象在内存中的地址)。我知道在您的分区程序中您只使用 name 字段(它是一个字符串并且将具有 hashCode() 的有效实现,但这是一个很好的做法,以防将来您使用整个 Stock 对象的 hashCode() 并想知道为什么两个相同的 Stock 对象最终出现在不同的 reducer 中
KeyPartitioner - 您需要 Math.abs(..) arg0.name.hashCode() 的结果.目前,这个值可能会返回负数,当你对 reducer 的数量取模时,将返回一个负数。链式效应是 MR 框架将抛出异常,因为它期望一个介于 0(含)和 reducer 数量(不含)之间的数字。这可能是你的问题所在,我将在下一点解释
Mapper.map 方法 - 当您调用 context.collect 时,您正在吞下任何潜在的输出异常。继续我之前关于分区程序的观点——如果它返回一个负数,将抛出一个异常,你需要处理这个异常。在某些情况下捕获和吞噬异常可能没问题(例如输入记录的数据验证),但是输出时发生的任何异常都应该被抛给 MR 框架以标记出现问题并且此映射器的输出是错误的/不完整:
try {
context.collect(key, val);
} catch (IOException e) {
e.printStackTrace();
}
最后,您需要显式声明您的 map 和 reduce 输出类型(这会导致异常,因为您当前将 map 值输出类型声明为 Text,而实际上 mapper 输出的是 DoubleWritable):
job.setMapOutputKeyClass(StockKey.class);job.setMapOutputValueClass(DoubleWritable.class);
job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);
我建议您删除 context.collect 调用周围的 try/catch block 并重新运行您的作业(或者只检查 map task 的日志,看看您是否看到堆栈跟踪)。
关于java - Mapreduce作业运行,出现异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11447736/
总的来说,我对ruby还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用
在选择我想要运行操作的频率时,唯一的选项是“每天”、“每小时”和“每10分钟”。谢谢!我想为我的Rails3.1应用程序运行调度程序。 最佳答案 这不是一个优雅的解决方案,但您可以安排它每天运行,并在实际开始工作之前检查日期是否为当月的第一天。 关于ruby-如何每月在Heroku运行一次Scheduler插件?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/8692687/
exe应该在我打开页面时运行。异步进程需要运行。有什么方法可以在ruby中使用两个参数异步运行exe吗?我已经尝试过ruby命令-system()、exec()但它正在等待过程完成。我需要用参数启动exe,无需等待进程完成是否有任何rubygems会支持我的问题? 最佳答案 您可以使用Process.spawn和Process.wait2:pid=Process.spawn'your.exe','--option'#Later...pid,status=Process.wait2pid您的程序将作为解释器的子进程执行。除
我尝试运行2.x应用程序。我使用rvm并为此应用程序设置其他版本的ruby:$rvmuseree-1.8.7-head我尝试运行服务器,然后出现很多错误:$script/serverNOTE:Gem.source_indexisdeprecated,useSpecification.Itwillberemovedonorafter2011-11-01.Gem.source_indexcalledfrom/Users/serg/rails_projects_terminal/work_proj/spohelp/config/../vendor/rails/railties/lib/r
Sinatra新手;我正在运行一些rspec测试,但在日志中收到了一堆不需要的噪音。如何消除日志中过多的噪音?我仔细检查了环境是否设置为:test,这意味着记录器级别应设置为WARN而不是DEBUG。spec_helper:require"./app"require"sinatra"require"rspec"require"rack/test"require"database_cleaner"require"factory_girl"set:environment,:testFactoryGirl.definition_file_paths=%w{./factories./test/
我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/
我正在学习Rails,并阅读了关于乐观锁的内容。我已将类型为integer的lock_version列添加到我的articles表中。但现在每当我第一次尝试更新记录时,我都会收到StaleObjectError异常。这是我的迁移:classAddLockVersionToArticle当我尝试通过Rails控制台更新文章时:article=Article.first=>#我这样做:article.title="newtitle"article.save我明白了:(0.3ms)begintransaction(0.3ms)UPDATE"articles"SET"title"='dwdwd
在Cooper的书BeginningRuby中,第166页有一个我无法重现的示例。classSongincludeComparableattr_accessor:lengthdef(other)@lengthother.lengthenddefinitialize(song_name,length)@song_name=song_name@length=lengthendenda=Song.new('Rockaroundtheclock',143)b=Song.new('BohemianRhapsody',544)c=Song.new('MinuteWaltz',60)a.betwee
GivenIamadumbprogrammerandIamusingrspecandIamusingsporkandIwanttodebug...mmm...let'ssaaay,aspecforPhone.那么,我应该把“require'ruby-debug'”行放在哪里,以便在phone_spec.rb的特定点停止处理?(我所要求的只是一个大而粗的箭头,即使是一个有挑战性的程序员也能看到:-3)我已经尝试了很多位置,除非我没有正确测试它们,否则会发生一些奇怪的事情:在spec_helper.rb中的以下位置:require'rubygems'require'spork'
我早就知道Ruby中的“常量”(即大写的变量名)不是真正常量。与其他编程语言一样,对对象的引用是唯一存储在变量/常量中的东西。(侧边栏:Ruby确实具有“卡住”引用对象不被修改的功能,据我所知,许多其他语言都没有提供这种功能。)所以这是我的问题:当您将一个值重新分配给常量时,您会收到如下警告:>>FOO='bar'=>"bar">>FOO='baz'(irb):2:warning:alreadyinitializedconstantFOO=>"baz"有没有办法强制Ruby抛出异常而不是打印警告?很难弄清楚为什么有时会发生重新分配。 最佳答案