久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

Mapreduce RCFile如何寫入和讀取API

151次閱讀
沒有評論

共計 7916 個字符,預計需要花費 20 分鐘才能閱讀完成。

這篇文章主要介紹 Mapreduce RCFile 如何寫入和讀取 API,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!

RCFile 是 FaceBook 開發的高壓縮比、高效讀的行列存儲結構。通常在 Hive 中可以直接對一張 Text 表使用 insert-select 轉換,但有時希望使用 Mapreduce 進行 RCFile 的讀寫。

        dependency
            groupId org.apache.hadoop /groupId
            artifactId hadoop-client /artifactId
            version 2.5.0-cdh6.2.1 /version
        /dependency

        dependency
            groupId org.apache.hive /groupId
            artifactId hive-serde /artifactId
            version 0.13.1-cdh6.2.1 /version
        /dependency

        dependency
            groupId org.apache.hive.hcatalog /groupId
            artifactId hive-hcatalog-core /artifactId
            version 0.13.1-cdh6.2.1 /version
        /dependency

讀取文本文件,使用 mapreduce 生成 RCFile 格式文件

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hive.hcatalog.rcfile.RCFileMapReduceInputFormat;

import java.io.IOException;

public class RcFileReaderJob {
    static class RcFileMapper extends Mapper Object, BytesRefArrayWritable, Text, NullWritable {
        @Override
        protected void map(Object key, BytesRefArrayWritable value,
                           Context context)
                throws IOException, InterruptedException {
            Text txt = new Text();
            StringBuffer sb = new StringBuffer();
            for (int i = 0; i value.size(); i++) {
                BytesRefWritable v = value.get(i);
                txt.set(v.getData(), v.getStart(), v.getLength());
                if (i == value.size() – 1) {
                    sb.append(txt.toString());
                } else {
                    sb.append(txt.toString() + \t
                }
            }
            context.write(new Text(sb.toString()), NullWritable.get());
        }

        @Override
        protected void cleanup(Context context) throws IOException,
                InterruptedException {
            super.cleanup(context);
        }

        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {
            super.setup(context);

        }
    }

    static class RcFileReduce extends Reducer Text, NullWritable, Text, NullWritable {
        @Override
        protected void reduce(Text key, Iterable NullWritable values,
                              Context context) throws IOException, InterruptedException {
            context.write(key, NullWritable.get());
        }
    }

    public static boolean runLoadMapReducue(Configuration conf, Path input, Path output) throws IOException,
            ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(conf);
        job.setJarByClass(RcFileReaderJob.class);
        job.setJobName(RcFileReaderJob
        job.setNumReduceTasks(1);
        job.setMapperClass(RcFileMapper.class);
        job.setReducerClass(RcFileReduce.class);
        job.setInputFormatClass(RCFileMapReduceInputFormat.class);
//        MultipleInputs.addInputPath(job, input, RCFileInputFormat.class);
        RCFileMapReduceInputFormat.addInputPath(job, input);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        FileOutputFormat.setOutputPath(job, output);
        return job.waitForCompletion(true);
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        if (args.length != 2) {
            System.err.println(Usage: rcfile in out
            System.exit(2);
        }
        RcFileReaderJob.runLoadMapReducue(conf, new Path(args[0]), new Path(args[1]));
    }
}  

讀取 RCFile 格式文件,使用 mapreduce 生成 Text 格式文件

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hive.hcatalog.rcfile.RCFileMapReduceOutputFormat;

import java.io.IOException;

public class RcFileWriterJob extends Configured implements Tool{
   public static class Map extends Mapper Object, Text, NullWritable, BytesRefArrayWritable {
     private byte[] fieldData;
     private int numCols;
     private BytesRefArrayWritable bytes;
     
     @Override
     protected void setup(Context context) throws IOException, InterruptedException {
       numCols = context.getConfiguration().getInt( hive.io.rcfile.column.number.conf , 0);
       bytes = new BytesRefArrayWritable(numCols);
     }
     
     public void map(Object key, Text line, Context context
                ) throws IOException, InterruptedException {
       bytes.clear();
       String[] cols = line.toString().split(\t , -1);
       System.out.println(SIZE : +cols.length);
       for (int i=0; i numCols; i++){
             fieldData = cols[i].getBytes(UTF-8
             BytesRefWritable cu = new BytesRefWritable(fieldData, 0, fieldData.length);
               bytes.set(i, cu);
           }
       context.write(NullWritable.get(), bytes);
     }
   }
   
