Wednesday 24 November 2021

Extract error records while inserting into db table using JDBCIO apache beam in java

 I was inserting data into postgres db using apache beam pipeline. it works perfectly with JdbcIO write of apache beam library. But, now, i want to extract those records which are not able to insert in DB. this unsuccessful insertion can be anything like SqlException or db constraint violation. 

I have gone through apache beam programming guide documentation but not able to any pre defined transform which give me unsuccessful inserts. 

Then, I copied, JdbcIO class write functionality and modified executeBatch functionality under WriteFn. also, I added TupleTags for successful and non-successful insert records which classify incoming pipeline records and and then we can get failed records by providing given TupleTag.


Below is the code:


BeamTest.java


import java.sql.PreparedStatement;

import java.sql.SQLException;

import java.util.ArrayList;

import java.util.Arrays;

import java.util.Collection;

import java.util.List;


import org.apache.beam.runners.flink.FlinkRunner;

import org.apache.beam.sdk.Pipeline;

import org.apache.beam.sdk.PipelineResult.State;

import org.apache.beam.sdk.coders.SerializableCoder;

import org.apache.beam.sdk.options.PipelineOptions;

import org.apache.beam.sdk.options.PipelineOptionsFactory;

import org.apache.beam.sdk.transforms.Create;

import org.apache.beam.sdk.transforms.DoFn;

import org.apache.beam.sdk.transforms.ParDo;

import org.apache.beam.sdk.values.PCollection;

import org.apache.beam.sdk.values.PCollectionTuple;

import org.apache.beam.sdk.values.TupleTag;


/**

 * @author sachin

 * @date 18-Nov-2021

 */


public class BeamTest {

static List<Stock> stocks = new ArrayList<>();


public static void main(String[] args) {

System.setProperty("java.specification.version", "1.8");

process();

// read();


}


public static void process() {

final TupleTag<Stock> VALID = new TupleTag<Stock>() {

};

final TupleTag<Stock> INVALID = new TupleTag<Stock>() {

};


PipelineOptions options = PipelineOptionsFactory.create();


options.setRunner(FlinkRunner.class);

Pipeline p = Pipeline.create(options);


// Preparing dummy data

Collection<Stock> stockList = Arrays.asList(

new Stock("AAP", 2000, "Apple Inc"),

new Stock("MSF", 3000, "Microsoft Corporation"), 

new Stock("NVDA", 4000, "NVIDIA Corporation"),

new Stock("INT", 3200, "Intel Corporation"));


// Reading dummy data and save it into PCollection<Stock>

PCollection<Stock> data = p.apply(Create.of(stockList).withCoder(SerializableCoder.of(Stock.class)));

// insert

PCollectionTuple pCollectionTupleResult = data.apply("write", CustomJdbcIOWrite.<Stock>write()


.withDataSourceConfiguration(CustomJdbcIOWrite.DataSourceConfiguration

.create("org.postgresql.Driver", "jdbc:postgresql://localhost:5432/postgres")

.withUsername("postgres").withPassword("sachin"))

.withStatement("insert into stocks values(?, ?, ?)").withValidTag(VALID).withInValidTag(INVALID)

.withPreparedStatementSetter(new CustomJdbcIOWrite.PreparedStatementSetter<Stock>() {

private static final long serialVersionUID = 1L;

public void setParameters(Stock element, PreparedStatement query) throws SQLException {

query.setString(1, element.getSymbol());

query.setLong(2, element.getPrice());

query.setString(3, element.getCompany());

}

}));

// get failed PCollection using INVALID tupletag

PCollection<Stock> failedPcollection = pCollectionTupleResult.get(INVALID)

.setCoder(SerializableCoder.of(Stock.class));

failedPcollection.apply(ParDo.of(new DoFn<Stock, Stock>() {


private static final long serialVersionUID = 1L;


@ProcessElement

public void process(ProcessContext pc) {

System.out.println("Failed pCollection element:" + pc.element().getCompany());

}


}));


//get failed PCollection using INVALID tupletag

PCollection<Stock> insertedPcollection = pCollectionTupleResult.get(VALID)

.setCoder(SerializableCoder.of(Stock.class));

insertedPcollection.apply(ParDo.of(new DoFn<Stock, Stock>() {


private static final long serialVersionUID = 1L;

@ProcessElement

public void process(ProcessContext pc) {

System.out.println("Inserted pCollection element:" + pc.element().getCompany());

}

}));

// run pipeline

State state = p.run().waitUntilFinish();

System.out.println("Data inserted successfully with state : " + state);


}

}

