草庐IT

去除Hadoop-Streaming行末多余的TAB

superpopb2b 2023-03-28 原文
    单位有一组业务一直都是使用Streaming压缩文本日志,大体上就是设置作业输出为BZ2格式,怎么输入就怎么输出,没有任何处理功能在里面。但是每行结尾都多出来一个TAB。终于,有一个业务需要使用TAB前的最后一个字段,不去掉不行了。

    虽然是个小问题,但是网上搜了一圈,也没有很好的解决。很多人都遇到了,但是单位的业务比较特殊,只有map没有reduce。http://stackoverflow.com/questions/20137618/hadoop-streaming-api-how-to-remove-unwanted-delimiters这个上面直接说“As I discussed with friends, there's no easy way to achieve the goal,...”。

    Streaming有个特点,默认是按照TAB去区分Key和Value。如果没有设置Key字段的数目,默认一行里面第一个TAB之前的做Key,后面的是Value。如果没有找到Tab,就全都是Key字段,Value是空。之所以后面会多出个Tab,正是Key和Value之间的那个Tab。

    首先是考察Streaming的Map,在PipeMapper.java。InputWriter处理输出,所以尝试实现自定义输出。在MapReduce作业配置里面,stream.map.input.writer.class负责指定InputWriter是哪一个,默认是TextInputWriter。Streaming在这里比较坑,增加-Dstream.map.input.writer.class=XXX的选项并不能令Streaming使用自定义的实现类,必须实现自己的IdentifierResolver,然后在其中对不同类型的输入设定不同类型的InputWriter,而其中的输入类型,必须由stream.map.input选项传入。是否设置成功以作业运行时候JobTracker的配置参数表为准。

    不巧的是,使用自定义的InputWriter代替TextInputWriter,行尾的Tab是没了,行首又多了个数字。估计是Hadoop给Mapper传入的Key被打印出来了。oooorz....不能瞎猜了,还是看看代码吧。

    好在代码蛮短的还是。

    Streaming会把本身、以及用户-file -cacheFile -cacheArchive 等选项指定的文件,打成一个Jar包提交到集群进行MR作业。把集群的输出,作为用户实现Mapper的输入;读取用户实现Mapper的输出,作为整个Map作业的输出。Input/Output相对于用户自定义作业,Writer/Reader体现为Streaming的行为,因此是InputWriter和OutputReader。简单来讲,

Hadoop给出的(K,V)---streaming---> 用户自定义Mapper ---streaming--->Hadoop的Mapper输出   Streaming由PipeMapRunner启动作业,异步收集用户作业输出,进而向Hadoop汇报作业进度。整个作业的基础设置、作业提交都是由StreamJob类完成。

    作业的执行是PipeMapRed/PipeMapper/PipReducer/PipCombiner这几个类。解决方案也就在这里。在MROutputThread的run方法里面,outCollector.collect(key, value);这句之前,加上下面的代码片段即可。

          if (value instanceof Text) {             if (value.toString().isEmpty())               value = NullWritable.get();           }    是不是很简单。


    为什么这样做是可行的?还是源于org.apache.hadoop.mapred.TextOutputFormat。直接上代码。

