草庐IT

Hadoop MapReduce--实现获取最大值和最小值

洛理小菜鸡 2024-01-18 原文

根据txt文档,获取age的最值

前言

一、txt数据准备

1.代码设计

2.代码实现

总结



前言

例如:随着大数据的不断发展,hadoop这门技术也越来越重要,很多人都开启了学习大数据,本文就如何在海量数据中获取最值提供了思路。


提示:以下是本篇文章正文内容,下面案例可供参考

一、txt数据准备

python中有random和faker包(外部)给我们提供假的数据。我们使用python创建一个小型的txt文档,其中包括姓名,年龄,score(1分制)

以下是创建的txt文档(按照\t分行):
 

rose	27	0.6270426084076096
lisa	27	0.7321873119280536
black	22	0.8129257516728405
jack	27	0.8328363834853498
rose	22	0.9685109182738815
black	30	0.7727688951446979
lisa	21	0.24477509026919908
black	25	0.8785838355517068
rose	30	0.8901461639299959
black	26	0.5460709396256507

 

1.代码设计

1.创建对象并对其序列化。对象属性有min,max

2.mapper阶段对数据进行分割处理,context写入(StuBean,Text)的数据.

3.reduce阶段对mapper阶段的数据进行处理:获取最值。context写入(StuBean,Text)的数据

4.Driver实现整个流程

2.代码实现

StuBean代码如下(示例):

package com.atguigu.mapreduce.MaxMinscore;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

//input--排序
public class StuBean implements Writable {

    public int max;
    public int min;

    public int getMax() {
        return max;
    }

    public void setMax(int max) {
        this.max = max;
    }

    public int getMin() {
        return min;
    }

    public void setMin(int min) {
        this.min = min;
    }

    @Override
    public String toString() {
        return  max+"\t"+min;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(max);
        out.writeInt(min);
    }
    //     序列化读取
    @Override
    public void readFields(DataInput in) throws IOException {
        this.max =in.readInt();
        this.min =in.readInt();
    }

}

2.mapper阶段代码

import java.io.IOException;

//根据input类型,输出Text的主键,stuBean的数据
public class MinMaxMapper extends Mapper<Object,Text, Text,StuBean> {

    private StuBean sb=new StuBean();

    @Override
    protected void map(Object key, Text value, Mapper<Object, Text, Text, StuBean>.Context context) throws IOException, InterruptedException {
        //        获取一行
        String strs=value.toString();
        String[] split = strs.split("\t");
        String data=split[0];
        if (data==null){
            return ;
        }
        sb.setMin(Integer.parseInt(split[1]));
        sb.setMax(Integer.parseInt(split[1]));

        context.write(new Text(data),sb);
    }
}

3.reduce实现

package com.atguigu.mapreduce.MaxMinscore;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
//将mapper阶段的数据转化成视图bean和int
public class StuReducer extends Reducer<Text,StuBean,Text,StuBean> {

    private StuBean sb=new StuBean();
    @Override
    protected void reduce(Text key, Iterable<StuBean> values, Reducer<Text, StuBean, Text, StuBean>.Context context) throws IOException, InterruptedException {

        sb.setMax(0);
        sb.setMin(0);
        for (StuBean value:values){
            if (sb.getMin() == 0 ||value.getMin() <sb.getMin() ){
                    sb.setMin(value.getMin());
            }
            if (sb.getMax() == 0 ||value.getMax() >sb.getMax() ){
                 sb.setMax(value.getMax());
            }
        }
        context.write(key,sb);
    }
}

 4.driver实现

package com.atguigu.mapreduce.MaxMinscore;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
/*
只排序
 */
public class StuDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // 1 获取配置信息以及获取job对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2 关联本Driver程序的jar
        job.setJarByClass(StuDriver.class);

        // 3 关联Mapper和Reducer的jar
        job.setMapperClass(MinMaxMapper.class);
        job.setReducerClass(StuReducer.class);

        // 4 设置Mapper输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(StuBean.class);

        // 5 设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(StuBean.class);

        // 6 设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path("D:\\desk\\ac.txt"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\desk\\ac2"));

        // 7 提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);

    }
    }


总结

提示:这里对文章进行总结:
例如:以上就是今天要讲的内容,本文仅仅简单介绍了hadoop mapreduce操作最值的基本使用,而mapre提供了大量能使我们快速便捷地处理数据的函数和方法。

