通过Avro 将文件合并

简介:

Avro合并本地文件

package com.zhiyou100.mr;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FileUtil;

import com.zhiyou100.schema.SmallFile;

public class AvroMergeSmallFile {

private Schema.Parser parse = new Schema.Parser();
private Schema schema;
private List<String> inputFilePaths = new ArrayList<String>();

private AvroMergeSmallFile() {
    schema = SmallFile.getClassSchema();
}

public void addInputFileDir(String inputDir) throws IOException {
    // 获取出文件夹下的所有文件
    File[] files = FileUtil.listFiles(new File(inputDir));
    // 把文件路径添加到inputFilePaths中
    for (File file : files) {
        inputFilePaths.add(file.getPath());
    }
}

// 把inputFilePaths中所有文件合并到一个avro文件中
public void mergeFile(String outputPath) throws IOException {
    DatumWriter<SmallFile> writer = new SpecificDatumWriter<SmallFile>();
    DataFileWriter<SmallFile> fileWriter = new DataFileWriter<SmallFile>(writer);
    fileWriter.create(SmallFile.getClassSchema(), new File(outputPath));
    // 把inputFilePath的文件一个个读取出来根据模式放入到avro文件中
    for (String filePath : inputFilePaths) {
        File inputFile = new File(filePath);
        byte[] content = FileUtils.readFileToByteArray(inputFile);

        SmallFile oneSmallFile = SmallFile.newBuilder().setFileName(inputFile.getAbsolutePath())
                .setContent(ByteBuffer.wrap(content)).build();
        fileWriter.append(oneSmallFile);
        System.out.println("写入" + inputFile.getAbsolutePath() + "成功"+DigestUtils.md5Hex(content));

    }
    fileWriter.flush();
    fileWriter.close();
}
//读取大avro中的文件
public void readMergedFile (String avrofile) throws IOException{
    DatumReader<SmallFile> reader =new SpecificDatumReader<SmallFile>();
    DataFileReader<SmallFile>fileReader =new DataFileReader<SmallFile>(new File(avrofile), reader);
    SmallFile smallFile =null;
    while(fileReader.hasNext()){
        smallFile=fileReader.next();
        System.out.println("文件名:"+smallFile.getFileName());
        System.out.println("文件内容:"+new String(smallFile.getContent().array().toString()));
        System.out.println("文件MD5"+DigestUtils.md5Hex(smallFile.getContent().array()));
    }
}

public static void main(String[] args) throws IOException {
    AvroMergeSmallFile avroMergeSmallFile =new AvroMergeSmallFile();
    avroMergeSmallFile.addInputFileDir("C:\\Users\\Administrator\\Desktop\\reversetext");
    avroMergeSmallFile.mergeFile("C:\\Users\\Administrator\\Desktop\\AvroMErgeSmallFile");
    avroMergeSmallFile.readMergedFile("C:\\Users\\Administrator\\Desktop\\AvroMErgeSmallFile");
}

}

相关文章
|
2月前
|
存储
Hudi Log日志文件格式分析(一)
Hudi Log日志文件格式分析(一)
27 1
|
存储 SQL JSON
hive文件与压缩
hive文件与压缩
hive文件与压缩
|
SQL 分布式计算 Hadoop
MapReduce - 读取 ORC, RcFile 文件
一.引言 MR 任务处理相关 hive 表数据时格式为 orc 和 rcFile,下面记录两种处理方法。 二.偷懒版读取 ORC,RcFile 文件 最初不太熟悉 mr,只会 textFormat 一种输入模式,于是遇到 orc 和 rcFile 形式的 hive 数据需要在 mr 读取时,都是先通过 INSERTOVERWRITEDIRECTORY 将 hive 表重新输出一份 hdfs 的 text 数据,随后用 mr 读取该 text 文件,该方法适合偷懒且原始 hive 数据不大,..
262 0
MapReduce - 读取 ORC, RcFile 文件
|
存储 分布式计算 Hadoop
Hadoop支持的文件格式之SequenceFile(下)
Hadoop支持的文件格式之SequenceFile(下)
110 0
Hadoop支持的文件格式之SequenceFile(下)
|
分布式计算 Hadoop
Hadoop支持的文件格式之SequenceFile(上)
Hadoop支持的文件格式之SequenceFile(上)
147 0
|
JSON 编解码 Java
|
分布式计算 Hadoop 分布式数据库
通过Datax将CSV文件导入Hbase,导入之前的CSV文件大小和导入之后的Hadoop分布式文件大小对比引入的思考
由于项目需要做系统之间的离线数据同步,因为实时性要求不高,因此考虑采用了阿里的datax来进行同步。在同步之前,将数据导出未csv文件,因为需要估算将来的hbase运行的hadoop的分布式文件系统需要占用多少磁盘空间,因此想到了需要做几组测试。
2180 0
|
SQL 存储 HIVE
hive orc文件读取
支持增删改查建表: create table orc_table(id int, name string) clustered by (id) into 4 buckets stored as orc TBLPROP...
5531 0