JDBC batch re-processing

I have always wanted to duplicate how GoldenGate handles batch failures. In GoldenGate, you can set the parameter “batchsql BATCHTRANSOPS 2000”, for example, to send 2000 statements at a time to the database for execution. In general, this is far faster than sending them one at a time. If an error is encountered during batch execution, such as a duplicate key violation, the statements are rolled back and reprocessed serially. This is really useful if you don’t expect many errors. If an error does occur, the statements that can be successfully processed are executed, and the exceptions can be logged to a file.

I wanted to do the same with JDBC, and came up with what is below. Roughly, this reads lines from a file and stores them in an ArrayList. Once the list has reached a configurable size, each is sent in a batch to the database. if an exception is thrown, the batch is rolled back and the ArrayList is resubmitted to a method that processes each statement serially. Any exceptions in this phase are logged, and the remainder that can be processed are executed.

This class has some additional elements such as key processing, but for this post, the emphasis is on the batch processing algorithm.

import java.io.*;
import java.text.*;
import java.util.*;
import java.sql.*;

public class loadData {

  static Connection conn = null;
  static PreparedStatement pst = null;
  static ArrayList rows = new ArrayList();
  static ResultSet rst = null;
  static ResultSetMetaData rsmd = null;
  static DateFormat df = new SimpleDateFormat("MM/dd/yyyy H:m:s");

  public static void main (String args[]) throws Exception {

    PGPFileProcessor pgpfp = new PGPFileProcessor();
    pgpfp.setInputFileName(args[0]);
    String output = null;
    pgpfp.setSecretKeyFileName("secring.gpg");
    if (args[0].indexOf(".gpg") > 0)
      output = args[0].replace(".gpg","");
    else
      output = args[0] + ".txt";

    pgpfp.setOutputFileName(output);
    pgpfp.setPassphrase("****");
    pgpfp.decrypt();

    BufferedReader bfr = new BufferedReader(new FileReader(output));

    conn = DriverManager.getConnection("jdbc:oracle:thin:user/pwd@dbhost:1521/dbservice");
    String table = args[1];
    System.out.println(table);
    conn.setAutoCommit(false);
    rst = conn.createStatement().executeQuery("select * from " + table);
    rsmd = rst.getMetaData();
    String insert = "insert into " + table + " values(";
    for (int i = 1; i <= rsmd.getColumnCount(); i++) {
      if (i == rsmd.getColumnCount()) {
        insert = insert + "?)";
      }
      else {
        insert = insert + "?,";
      }
    }
    pst = conn.prepareStatement(insert);
    int l = 0;
    String s;
    int cnt = 0;
    while((s = bfr.readLine()) != null) {
      if (cnt > 0) {
        int j = 0;
        String[] st2 = s.split("\\|",-1);
        if (st2.length == rsmd.getColumnCount()) {
          rows.add(st2);
        }
        else {
          System.out.println(new java.util.Date() + "\tFAILURE: Column number mismatch " + s);
        }
        if (l++ % 10000 == 0) {
          processBatch();
          try {
            pst.executeBatch();
            System.out.println("batch successful");
            conn.commit();
            System.out.println(new java.util.Date() + "\tinserted " + l + " rows.");
            rows.clear();
          }
          catch (Exception e2) {
            System.out.println(e2.getMessage());
            conn.rollback();
            processRowByRow();
            rows.clear();
          }
        }
      }
      cnt++;
    }
    processBatch();
    try {
      pst.executeBatch();
      conn.commit();
      System.out.println(new java.util.Date() + "\tinserted " + l + " rows.");
      rows.clear();
    }
    catch (Exception e2) {
      System.out.println(e2.getMessage());
      conn.rollback();
      processRowByRow();
      rows.clear();
    }
    bfr.close();
  }

  static public void processRowByRow() throws Exception {
    System.out.println("row by row processing");
    for (int m = 0; m < rows.size(); m++) {
      try {
        processRow(rows.get(m));
        pst.execute();
        //rows.remove(m);
        conn.commit();
      }
      catch (Exception e) {
        System.out.println(rows.get(m)[0]);
        System.out.println(e.getMessage());
      }
    }
  }

  static public void processBatch() throws Exception {
    System.out.println("batch processing");
    for (int m = 0; m < rows.size(); m++) {
      processRow(rows.get(m));
      pst.addBatch();
    }
  }

  static public void processRow(String[] s) throws Exception {
    int j = 0;
    for (int k = 0; k < s.length; k++) {
      j++;
      String tmp = s[k].replace("\"","");
      //System.out.println(j + " " + tmp + " " + rsmd.getColumnTypeName(j) + " " + rsmd.getColumnName(j));
      if (rsmd.getColumnTypeName(j).equals("DATE")) {
        if (tmp == null || tmp.equals(""))
          pst.setNull(j, java.sql.Types.DATE);
        else
          pst.setDate(j,new java.sql.Date(df.parse(tmp).getTime()));
      }
      else if (rsmd.getColumnTypeName(j).equals("NUMBER")) {
        if (tmp == null || tmp.equals(""))
          pst.setNull(j, java.sql.Types.INTEGER);
        else
          pst.setDouble(j,Double.parseDouble(tmp));
      }
      else if (rsmd.getColumnTypeName(j).equals("VARCHAR2")) {
        if (tmp == null)
          pst.setNull(j, java.sql.Types.VARCHAR);
        else
          pst.setString(j,tmp);
      }
    }
  }
}

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.