Streamline Data Storage with Apache Beam’s Built-in and Custom I/O Connectors

Apache Beam I/O connectors provide read and write transforms for the most popular data storage systems. Apache Beam provides various built-in I/O connectors for services like PubSub, GCS Bucket, BigQuery, Snowflake, JDBC, MySQL, etc. We can create custom I/O Connectors that need to connect to a data store that isn’t supported by the built-in I/O connector.

The following guide helps connect with different data storages using Apache Beam I/O Connectors in Java.

SFTP I/O Connector

Secure File Transfer Protocol (SFTP) is a custom I/O connector. Custom connectors are created using PTransform. A connector usually consists of a source and a sink.

For bounded (batch) sources, Use Splittable DoFn or ParDo and GroupByKey.

For sinks, Use ParDo or ParDo and GroupByKey to develop complex sinks.

We have used JSch to connect to SFTP Server. JSch is the Java implementation of SSH2 that allows us to connect to the SFTP Server.

Add maven dependency as follows in pom.xml

<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<version>0.1.55</version>
</dependency>

Reading Data from SFTP

try{
byte[] username = this.username;
byte[] password= this.password;
String host = this.host;

JSch jsch = new JSch();
jschSession = jsch.getSession(username, host, 22);

Properties config = new Properties(); config.put(“StrictHostKeyChecking”, “no”); jschSession.setConfig(config); jschSession.setPassword(password);
jschSession.connect();

Channel sftp = jschSession.openChannel(“sftp”); sftp.connect();
ChannelSftp channelsftp = (Channelsftp) sftp; InputStream stream = sftpChannel.get(PATH_OF_FILE);
try {
BufferedReader br = new BufferedReader (new InputStreamReader(stream));
String Line:
while ((line = br.readLine()) != null) {
//Do Something
}
} catch (Exception e) {
log.error(“Exception occurred while reading file from SFTP server + e.getMessage());
}
}catch (JSchException| SftpException e) {
log.error(“Into Exception”);
}finally {
if (jschSession != null) { jschSession.disconnect();
}
sftp.disconnect(); channelsftp.exit();
br.close();
}

Writing Data to SFTP

String sftpInput = context.element();
String sftpData = fileHeader+”\n”+sftpInput;
try{
byte[] username = this.username;
byte[] password= this.password;
String host = this.host;

JSch jsch = new JSch();
jschSession = jsch.getSession(username, host, 22);

Properties config = new Properties();
config.put(“StrictHostKeyChecking”, “no”);
jschSession.setConfig(config);
jschSession.setPassword(password);
jschSession.connect();

Channel sftp = jschsession.openChannel(“sftp”);
sftp.connect();

ChannelSftp channelsftp = (Channelsftp) sftp;
InputStream stream = sftpChannel.get(PATH_OF_FILE);

InputStream inputStream = new ByteArrayInputStream(sftpData.getBytes (StandardCharsets.UTF_8));
channelsftp.ca(sftpPath);
channelsftp.put(inputStream, filename+currentUtcDateTime+”.csv”);
}catch (JSchException | SftpException e) {
log.error(“Into Exception”);

finally {
if (jschSession != null) {
jschSession.disconnect();
log.info(“Disconnecting jschsession when not null”);
}

sftp.disconnect();
channelsftp.exit();

br.close();

JMS I/O Connector

Java Message Service(JMS) is a Java API that allows applications to create, send, receive, and read messages using reliable, asynchronous, loosely coupled communication.

Add the maven dependency as follows:

<dependency>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
<version>1.1</version>
</dependency>

Reading data from a JMS Destination

pipeline.apply(org.apache.beam.sdk.io.jms.Jms10.read()
.withConnectionFactory (myConnectionFactory)
.withQueue(“my-queue”)

Writing to a JMS destination

pipeline.apply(org.apache.beam.sdk.io.jms.Jms10.read()
.withConnectionFactory (myConnectionFactory)
.withQueue(“my-queue”)

Spanner I/O Connector

Spanner is a distributed, globally scalable relational database management service that decouples compute from storage, which makes it possible to scale processing resources separately from storage and it is developed by Google.

Add the maven dependency as follows

<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-spanner</artifactId>
<version>6.35.2</version>
</dependency>

Reading data from a Spanner Database

pipeline
.apply(org.apache.beam.sdk.io.gcp.spanner. Spanner10.read() .withInstanceId (instanceId)
.withDatabaseId(dbId)
.withQuery(“my-query”))

Writing data to a Spanner Database

The Cloud Spanner SpannerIO.Write transform writes to Cloud Spanner by executing a collection of input row Mutations. The mutations are grouped into batches for efficiency.

mutations.apply(
“Write”, org.apache.beam.sdk.io.gcp.spanner. Spanner10.write()
.withInstanceId (instanceId)
.withDatabaseId(dbId))

Snowflake I/O Connector

Snowflake is a fully managed SaaS (software as a service) that provides a single platform for data warehousing, data lakes, data engineering, data science, data application development, and secure sharing and consumption of real-time/shared data.

Add the maven dependency as follows:

<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-jdbc</artifactId>
<version>3.13.25</version>
</dependency>

Reading data from a Snowflake Table

pipeline.apply(
org.apache.beam.sdk.io.snowflake. Snowflake10.<GenericRecord>read()
.withDataSourceConfiguration (dataSourceConfiguration)
.fromQuery(QUERY)
.withStagingBucketName (BUCKET)
.withStorage IntegrationName (STORAGE INTEGRATION_NAME)
.withCsvMapper (MAPPER)
.withCoder (CODER));

Writing data to a Snowflake Table

items.apply(
org.apache.beam.sdk.io.snowflake. Snowflake10.<KV<Integer, .withDataSourceConfiguration (dataSourceConfiguration) String>>write()
.withTable(table)
.withStagingBucketName (BUCKET)
.withStorage IntegrationName (STORAGE INTEGRATION_NAME)
.withUserDataMapper (MAPPER);

Cassandra I/O Connector

Cassandra is a free and open-source, distributed, wide-column store, NoSQL database management system designed to handle large amounts of data across many commodity servers, providing high availability with no single point of failure.

Add the maven dependency as follows:

<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>4.1.0</version>
</dependency>

Add the maven dependency as follows:

<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>4.1.0</version>
</dependency>

Reading data from a Cassandra Database

A keyspace in Cassandra is a namespace that defines data replication on nodes. A cluster contains one keyspace per node.withKeyspace specifies the Cassandra keyspace where to read data.

pipeline.apply(
org.apache.beam.sdk.io.cassandra.Cassandra10.<Person>read()
.withHosts (Arrays.asList(“host1”, “host2”))
.withPort (9042)
.withkeyspace(“beam”)
.withTable (TABLE)
.withEntity (Person.class)
.withCoder (SerializableCoder.of (Person.class))

Writing data to a Cassandra Database

pipeline.apply(…) // provides a PCollection<Person> where Person is an entity
.apply(org.apache.beam.sdk.io.cassandra.Cassandra10.<Person>write()
.withHosts (Arrays.asList(“host1”, “host2”))
.withPort (9042)
.withkeyspace(“beam”)
.withEntity (Person.class));

Conclusion

Apache Beam I/O connectors provide read and write transforms for the most popular data storage systems so that Beam users can benefit from native optimized connectivity. With the available I/Os, Apache Beam pipelines can read and write data from and to an external storage type in a unified and distributed way, simplifying complexity for the user. To better understand your data and leverage Apache Beam for your business needs, contact SpringML at info@springml.com.

Thought Leadership