Pipeline Components and Applications

  1. Home
  2. Docs
  3. Pipeline Components and Applications
  4. Enrichment
  5. Beam Enrich
  6. Setting up Beam Enrich

Setting up Beam Enrich


Beam Enrich specific options

Beam Enrich comes with a set of predefined CLI options:

  • --job-name, the name of the job as it will appear in the Dataflow console
  • --raw=projects/{project}/subscriptions/{raw-topic-subscription} which describes the input PubSub subscription Beam Enrich will consume from
  • --enriched=projects/{project}/topics/{enriched-topic} which is the PubSub topic the successfully enriched events will be sinked to
  • --bad=projects/{project}/topics/{bad-topic}, the PubSub topic where events that have failed enrichment will end up
  • --pii=projects/{project}/topics/{pii-topic}, the PubSub topic where events resulting from the PII enrichment will end up, optional
  • --resolver=iglu_resolver.json, the necessary Iglu resolver to lookup the schemas in your data
  • --enrichments=enrichments the optional directory containing the enrichments that need to be performed

It’s important to note that every enrichment relying on local files will need to have the necessary files stored in Google Cloud Storage, e.g. the IP lookups enrichment:

{ "schema": "iglu:com.snowplowanalytics.snowplow/ip_lookups/jsonschema/2-0-0", "data": { "name": "ip_lookups", "vendor": "com.snowplowanalytics.snowplow", "enabled": true, "parameters": { "geo": { "database": "GeoLite2-City.mmdb", "uri": "gs://gcs-bucket/maxmind" } } } }
Code language: JSON / JSON with Comments (json)

Dataflow options

To run on Dataflow, Beam Enrich will rely on a set of additional configuration options:

  • --runner=DataFlowRunner which specifies that we want to run on Dataflow
  • --project={project}, the name of the GCP project
  • --streaming=true to notify Dataflow that we’re running a streaming application
  • --zone=europe-west2-a, the zone where the Dataflow nodes (effectively GCP Compute Engine nodes) will be launched
  • --region=europe-west2, the region where the Dataflow job will be launched
  • --gcpTempLocation=gs://location/, the GCS bucket where temporary files necessary to run the job (e.g. JARs) will be stored

The list of all the options can be found at https://cloud.google.com/dataflow/pipelines/specifying-exec-params#setting-other-cloud-pipeline-options.


Beam Enrich comes as a ZIP archive or a Docker image, feel free to choose which fits your use case the most.

Docker image

The Docker image for Beam Enrich is also published on Dockerhub.

You can build it yourself with sbt docker:publishLocal.

You can run a container with the following command:

docker run \ -v $PWD/config:/snowplow/config snowplow/beam-enrich:latest \ -e GOOGLE_APPLICATION_CREDENTIALS=/snowplow/config/credentials.json \ # if running outside GCP snowplow/beam-enrich:latest \ --runner=DataFlowRunner \ --project=project-id \ --streaming=true \ --zone=europe-west2-a \ --gcpTempLocation=gs://location/ \ --job-name=beam-enrich \ --raw=projects/project/subscriptions/raw-topic-subscription \ --enriched=projects/project/topics/enriched-topic \ --bad=projects/project/topics/bad-topic \ --pii=projects/project/topics/pii-topic \ # OPTIONAL --resolver=/snowplow/config/iglu_resolver.json \ --enrichments=/snowplow/config/enrichments/
Code language: PHP (php)

This assumes that you have a config folder containing your resolver and your enrichments (as well as your GCP credentials if you’re running Beam Enrich outside of GCP) in the current directory.

Tests and debugging


The tests for this codebase can be run with sbt test.


You can run the job locally and experiment with its different parts using the SCIO REPL by running sbt repl/run.