Pipeline Components and Applications

  1. Home
  2. Docs
  3. Pipeline Components and Applications
  4. Enrichment
  5. Beam Enrich

Beam Enrich

Beam Enrich is built on top of Apache Beam and it runs on GCP’s Dataflow. It can be run from anywhere, as long as it can communicate with Dataflow and have enough permissions to create a Dataflow job. For example, run it as a Kubernetes job or from a Compute Engine instance.

Run Beam Enrich

Beam Enrich is published on Docker Hub.

docker pull snowplow/beam-enrich:2.0.1

The docker container can be run with the following command:

docker run \ -v $PWD/config:/snowplow/config \ -e GOOGLE_APPLICATION_CREDENTIALS=/snowplow/config/credentials.json \ # if running outside GCP snowplow/beam-enrich:latest \ --runner=DataFlowRunner \ --project=project-id \ --streaming=true \ --zone=europe-west2-a \ --gcpTempLocation=gs://location/ \ --job-name=beam-enrich \ --raw=projects/project/subscriptions/raw-topic-subscription \ --enriched=projects/project/topics/enriched-topic \ --bad=projects/project/topics/bad-topic \ --pii=projects/project/topics/pii-topic \ --resolver=/snowplow/config/iglu_resolver.json \ --enrichments=/snowplow/config/enrichments
Code language: PHP (php)

Alternatively, you can download and run a jar file from the github release.

java -jar beam-enrich-2.0.1.jar \
  -runner=DataFlowRunner \
  --project=project-id \
  --streaming=true \
  --zone=europe-west2-a \
  --gcpTempLocation=gs://location/ \
  --job-name=beam-enrich \
  --raw=projects/project/subscriptions/raw-topic-subscription \
  --enriched=projects/project/topics/enriched-topic \
  --bad=projects/project/topics/bad-topic \
  --pii=projects/project/topics/pii-topic \
  --resolver=/snowplow/config/iglu_resolver.json \
  --enrichments=/snowplow/config/enrichments

Configuration

Beam Enrich specific options

Beam Enrich comes with a set of predefined CLI options:

  • --job-name, the name of the job as it will appear in the Dataflow console
  • --raw=projects/{project}/subscriptions/{raw-topic-subscription} which describes the input PubSub subscription Beam Enrich will consume from
  • --enriched=projects/{project}/topics/{enriched-topic} which is the PubSub topic the successfully enriched events will be sinked to
  • --bad=projects/{project}/topics/{bad-topic}, the PubSub topic where events that have failed enrichment will end up
  • --pii=projects/{project}/topics/{pii-topic}, the PubSub topic where events resulting from the PII enrichment will end up, optional
  • --resolver=iglu_resolver.json, the necessary Iglu resolver to lookup the schemas in your data
  • --enrichments=enrichments the optional directory containing the enrichments that need to be performed

It’s important to note that every enrichment relying on local files will need to have the necessary files stored in Google Cloud Storage, e.g. the IP lookups enrichment:

{
  "schema": "iglu:com.snowplowanalytics.snowplow/ip_lookups/jsonschema/2-0-0",
  "data": {
    "name": "ip_lookups",
    "vendor": "com.snowplowanalytics.snowplow",
    "enabled": true,
    "parameters": {
      "geo": {
        "database": "GeoLite2-City.mmdb",
        "uri": "gs://gcs-bucket/maxmind"
      }
    }
  }
}

Dataflow options

To run on Dataflow, Beam Enrich will rely on a set of additional configuration options:

  • --runner=DataFlowRunner which specifies that we want to run on Dataflow
  • --project={project}, the name of the GCP project
  • --streaming=true to notify Dataflow that we’re running a streaming application
  • --zone=europe-west2-a, the zone where the Dataflow nodes (effectively GCP Compute Engine nodes) will be launched
  • --region=europe-west2, the region where the Dataflow job will be launched
  • --gcpTempLocation=gs://location/, the GCS bucket where temporary files necessary to run the job (e.g. JARs) will be stored

The list of all the options can be found at https://cloud.google.com/dataflow/pipelines/specifying-exec-params#setting-other-cloud-pipeline-options.