Wednesday 24 November 2021

Extract error records while inserting into db table using JDBCIO apache beam in java

 I was inserting data into postgres db using apache beam pipeline. it works perfectly with JdbcIO write of apache beam library. But, now, i want to extract those records which are not able to insert in DB. this unsuccessful insertion can be anything like SqlException or db constraint violation. 

I have gone through apache beam programming guide documentation but not able to any pre defined transform which give me unsuccessful inserts. 

Then, I copied, JdbcIO class write functionality and modified executeBatch functionality under WriteFn. also, I added TupleTags for successful and non-successful insert records which classify incoming pipeline records and and then we can get failed records by providing given TupleTag.


Below is the code:


BeamTest.java


import java.sql.PreparedStatement;

import java.sql.SQLException;

import java.util.ArrayList;

import java.util.Arrays;

import java.util.Collection;

import java.util.List;


import org.apache.beam.runners.flink.FlinkRunner;

import org.apache.beam.sdk.Pipeline;

import org.apache.beam.sdk.PipelineResult.State;

import org.apache.beam.sdk.coders.SerializableCoder;

import org.apache.beam.sdk.options.PipelineOptions;

import org.apache.beam.sdk.options.PipelineOptionsFactory;

import org.apache.beam.sdk.transforms.Create;

import org.apache.beam.sdk.transforms.DoFn;

import org.apache.beam.sdk.transforms.ParDo;

import org.apache.beam.sdk.values.PCollection;

import org.apache.beam.sdk.values.PCollectionTuple;

import org.apache.beam.sdk.values.TupleTag;


/**

 * @author sachin

 * @date 18-Nov-2021

 */


public class BeamTest {

static List<Stock> stocks = new ArrayList<>();


public static void main(String[] args) {

System.setProperty("java.specification.version", "1.8");

process();

// read();


}


public static void process() {

final TupleTag<Stock> VALID = new TupleTag<Stock>() {

};

final TupleTag<Stock> INVALID = new TupleTag<Stock>() {

};


PipelineOptions options = PipelineOptionsFactory.create();


options.setRunner(FlinkRunner.class);

Pipeline p = Pipeline.create(options);


// Preparing dummy data

Collection<Stock> stockList = Arrays.asList(

new Stock("AAP", 2000, "Apple Inc"),

new Stock("MSF", 3000, "Microsoft Corporation"), 

new Stock("NVDA", 4000, "NVIDIA Corporation"),

new Stock("INT", 3200, "Intel Corporation"));


// Reading dummy data and save it into PCollection<Stock>

PCollection<Stock> data = p.apply(Create.of(stockList).withCoder(SerializableCoder.of(Stock.class)));

// insert

PCollectionTuple pCollectionTupleResult = data.apply("write", CustomJdbcIOWrite.<Stock>write()


.withDataSourceConfiguration(CustomJdbcIOWrite.DataSourceConfiguration

.create("org.postgresql.Driver", "jdbc:postgresql://localhost:5432/postgres")

.withUsername("postgres").withPassword("sachin"))

.withStatement("insert into stocks values(?, ?, ?)").withValidTag(VALID).withInValidTag(INVALID)

.withPreparedStatementSetter(new CustomJdbcIOWrite.PreparedStatementSetter<Stock>() {

private static final long serialVersionUID = 1L;

public void setParameters(Stock element, PreparedStatement query) throws SQLException {

query.setString(1, element.getSymbol());

query.setLong(2, element.getPrice());

query.setString(3, element.getCompany());

}

}));

// get failed PCollection using INVALID tupletag

PCollection<Stock> failedPcollection = pCollectionTupleResult.get(INVALID)

.setCoder(SerializableCoder.of(Stock.class));

failedPcollection.apply(ParDo.of(new DoFn<Stock, Stock>() {


private static final long serialVersionUID = 1L;


@ProcessElement

public void process(ProcessContext pc) {

System.out.println("Failed pCollection element:" + pc.element().getCompany());

}


}));


//get failed PCollection using INVALID tupletag

PCollection<Stock> insertedPcollection = pCollectionTupleResult.get(VALID)

.setCoder(SerializableCoder.of(Stock.class));

insertedPcollection.apply(ParDo.of(new DoFn<Stock, Stock>() {


private static final long serialVersionUID = 1L;

@ProcessElement

public void process(ProcessContext pc) {

System.out.println("Inserted pCollection element:" + pc.element().getCompany());

}

}));

// run pipeline

State state = p.run().waitUntilFinish();

System.out.println("Data inserted successfully with state : " + state);


}

}

In output, We can see, from PCollectionTuple, we get failed and success record by providing  given TupleTag. 

OUTPUT:

