CRC check on gzip files in HDFS

I am sure there is a more elegant way to do this, but I wanted it done quickly. We had a few files that threw exceptions about the end of the file being reached (CRC failure) for external files underlying a hive table. There is a fix coming out for this at some point, but for now, this is a workaround to at least identify those problem children…

import java.util.*;
import java.util.zip.*;
import java.util.concurrent.*;
import java.io.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.compress.*;

public class check implements Runnable {
  static ArrayBlockingQueue queue = new ArrayBlockingQueue(1000);
  static FileSystem fs;
  static CompressionCodecFactory factory;
  static Object POISON_PILL = new Object();
  public static void main(String args[]) throws Exception {
    Configuration conf = new Configuration();
    fs = FileSystem.get(conf);
    factory = new CompressionCodecFactory(conf);
    CompressionCodec codec = null;
    try {
      Path file = new Path(args[0]);
      RemoteIterator it = fs.listFiles(file,false);
      String fname = "";
      for (int j = 1; j <= 20; j++) {
        check c = new check();
      }
      while (it.hasNext()) {
        try {
          LocatedFileStatus item = (LocatedFileStatus)it.next();
          fname = item.getPath().toString();
          queue.put(item);
          System.out.println("put " + fname);
        }
        catch (Exception e) {
          System.out.println(fname + " " + e.getMessage());
        }
      }
      queue.put(POISON_PILL);
      fs.close();
    }
    catch (Exception ezip) {
      ezip.printStackTrace();
    }
  }

  check() {
    Thread t = new Thread(this);
    t.start();
    try {
      //t.join();
    }
    catch (Exception k) {
    }
  }

  public void run() {
    try {
      while (true) {
        Object obj = check.queue.take();
        if (obj == check.POISON_PILL) {
          check.queue.add(POISON_PILL);
          break;
        }
        LocatedFileStatus item = (LocatedFileStatus)obj;
        System.out.println("processing " + item.getPath().toString());
        CompressionCodec codec = check.factory.getCodec(item.getPath());
        InputStream stream = null;

        if (codec != null) {
          stream = codec.createInputStream(check.fs.open(item.getPath()));
        }
        else {
          stream = check.fs.open(item.getPath());
        }

        String s = "";

        BufferedReader bfr = null;
        bfr = new BufferedReader(new InputStreamReader(stream));
        while((s = bfr.readLine()) != null) {
        }
      }
    }
    catch (Exception e) {
      e.printStackTrace();
    }
  }
}

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.