`
hududumo
  • 浏览: 239222 次
文章分类
社区版块
存档分类
最新评论

实现mapreduce多文件自定义输出

 
阅读更多

普通maprduce中通常是有map和reduce两个阶段,在不做设置的情况下,计算结果会以part-000*输出成多个文件,并且输出的文件数量和reduce数量一样,文件内容格式也不能随心所欲。这样不利于后续结果处理。

在hadoop中,reduce支持多个输出,输出的文件名也是可控的,就是继承MultipleTextOutputFormat类,重写generateFileNameForKey方法。如果只是想做到输出结果的文件名可控,实现自己的LogNameMultipleTextOutputFormat类,设置jobconf.setOutputFormat(LogNameMultipleTextOutputFormat.class);就可以了,但是这种方式只限于使用旧版本的hadoop api.如果想采用新版本的api接口或者自定义输出内容的格式等等更多的需求,那么就要自己动手重写一些hadoop api了。

首先需要构造一个自己的MultipleOutputFormat类实现FileOutputFormat类(注意是org.apache.hadoop.mapreduce.lib.output包的FileOutputFormat

  

import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;


/**
 * This abstract class extends the FileOutputFormat, allowing to write the
 * output data to different output files. There are three basic use cases for
 * this class. 
 * Created on 2012-07-08
 * @author zhoulongliu
 * @param <K>
 * @param <V>
 */
public abstract class MultipleOutputFormat<K extends WritableComparable<?>, V extends Writable> extends
        FileOutputFormat<K, V> {


   //接口类,需要在调用程序中实现generateFileNameForKeyValue来获取文件名
    private MultiRecordWriter writer = null;


    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        if (writer == null) {
            writer = new MultiRecordWriter(job, getTaskOutputPath(job));
        }
        return writer;
    }


    /**
     * get task output path
     * @param conf
     * @return
     * @throws IOException
     */
    private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {
        Path workPath = null;
        OutputCommitter committer = super.getOutputCommitter(conf);
        if (committer instanceof FileOutputCommitter) {
            workPath = ((FileOutputCommitter) committer).getWorkPath();
        } else {
            Path outputPath = super.getOutputPath(conf);
            if (outputPath == null) {
                throw new IOException("Undefined job output-path");
            }
            workPath = outputPath;
        }
        return workPath;
    }


    /**
     * 通过key, value, conf来确定输出文件名(含扩展名) Generate the file output file name based
     * on the given key and the leaf file name. The default behavior is that the
     * file name does not depend on the key.
     * 
     * @param key the key of the output data
     * @param name the leaf file name
     * @param conf the configure object
     * @return generated file name
     */
    protected abstract String generateFileNameForKeyValue(K key, V value, Configuration conf);


   /**
    * 实现记录写入器RecordWriter类
    * (内部类)
    * @author zhoulongliu
    *
    */
    public class MultiRecordWriter extends RecordWriter<K, V> {
        /** RecordWriter的缓存 */
        private HashMap<String, RecordWriter<K, V>> recordWriters = null;
        private TaskAttemptContext job = null;
        /** 输出目录 */
        private Path workPath = null;


        public MultiRecordWriter(TaskAttemptContext job, Path workPath) {
            super();
            this.job = job;
            this.workPath = workPath;
            recordWriters = new HashMap<String, RecordWriter<K, V>>();
        }


        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator();
            while (values.hasNext()) {
                values.next().close(context);
            }
            this.recordWriters.clear();
        }


        @Override
        public void write(K key, V value) throws IOException, InterruptedException {
            // 得到输出文件名
            String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration());
           //如果recordWriters里没有文件名,那么就建立。否则就直接写值。
            RecordWriter<K, V> rw = this.recordWriters.get(baseName);
            if (rw == null) {
                rw = getBaseRecordWriter(job, baseName);
                this.recordWriters.put(baseName, rw);
            }
            rw.write(key, value);
        }


        // ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension}
        private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName) throws IOException,
                InterruptedException {
            Configuration conf = job.getConfiguration();
           //查看是否使用解码器  
            boolean isCompressed = getCompressOutput(job);
            String keyValueSeparator = ",";
            RecordWriter<K, V> recordWriter = null;
            if (isCompressed) {
                Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);
                CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
                Path file = new Path(workPath, baseName + codec.getDefaultExtension());
                FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
                //这里我使用的自定义的OutputFormat 
                recordWriter = new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),
                        keyValueSeparator);
            } else {
                Path file = new Path(workPath, baseName);
                FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
                //这里我使用的自定义的OutputFormat 
                recordWriter = new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
            }
            return recordWriter;
        }
    }


}
接着你还需要自定义一个LineRecordWriter实现记录写入器RecordWriter类,自定义输出格式。

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

/**
 * 
 * 重新构造实现记录写入器RecordWriter类
 * Created on 2012-07-08
 * @author zhoulongliu
 * @param <K>
 * @param <V>
 */
public class LineRecordWriter<K, V> extends RecordWriter<K, V> {

    private static final String utf8 = "UTF-8";//定义字符编码格式
    private static final byte[] newline;
    static {
        try {
            newline = "\n".getBytes(utf8);//定义换行符
        } catch (UnsupportedEncodingException uee) {
            throw new IllegalArgumentException("can't find " + utf8 + " encoding");
        }
    }
    protected DataOutputStream out;
    private final byte[] keyValueSeparator;

     //实现构造方法,出入输出流对象和分隔符
    public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
        this.out = out;
        try {
            this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
        } catch (UnsupportedEncodingException uee) {
            throw new IllegalArgumentException("can't find " + utf8 + " encoding");
        }
    }

    public LineRecordWriter(DataOutputStream out) {
        this(out, "\t");
    }

    private void writeObject(Object o) throws IOException {
        if (o instanceof Text) {
            Text to = (Text) o;
            out.write(to.getBytes(), 0, to.getLength());
        } else {
            out.write(o.toString().getBytes(utf8));
        }
    }
   
    /**
     * 将mapreduce的key,value以自定义格式写入到输出流中
     */
    public synchronized void write(K key, V value) throws IOException {
        boolean nullKey = key == null || key instanceof NullWritable;
        boolean nullValue = value == null || value instanceof NullWritable;
        if (nullKey && nullValue) {
            return;
        }
        if (!nullKey) {
            writeObject(key);
        }
        if (!(nullKey || nullValue)) {
            out.write(keyValueSeparator);
        }
        if (!nullValue) {
            writeObject(value);
        }
        out.write(newline);
    }

    public synchronized void close(TaskAttemptContext context) throws IOException {
        out.close();
    }

}

接着,你实现刚刚重写MultipleOutputFormat类中的generateFileNameForKeyValue方法自定义返回需要输出文件的名称,我这里是以key值中以逗号分割取第一个字段的值作为输出文件名,这样第一个字段值相同的会输出到一个文件中并以其值作为文件名。

 public static class VVLogNameMultipleTextOutputFormat extends MultipleOutputFormat<Text, NullWritable> {
        
        @Override
        protected String generateFileNameForKeyValue(Text key, NullWritable value, Configuration conf) { 
            String sp[] = key.toString().split(",");
            String filename = sp[1];
            try {
                Long.parseLong(sp[1]);
            } catch (NumberFormatException e) {
                filename = "000000000000";
            }
            return filename;
        }


    }



最后就是在job调用时设置了

Configuration conf = getConf();
Job job = new Job(conf);
job.setNumReduceTasks(12);
......
job.setMapperClass(VVEtlMapper.class);
job.setReducerClass(EtlReducer.class);
job.setOutputFormatClass(VVLogNameMultipleTextOutputFormat.class);//设置自定义的多文件输出类
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
FileOutputFormat.setCompressOutput(job, true);//设置输出结果采用压缩
FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class); //设置输出结果采用lzo压缩

ok,这样你就完成了支持新的hadoop api自定义的多文件输出mapreduce编写。

分享到:
评论

相关推荐

    KNN mapreduce实现

    包含knn mapreduce环境下的实现全部代码和自定义输入输出文件格式。以及实验数据集。

    Hadoop中MapReduce基本案例及代码(五)

    下面详细介绍MapReduce中Map任务Reduce任务以及MapReduce的执行流程。 Map任务: 读取输入文件内容,解析成key,value对。对输入文件的每一行,解析成key,value对。每一个键值对调用一次map函数。 写自己的逻辑,对...

    MapReduce:超大机群上的简单数据处理

    计算利用一个输入key/value对集,来产生一个输出key/value对集.MapReduce库的用户用两个函数表达这个计算:map和reduce. 用户自定义的map函数,接受一个输入对,然后产生一个中间key/value对集.MapReduce库把所有具有...

    MapReduce-Collections:自定义IO格式、文件格式、日志处理、ipLookup、二级排序、自定义patitioner的集合

    平均字数程序使用map输出记录计数器来实现平均字数。MAP_OUTPUT_RECORDS给出从mapper发出的记录总数,reducer使用它来求平均值。 hadoop jar target/collections-1.0-SNAPSHOT-jar-with-dependencies.jar ...

    mapreduce高级特性及shuffle

    1.shuffle机制详细讲解 2.MR案例多文件输出 3.MR案例partition使用 4.MR案例内容去重 5.MR案例敏感词汇过滤 6.MR案例自定义combiner的使用 7.MR案例倒排序索引 8.MR案例简单排序

    分布式计算(MapReduce).docx

    从计算的角度上看,Map/Reduce框架接受各种格式的键值对文件作为输入,读取计算后,最终生成自定义格式的输出文件。而从分布式的角度上看,分布式计算的输入文件往往规模巨大,且分布在多个机器上,单机计算完全不可...

    Hadoop大作业排序.zip

    在 Mapreduce 中,如果需要自定义类的排序规则,需要让类实现 Writable 的子接口 WritableComparable,重写里面的 write, readFields 6 和 compareTo 方法,所以可以自定义一个类作为 key,类中包含 2 个 需要进行...

    Hadoop权威指南 第二版(中文版)

     多个输出  延迟输出  数据库输出 第8章 MapReduce的特性  计数器  内置计数器  用户定义的Java计数器  用户定义的Streaming计数器  排序  准备  部分排序  总排序  二次排序  联接  map端联接  ...

    Hadoop权威指南(中文版)2015上传.rar

    多个输出 延迟输出 数据库输出 第8章 MapReduce的特性 计数器 内置计数器 用户定义的Java计数器 用户定义的Streaming计数器 排序 准备 部分排序 总排序 二次排序 联接 map端联接 reduce端联接 边数据分布 利用...

    Hadoop硬实战 [(美)霍姆斯著][电子工业出版社][2015.01]_PDF电子书下载 带书签目录 高清完整版.rar )

    3.4 自定义文件格式 3.4.1 输入输出格式 技术点18 输入和输出格式为CSV 的文件 3.4.2 output committing 的重要性 3.5 本章小结 第3 部分 大数据模式 4 处理大数据的MapReduce 模式 4.1 Join ...

    Hadoop实战(第2版)

    技术点16 使用Thrift3.3.5 Avro技术点17 MapReduce 的下一代数据序列化技术3.4 自定义文件格式3.4.1 输入输出格式技术点18 输入和输出格式为CSV 的文件3.4.2 output committing 的重要性 3.5 本章小...

    kafka-hadoop-loader-my:kafka0.8.2使用简单的消费者负载消息使用自定义mapreduce进入hdfs

    此外,它不使用高级使用者,而是直接与zookeeper通信以管理消耗的偏移量,消耗的偏移量在每个地图任务结束时提交,也就是说,当输出文件已从hdfs_temp移至其最终目的地时。 实际使用者及其内部提取程序线程都包装...

    2017最新大数据架构师精英课程

    本资源为大数据基础到中高级教学资源,适合稍微有点大数据或者java基础的人群学习,资源过大,上传乃是下载链接,不多说,上目录: 1_java基础2 l3 a2 a$ t7 J2 b+ `- p 2_java引入ide-eclipse 3_java基础知识-循环...

Global site tag (gtag.js) - Google Analytics