package org.apache.hadoop.mapred; import java.io.DataOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.util.*; /** An {@link OutputFormat} that writes plain text files.   */ @InterfaceAudience.Public @InterfaceStability.Stable public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {   protected static class LineRecordWriter<K, V>     implements RecordWriter<K, V> {     private static final String utf8 = "UTF-8";     private static final byte[] newline;     static {       try {         newline = "\n".getBytes(utf8);       } catch (UnsupportedEncodingException uee) {         throw new IllegalArgumentException("can't find " + utf8 + " encoding");       }     }     protected DataOutputStream out;     private final byte[] keyValueSeparator;     public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {       this.out = out;       try {         this.keyValueSeparator = keyValueSeparator.getBytes(utf8);       } catch (UnsupportedEncodingException uee) {         throw new IllegalArgumentException("can't find " + utf8 + " encoding");       }     }     public LineRecordWriter(DataOutputStream out) {       this(out, "\t");     }     /**      * Write the object to the byte stream, handling Text as a special      * case.      * @param o the object to print      * @throws IOException if the write throws, we pass it on      */     private void writeObject(Object o) throws IOException {       if (o instanceof Text) {         Text to = (Text) o;         out.write(to.getBytes(), 0, to.getLength());       } else {         out.write(o.toString().getBytes(utf8));       }     }     public synchronized void write(K key, V value)       throws IOException {             boolean nullKey = key == null || key instanceof NullWritable;       boolean nullValue = value == null || value instanceof NullWritable;       if (nullKey && nullValue) {         return;        }        if (!nullKey) {         writeObject(key);       }       if (!(nullKey || nullValue)) {         out.write(keyValueSeparator);       }       if (!nullValue) {         writeObject(value);       }       out.write(newline);     }               public synchronized void close(Reporter reporter) throws IOException {       out.close();     }   }        public RecordWriter<K, V> getRecordWriter(FileSystem ignored,                                                   JobConf job,                                                   String name,                                                   Progressable progress)     throws IOException {     boolean isCompressed = getCompressOutput(job);     String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator",                                        "\t");     if (!isCompressed) {       Path file = FileOutputFormat.getTaskOutputPath(job, name);       FileSystem fs = file.getFileSystem(job);       FSDataOutputStream fileOut = fs.create(file, progress);       return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);     } else {        Class<? extends CompressionCodec> codecClass =         getOutputCompressorClass(job, GzipCodec.class);       // create the named codec       CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);       // build the filename including the extension       Path file =          FileOutputFormat.getTaskOutputPath(job,         Path file =         FileOutputFormat.getTaskOutputPath(job,                                            name + codec.getDefaultExtension());       FileSystem fs = file.getFileSystem(job);       FSDataOutputStream fileOut = fs.create(file, progress);       return new LineRecordWriter<K, V>(new DataOutputStream                                         (codec.createOutputStream(fileOut)),                                         keyValueSeparator);     }   } }    注意到LineRecordWriter.write了么?


后记:

    A. 网上很多是想办法修改分隔符,把TAB换成空字符。这是一个非常粗暴的做法,基本上就是埋坑!为什么呢?

    日志文本内容可以是很丰富的,这次出问题是因为每行没有TAB。如果换做含有TAB的文本,把分隔符变为空串,就把日志中原有的TAB去掉了。

    B. 之所以这么搞,也是受到了stackoverflow的这个Q&A的启发。http://stackoverflow.com/questions/18133290/hadoop-streaming-remove-trailing-tab-from-reducer-output。类似的,Q&A也是采用修改分隔符的办法,是不可取的。但是仔细发现,是可以在自己重写的TextOutputFormat<K,V>里,修改LineRecordWriter.write方法的。

    重写TextOutputFormat是十分优雅的解决,看似修改了Hadoop本身的东西,但是在Streaming最新版没有加入这个fix之前,防止对每个版本的Streaming都要变更、重新编译打包。另外,Streaming不是独立的项目,编译它需要同时编译Hadoop!

    加上下面这段

    if (!nullValue) {       if (value instanceof Text) {         if (value.toString().isEmpty())           nullValue = true;       }        }    C. 虽然是修改了Streaming代码,但是不需要考虑会影响同一机器所有用户的问题,也不用修改$HADOOP_HOME下的Streaming包。streaming提供了这个参数stream.shipped.hadoopstreaming。

    D. 有些设置似乎是指对Reducer生效,对于这种只有Mapper的作业不起作用。比如

mapred.textoutputformat.ignoreseparator mapred.textoutputformat.separator    设置了,没看到什么效果。

    再有就是,命令行选项里面如果写-DXXX= \ 这样的语句,似乎也没有把这个参数设置为空串的效果,写-DXXX= ""也是一样。

有关去除Hadoop-Streaming行末多余的TAB的更多相关文章

  1. ruby CSV : How can I read a tab-delimited file? - 2

    CSV.open(name,"r").eachdo|row|putsrowend我得到以下错误:CSV::MalformedCSVErrorUnquotedfieldsdonotallow\ror\n文件名是一个.txt制表符分隔文件。我是专门做的。我有一个.csv文件,我转到excel,并将文件保存为.txt制表符分隔的文件。所以它是制表符分隔的。CSV.open不应该能够读取制表符分隔的文件吗? 最佳答案 尝试像这样指定字段分隔符:CSV.open("name","r",{:col_sep=>"\t"}).eachdo|row|

  2. hadoop安装之保姆级教程(二)之YARN的配置 - 2

    1.1.1 YARN的介绍 为克服Hadoop1.0中HDFS和MapReduce存在的各种问题⽽提出的,针对Hadoop1.0中的MapReduce在扩展性和多框架⽀持⽅⾯的不⾜,提出了全新的资源管理框架YARN. ApacheYARN(YetanotherResourceNegotiator的缩写)是Hadoop集群的资源管理系统,负责为计算程序提供服务器计算资源,相当于⼀个分布式的操作系统平台,⽽MapReduce等计算程序则相当于运⾏于操作系统之上的应⽤程序。 YARN被引⼊Hadoop2,最初是为了改善MapReduce的实现,但是因为具有⾜够的通⽤性,同样可以⽀持其他的分布式计算模

  3. ruby-on-rails - 如何让 ActionController::Live streaming 与 Thin 一起工作? - 2

    问题你能用thin吗?与ActionController::Live实现服务器端事件(SSE)和长轮询?如果是,怎么办?上下文虽然标题是HowtogetRails4ActionController::LivestreamingworkingwithThinandRuby2?AndhowdoThinandPumascalewithlivestreaming?的重复,OP通过问两个问题混淆了水域,这个问题从未得到回答。许多其他帖子建议您可以使用thin对于服务器端事件(sse),如果您通过execthinstart--threaded启动它:DoesHerokusupportActionC

  4. 大数据之Hadoop数据仓库Hive - 2

    目录:一、简介二、HQL的执行流程三、索引四、索引案例五、Hive常用DDL操作六、Hive常用DML操作七、查询结果插入到表八、更新和删除操作九、查询结果写出到文件系统十、HiveCLI和Beeline命令行的基本使用十一、Hive配置一、简介Hive是一个构建在Hadoop之上的数据仓库,它可以将结构化的数据文件映射成表,并提供类SQL查询功能,用于查询的SQL语句会被转化为MapReduce作业,然后提交到Hadoop上运行。特点:简单、容易上手(提供了类似sql的查询语言hql),使得精通sql但是不了解Java编程的人也能很好地进行大数据分析;灵活性高,可以自定义用户函数(UDF)和

  5. ruby-on-rails - 在 Rails 中验证之前如何从值中去除美元符号? - 2

    这是我在我的模型中使用的:before_validation:strip_dollar_signvalidates:amount_due,:format=>{:with=>/^\d+??(?:\.\d{0,2})?$/},:numericality=>{:greater_than=>0}privatedefstrip_dollar_signself.amount_due=self.amount_due.to_s.tr!('$,','').to_fend如果我在Rails控制台中手动运行来自strip_dollar_sign函数的行,我得到的正是我想要的(即400美元最终为400.0),

  6. ruby - 要去除\r and\n 或\r\n 的正则表达式 - 2

    我使用以下正则表达式制作换行符标签:str.gsub("\r\n",'')这在桌面上运行良好。在iphone上文本只有\n,没有\r。我怎样才能使正则表达式支持?\r\n还是只是\n?谢谢 最佳答案 我觉得str.gsub(/\r?\n/,'')应该做的工作 关于ruby-要去除\rand\n或\r\n的正则表达式,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/9107658/

  7. ruby - Jekyll YAML 嵌套列表抛出错误 : found a tab character that violate intendation - 2

    这是我正在使用的列表。-name:Game1platforms:{win32,win64,linux64}distribution:-name:hereurl:null-name:desuraurl:http://www.desura.com/games/Game1source:https://github.com/name/Game1description:cg/games/Game1/description.htmlrelease:2013-06-23这是它抛出的错误:jekyll2.2.0|Error:(C:/Users/User/jekyll-site/_data/games.

  8. ruby-on-rails - 从 rails 中的整数或小数中去除逗号 - 2

    整数或小数是否有等效的gsub?gsub应该使用整数吗?基本上,我只是想将小数输入到ruby​​表单中,以及用户能够使用逗号的内容。例如,我希望用户能够输入1,000.99。我试过用before_save:strip_commasdefstrip_commasself.number=self.number.gsub(",","")end但出现以下错误“undefinedmethod`gsub'for8:Fixnum”,其中“8”被替换为用户输入的任何数字。 最佳答案 如果您的字段是Fixnum,它永远不会有逗号,因为Rails必须将

  9. Ubuntu下Hadoop的单机安装 - 2

            云计算实验中要求我们在Linux系统安装Hadoop,故来做一个简单的记录。· 注:我的操作系统环境是Ubuntu-20.04.3,安装的JDK版本为jdk1.8.0_301,安装的Hadoop版本为hadoop2.7.1。(不确定其他版本是否会出现版本兼容问题)Hadoop安装步骤如下:        一、更新apt和安装vim编辑器        二、配置本机无密码登录SSH        三、安装JAVA环境        四、下载安装Hadoop        五、伪分布式搭建一、更新apt和安装vim编辑器1、更新aptsudoapt-getupdate2、安装vim

  10. ruby-on-rails - 服务器发送的事件和 Rails Streaming - 2

    我正在试验Rails4ActionController::Live和ServerSentEvents。我正在使用MRI2.0.0和Puma。就我所见,每个连接的客户端都与服务器保持事件连接。我想知道是否可以在不保持所有响应流运行的情况下利用SSE。Puma使用线程管理多个连接,我想当前连接数是有限制的。如果我想支持成千上万的客户注册到我的Rails应用程序以参加SSE事件的真实场景怎么办?有没有例子?此外,我通常在nginx反向代理后面运行Rails应用程序服务器。它需要任何特定的设置吗? 最佳答案 SSE的构建方式是客户端打开到服

随机推荐