In output, We can see, from PCollectionTuple, we get failed and success record by providing  given TupleTag. 

OUTPUT:

Inserted pCollection element:Microsoft Corporation

Failed pCollection element:NVIDIA Corporation

Inserted pCollection element:Intel Corporation

Inserted pCollection element:Apple Inc

Data inserted successfully with state : DONE

 

CustomJdbcIOWrite.Java

import static com.google.common.base.Preconditions.checkArgument;


import java.io.Serializable;

import java.sql.Connection;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.sql.SQLException;


import javax.annotation.Nullable;

import javax.sql.DataSource;


import org.apache.beam.sdk.annotations.Experimental;

import org.apache.beam.sdk.options.PipelineOptions;

import org.apache.beam.sdk.transforms.DoFn;

import org.apache.beam.sdk.transforms.PTransform;

import org.apache.beam.sdk.transforms.ParDo;

import org.apache.beam.sdk.transforms.display.DisplayData;

import org.apache.beam.sdk.values.PCollection;

import org.apache.beam.sdk.values.PCollectionTuple;

import org.apache.beam.sdk.values.TupleTag;

import org.apache.beam.sdk.values.TupleTagList;

import org.apache.commons.dbcp2.BasicDataSource;


import com.google.auto.value.AutoValue;


@Experimental

public class CustomJdbcIOWrite {


  /**

   * Write data to a JDBC datasource.

   *

   * @param <T> Type of the data to be written.

   */

  public static <T> Write<T> write() {

    return new AutoValue_JdbcIO_Write.Builder<T>().build();

  }


  private CustomJdbcIOWrite() {}


  /**

   * An interface used by {@link CustomJdbcIOWrite.Read} for converting each row of the {@link ResultSet} into

   * an element of the resulting {@link PCollection}.

   */

  public interface RowMapper<T> extends Serializable {

    T mapRow(ResultSet resultSet) throws Exception;

  }


  /**

   * A POJO describing a {@link DataSource}, either providing directly a {@link DataSource} or all

   * properties allowing to create a {@link DataSource}.

   */

  @AutoValue

  public abstract static class DataSourceConfiguration implements Serializable {

    @Nullable abstract String getDriverClassName();

    @Nullable abstract String getUrl();

    @Nullable abstract String getUsername();

    @Nullable abstract String getPassword();

    @Nullable abstract String getConnectionProperties();

    @Nullable abstract DataSource getDataSource();


    abstract Builder builder();


    @AutoValue.Builder

    abstract static class Builder {

      abstract Builder setDriverClassName(String driverClassName);

      abstract Builder setUrl(String url);

      abstract Builder setUsername(String username);

      abstract Builder setPassword(String password);

      abstract Builder setConnectionProperties(String connectionProperties);

      abstract Builder setDataSource(DataSource dataSource);

      abstract DataSourceConfiguration build();

    }


    public static DataSourceConfiguration create(DataSource dataSource) {

      checkArgument(dataSource != null, "DataSourceConfiguration.create(dataSource) called with "

          + "null data source");

      checkArgument(dataSource instanceof Serializable,

          "DataSourceConfiguration.create(dataSource) called with a dataSource not Serializable");

      return new AutoValue_JdbcIO_DataSourceConfiguration.Builder()

          .setDataSource(dataSource)

          .build();

    }


