I am sure there is a more elegant way to do this, but I wanted it done quickly. We had a few files that threw exceptions about the end of the file being reached (CRC failure) for external files underlying a hive table. There is a fix coming out for this at some point, but for now, this is a workaround to at least identify those problem children…
import java.util.*;
import java.util.zip.*;
import java.util.concurrent.*;
import java.io.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.compress.*;
public class check implements Runnable {
static ArrayBlockingQueue queue = new ArrayBlockingQueue(1000);
static FileSystem fs;
static CompressionCodecFactory factory;
static Object POISON_PILL = new Object();
public static void main(String args[]) throws Exception {
Configuration conf = new Configuration();
fs = FileSystem.get(conf);
factory = new CompressionCodecFactory(conf);
CompressionCodec codec = null;
try {
Path file = new Path(args[0]);
RemoteIterator it = fs.listFiles(file,false);
String fname = "";
for (int j = 1; j <= 20; j++) {
check c = new check();
}
while (it.hasNext()) {
try {
LocatedFileStatus item = (LocatedFileStatus)it.next();
fname = item.getPath().toString();
queue.put(item);
System.out.println("put " + fname);
}
catch (Exception e) {
System.out.println(fname + " " + e.getMessage());
}
}
queue.put(POISON_PILL);
fs.close();
}
catch (Exception ezip) {
ezip.printStackTrace();
}
}
check() {
Thread t = new Thread(this);
t.start();
try {
//t.join();
}
catch (Exception k) {
}
}
public void run() {
try {
while (true) {
Object obj = check.queue.take();
if (obj == check.POISON_PILL) {
check.queue.add(POISON_PILL);
break;
}
LocatedFileStatus item = (LocatedFileStatus)obj;
System.out.println("processing " + item.getPath().toString());
CompressionCodec codec = check.factory.getCodec(item.getPath());
InputStream stream = null;
if (codec != null) {
stream = codec.createInputStream(check.fs.open(item.getPath()));
}
else {
stream = check.fs.open(item.getPath());
}
String s = "";
BufferedReader bfr = null;
bfr = new BufferedReader(new InputStreamReader(stream));
while((s = bfr.readLine()) != null) {
}
}
}
catch (Exception e) {
e.printStackTrace();
}
}
}