Pipeline Components and Applications

  1. Home
  2. Docs
  3. Pipeline Components and Applications
  4. Loaders & storage targets
  5. Google Cloud Storage Loader

Google Cloud Storage Loader

Overview

Cloud Storage Loader is a Dataflow job which dumps event from an input PubSub subscription into a Cloud Storage bucket.

Technical details

Cloud Storage loader is built on top of Apache Beam and its Scala wrapper SCIO.

It groups messages from an input PubSub subscription into configurable windows and write them out to Google Cloud Storage.

It also optionally partitions the output by date so that you can easily see what was outputted when in your cloud storage bucket, for example we could see the following hierarchy:

gs://bucket/
--2018
----10
------31
--------18
--------19
--------20

It can also compress the output data. For now, the supported output compressions are:

  • gzip
  • bz2
  • none

Do note, however, that bz2-compressed data cannot be loaded directly into BigQuery.

For now, it only runs on GCP’s Dataflow.

See also:

Setup guide

Configuration

Cloud Storage Loader specific options

  • --inputSubscription=String The Cloud Pub/Sub subscription to read from, formatted as projects/[PROJECT]/subscriptions/[SUB]
  • --outputDirectory=String The Cloud Storage directory to output files to, ends with /
  • --outputFilenamePrefix=String Default: output The Cloud Storage prefix to output files to
  • --shardTemplate=String Default: -W-P-SSSSS-of-NNNNN The shard template which will be part of the filenames
  • --outputFilenameSuffix=String Default: .txt The suffix of the filenames written out
  • --windowDuration=Int Default: 5 The window duration in minutes
  • --compression=String Default: none The compression used (gzip, bz2 or none), bz2 can’t be loaded into BigQuery
  • --numShards=int Default: 1 The maximum number of output shards produced when writing

Dataflow options

To run on Dataflow, Beam Enrich will rely on a set of additional configuration options:

  • --runner=DataFlowRunner which specifies that we want to run on Dataflow
  • --project=[PROJECT], the name of the GCP project
  • --streaming=true to notify Dataflow that we’re running a streaming application
  • --zone=europe-west2-a, the zone where the Dataflow nodes (effectively GCP Compute Engine nodes) will be launched
  • --region=europe-west2, the region where the Dataflow job will be launched
  • --gcpTempLocation=gs://[BUCKET]/, the GCS bucket where temporary files necessary to run the job (e.g. JARs) will be stored

The list of all the options can be found at https://cloud.google.com/dataflow/pipelines/specifying-exec-params#setting-other-cloud-pipeline-options.

Running

Cloud Storage Loader comes as a ZIP archive, a Docker image or a Cloud Dataflow template, feel free to choose the one which fits your use case the most.

Template

You can run Dataflow templates using a variety of means:

  • Using the GCP console
  • Using gcloud
  • Using the REST API

Refer to the documentation on executing templates to know more.

Here, we provide an example using gcloud:

gcloud dataflow jobs run [JOB-NAME] \ --gcs-location gs://sp-hosted-assets/4-storage/snowplow-google-cloud-storage-loader/0.3.1/SnowplowGoogleCloudStorageLoaderTemplate-0.3.1 \ --parameters \ inputSubscription=projects/[PROJECT]/subscriptions/[SUBSCRIPTION],\ outputDirectory=gs://[BUCKET]/YYYY/MM/dd/HH/,\ # partitions by date outputFilenamePrefix=output,\ # optional shardTemplate=-W-P-SSSSS-of-NNNNN,\ # optional outputFilenameSuffix=.txt,\ # optional windowDuration=5,\ # optional, in minutes compression=none,\ # optional, gzip, bz2 or none numShards=1 # optional
Code language: PHP (php)

ZIP archive

You can find the archive hosted on Github.

Once unzipped the artifact can be run as follows:

./bin/snowplow-google-cloud-storage-loader \ --runner=DataFlowRunner \ --project=[PROJECT] \ --streaming=true \ --zone=europe-west2-a \ --inputSubscription=projects/[PROJECT]/subscriptions/[SUBSCRIPTION] \ --outputDirectory=gs://[BUCKET]/YYYY/MM/dd/HH/ \ # partitions by date --outputFilenamePrefix=output \ # optional --shardTemplate=-W-P-SSSSS-of-NNNNN \ # optional --outputFilenameSuffix=.txt \ # optional --windowDuration=5 \ # optional, in minutes --compression=none \ # optional, gzip, bz2 or none --numShards=1 # optional
Code language: PHP (php)

To display the help message:

./bin/snowplow-google-cloud-storage-loader --help

To display documentation about Cloud Storage Loader-specific options:

./bin/snowplow-google-cloud-storage-loader --help=com.snowplowanalytics.storage.googlecloudstorage.loader.Options

Docker image

You can find the image in Docker Hub.

A container can be run as follows:

docker run \ -v $PWD/config:/snowplow/config \ # if running outside GCP -e GOOGLE_APPLICATION_CREDENTIALS=/snowplow/config/credentials.json \ # if running outside GCP snowplow/snowplow-google-cloud-storage-loader:0.1.0 \ --runner=DataFlowRunner \ --jobName=[JOB-NAME] \ --project=[PROJECT] \ --streaming=true \ --zone=[ZONE] \ --inputSubscription=projects/[PROJECT]/subscriptions/[SUBSCRIPTION] \ --outputDirectory=gs://[BUCKET]/YYYY/MM/dd/HH/ \ # partitions by date --outputFilenamePrefix=output \ # optional --shardTemplate=-W-P-SSSSS-of-NNNNN \ # optional --outputFilenameSuffix=.txt \ # optional --windowDuration=5 \ # optional, in minutes --compression=none \ # optional, gzip, bz2 or none --numShards=1 # optional
Code language: PHP (php)

To display the help message:

docker run snowplow/snowplow-google-cloud-storage-loader:0.1.0 \ --help

To display documentation about Cloud Storage Loader-specific options:

docker run snowplow/snowplow-google-cloud-storage-loader:0.3.1 \ --help=com.snowplowanalytics.storage.googlecloudstorage.loader.Options

Additional information

A full list of all the Beam CLI options can be found at: https://cloud.google.com/dataflow/pipelines/specifying-exec-params#setting-other-cloud-pipeline-options.

Tests and debugging

Testing

The tests for this codebase can be run with sbt test.

Debugging

You can run the job locally and experiment with its different parts using the SCIO REPL by running sbt repl/run.