Inserted pCollection element:Microsoft Corporation

Failed pCollection element:NVIDIA Corporation

Inserted pCollection element:Intel Corporation

Inserted pCollection element:Apple Inc

Data inserted successfully with state : DONE

 

CustomJdbcIOWrite.Java

import static com.google.common.base.Preconditions.checkArgument;


import java.io.Serializable;

import java.sql.Connection;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.sql.SQLException;


import javax.annotation.Nullable;

import javax.sql.DataSource;


import org.apache.beam.sdk.annotations.Experimental;

import org.apache.beam.sdk.options.PipelineOptions;

import org.apache.beam.sdk.transforms.DoFn;

import org.apache.beam.sdk.transforms.PTransform;

import org.apache.beam.sdk.transforms.ParDo;

import org.apache.beam.sdk.transforms.display.DisplayData;

import org.apache.beam.sdk.values.PCollection;

import org.apache.beam.sdk.values.PCollectionTuple;

import org.apache.beam.sdk.values.TupleTag;

import org.apache.beam.sdk.values.TupleTagList;

import org.apache.commons.dbcp2.BasicDataSource;


import com.google.auto.value.AutoValue;


@Experimental

public class CustomJdbcIOWrite {


  /**

   * Write data to a JDBC datasource.

   *

   * @param <T> Type of the data to be written.

   */

  public static <T> Write<T> write() {

    return new AutoValue_JdbcIO_Write.Builder<T>().build();

  }


  private CustomJdbcIOWrite() {}


  /**

   * An interface used by {@link CustomJdbcIOWrite.Read} for converting each row of the {@link ResultSet} into

   * an element of the resulting {@link PCollection}.

   */

  public interface RowMapper<T> extends Serializable {

    T mapRow(ResultSet resultSet) throws Exception;

  }


  /**

   * A POJO describing a {@link DataSource}, either providing directly a {@link DataSource} or all

   * properties allowing to create a {@link DataSource}.

   */

  @AutoValue

  public abstract static class DataSourceConfiguration implements Serializable {

    @Nullable abstract String getDriverClassName();

    @Nullable abstract String getUrl();

    @Nullable abstract String getUsername();

    @Nullable abstract String getPassword();

    @Nullable abstract String getConnectionProperties();

    @Nullable abstract DataSource getDataSource();


    abstract Builder builder();


    @AutoValue.Builder

    abstract static class Builder {

      abstract Builder setDriverClassName(String driverClassName);

      abstract Builder setUrl(String url);

      abstract Builder setUsername(String username);

      abstract Builder setPassword(String password);

      abstract Builder setConnectionProperties(String connectionProperties);

      abstract Builder setDataSource(DataSource dataSource);

      abstract DataSourceConfiguration build();

    }


    public static DataSourceConfiguration create(DataSource dataSource) {

      checkArgument(dataSource != null, "DataSourceConfiguration.create(dataSource) called with "

          + "null data source");

      checkArgument(dataSource instanceof Serializable,

          "DataSourceConfiguration.create(dataSource) called with a dataSource not Serializable");

      return new AutoValue_JdbcIO_DataSourceConfiguration.Builder()

          .setDataSource(dataSource)

          .build();

    }


    public static DataSourceConfiguration create(String driverClassName, String url) {

      checkArgument(driverClassName != null,

          "DataSourceConfiguration.create(driverClassName, url) called with null driverClassName");

      checkArgument(url != null,

          "DataSourceConfiguration.create(driverClassName, url) called with null url");

      return new AutoValue_JdbcIO_DataSourceConfiguration.Builder()

          .setDriverClassName(driverClassName)

          .setUrl(url)

          .build();

    }


    public DataSourceConfiguration withUsername(String username) {

      return builder().setUsername(username).build();

    }


    public DataSourceConfiguration withPassword(String password) {

      return builder().setPassword(password).build();

    }


    /**

     * Sets the connection properties passed to driver.connect(...).

     * Format of the string must be [propertyName=property;]*

     *

     * <p>NOTE - The "user" and "password" properties can be add via {@link #withUsername(String)},

     * {@link #withPassword(String)}, so they do not need to be included here.

     */

    public DataSourceConfiguration withConnectionProperties(String connectionProperties) {

      checkArgument(connectionProperties != null, "DataSourceConfiguration.create(driver, url)"

          + ".withConnectionProperties(connectionProperties) "

          + "called with null connectionProperties");

      return builder().setConnectionProperties(connectionProperties).build();

    }


