Uploading data to Cloud Datastore using Dataflow

In one of my previous articles, I was uploading 1.2 million of records from csv file to Cloud Datastore. I was reading file and making list of 200 parsed lines and post that as json sequentially to webserver (here is full script). Of course that's rather slow, i.e. process took some 2.5 hours to complete. Only afterwards I remembered that there is a faster way and that is by using Cloud Dataflow, so just for fun I wrote pipeline to upload data from csv file into Datastore.

Dataflow is serverless service of Google Cloud Platform which runs data processing with Apache Beam framework. It means that you just need to write data processing pipeline and define few settings and that's all. It has (among others) good integration with Google Cloud products like Cloud Storage, BigQuery, BigTable, Datastore, Pub/Sub. Dataflow will create virtual machines which will execute job and after job is completed it will shutdown everything. Easy peasy. I created Github repository with full code (Python) https://github.com/zdenulo/upload-data-datastore-dataflow and I will explain how it's done.

most important file is upload.py where everything is defined. Note: Apache Beam for Python supports only Python 2.7 (not 3.x) at the moment.

import csv
import datetime

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
from google.cloud.proto.datastore.v1 import entity_pb2
from googledatastore import helper as datastore_helper

from settings import PROJECT, BUCKET, INPUT_FILENAME


class CSVtoDict(beam.DoFn):
    """Converts line into dictionary"""
    def process(self, element, headers):
        rec = ""
        element = element.encode('utf-8')
        try:
            for line in csv.reader([element]):
                rec = line

            if len(rec) == len(headers):
                data = {header.strip(): val.strip() for header, val in zip(headers, rec)}
                return [data]
            else:
                print "bad: {}".format(rec)
        except Exception:
            pass


class CreateEntities(beam.DoFn):
    """Creates Datastore entity"""
    def process(self, element):
        entity = entity_pb2.Entity()
        sku = int(element.pop('sku'))
        element['regularPrice'] = float(element['regularPrice'])
        element['salePrice'] = float(element['salePrice'])
        element['name'] = unicode(element['name'].decode('utf-8'))
        element['type'] = unicode(element['type'].decode('utf-8'))
        element['url'] = unicode(element['url'].decode('utf-8'))
        element['image'] = unicode(element['image'].decode('utf-8'))
        element['inStoreAvailability'] = unicode(element['inStoreAvailability'])

        datastore_helper.add_key_path(entity.key, 'Productx', sku)
        datastore_helper.add_properties(entity, element)
        return [entity]


def dataflow(run_local):
    if run_local:
        input_file_path = 'sample.csv'
    else:
        input_file_path = 'gs://' + BUCKET + '/' + INPUT_FILENAME

    JOB_NAME = 'datastore-upload-{}'.format(datetime.datetime.now().strftime('%Y-%m-%d-%H%M%S'))

    pipeline_options = {
        'project': PROJECT,
        'staging_location': 'gs://' + BUCKET + '/staging',
        'runner': 'DataflowRunner',
        'job_name': JOB_NAME,
        'disk_size_gb': 100,
        'temp_location': 'gs://' + BUCKET + '/temp',
        'save_main_session': True
    }

    if run_local:
        pipeline_options['runner'] = 'DirectRunner'

    options = PipelineOptions.from_dictionary(pipeline_options)
    with beam.Pipeline(options=options) as p:

        (p | 'Reading input file' >> beam.io.ReadFromText(input_file_path)
         | 'Converting from csv to dict' >> beam.ParDo(CSVtoDict(),
                                                       ['sku', 'name', 'regularPrice', 'salePrice', 'type', 'url', 'image',
                                                        'inStoreAvailability'])
         | 'Create entities' >> beam.ParDo(CreateEntities())
         | 'Write entities into Datastore' >> WriteToDatastore(PROJECT)
         )


if __name__ == '__main__':
    run_locally = False
    dataflow(run_locally)

dataflow function contains main part where options are defined as well as pipeline which does the job. It consists of 4 steps:

  1. Read input file (from Google Cloud Storage bucket)
  2. Based on header column names, parse csv lines into dictionary
  3. Create datastore entities (not saving)
  4. Saving entities into Datastore

There are 2 custom transformations:

  • CSVtoDict - converts csv line into dictionary
  • CreateEntities - based on dictionary create entity - proto buffer message which will be sent to WriteToDatastore transform

with run_locally variable you can define if you want to run pipeline locally or with Dataflow.

Before that apache beam library needs to be installed and job can be executed with command:

python upload.py

Here are some screenshots:

This is how pipeline looks in Dataflow UI

Dataflow 

And some stats.

Dataflow

As it can be seen whole job took 16.5 minutes (which includes also provisioning of VM). On graph is displayed number of workers (VM) which were processing data and it was increasing with time and at peak (with 60 workers) before job was done. On bellow image are displayed consumed resources. With a bit effort, it can be calculated that this execution cost ~0.5$ plus Datastore costs.

Dataflow

So there you have it, with few lines of code it's possible to write date pipeline to load data from csv file to Datastore. Everything else is managed by Google Cloud. There is actually template which uploads text from Cloud Storage to Cloud Datastore but in that case every line should be encoded as json and based on it, Datastore entity will be created.

blog comments powered by Disqus