I was inserting data into postgres db using apache beam pipeline. it works perfectly with JdbcIO write of apache beam library. But, now, i want to extract those records which are not able to insert in DB. this unsuccessful insertion can be anything like SqlException or db constraint violation.
I have gone through apache beam programming guide documentation but not able to any pre defined transform which give me unsuccessful inserts.
Then, I copied, JdbcIO class write functionality and modified executeBatch functionality under WriteFn. also, I added TupleTags for successful and non-successful insert records which classify incoming pipeline records and and then we can get failed records by providing given TupleTag.
Below is the code:
BeamTest.java
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
/**
* @author sachin
* @date 18-Nov-2021
*/
public class BeamTest {
static List<Stock> stocks = new ArrayList<>();
public static void main(String[] args) {
System.setProperty("java.specification.version", "1.8");
process();
// read();
}
public static void process() {
final TupleTag<Stock> VALID = new TupleTag<Stock>() {
};
final TupleTag<Stock> INVALID = new TupleTag<Stock>() {
};
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(FlinkRunner.class);
Pipeline p = Pipeline.create(options);
// Preparing dummy data
Collection<Stock> stockList = Arrays.asList(
new Stock("AAP", 2000, "Apple Inc"),
new Stock("MSF", 3000, "Microsoft Corporation"),
new Stock("NVDA", 4000, "NVIDIA Corporation"),
new Stock("INT", 3200, "Intel Corporation"));
// Reading dummy data and save it into PCollection<Stock>
PCollection<Stock> data = p.apply(Create.of(stockList).withCoder(SerializableCoder.of(Stock.class)));
// insert
PCollectionTuple pCollectionTupleResult = data.apply("write", CustomJdbcIOWrite.<Stock>write()
.withDataSourceConfiguration(CustomJdbcIOWrite.DataSourceConfiguration
.create("org.postgresql.Driver", "jdbc:postgresql://localhost:5432/postgres")
.withUsername("postgres").withPassword("sachin"))
.withStatement("insert into stocks values(?, ?, ?)").withValidTag(VALID).withInValidTag(INVALID)
.withPreparedStatementSetter(new CustomJdbcIOWrite.PreparedStatementSetter<Stock>() {
private static final long serialVersionUID = 1L;
public void setParameters(Stock element, PreparedStatement query) throws SQLException {
query.setString(1, element.getSymbol());
query.setLong(2, element.getPrice());
query.setString(3, element.getCompany());
}
}));
// get failed PCollection using INVALID tupletag
PCollection<Stock> failedPcollection = pCollectionTupleResult.get(INVALID)
.setCoder(SerializableCoder.of(Stock.class));
failedPcollection.apply(ParDo.of(new DoFn<Stock, Stock>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void process(ProcessContext pc) {
System.out.println("Failed pCollection element:" + pc.element().getCompany());
}
}));
//get failed PCollection using INVALID tupletag
PCollection<Stock> insertedPcollection = pCollectionTupleResult.get(VALID)
.setCoder(SerializableCoder.of(Stock.class));
insertedPcollection.apply(ParDo.of(new DoFn<Stock, Stock>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void process(ProcessContext pc) {
System.out.println("Inserted pCollection element:" + pc.element().getCompany());
}
}));
// run pipeline
State state = p.run().waitUntilFinish();
System.out.println("Data inserted successfully with state : " + state);
}
}
In output, We can see, from PCollectionTuple, we get failed and success record by providing given TupleTag.
OUTPUT:
Inserted pCollection element:Microsoft Corporation
Failed pCollection element:NVIDIA Corporation
Inserted pCollection element:Intel Corporation
Inserted pCollection element:Apple Inc
Data inserted successfully with state : DONE
CustomJdbcIOWrite.Java
import static com.google.common.base.Preconditions.checkArgument;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import javax.annotation.Nullable;
import javax.sql.DataSource;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.commons.dbcp2.BasicDataSource;
import com.google.auto.value.AutoValue;
@Experimental
public class CustomJdbcIOWrite {
/**
* Write data to a JDBC datasource.
*
* @param <T> Type of the data to be written.
*/
public static <T> Write<T> write() {
return new AutoValue_JdbcIO_Write.Builder<T>().build();
}
private CustomJdbcIOWrite() {}
/**
* An interface used by {@link CustomJdbcIOWrite.Read} for converting each row of the {@link ResultSet} into
* an element of the resulting {@link PCollection}.
*/
public interface RowMapper<T> extends Serializable {
T mapRow(ResultSet resultSet) throws Exception;
}
/**
* A POJO describing a {@link DataSource}, either providing directly a {@link DataSource} or all
* properties allowing to create a {@link DataSource}.
*/
@AutoValue
public abstract static class DataSourceConfiguration implements Serializable {
@Nullable abstract String getDriverClassName();
@Nullable abstract String getUrl();
@Nullable abstract String getUsername();
@Nullable abstract String getPassword();
@Nullable abstract String getConnectionProperties();
@Nullable abstract DataSource getDataSource();
abstract Builder builder();
@AutoValue.Builder
abstract static class Builder {
abstract Builder setDriverClassName(String driverClassName);
abstract Builder setUrl(String url);
abstract Builder setUsername(String username);
abstract Builder setPassword(String password);
abstract Builder setConnectionProperties(String connectionProperties);
abstract Builder setDataSource(DataSource dataSource);
abstract DataSourceConfiguration build();
}
public static DataSourceConfiguration create(DataSource dataSource) {
checkArgument(dataSource != null, "DataSourceConfiguration.create(dataSource) called with "
+ "null data source");
checkArgument(dataSource instanceof Serializable,
"DataSourceConfiguration.create(dataSource) called with a dataSource not Serializable");
return new AutoValue_JdbcIO_DataSourceConfiguration.Builder()
.setDataSource(dataSource)
.build();
}
public static DataSourceConfiguration create(String driverClassName, String url) {
checkArgument(driverClassName != null,
"DataSourceConfiguration.create(driverClassName, url) called with null driverClassName");
checkArgument(url != null,
"DataSourceConfiguration.create(driverClassName, url) called with null url");
return new AutoValue_JdbcIO_DataSourceConfiguration.Builder()
.setDriverClassName(driverClassName)
.setUrl(url)
.build();
}
public DataSourceConfiguration withUsername(String username) {
return builder().setUsername(username).build();
}
public DataSourceConfiguration withPassword(String password) {
return builder().setPassword(password).build();
}
/**
* Sets the connection properties passed to driver.connect(...).
* Format of the string must be [propertyName=property;]*
*
* <p>NOTE - The "user" and "password" properties can be add via {@link #withUsername(String)},
* {@link #withPassword(String)}, so they do not need to be included here.
*/
public DataSourceConfiguration withConnectionProperties(String connectionProperties) {
checkArgument(connectionProperties != null, "DataSourceConfiguration.create(driver, url)"
+ ".withConnectionProperties(connectionProperties) "
+ "called with null connectionProperties");
return builder().setConnectionProperties(connectionProperties).build();
}
private void populateDisplayData(DisplayData.Builder builder) {
if (getDataSource() != null) {
builder.addIfNotNull(DisplayData.item("dataSource", getDataSource().getClass().getName()));
} else {
builder.addIfNotNull(DisplayData.item("jdbcDriverClassName", getDriverClassName()));
builder.addIfNotNull(DisplayData.item("jdbcUrl", getUrl()));
builder.addIfNotNull(DisplayData.item("username", getUsername()));
}
}
DataSource buildDatasource() throws Exception{
if (getDataSource() != null) {
return getDataSource();
} else {
BasicDataSource basicDataSource = new BasicDataSource();
basicDataSource.setDriverClassName(getDriverClassName());
basicDataSource.setUrl(getUrl());
basicDataSource.setUsername(getUsername());
basicDataSource.setPassword(getPassword());
if (getConnectionProperties() != null) {
basicDataSource.setConnectionProperties(getConnectionProperties());
}
return basicDataSource;
}
}
}
/**
* An interface used by the JdbcIO Write to set the parameters of the {@link PreparedStatement}
* used to setParameters into the database.
*/
public interface StatementPreparator extends Serializable {
void setParameters(PreparedStatement preparedStatement) throws Exception;
}
public interface PreparedStatementSetter<T> extends Serializable {
void setParameters(T element, PreparedStatement preparedStatement) throws Exception;
}
/** A {@link PTransform} to write to a JDBC datasource. */
@AutoValue
public abstract static class Write<T> extends PTransform<PCollection<T>, PCollectionTuple> {
final TupleTag<Stock> tupletagTest = new TupleTag<Stock>() {} ;
@Nullable abstract DataSourceConfiguration getDataSourceConfiguration();
@Nullable abstract String getStatement();
@Nullable abstract PreparedStatementSetter<T> getPreparedStatementSetter();
@Nullable
abstract TupleTag<T> getValidTupleTag();
@Nullable
abstract TupleTag<T> getInvalidTupleTag();
abstract Builder<T> toBuilder();
@AutoValue.Builder
abstract static class Builder<T> {
abstract Builder<T> setDataSourceConfiguration(DataSourceConfiguration config);
abstract Builder<T> setStatement(String statement);
abstract Builder<T> setPreparedStatementSetter(PreparedStatementSetter<T> setter);
abstract Builder<T> setValidTupleTag(TupleTag<T> validtag);
abstract Builder<T> setInvalidTupleTag(TupleTag<T> inValidtag);
abstract Write<T> build();
}
public Write<T> withDataSourceConfiguration(DataSourceConfiguration config) {
return toBuilder().setDataSourceConfiguration(config).build();
}
public Write<T> withStatement(String statement) {
return toBuilder().setStatement(statement).build();
}
public Write<T> withPreparedStatementSetter(PreparedStatementSetter<T> setter) {
return toBuilder().setPreparedStatementSetter(setter).build();
}
public Write<T> withValidTag(TupleTag<T> validtag) {
return toBuilder().setValidTupleTag(validtag).build();
}
public Write<T> withInValidTag(TupleTag<T> inValidtag) {
return toBuilder().setInvalidTupleTag(inValidtag).build();
}
@Override
public PCollectionTuple expand(PCollection<T> input) {
return input.apply(ParDo.of(new WriteFn<T>(this)).
withOutputTags(this.getValidTupleTag(), TupleTagList.of(this.getInvalidTupleTag()).and(tupletagTest)));
}
@Override
public void validate(PipelineOptions options) {
checkArgument(getDataSourceConfiguration() != null,
"JdbcIO.write() requires a configuration to be set via "
+ ".withDataSourceConfiguration(configuration)");
checkArgument(getStatement() != null,
"JdbcIO.write() requires a statement to be set via .withStatement(statement)");
checkArgument(getPreparedStatementSetter() != null,
"JdbcIO.write() requires a preparedStatementSetter to be set via "
+ ".withPreparedStatementSetter(preparedStatementSetter)");
}
private static class WriteFn<T> extends DoFn<T, T> {
private static final int DEFAULT_BATCH_SIZE = 1;
private final Write<T> spec;
private DataSource dataSource;
private Connection connection;
private PreparedStatement preparedStatement;
private TupleTag<T> validTupleTag;
private TupleTag<T> inValidTupleTag;
private int batchCount;
public WriteFn(Write<T> spec) {
this.spec = spec;
}
@Setup
public void setup() throws Exception {
dataSource = spec.getDataSourceConfiguration().buildDatasource();
connection = dataSource.getConnection();
connection.setAutoCommit(false);
preparedStatement = connection.prepareStatement(spec.getStatement());
validTupleTag = spec.getValidTupleTag();
inValidTupleTag = spec.getInvalidTupleTag();
}
@StartBundle
public void startBundle() {
batchCount = 0;
}
@ProcessElement
public void processElement(@Element T record, MultiOutputReceiver out) throws Exception {
preparedStatement.clearParameters();
spec.getPreparedStatementSetter().setParameters(record, preparedStatement);
preparedStatement.addBatch();
batchCount++;
if (batchCount >= DEFAULT_BATCH_SIZE) {
if (batchCount > 0) {
try {
preparedStatement.executeBatch();
connection.commit();
out.get(validTupleTag).output(record);
} catch (SQLException e1) {
//TODO add logger
out.get(inValidTupleTag).output(record);
}
batchCount = 0;
}
}
}
@FinishBundle
public void finishBundle() throws Exception {
executeBatch();
}
private void executeBatch() {
}
@Teardown
public void teardown() throws Exception {
try {
if (preparedStatement != null) {
preparedStatement.close();
}
} finally {
if (connection != null) {
connection.close();
}
if (dataSource instanceof AutoCloseable) {
((AutoCloseable) dataSource).close();
}
}
}
}
}
}
GIT: https://github.com/ranesaci/apachebeam