    private void populateDisplayData(DisplayData.Builder builder) {

      if (getDataSource() != null) {

        builder.addIfNotNull(DisplayData.item("dataSource", getDataSource().getClass().getName()));

      } else {

        builder.addIfNotNull(DisplayData.item("jdbcDriverClassName", getDriverClassName()));

        builder.addIfNotNull(DisplayData.item("jdbcUrl", getUrl()));

        builder.addIfNotNull(DisplayData.item("username", getUsername()));

      }

    }


    DataSource buildDatasource() throws Exception{

      if (getDataSource() != null) {

        return getDataSource();

      } else {

        BasicDataSource basicDataSource = new BasicDataSource();

        basicDataSource.setDriverClassName(getDriverClassName());

        basicDataSource.setUrl(getUrl());

        basicDataSource.setUsername(getUsername());

        basicDataSource.setPassword(getPassword());

        if (getConnectionProperties() != null) {

          basicDataSource.setConnectionProperties(getConnectionProperties());

        }

        return basicDataSource;

      }

    }


  }


  /**

   * An interface used by the JdbcIO Write to set the parameters of the {@link PreparedStatement}

   * used to setParameters into the database.

   */

  public interface StatementPreparator extends Serializable {

    void setParameters(PreparedStatement preparedStatement) throws Exception;

  }


  public interface PreparedStatementSetter<T> extends Serializable {

    void setParameters(T element, PreparedStatement preparedStatement) throws Exception;

  }


  /** A {@link PTransform} to write to a JDBC datasource. */

  @AutoValue

  public abstract static class Write<T> extends PTransform<PCollection<T>, PCollectionTuple> {

  final TupleTag<Stock> tupletagTest = new TupleTag<Stock>() {} ;

    @Nullable abstract DataSourceConfiguration getDataSourceConfiguration();

    @Nullable abstract String getStatement();

    @Nullable abstract PreparedStatementSetter<T> getPreparedStatementSetter();

    @Nullable

abstract TupleTag<T> getValidTupleTag();


@Nullable

abstract TupleTag<T> getInvalidTupleTag();


    abstract Builder<T> toBuilder();


    @AutoValue.Builder

    abstract static class Builder<T> {

      abstract Builder<T> setDataSourceConfiguration(DataSourceConfiguration config);

      abstract Builder<T> setStatement(String statement);

      abstract Builder<T> setPreparedStatementSetter(PreparedStatementSetter<T> setter);

      abstract Builder<T> setValidTupleTag(TupleTag<T> validtag);

      abstract Builder<T> setInvalidTupleTag(TupleTag<T> inValidtag);


      abstract Write<T> build();

    }


    public Write<T> withDataSourceConfiguration(DataSourceConfiguration config) {

      return toBuilder().setDataSourceConfiguration(config).build();

    }

    public Write<T> withStatement(String statement) {

      return toBuilder().setStatement(statement).build();

    }

    public Write<T> withPreparedStatementSetter(PreparedStatementSetter<T> setter) {

      return toBuilder().setPreparedStatementSetter(setter).build();

    }

    public Write<T> withValidTag(TupleTag<T> validtag) {

      return toBuilder().setValidTupleTag(validtag).build();

    }

    public Write<T> withInValidTag(TupleTag<T> inValidtag) {

        return toBuilder().setInvalidTupleTag(inValidtag).build();

    }


    @Override

    public PCollectionTuple expand(PCollection<T> input) {

    return input.apply(ParDo.of(new WriteFn<T>(this)).

    withOutputTags(this.getValidTupleTag(), TupleTagList.of(this.getInvalidTupleTag()).and(tupletagTest)));

    }


    @Override

    public void validate(PipelineOptions options) {

      checkArgument(getDataSourceConfiguration() != null,

          "JdbcIO.write() requires a configuration to be set via "

              + ".withDataSourceConfiguration(configuration)");

      checkArgument(getStatement() != null,

          "JdbcIO.write() requires a statement to be set via .withStatement(statement)");

      checkArgument(getPreparedStatementSetter() != null,

          "JdbcIO.write() requires a preparedStatementSetter to be set via "

              + ".withPreparedStatementSetter(preparedStatementSetter)");

    }


    private static class WriteFn<T> extends DoFn<T, T> {

      private static final int DEFAULT_BATCH_SIZE = 1;


      private final Write<T> spec;


      private DataSource dataSource;

      private Connection connection;

      private PreparedStatement preparedStatement;

      private TupleTag<T> validTupleTag;

      private TupleTag<T> inValidTupleTag;

      private int batchCount;


      public WriteFn(Write<T> spec) {

        this.spec = spec;

      }


      @Setup

      public void setup() throws Exception {

        dataSource = spec.getDataSourceConfiguration().buildDatasource();

        connection = dataSource.getConnection();

        connection.setAutoCommit(false);

        preparedStatement = connection.prepareStatement(spec.getStatement());

        validTupleTag = spec.getValidTupleTag();

        inValidTupleTag = spec.getInvalidTupleTag();

      }


