草庐IT

hadoop中MapReduce多种join实现实例分析

zengzhaozheng 2023-03-28 原文
一、概述

    对于RDBMS中的join操作大伙一定非常熟悉,写sql的时候要十分注意细节,稍有差池就会耗时巨久造成很大的性能瓶颈,而在Hadoop中使用MapReduce框架进行join的操作时同样耗时,但是由于hadoop的分布式设计理念的特殊性,因此对于这种join操作同样也具备了一定的特殊性。本文主要对MapReduce框架对表之间的join操作的几种实现方式进行详细分析,并且根据我在实际开发过程中遇到的实际例子来进行进一步的说明。

二、实现原理

1、在Reudce端进行连接。

   在Reudce端进行连接是MapReduce框架进行表之间join操作最为常见的模式,其具体的实现原理如下:

Map端的主要工作:为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。

reduce端的主要工作:在reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经打标志)分开,最后进行笛卡尔只就ok了。原理非常简单,下面来看一个实例:

(1)自定义一个value返回类型:

package com.mr.reduceSizeJoin; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; public class CombineValues implements WritableComparable<CombineValues>{     //private static final Logger logger = LoggerFactory.getLogger(CombineValues.class);     private Text joinKey;//链接关键字     private Text flag;//文件来源标志     private Text secondPart;//除了链接键外的其他部分     public void setJoinKey(Text joinKey) {         this.joinKey = joinKey;     }     public void setFlag(Text flag) {         this.flag = flag;     }     public void setSecondPart(Text secondPart) {         this.secondPart = secondPart;     }     public Text getFlag() {         return flag;     }     public Text getSecondPart() {         return secondPart;     }     public Text getJoinKey() {         return joinKey;     }     public CombineValues() {         this.joinKey =  new Text();         this.flag = new Text();         this.secondPart = new Text();     }                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               @Override     public void write(DataOutput out) throws IOException {         this.joinKey.write(out);         this.flag.write(out);         this.secondPart.write(out);     }     @Override     public void readFields(DataInput in) throws IOException {         this.joinKey.readFields(in);         this.flag.readFields(in);         this.secondPart.readFields(in);     }     @Override     public int compareTo(CombineValues o) {         return this.joinKey.compareTo(o.getJoinKey());     }     @Override     public String toString() {         // TODO Auto-generated method stub         return "[flag="+this.flag.toString()+",joinKey="+this.joinKey.toString()+",secondPart="+this.secondPart.toString()+"]";     } }(2)map、reduce主体代码

package com.mr.reduceSizeJoin; import java.io.IOException; import java.util.ArrayList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /**  * @author zengzhaozheng  * 用途说明:  * reudce side join中的left outer join  * 左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段  * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show)  * tb_dim_city.dat文件内容,分隔符为"|":  * id     name  orderid  city_code  is_show  * 0       其他        9999     9999         0  * 1       长春        1        901          1  * 2       吉林        2        902          1  * 3       四平        3        903          1  * 4       松原        4        904          1  * 5       通化        5        905          1  * 6       辽源        6        906          1  * 7       白城        7        907          1  * 8       白山        8        908          1  * 9       延吉        9        909          1  * -------------------------风骚的分割线-------------------------------  * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)  * tb_user_profiles.dat文件内容,分隔符为"|":  * userID   network     flow    cityID  * 1           2G       123      1  * 2           3G       333      2  * 3           3G       555      1  * 4           2G       777      3  * 5           3G       666      4  *  * -------------------------风骚的分割线-------------------------------  *  结果:  *  1   长春  1   901 1   1   2G  123  *  1   长春  1   901 1   3   3G  555  *  2   吉林  2   902 1   2   3G  333  *  3   四平  3   903 1   4   2G  777  *  4   松原  4   904 1   5   3G  666  */ public class ReduceSideJoin_LeftOuterJoin extends Configured implements Tool{     private static final Logger logger = LoggerFactory.getLogger(ReduceSideJoin_LeftOuterJoin.class);     public static class LeftOutJoinMapper extends Mapper<Object, Text, Text, CombineValues> {         private CombineValues combineValues = new CombineValues();         private Text flag = new Text();         private Text joinKey = new Text();         private Text secondPart = new Text();         @Override         protected void map(Object key, Text value, Context context)                 throws IOException, InterruptedException {             //获得文件输入路径             String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();             //数据来自tb_dim_city.dat文件,标志即为"0"             if(pathName.endsWith("tb_dim_city.dat")){                 String[] valueItems = value.toString().split("\\|");                 //过滤格式错误的记录                 if(valueItems.length != 5){                     return;                 }                 flag.set("0");                 joinKey.set(valueItems[0]);                 secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]);                 combineValues.setFlag(flag);                 combineValues.setJoinKey(joinKey);                 combineValues.setSecondPart(secondPart);                 context.write(combineValues.getJoinKey(), combineValues);                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             }//数据来自于tb_user_profiles.dat,标志即为"1"             else if(pathName.endsWith("tb_user_profiles.dat")){                 String[] valueItems = value.toString().split("\\|");                 //过滤格式错误的记录                 if(valueItems.length != 4){                     return;                 }                 flag.set("1");                 joinKey.set(valueItems[3]);                 secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]);                 combineValues.setFlag(flag);                 combineValues.setJoinKey(joinKey);                 combineValues.setSecondPart(secondPart);                 context.write(combineValues.getJoinKey(), combineValues);             }         }     }     public static class LeftOutJoinReducer extends Reducer<Text, CombineValues, Text, Text> {         //存储一个分组中的左表信息         private ArrayList<Text> leftTable = new ArrayList<Text>();         //存储一个分组中的右表信息         private ArrayList<Text> rightTable = new ArrayList<Text>();         private Text secondPar = null;         private Text output = new Text();         /**          * 一个分组调用一次reduce函数          */         @Override         protected void reduce(Text key, Iterable<CombineValues> value, Context context)                 throws IOException, InterruptedException {             leftTable.clear();             rightTable.clear();             /**              * 将分组中的元素按照文件分别进行存放              * 这种方法要注意的问题:              * 如果一个分组内的元素太多的话,可能会导致在reduce阶段出现OOM,              * 在处理分布式问题之前最好先了解数据的分布情况,根据不同的分布采取最              * 适当的处理方法,这样可以有效的防止导致OOM和数据过度倾斜问题。              */             for(CombineValues cv : value){                 secondPar = new Text(cv.getSecondPart().toString());                 //左表tb_dim_city                 if("0".equals(cv.getFlag().toString().trim())){                     leftTable.add(secondPar);                 }                 //右表tb_user_profiles                 else if("1".equals(cv.getFlag().toString().trim())){                     rightTable.add(secondPar);                 }             }             logger.info("tb_dim_city:"+leftTable.toString());             logger.info("tb_user_profiles:"+rightTable.toString());             for(Text leftPart : leftTable){                 for(Text rightPart : rightTable){                     output.set(leftPart+ "\t" + rightPart);                     context.write(key, output);                 }             }         }     }     @Override     public int run(String[] args) throws Exception {           Configuration conf=getConf(); //获得配置文件对象             Job job=new Job(conf,"LeftOutJoinMR");             job.setJarByClass(ReduceSideJoin_LeftOuterJoin.class);                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径             FileOutputFormat.setOutputPath(job, new Path(args[1])); //设置reduce输出文件路径                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              job.setMapperClass(LeftOutJoinMapper.class);             job.setReducerClass(LeftOutJoinReducer.class);                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式             job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         //设置map的输出key和value类型             job.setMapOutputKeyClass(Text.class);             job.setMapOutputValueClass(CombineValues.class);                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         //设置reduce的输出key和value类型             job.setOutputKeyClass(Text.class);             job.setOutputValueClass(Text.class);             job.waitForCompletion(true);             return job.isSuccessful()?0:1;     }     public static void main(String[] args) throws IOException,             ClassNotFoundException, InterruptedException {         try {             int returnCode =  ToolRunner.run(new ReduceSideJoin_LeftOuterJoin(),args);             System.exit(returnCode);         } catch (Exception e) {             // TODO Auto-generated catch block             logger.error(e.getMessage());         }     } }其中具体的分析以及数据的输出输入请看代码中的注释已经写得比较清楚了,这里主要分析一下reduce join的一些不足。之所以会存在reduce join这种方式,我们可以很明显的看出原:因为整体数据被分割了,每个map task只处理一部分数据而不能够获取到所有需要的join字段,因此我们需要在讲join key作为reduce端的分组将所有join key相同的记录集中起来进行处理,所以reduce join这种方式就出现了。这种方式的缺点很明显就是会造成map和reduce端也就是shuffle阶段出现大量的数据传输,效率很低。

2、在Map端进行连接。

   使用场景:一张表十分小、一张表很大。

   用法:在提交作业的时候先将小表文件放到该作业的DistributedCache中,然后从DistributeCache中取出该小表进行join key / value解释分割放到内存中(可以放大Hash Map等等容器中)。然后扫描大表,看大表中的每条记录的join key /value值是否能够在内存中找到相同join key的记录,如果有则直接输出结果。

直接上代码,比较简单:

package com.mr.mapSideJoin; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.HashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /**  * @author zengzhaozheng  *  * 用途说明:  * Map side join中的left outer join  * 左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段  * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show),  * 假设tb_dim_city文件记录数很少,tb_dim_city.dat文件内容,分隔符为"|":  * id     name  orderid  city_code  is_show  * 0       其他        9999     9999         0  * 1       长春        1        901          1  * 2       吉林        2        902          1  * 3       四平        3        903          1  * 4       松原        4        904          1  * 5       通化        5        905          1  * 6       辽源        6        906          1  * 7       白城        7        907          1  * 8       白山        8        908          1  * 9       延吉        9        909          1  * -------------------------风骚的分割线-------------------------------  * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)  * tb_user_profiles.dat文件内容,分隔符为"|":  * userID   network     flow    cityID  * 1           2G       123      1  * 2           3G       333      2  * 3           3G       555      1  * 4           2G       777      3  * 5           3G       666      4  * -------------------------风骚的分割线-------------------------------  *  结果:  *  1   长春  1   901 1   1   2G  123  *  1   长春  1   901 1   3   3G  555  *  2   吉林  2   902 1   2   3G  333  *  3   四平  3   903 1   4   2G  777  *  4   松原  4   904 1   5   3G  666  */ public class MapSideJoinMain extends Configured implements Tool{     private static final Logger logger = LoggerFactory.getLogger(MapSideJoinMain.class);     public static class LeftOutJoinMapper extends Mapper<Object, Text, Text, Text> {                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            private HashMap<String,String> city_info = new HashMap<String, String>();         private Text outPutKey = new Text();         private Text outPutValue = new Text();         private String mapInputStr = null;         private String mapInputSpit[] = null;         private String city_secondPart = null;         /**          * 此方法在每个task开始之前执行,这里主要用作从DistributedCache          * 中取到tb_dim_city文件,并将里边记录取出放到内存中。          */         @Override         protected void setup(Context context)                 throws IOException, InterruptedException {             BufferedReader br = null;             //获得当前作业的DistributedCache相关文件             Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());             String cityInfo = null;             for(Path p : distributePaths){                 if(p.toString().endsWith("tb_dim_city.dat")){                     //读缓存文件,并放到mem中                     br = new BufferedReader(new FileReader(p.toString()));                     while(null!=(cityInfo=br.readLine())){                         String[] cityPart = cityInfo.split("\\|",5);                         if(cityPart.length ==5){                             city_info.put(cityPart[0], cityPart[1]+"\t"+cityPart[2]+"\t"+cityPart[3]+"\t"+cityPart[4]);                         }                     }                 }             }         }                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            /**          * Map端的实现相当简单,直接判断tb_user_profiles.dat中的          * cityID是否存在我的map中就ok了,这样就可以实现Map Join了          */         @Override         protected void map(Object key, Text value, Context context)                 throws IOException, InterruptedException {             //排掉空行             if(value == null || value.toString().equals("")){                 return;             }             mapInputStr = value.toString();             mapInputSpit = mapInputStr.split("\\|",4);             //过滤非法记录             if(mapInputSpit.length != 4){                 return;             }             //判断链接字段是否在map中存在             city_secondPart = city_info.get(mapInputSpit[3]);             if(city_secondPart != null){                 this.outPutKey.set(mapInputSpit[3]);                 this.outPutValue.set(city_secondPart+"\t"+mapInputSpit[0]+"\t"+mapInputSpit[1]+"\t"+mapInputSpit[2]);                 context.write(outPutKey, outPutValue);             }         }     }     @Override     public int run(String[] args) throws Exception {             Configuration conf=getConf(); //获得配置文件对象             DistributedCache.addCacheFile(new Path(args[1]).toUri(), conf);//为该job添加缓存文件             Job job=new Job(conf,"MapJoinMR");             job.setNumReduceTasks(0);                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径             FileOutputFormat.setOutputPath(job, new Path(args[2])); //设置reduce输出文件路径                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        job.setJarByClass(MapSideJoinMain.class);             job.setMapperClass(LeftOutJoinMapper.class);                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式             job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    //设置map的输出key和value类型             job.setMapOutputKeyClass(Text.class);                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    //设置reduce的输出key和value类型             job.setOutputKeyClass(Text.class);             job.setOutputValueClass(Text.class);             job.waitForCompletion(true);             return job.isSuccessful()?0:1;     }     public static void main(String[] args) throws IOException,             ClassNotFoundException, InterruptedException {         try {             int returnCode =  ToolRunner.run(new MapSideJoinMain(),args);             System.exit(returnCode);         } catch (Exception e) {             // TODO Auto-generated catch block             logger.error(e.getMessage());         }     } }这里说说DistributedCache。DistributedCache是分布式缓存的一种实现,它在整个MapReduce框架中起着相当重要的作用,他可以支撑我们写一些相当复杂高效的分布式程序。说回到这里,JobTracker在作业启动之前会获取到DistributedCache的资源uri列表,并将对应的文件分发到各个涉及到该作业的任务的TaskTracker上。另外,关于DistributedCache和作业的关系,比如权限、存储路径区分、public和private等属性,接下来有用再整理研究一下写一篇blog,这里就不详细说了。

   另外还有一种比较变态的Map Join方式,就是结合HBase来做Map Join操作。这种方式完全可以突破内存的控制,使你毫无忌惮的使用Map Join,而且效率也非常不错。

3、SemiJoin。

SemiJoin就是所谓的半连接,其实仔细一看就是reduce join的一个变种,就是在map端过滤掉一些数据,在网络中只传输参与连接的数据不参与连接的数据不必在网络中进行传输,从而减少了shuffle的网络传输量,使整体效率得到提高,其他思想和reduce join是一模一样的。说得更加接地气一点就是将小表中参与join的key单独抽出来通过DistributedCach分发到相关节点,然后将其取出放到内存中(可以放到HashSet中),在map阶段扫描连接表,将join key不在内存HashSet中的记录过滤掉,让那些参与join的记录通过shuffle传输到reduce端进行join操作,其他的和reduce join都是一样的。看代码:

package com.mr.SemiJoin; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /**  * @author zengzhaozheng  *  * 用途说明:  * reudce side join中的left outer join  * 左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段  * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show)  * tb_dim_city.dat文件内容,分隔符为"|":  * id     name  orderid  city_code  is_show  * 0       其他        9999     9999         0  * 1       长春        1        901          1  * 2       吉林        2        902          1  * 3       四平        3        903          1  * 4       松原        4        904          1  * 5       通化        5        905          1  * 6       辽源        6        906          1  * 7       白城        7        907          1  * 8       白山        8        908          1  * 9       延吉        9        909          1  * -------------------------风骚的分割线-------------------------------  * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)  * tb_user_profiles.dat文件内容,分隔符为"|":  * userID   network     flow    cityID  * 1           2G       123      1  * 2           3G       333      2  * 3           3G       555      1  * 4           2G       777      3  * 5           3G       666      4  * -------------------------风骚的分割线-------------------------------  * joinKey.dat内容:  * city_code  * 1  * 2  * 3  * 4  * -------------------------风骚的分割线-------------------------------  *  结果:  *  1   长春  1   901 1   1   2G  123  *  1   长春  1   901 1   3   3G  555  *  2   吉林  2   902 1   2   3G  333  *  3   四平  3   903 1   4   2G  777  *  4   松原  4   904 1   5   3G  666  */ public class SemiJoin extends Configured implements Tool{     private static final Logger logger = LoggerFactory.getLogger(SemiJoin.class);     public static class SemiJoinMapper extends Mapper<Object, Text, Text, CombineValues> {         private CombineValues combineValues = new CombineValues();         private HashSet<String> joinKeySet = new HashSet<String>();         private Text flag = new Text();         private Text joinKey = new Text();         private Text secondPart = new Text();         /**          * 将参加join的key从DistributedCache取出放到内存中,以便在map端将要参加join的key过滤出来。b          */         @Override         protected void setup(Context context)                 throws IOException, InterruptedException {             BufferedReader br = null;             //获得当前作业的DistributedCache相关文件             Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());             String joinKeyStr = null;             for(Path p : distributePaths){                 if(p.toString().endsWith("joinKey.dat")){                     //读缓存文件,并放到mem中                     br = new BufferedReader(new FileReader(p.toString()));                     while(null!=(joinKeyStr=br.readLine())){                         joinKeySet.add(joinKeyStr);                     }                 }             }         }         @Override         protected void map(Object key, Text value, Context context)                 throws IOException, InterruptedException {             //获得文件输入路径             String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();             //数据来自tb_dim_city.dat文件,标志即为"0"             if(pathName.endsWith("tb_dim_city.dat")){                 String[] valueItems = value.toString().split("\\|");                 //过滤格式错误的记录                 if(valueItems.length != 5){                     return;                 }                 //过滤掉不需要参加join的记录                 if(joinKeySet.contains(valueItems[0])){                     flag.set("0");                     joinKey.set(valueItems[0]);                     secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]);                     combineValues.setFlag(flag);                     combineValues.setJoinKey(joinKey);                     combineValues.setSecondPart(secondPart);                     context.write(combineValues.getJoinKey(), combineValues);                 }else{                     return ;                 }             }//数据来自于tb_user_profiles.dat,标志即为"1"             else if(pathName.endsWith("tb_user_profiles.dat")){                 String[] valueItems = value.toString().split("\\|");                 //过滤格式错误的记录                 if(valueItems.length != 4){                     return;                 }                 //过滤掉不需要参加join的记录                 if(joinKeySet.contains(valueItems[3])){                     flag.set("1");                     joinKey.set(valueItems[3]);                     secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]);                     combineValues.setFlag(flag);                     combineValues.setJoinKey(joinKey);                     combineValues.setSecondPart(secondPart);                     context.write(combineValues.getJoinKey(), combineValues);                 }else{                     return ;                 }             }         }     }     public static class SemiJoinReducer extends Reducer<Text, CombineValues, Text, Text> {         //存储一个分组中的左表信息         private ArrayList<Text> leftTable = new ArrayList<Text>();         //存储一个分组中的右表信息         private ArrayList<Text> rightTable = new ArrayList<Text>();         private Text secondPar = null;         private Text output = new Text();         /**          * 一个分组调用一次reduce函数          */         @Override         protected void reduce(Text key, Iterable<CombineValues> value, Context context)                 throws IOException, InterruptedException {             leftTable.clear();             rightTable.clear();             /**              * 将分组中的元素按照文件分别进行存放              * 这种方法要注意的问题:              * 如果一个分组内的元素太多的话,可能会导致在reduce阶段出现OOM,              * 在处理分布式问题之前最好先了解数据的分布情况,根据不同的分布采取最              * 适当的处理方法,这样可以有效的防止导致OOM和数据过度倾斜问题。              */             for(CombineValues cv : value){                 secondPar = new Text(cv.getSecondPart().toString());                 //左表tb_dim_city                 if("0".equals(cv.getFlag().toString().trim())){                     leftTable.add(secondPar);                 }                 //右表tb_user_profiles                 else if("1".equals(cv.getFlag().toString().trim())){                     rightTable.add(secondPar);                 }             }             logger.info("tb_dim_city:"+leftTable.toString());             logger.info("tb_user_profiles:"+rightTable.toString());             for(Text leftPart : leftTable){                 for(Text rightPart : rightTable){                     output.set(leftPart+ "\t" + rightPart);                     context.write(key, output);                 }             }         }     }     @Override     public int run(String[] args) throws Exception {             Configuration conf=getConf(); //获得配置文件对象             DistributedCache.addCacheFile(new Path(args[2]).toUri(), conf);                                                                                                                                                                                                                                                        Job job=new Job(conf,"LeftOutJoinMR");             job.setJarByClass(SemiJoin.class);                                                                                                                                                                                                                                                        FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径             FileOutputFormat.setOutputPath(job, new Path(args[1])); //设置reduce输出文件路径                                                                                                                                                                                                                                                                                                                                                                                             job.setMapperClass(SemiJoinMapper.class);             job.setReducerClass(SemiJoinReducer.class);                                                                                                                                                                                                                                                       job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式             job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式                                                                                                                                                                                                                                                        //设置map的输出key和value类型             job.setMapOutputKeyClass(Text.class);             job.setMapOutputValueClass(CombineValues.class);                                                                                                                                                                                                                                                        //设置reduce的输出key和value类型             job.setOutputKeyClass(Text.class);             job.setOutputValueClass(Text.class);             job.waitForCompletion(true);             return job.isSuccessful()?0:1;     }     public static void main(String[] args) throws IOException,             ClassNotFoundException, InterruptedException {         try {             int returnCode =  ToolRunner.run(new SemiJoin(),args);             System.exit(returnCode);         } catch (Exception e) {             logger.error(e.getMessage());         }     } }这里还说说SemiJoin也是有一定的适用范围的,其抽取出来进行join的key是要放到内存中的,所以不能够太大,容易在Map端造成OOM。


三、总结

blog介绍了三种join方式。这三种join方式适用于不同的场景,其处理效率上的相差还是蛮大的,其中主要导致因素是网络传输。Map join效率最高,其次是SemiJoin,最低的是reduce join。另外,写分布式大数据处理程序的时最好要对整体要处理的数据分布情况作一个了解,这可以提高我们代码的效率,使数据的倾斜度降到最低,使我们的代码倾向性更好。


参考文献:

http://wenku.baidu.com/view/ae7442db7f1922791688e877.html


有关hadoop中MapReduce多种join实现实例分析的更多相关文章

  1. ruby-on-rails - 如何使用 instance_variable_set 正确设置实例变量? - 2

    我正在查看instance_variable_set的文档并看到给出的示例代码是这样做的:obj.instance_variable_set(:@instnc_var,"valuefortheinstancevariable")然后允许您在类的任何实例方法中以@instnc_var的形式访问该变量。我想知道为什么在@instnc_var之前需要一个冒号:。冒号有什么作用? 最佳答案 我的第一直觉是告诉你不要使用instance_variable_set除非你真的知道你用它做什么。它本质上是一种元编程工具或绕过实例变量可见性的黑客攻击

  2. ruby 正则表达式 - 如何替换字符串中匹配项的第 n 个实例 - 2

    在我的应用程序中,我需要能够找到所有数字子字符串,然后扫描每个子字符串,找到第一个匹配范围(例如5到15之间)的子字符串,并将该实例替换为另一个字符串“X”。我的测试字符串s="1foo100bar10gee1"我的初始模式是1个或多个数字的任何字符串,例如,re=Regexp.new(/\d+/)matches=s.scan(re)给出["1","100","10","1"]如果我想用“X”替换第N个匹配项,并且只替换第N个匹配项,我该怎么做?例如,如果我想替换第三个匹配项“10”(匹配项[2]),我不能只说s[matches[2]]="X"因为它做了两次替换“1fooX0barXg

  3. ruby - 如何根据特征实现 FactoryGirl 的条件行为 - 2

    我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden

  4. ruby-on-rails - Rails - 从另一个模型中创建一个模型的实例 - 2

    我有一个正在构建的应用程序,我需要一个模型来创建另一个模型的实例。我希望每辆车都有4个轮胎。汽车模型classCar轮胎模型classTire但是,在make_tires内部有一个错误,如果我为Tire尝试它,则没有用于创建或新建的activerecord方法。当我检查轮胎时,它没有这些方法。我该如何补救?错误是这样的:未定义的方法'create'forActiveRecord::AttributeMethods::Serialization::Tire::Module我测试了两个环境:测试和开发,它们都因相同的错误而失败。 最佳答案

  5. ruby-on-rails - RSpec:避免使用允许接收的任何实例 - 2

    我正在处理旧代码的一部分。beforedoallow_any_instance_of(SportRateManager).toreceive(:create).and_return(true)endRubocop错误如下:Avoidstubbingusing'allow_any_instance_of'我读到了RuboCop::RSpec:AnyInstance我试着像下面那样改变它。由此beforedoallow_any_instance_of(SportRateManager).toreceive(:create).and_return(true)end对此:let(:sport_

  6. ruby - nanoc 和多种布局 - 2

    是否可以为特定(或所有)项目使用多个布局?例如,我有几个项目,我想对其应用两种不同的布局。一个是绿色的,一个是蓝色的(但是)。我想将它们编译到我的输出目录中的两个不同文件夹中(例如v1和v2)。我一直在玩弄规则和编译block,但我不知道这是怎么回事。因为,每个项目在编译过程中只编译一次,我不能告诉nanoc第一次用layout1编译,第二次用layout2编译。我试过这样的东西,但它导致输出文件损坏。compile'*'doifitem.binary?#don’tfilterbinaryitemselsefilter:erblayout'layout1'layout'layout2'

  7. ruby-on-rails - 使用 ruby​​ 将多个实例变量转换为散列的更好方法? - 2

    我收到格式为的回复#我需要将其转换为哈希值(针对活跃商家)。目前我正在遍历变量并执行此操作:response.instance_variables.eachdo|r|my_hash.merge!(r.to_s.delete("@").intern=>response.instance_eval(r.to_s.delete("@")))end这有效,它将生成{:first="charlie",:last=>"kelly"},但它似乎有点hacky和不稳定。有更好的方法吗?编辑:我刚刚意识到我可以使用instance_variable_get作为该等式的第二部分,但这仍然是主要问题。

  8. 华为OD机试用Python实现 -【明明的随机数】 2023Q1A - 2

    华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o

  9. 基于C#实现简易绘图工具【100010177】 - 2

    C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.

  10. MIMO-OFDM无线通信技术及MATLAB实现(1)无线信道:传播和衰落 - 2

     MIMO技术的优缺点优点通过下面三个增益来总体概括:阵列增益。阵列增益是指由于接收机通过对接收信号的相干合并而活得的平均SNR的提高。在发射机不知道信道信息的情况下,MIMO系统可以获得的阵列增益与接收天线数成正比复用增益。在采用空间复用方案的MIMO系统中,可以获得复用增益,即信道容量成倍增加。信道容量的增加与min(Nt,Nr)成正比分集增益。在采用空间分集方案的MIMO系统中,可以获得分集增益,即可靠性性能的改善。分集增益用独立衰落支路数来描述,即分集指数。在使用了空时编码的MIMO系统中,由于接收天线或发射天线之间的间距较远,可认为它们各自的大尺度衰落是相互独立的,因此分布式MIMO

随机推荐