    public static DataSourceConfiguration create(String driverClassName, String url) {

      checkArgument(driverClassName != null,

          "DataSourceConfiguration.create(driverClassName, url) called with null driverClassName");

      checkArgument(url != null,

          "DataSourceConfiguration.create(driverClassName, url) called with null url");

      return new AutoValue_JdbcIO_DataSourceConfiguration.Builder()

          .setDriverClassName(driverClassName)

          .setUrl(url)

          .build();

    }


    public DataSourceConfiguration withUsername(String username) {

      return builder().setUsername(username).build();

    }


    public DataSourceConfiguration withPassword(String password) {

      return builder().setPassword(password).build();

    }


    /**

     * Sets the connection properties passed to driver.connect(...).

     * Format of the string must be [propertyName=property;]*

     *

     * <p>NOTE - The "user" and "password" properties can be add via {@link #withUsername(String)},

     * {@link #withPassword(String)}, so they do not need to be included here.

     */

    public DataSourceConfiguration withConnectionProperties(String connectionProperties) {

      checkArgument(connectionProperties != null, "DataSourceConfiguration.create(driver, url)"

          + ".withConnectionProperties(connectionProperties) "

          + "called with null connectionProperties");

      return builder().setConnectionProperties(connectionProperties).build();

    }


    private void populateDisplayData(DisplayData.Builder builder) {

      if (getDataSource() != null) {

        builder.addIfNotNull(DisplayData.item("dataSource", getDataSource().getClass().getName()));

      } else {

        builder.addIfNotNull(DisplayData.item("jdbcDriverClassName", getDriverClassName()));

        builder.addIfNotNull(DisplayData.item("jdbcUrl", getUrl()));

        builder.addIfNotNull(DisplayData.item("username", getUsername()));

      }

    }


    DataSource buildDatasource() throws Exception{

      if (getDataSource() != null) {

        return getDataSource();

      } else {

        BasicDataSource basicDataSource = new BasicDataSource();

        basicDataSource.setDriverClassName(getDriverClassName());

        basicDataSource.setUrl(getUrl());

        basicDataSource.setUsername(getUsername());

        basicDataSource.setPassword(getPassword());

        if (getConnectionProperties() != null) {

          basicDataSource.setConnectionProperties(getConnectionProperties());

        }

        return basicDataSource;

      }

    }


  }


  /**

   * An interface used by the JdbcIO Write to set the parameters of the {@link PreparedStatement}

   * used to setParameters into the database.

   */

  public interface StatementPreparator extends Serializable {

    void setParameters(PreparedStatement preparedStatement) throws Exception;

  }


  public interface PreparedStatementSetter<T> extends Serializable {

    void setParameters(T element, PreparedStatement preparedStatement) throws Exception;

  }


  /** A {@link PTransform} to write to a JDBC datasource. */

  @AutoValue

  public abstract static class Write<T> extends PTransform<PCollection<T>, PCollectionTuple> {

  final TupleTag<Stock> tupletagTest = new TupleTag<Stock>() {} ;

    @Nullable abstract DataSourceConfiguration getDataSourceConfiguration();

    @Nullable abstract String getStatement();

    @Nullable abstract PreparedStatementSetter<T> getPreparedStatementSetter();

    @Nullable

abstract TupleTag<T> getValidTupleTag();


@Nullable

abstract TupleTag<T> getInvalidTupleTag();


    abstract Builder<T> toBuilder();


    @AutoValue.Builder

    abstract static class Builder<T> {

      abstract Builder<T> setDataSourceConfiguration(DataSourceConfiguration config);

      abstract Builder<T> setStatement(String statement);

      abstract Builder<T> setPreparedStatementSetter(PreparedStatementSetter<T> setter);

      abstract Builder<T> setValidTupleTag(TupleTag<T> validtag);

      abstract Builder<T> setInvalidTupleTag(TupleTag<T> inValidtag);