      @StartBundle

      public void startBundle() {

        batchCount = 0;

      }

      

      @ProcessElement

      public void processElement(@Element T record, MultiOutputReceiver out) throws Exception {

        preparedStatement.clearParameters();

        spec.getPreparedStatementSetter().setParameters(record, preparedStatement);

        preparedStatement.addBatch();


        batchCount++;


        if (batchCount >= DEFAULT_BATCH_SIZE) {

        if (batchCount > 0) {

                try {

      preparedStatement.executeBatch();

      connection.commit();

      out.get(validTupleTag).output(record);

      } catch (SQLException e1) {

      //TODO add logger

      out.get(inValidTupleTag).output(record);

      }

                batchCount = 0;

              }

        }

      }


      @FinishBundle

      public void finishBundle() throws Exception {

        executeBatch();

      }


      private void executeBatch() {

        

      }


      @Teardown

      public void teardown() throws Exception {

        try {

          if (preparedStatement != null) {

            preparedStatement.close();

          }

        } finally {

          if (connection != null) {

            connection.close();

          }

          if (dataSource instanceof AutoCloseable) {

            ((AutoCloseable) dataSource).close();

          }

        }

      }

    }

  }

}


 GIT:  https://github.com/ranesaci/apachebeam


Wednesday 30 June 2021

Empty string fields to null using java reflection for given object recursively

 While working in project, we got requirement to not include empty xml tags while converting java pojo to XML. if we pass empty string fields to in java pojo, it adds empty tag for that filed. to avoid it, i have written below program which will take the object as parameter and convert all empty string fields to null using java reflection recursively.


import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* @author Sachin Rane on 7/1/21
*/
public class ReflectionUtils {
public static void main(String[] args) {
Employee employee = new Employee();
employee.setName("Sachin");
employee.setCity("");//this will set to null
replaceAllEmptyStringFieldsWithNull(employee);
if (null == employee.getCity()) {
System.out.println("replaceAllEmptyStringFieldsWithNull converted blank string to null");
}
}
public static void replaceAllEmptyStringFieldsWithNull(Object object) {
if (null == object) {
return;
}
Method[] methodsOfObject = object.getClass().getDeclaredMethods();
Method methodRef = null;
try {
for (Method method : methodsOfObject) {
methodRef = method;
method.setAccessible(true);
if (isGetType(method)) {
//skip list, set, map or array objects
if (skipObjectsCheck(method)) {
continue;
}
if (method.getReturnType() == String.class) {
String value = (String) method.invoke(object);
if ("".equals(value)) {
Field field = getFieldByFieldName(object, method.getName().replace("get", ""));
if (null != field) {
field.setAccessible(true);
field.set(object, null);
}
}
} else {
replaceAllEmptyStringFieldsWithNull(method.invoke(object));
}
}
}
} catch (IllegalAccessException ex) {
System.out.println("IllegalAccessException while invoking method for class : " + object.getClass().getName()
+ " and method : " + (null != methodRef ? methodRef.getName() : null));
} catch (InvocationTargetException ex) {
System.out.println("InvocationTargetException while invoking method for class : " + object.getClass().getName()
+ " and method : " + (null != methodRef ? methodRef.getName() : null));
}
}
static boolean skipObjectsCheck(Method method) {
if (method.getReturnType().isArray()) return true;
if (!(method.getGenericReturnType() instanceof ParameterizedType)) return false;
ParameterizedType parametrizedReturnType = (ParameterizedType) method.getGenericReturnType();
if (parametrizedReturnType.getRawType() == List.class
|| parametrizedReturnType.getRawType() == Set.class
|| parametrizedReturnType.getRawType() == Map.class) {
return true;
}
return false;
}
static Field getFieldByFieldName(Object obj, String fieldName) {
Field[] fields = obj.getClass().getDeclaredFields();
for(Field f : fields){
if (f.getName().equalsIgnoreCase(fieldName)) {
return f;
}
}
return null;
}
static boolean isGetType(Method method) {
if (method.getName().startsWith("get")) return true;
return false;
}
private static class Employee {
String name;
String city;
List<String> list;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getCity() {
return city;
}
public void setCity(String city) {
this.city = city;
}
public List<String> getList() {
return list;
}
public void setList(List<String> list) {
this.list = list;
}
}
}

Extract error records while inserting into db table using JDBCIO apache beam in java

 I was inserting data into postgres db using apache beam pipeline. it works perfectly with JdbcIO write of apache beam library. But, now, i ...