Apache Beam I/O connectors provide read and write transforms for the most popular data storage systems. Apache Beam provides various built-in I/O connectors for services like PubSub, GCS Bucket, BigQuery, Snowflake, JDBC, MySQL, etc. We can create custom I/O Connectors that need to connect to a data store that isn’t supported by the built-in I/O connector.
The following guide helps connect with different data storages using Apache Beam I/O Connectors in Java.
SFTP I/O Connector
Secure File Transfer Protocol (SFTP) is a custom I/O connector. Custom connectors are created using PTransform. A connector usually consists of a source and a sink.
For bounded (batch) sources, Use Splittable DoFn or ParDo and GroupByKey.
For sinks, Use ParDo or ParDo and GroupByKey to develop complex sinks.
We have used JSch to connect to SFTP Server. JSch is the Java implementation of SSH2 that allows us to connect to the SFTP Server.
Add maven dependency as follows in pom.xml
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<version>0.1.55</version>
</dependency>
Reading Data from SFTP
try{
byte[] username = this.username;
byte[] password= this.password;
String host = this.host;
JSch jsch = new JSch();
jschSession = jsch.getSession(username, host, 22);
Properties config = new Properties(); config.put(“StrictHostKeyChecking”, “no”); jschSession.setConfig(config); jschSession.setPassword(password);
jschSession.connect();
Channel sftp = jschSession.openChannel(“sftp”); sftp.connect();
ChannelSftp channelsftp = (Channelsftp) sftp; InputStream stream = sftpChannel.get(PATH_OF_FILE);
try {
BufferedReader br = new BufferedReader (new InputStreamReader(stream));
String Line:
while ((line = br.readLine()) != null) {
//Do Something
}
} catch (Exception e) {
log.error(“Exception occurred while reading file from SFTP server + e.getMessage());
}
}catch (JSchException| SftpException e) {
log.error(“Into Exception”);
}finally {
if (jschSession != null) { jschSession.disconnect();
}
sftp.disconnect(); channelsftp.exit();
br.close();
}
Writing Data to SFTP
String sftpInput = context.element();
String sftpData = fileHeader+”\n”+sftpInput;
try{
byte[] username = this.username;
byte[] password= this.password;
String host = this.host;
JSch jsch = new JSch();
jschSession = jsch.getSession(username, host, 22);
Properties config = new Properties();
config.put(“StrictHostKeyChecking”, “no”);
jschSession.setConfig(config);
jschSession.setPassword(password);
jschSession.connect();
Channel sftp = jschsession.openChannel(“sftp”);
sftp.connect();
ChannelSftp channelsftp = (Channelsftp) sftp;
InputStream stream = sftpChannel.get(PATH_OF_FILE);
InputStream inputStream = new ByteArrayInputStream(sftpData.getBytes (StandardCharsets.UTF_8));
channelsftp.ca(sftpPath);
channelsftp.put(inputStream, filename+currentUtcDateTime+”.csv”);
}catch (JSchException | SftpException e) {
log.error(“Into Exception”);
finally {
if (jschSession != null) {
jschSession.disconnect();
log.info(“Disconnecting jschsession when not null”);
}
sftp.disconnect();
channelsftp.exit();
br.close();
JMS I/O Connector
Java Message Service(JMS) is a Java API that allows applications to create, send, receive, and read messages using reliable, asynchronous, loosely coupled communication.
Add the maven dependency as follows:
<dependency>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
<version>1.1</version>
</dependency>
Reading data from a JMS Destination
pipeline.apply(org.apache.beam.sdk.io.jms.Jms10.read()
.withConnectionFactory (myConnectionFactory)
.withQueue(“my-queue”)
Writing to a JMS destination
pipeline.apply(org.apache.beam.sdk.io.jms.Jms10.read()
.withConnectionFactory (myConnectionFactory)
.withQueue(“my-queue”)
Spanner I/O Connector
Spanner is a distributed, globally scalable relational database management service that decouples compute from storage, which makes it possible to scale processing resources separately from storage and it is developed by Google.
Add the maven dependency as follows
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-spanner</artifactId>
<version>6.35.2</version>
</dependency>
Reading data from a Spanner Database
pipeline
.apply(org.apache.beam.sdk.io.gcp.spanner. Spanner10.read() .withInstanceId (instanceId)
.withDatabaseId(dbId)
.withQuery(“my-query”))
Writing data to a Spanner Database
The Cloud Spanner SpannerIO.Write transform writes to Cloud Spanner by executing a collection of input row Mutations. The mutations are grouped into batches for efficiency.
mutations.apply(
“Write”, org.apache.beam.sdk.io.gcp.spanner. Spanner10.write()
.withInstanceId (instanceId)
.withDatabaseId(dbId))
Snowflake I/O Connector
Snowflake is a fully managed SaaS (software as a service) that provides a single platform for data warehousing, data lakes, data engineering, data science, data application development, and secure sharing and consumption of real-time/shared data.
Add the maven dependency as follows:
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-jdbc</artifactId>
<version>3.13.25</version>
</dependency>
Reading data from a Snowflake Table
pipeline.apply(
org.apache.beam.sdk.io.snowflake. Snowflake10.<GenericRecord>read()
.withDataSourceConfiguration (dataSourceConfiguration)
.fromQuery(QUERY)
.withStagingBucketName (BUCKET)
.withStorage IntegrationName (STORAGE INTEGRATION_NAME)
.withCsvMapper (MAPPER)
.withCoder (CODER));
Writing data to a Snowflake Table
items.apply(
org.apache.beam.sdk.io.snowflake. Snowflake10.<KV<Integer, .withDataSourceConfiguration (dataSourceConfiguration) String>>write()
.withTable(table)
.withStagingBucketName (BUCKET)
.withStorage IntegrationName (STORAGE INTEGRATION_NAME)
.withUserDataMapper (MAPPER);
Cassandra I/O Connector
Cassandra is a free and open-source, distributed, wide-column store, NoSQL database management system designed to handle large amounts of data across many commodity servers, providing high availability with no single point of failure.
Add the maven dependency as follows:
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>4.1.0</version>
</dependency>
Add the maven dependency as follows:
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>4.1.0</version>
</dependency>
Reading data from a Cassandra Database
A keyspace in Cassandra is a namespace that defines data replication on nodes. A cluster contains one keyspace per node.withKeyspace specifies the Cassandra keyspace where to read data.
pipeline.apply(
org.apache.beam.sdk.io.cassandra.Cassandra10.<Person>read()
.withHosts (Arrays.asList(“host1”, “host2”))
.withPort (9042)
.withkeyspace(“beam”)
.withTable (TABLE)
.withEntity (Person.class)
.withCoder (SerializableCoder.of (Person.class))
Writing data to a Cassandra Database
pipeline.apply(…) // provides a PCollection<Person> where Person is an entity
.apply(org.apache.beam.sdk.io.cassandra.Cassandra10.<Person>write()
.withHosts (Arrays.asList(“host1”, “host2”))
.withPort (9042)
.withkeyspace(“beam”)
.withEntity (Person.class));
Conclusion
Apache Beam I/O connectors provide read and write transforms for the most popular data storage systems so that Beam users can benefit from native optimized connectivity. With the available I/Os, Apache Beam pipelines can read and write data from and to an external storage type in a unified and distributed way, simplifying complexity for the user. To better understand your data and leverage Apache Beam for your business needs, contact SpringML at info@springml.com.