Google Cloud Spanner – Batch Uploading and Event Publisher

In this blog, we are going to demonstrate how to perform batch upload from On-prem transactional database to Cloud Spanner and how to spin off an Event Publisher that publishes any update in the database to a GCP pubsub topic.

This was a part of the solution we have provided to one of our major Retail Customers.

1. Batch Upload from On-Prem MySql database to Cloud Spanner

To perform data migration to the Spanner database, we will take the help of Google Dataflow, which is an ETL runner in GCP, based on the Apache Beam framework.

Architecture:

Architecture Batch Upload from On-Prem MySql database to Cloud Spanner

Step 1:

We need to get the schema from the MySql table. Say for example we have the following table in MySql:

Step 1 Batch Upload from On-Prem MySql database to Cloud Spanner

We will keep the schema details handy. It will be needed at a later stage.

Step 2:

Create a Spanner instance in GCP. We can use the gcp console, gcloud command, or client libraries to do that.

Step 2 Create a Spanner instance in GCP

Step 3:

Create a database in Spanner Instance

Step 3 Create a database in Spanner Instance

Step 4:

Create a table with schema details noted in step 1.

Step 4 Create a table with schema details

Step 5:

Now that we have our spanner set up done, We need to start a dataflow pipeline that reads from the Mysql database and puts data in the Spanner table. We can write dataflow code in java/python. We have written this job in java. It looks somewhat like below.

Step 4 Create a table with schema details

Reading from DB:

PCollection<List<String>> rows= pipeline.apply(“Read From Db”,JdbcIO.<List<String>>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
“com.mysql.jdbc.Driver”, “jdbc:mysql://8.tcp.ngrok.io:12887/demo_database”)
.withUsername(“root”)
.withPassword(“password”))
.withQuery(“select * from demo_table”)
.withCoder(ListCoder.of(StringUtf8Coder.of()))
.withRowMapper(new JdbcIO.RowMapper<List<String>>() {
public List<String> mapRow(ResultSet resultSet) throws Exception {
List<String> addRow = new ArrayList<String>();
for(int i=1; i<= resultSet.getMetaData().getColumnCount(); i++ )
{
addRow.add(i-1, String.valueOf(resultSet.getObject(i)));
}
return addRow;
}
})
);

Writing to Spanner:

mutations.apply(
    “Insert to Spanner”,
SpannerIO.write().withInstanceId(options.getSpannerInstanceName())
.withDatabaseId(options.getSpannerDatabaseName()));

When we run the pipeline code, it spins off the dataflow job in the GCP

Dataflow job in the GCP Google Cloud Spanner

This will batch upload the data from DB to Spanner.

Learn more about Pooling data from MySQL to Google Cloud Spanner Using the Dataflow Pipeline by navigating our Video Resource,

Pooling data from MySQL to Google Cloud Spanner Using the Dataflow Pipeline

2. Event Publisher:

Let us say, we have our analytics team that requires the same version of data from the transactional database to run analytics and create dashboards. For that, they have a Bigquery Table. So whenever there is a change in the database, that has to be reflected in BigQuery.
We need to create a poller that polls for any changes in the database and whenever there is a change in the database, it will be passed to the pubsub bus, which can then be used as a BigQuery insert.

We have implemented the same.

Learn more about Updating Data in Google Cloud Spanner Using Pollers by navigating our Video Resource,

Updating Data in Google Cloud Spanner Using Pollers

Architecture Batch Upload and Event Listener

The above solution can be leveraged by businesses that require a fully managed cloud-based transactional database with global availability and help in performing data analytics on the current state of data.

Thought Leadership