   public int run(String[] args) throws Exception {
     Configuration conf = new Configuration();
     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
     if(otherArgs.length 2){
         System.out.println(Usage: +
              hadoop jar RCFileLoader.jar main class +
              -tableName tableName -numCols numberOfColumns -input input path +
              -output output path -rowGroupSize rowGroupSize -ioBufferSize ioBufferSize
         System.out.println(For test
         System.out.println($HADOOP jar RCFileLoader.jar edu.osu.cse.rsam.rcfile.mapreduce.LoadTable +
              -tableName test1 -numCols 10 -input RCFileLoaderTest/test1 +
              -output RCFileLoaderTest/RCFile_test1
         System.out.println($HADOOP jar RCFileLoader.jar edu.osu.cse.rsam.rcfile.mapreduce.LoadTable +
              -tableName test2 -numCols 5 -input RCFileLoaderTest/test2 +
              -output RCFileLoaderTest/RCFile_test2
         return 2;
       }

     String tableName =
     int numCols = 0;
     String inputPath =
     String outputPath =
     int rowGroupSize = 16 *1024*1024;
     int ioBufferSize = 128*1024;
       for (int i=0; i otherArgs.length – 1; i++){
         if(-tableName .equals(otherArgs[i])){
           tableName = otherArgs[i+1];
         }else if (-numCols .equals(otherArgs[i])){
           numCols = Integer.parseInt(otherArgs[i+1]);
         }else if (-input .equals(otherArgs[i])){
           inputPath = otherArgs[i+1];
         }else if(-output .equals(otherArgs[i])){
           outputPath = otherArgs[i+1];
         }else if(-rowGroupSize .equals(otherArgs[i])){
           rowGroupSize = Integer.parseInt(otherArgs[i+1]);
         }else if(-ioBufferSize .equals(otherArgs[i])){
           ioBufferSize = Integer.parseInt(otherArgs[i+1]);
         }
         
       }
       
       conf.setInt(hive.io.rcfile.record.buffer.size , rowGroupSize);
       conf.setInt(io.file.buffer.size , ioBufferSize);

     Job job = Job.getInstance(conf);
     job.setJobName(RcFileWriterJob
       job.setJarByClass(RcFileWriterJob.class);
       job.setMapperClass(Map.class);
       job.setMapOutputKeyClass(NullWritable.class);
       job.setMapOutputValueClass(BytesRefArrayWritable.class);
//       job.setNumReduceTasks(0);
       
       FileInputFormat.addInputPath(job, new Path(inputPath));
       
       job.setOutputFormatClass(RCFileMapReduceOutputFormat.class);
       RCFileMapReduceOutputFormat.setColumnNumber(job.getConfiguration(), numCols);
       RCFileMapReduceOutputFormat.setOutputPath(job, new Path(outputPath));
       RCFileMapReduceOutputFormat.setCompressOutput(job, false);

       System.out.println(Loading table + tableName + from + inputPath + to RCFile located at + outputPath);
       System.out.println(number of columns: + job.getConfiguration().get(hive.io.rcfile.column.number.conf));
       System.out.println(RCFile row group size: + job.getConfiguration().get(hive.io.rcfile.record.buffer.size));
       System.out.println(io bufer size: + job.getConfiguration().get(io.file.buffer.size));
       
       return (job.waitForCompletion(true) ? 0 : 1);
   }
   
   public static void main(String[] args) throws Exception {
       int res = ToolRunner.run(new Configuration(), new RcFileWriterJob(), args);
       System.exit(res);
   }

}

以上是“Mapreduce RCFile 如何寫入和讀取 API”這篇文章的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注丸趣 TV 行業資訊頻道!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計7916字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 天峻县| 新和县| 宣恩县| 桓仁| 新干县| 平果县| 上栗县| 襄城县| 左权县| 临漳县| 商洛市| 九江市| 保德县| 辰溪县| 文水县| 阳朔县| 神池县| 微山县| 武隆县| 宜兰县| 新巴尔虎右旗| 墨脱县| 广饶县| 盐池县| 三原县| 汤原县| 平凉市| 井冈山市| 罗田县| 昭通市| 凯里市| 宁强县| 宣汉县| 兴仁县| 防城港市| 赣州市| 黑河市| 砚山县| 商都县| 绥阳县| 太和县|