      abstract Write<T> build();

    }


    public Write<T> withDataSourceConfiguration(DataSourceConfiguration config) {

      return toBuilder().setDataSourceConfiguration(config).build();

    }

    public Write<T> withStatement(String statement) {

      return toBuilder().setStatement(statement).build();

    }

    public Write<T> withPreparedStatementSetter(PreparedStatementSetter<T> setter) {

      return toBuilder().setPreparedStatementSetter(setter).build();

    }

    public Write<T> withValidTag(TupleTag<T> validtag) {

      return toBuilder().setValidTupleTag(validtag).build();

    }

    public Write<T> withInValidTag(TupleTag<T> inValidtag) {

        return toBuilder().setInvalidTupleTag(inValidtag).build();

    }


    @Override

    public PCollectionTuple expand(PCollection<T> input) {

    return input.apply(ParDo.of(new WriteFn<T>(this)).

    withOutputTags(this.getValidTupleTag(), TupleTagList.of(this.getInvalidTupleTag()).and(tupletagTest)));

    }


    @Override

    public void validate(PipelineOptions options) {

      checkArgument(getDataSourceConfiguration() != null,

          "JdbcIO.write() requires a configuration to be set via "

              + ".withDataSourceConfiguration(configuration)");

      checkArgument(getStatement() != null,

          "JdbcIO.write() requires a statement to be set via .withStatement(statement)");

      checkArgument(getPreparedStatementSetter() != null,

          "JdbcIO.write() requires a preparedStatementSetter to be set via "

              + ".withPreparedStatementSetter(preparedStatementSetter)");

    }


    private static class WriteFn<T> extends DoFn<T, T> {

      private static final int DEFAULT_BATCH_SIZE = 1;


      private final Write<T> spec;


      private DataSource dataSource;

      private Connection connection;

      private PreparedStatement preparedStatement;

      private TupleTag<T> validTupleTag;

      private TupleTag<T> inValidTupleTag;

      private int batchCount;


      public WriteFn(Write<T> spec) {

        this.spec = spec;

      }


      @Setup

      public void setup() throws Exception {

        dataSource = spec.getDataSourceConfiguration().buildDatasource();

        connection = dataSource.getConnection();

        connection.setAutoCommit(false);

        preparedStatement = connection.prepareStatement(spec.getStatement());

        validTupleTag = spec.getValidTupleTag();

        inValidTupleTag = spec.getInvalidTupleTag();

      }


      @StartBundle

      public void startBundle() {

        batchCount = 0;

      }

      

      @ProcessElement

      public void processElement(@Element T record, MultiOutputReceiver out) throws Exception {

        preparedStatement.clearParameters();

        spec.getPreparedStatementSetter().setParameters(record, preparedStatement);

        preparedStatement.addBatch();


        batchCount++;


        if (batchCount >= DEFAULT_BATCH_SIZE) {

        if (batchCount > 0) {

                try {

      preparedStatement.executeBatch();

      connection.commit();

      out.get(validTupleTag).output(record);

      } catch (SQLException e1) {

      //TODO add logger

      out.get(inValidTupleTag).output(record);

      }

                batchCount = 0;

              }

        }

      }


      @FinishBundle

      public void finishBundle() throws Exception {

        executeBatch();

      }


      private void executeBatch() {

        

      }


      @Teardown

      public void teardown() throws Exception {

        try {

          if (preparedStatement != null) {

            preparedStatement.close();

          }

        } finally {

          if (connection != null) {

            connection.close();

          }

          if (dataSource instanceof AutoCloseable) {

            ((AutoCloseable) dataSource).close();

          }

        }

      }

    }

  }

}


 GIT:  https://github.com/ranesaci/apachebeam


No comments:

Post a Comment

Extract error records while inserting into db table using JDBCIO apache beam in java

 I was inserting data into postgres db using apache beam pipeline. it works perfectly with JdbcIO write of apache beam library. But, now, i ...