Overview
Cloud Storage Loader is a Dataflow job which dumps event from an input PubSub subscription into a Cloud Storage bucket.
Technical details
Cloud Storage loader is built on top of Apache Beam and its Scala wrapper SCIO.
It groups messages from an input PubSub subscription into configurable windows and write them out to Google Cloud Storage.
It also optionally partitions the output by date so that you can easily see what was outputted when in your cloud storage bucket, for example we could see the following hierarchy:
gs://bucket/ --2018 ----10 ------31 --------18 --------19 --------20
It can also compress the output data. For now, the supported output compressions are:
- gzip
- bz2
- none
Do note, however, that bz2-compressed data cannot be loaded directly into BigQuery.
For now, it only runs on GCP’s Dataflow.
See also:
Setup guide
Configuration
Cloud Storage Loader specific options
--inputSubscription=String
The Cloud Pub/Sub subscription to read from, formatted as projects/[PROJECT]/subscriptions/[SUB]--outputDirectory=String
The Cloud Storage directory to output files to, ends with /--outputFilenamePrefix=String
Default: output The Cloud Storage prefix to output files to--shardTemplate=String
Default: -W-P-SSSSS-of-NNNNN The shard template which will be part of the filenames--outputFilenameSuffix=String
Default: .txt The suffix of the filenames written out--windowDuration=Int
Default: 5 The window duration in minutes--compression=String
Default: none The compression used (gzip, bz2 or none), bz2 can’t be loaded into BigQuery--numShards=int
Default: 1 The maximum number of output shards produced when writing
Dataflow options
To run on Dataflow, Beam Enrich will rely on a set of additional configuration options:
--runner=DataFlowRunner
which specifies that we want to run on Dataflow--project=[PROJECT]
, the name of the GCP project--streaming=true
to notify Dataflow that we’re running a streaming application--zone=europe-west2-a
, the zone where the Dataflow nodes (effectively GCP Compute Engine nodes) will be launched--region=europe-west2
, the region where the Dataflow job will be launched--gcpTempLocation=gs://[BUCKET]/
, the GCS bucket where temporary files necessary to run the job (e.g. JARs) will be stored
The list of all the options can be found at https://cloud.google.com/dataflow/pipelines/specifying-exec-params#setting-other-cloud-pipeline-options.
Running
Cloud Storage Loader comes as a ZIP archive, a Docker image or a Cloud Dataflow template, feel free to choose the one which fits your use case the most.
Template
You can run Dataflow templates using a variety of means:
- Using the GCP console
- Using
gcloud
- Using the REST API
Refer to the documentation on executing templates to know more.
Here, we provide an example using gcloud
:
gcloud dataflow jobs run [JOB-NAME] \
--gcs-location gs://sp-hosted-assets/4-storage/snowplow-google-cloud-storage-loader/0.1.0/SnowplowGoogleCloudStorageLoaderTemplate-0.1.0 \
--parameters \
inputSubscription=projects/[PROJECT]/subscriptions/[SUBSCRIPTION],\
outputDirectory=gs://[BUCKET]/YYYY/MM/dd/HH/,\ # partitions by date
outputFilenamePrefix=output,\ # optional
shardTemplate=-W-P-SSSSS-of-NNNNN,\ # optional
outputFilenameSuffix=.txt,\ # optional
windowDuration=5,\ # optional, in minutes
compression=none,\ # optional, gzip, bz2 or none
numShards=1 # optional
Code language: PHP (php)
ZIP archive
You can find the archive hosted on our Bintray.
Once unzipped the artifact can be run as follows:
./bin/snowplow-google-cloud-storage-loader \
--runner=DataFlowRunner \
--project=[PROJECT] \
--streaming=true \
--zone=europe-west2-a \
--inputSubscription=projects/[PROJECT]/subscriptions/[SUBSCRIPTION] \
--outputDirectory=gs://[BUCKET]/YYYY/MM/dd/HH/ \ # partitions by date
--outputFilenamePrefix=output \ # optional
--shardTemplate=-W-P-SSSSS-of-NNNNN \ # optional
--outputFilenameSuffix=.txt \ # optional
--windowDuration=5 \ # optional, in minutes
--compression=none \ # optional, gzip, bz2 or none
--numShards=1 # optional
Code language: PHP (php)
To display the help message:
./bin/snowplow-google-cloud-storage-loader --help
To display documentation about Cloud Storage Loader-specific options:
./bin/snowplow-google-cloud-storage-loader --help=com.snowplowanalytics.storage.googlecloudstorage.loader.Options
Docker image
You can also find the imageon our Bintray.
A container can be run as follows:
docker run \
-v $PWD/config:/snowplow/config \ # if running outside GCP
-e GOOGLE_APPLICATION_CREDENTIALS=/snowplow/config/credentials.json \ # if running outside GCP
snowplow-docker-registry.bintray.io/snowplow/snowplow-google-cloud-storage-loader:0.1.0 \
--runner=DataFlowRunner \
--jobName=[JOB-NAME] \
--project=[PROJECT] \
--streaming=true \
--zone=[ZONE] \
--inputSubscription=projects/[PROJECT]/subscriptions/[SUBSCRIPTION] \
--outputDirectory=gs://[BUCKET]/YYYY/MM/dd/HH/ \ # partitions by date
--outputFilenamePrefix=output \ # optional
--shardTemplate=-W-P-SSSSS-of-NNNNN \ # optional
--outputFilenameSuffix=.txt \ # optional
--windowDuration=5 \ # optional, in minutes
--compression=none \ # optional, gzip, bz2 or none
--numShards=1 # optional
Code language: PHP (php)
To display the help message:
docker run snowplow-docker-registry.bintray.io/snowplow/snowplow-google-cloud-storage-loader:0.1.0 \ --help
To display documentation about Cloud Storage Loader-specific options:
docker run snowplow-docker-registry.bintray.io/snowplow/snowplow-google-cloud-storage-loader:0.1.0 \ --help=com.snowplowanalytics.storage.googlecloudstorage.loader.Options
Additional information
A full list of all the Beam CLI options can be found at: https://cloud.google.com/dataflow/pipelines/specifying-exec-params#setting-other-cloud-pipeline-options.
Tests and debugging
Testing
The tests for this codebase can be run with sbt test
.
Debugging
You can run the job locally and experiment with its different parts using the SCIO REPL by running sbt repl/run
.