有关Hadoop MapReduce--实现获取最大值和最小值的更多相关文章

  1. ruby - 如何根据特征实现 FactoryGirl 的条件行为 - 2

    我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden

  2. ruby - 简单获取法拉第超时 - 2

    有没有办法在这个简单的get方法中添加超时选项?我正在使用法拉第3.3。Faraday.get(url)四处寻找,我只能先发起连接后应用超时选项,然后应用超时选项。或者有什么简单的方法?这就是我现在正在做的:conn=Faraday.newresponse=conn.getdo|req|req.urlurlreq.options.timeout=2#2secondsend 最佳答案 试试这个:conn=Faraday.newdo|conn|conn.options.timeout=20endresponse=conn.get(url

  3. ruby - 从 Ruby 中的主机名获取 IP 地址 - 2

    我有一个存储主机名的Ruby数组server_names。如果我打印出来,它看起来像这样:["hostname.abc.com","hostname2.abc.com","hostname3.abc.com"]相当标准。我想要做的是获取这些服务器的IP(可能将它们存储在另一个变量中)。看起来IPSocket类可以做到这一点,但我不确定如何使用IPSocket类遍历它。如果它只是尝试像这样打印出IP:server_names.eachdo|name|IPSocket::getaddress(name)pnameend它提示我没有提供服务器名称。这是语法问题还是我没有正确使用类?输出:ge

  4. ruby - 获取模块中定义的所有常量的值 - 2

    我想获取模块中定义的所有常量的值:moduleLettersA='apple'.freezeB='boy'.freezeendconstants给了我常量的名字:Letters.constants(false)#=>[:A,:B]如何获取它们的值的数组,即["apple","boy"]? 最佳答案 为了做到这一点,请使用mapLetters.constants(false).map&Letters.method(:const_get)这将返回["a","b"]第二种方式:Letters.constants(false).map{|c

  5. ruby-on-rails - 获取 inf-ruby 以使用 ruby​​ 版本管理器 (rvm) - 2

    我安装了ruby​​版本管理器,并将RVM安装的ruby​​实现设置为默认值,这样'哪个ruby'显示'~/.rvm/ruby-1.8.6-p383/bin/ruby'但是当我在emacs中打开inf-ruby缓冲区时,它使用安装在/usr/bin中的ruby​​。有没有办法让emacs像shell一样尊重ruby​​的路径?谢谢! 最佳答案 我创建了一个emacs扩展来将rvm集成到emacs中。如果您有兴趣,可以在这里获取:http://github.com/senny/rvm.el

  6. Ruby 从大范围中获取第 n 个项目 - 2

    假设我有这个范围:("aaaaa".."zzzzz")如何在不事先/每次生成整个项目的情况下从范围中获取第N个项目? 最佳答案 一种快速简便的方法:("aaaaa".."zzzzz").first(42).last#==>"aaabp"如果出于某种原因你不得不一遍又一遍地这样做,或者如果你需要避免为前N个元素构建中间数组,你可以这样写:moduleEnumerabledefskip(n)returnto_enum:skip,nunlessblock_given?each_with_indexdo|item,index|yieldit

  7. ruby - Net::HTTP 获取源代码和状态 - 2

    我目前正在使用以下方法获取页面的源代码:Net::HTTP.get(URI.parse(page.url))我还想获取HTTP状态,而无需发出第二个请求。有没有办法用另一种方法做到这一点?我一直在查看文档,但似乎找不到我要找的东西。 最佳答案 在我看来,除非您需要一些真正的低级访问或控制,否则最好使用Ruby的内置Open::URI模块:require'open-uri'io=open('http://www.example.org/')#=>#body=io.read[0,50]#=>"["200","OK"]io.base_ur

  8. ruby - 没有类方法获取 Ruby 类名 - 2

    如何在Ruby中获取BasicObject实例的类名?例如,假设我有这个:classMyObjectSystem我怎样才能使这段代码成功?编辑:我发现Object的实例方法class被定义为returnrb_class_real(CLASS_OF(obj));。有什么方法可以从Ruby中使用它? 最佳答案 我花了一些时间研究irb并想出了这个:classBasicObjectdefclassklass=class这将为任何从BasicObject继承的对象提供一个#class您可以调用的方法。编辑评论中要求的进一步解释:假设你有对象

  9. ruby-on-rails - 如何在 Gem 中获取 Rails 应用程序的根目录 - 2

    是否可以在应用程序中包含的gem代码中知道应用程序的Rails文件系统根目录?这是gem来源的示例:moduleMyGemdefself.included(base)putsRails.root#returnnilendendActionController::Base.send:include,MyGem谢谢,抱歉我的英语不好 最佳答案 我发现解决类似问题的解决方案是使用railtie初始化程序包含我的模块。所以,在你的/lib/mygem/railtie.rbmoduleMyGemclassRailtie使用此代码,您的模块将在

  10. ruby - 如何使用 CarrierWave 从 S3 获取真实文件 - 2

    我有一个应用程序可以读取文件的内容并为其编制索引。我将它们存储在磁盘本身中,但现在我使用的是AmazonS3,因此以下方法不再适用。事情是这样的:defperform(docId)@document=Document.find(docId)if@document.file?#Youshould'tcreateanewversion@document.versionlessdo|doc|@document.file_content=Cloudoc::Extractor.new.extract(@document.file.file)@document.saveendendend@docu

随机推荐