Pipeline Components and Applications

  1. Home
  2. Docs
  3. Pipeline Components and Applications
  4. Enrichment
  5. EmrEtlRunner

EmrEtlRunner

Setting up EmrEtlRunner

Snowplow EmrEtlRunner is an application that parses the log files generated by your Snowplow collector and:

  1. Cleans up the data into a format that is easier to parse / analyse
  2. Enriches the data (e.g. infers the location of the visitor from his / her IP address and infers the search engine keywords from the query string)
  3. Stores that cleaned, enriched data in S3

This guide covers how to setup EmrEtlRunner (including scheduling it) so that your event data is automatically fetched from the collector logs, processed and updated in your cleaned data store on S3. It is divided into six sections:

  1. Installation. You need to install EmrEtlRunner on your own server. It will interact with Amazon Elastic MapReduce and S3 via the Amazon API
  2. Usage. How to use EmrEtlRunner at the command line, to instruct it to process data from your collector
  3. Scheduling. How to schedule the tool so that you always have an up to date set of cleaned, enriched data available for analysis
  4. Self-hosting Spark Enrich. (Optional step)
  5. Configuring enrichments. How to configure enrichments such as referer parsing and IP lookups
  6. Configuring shredding. How to configure Snowplow to shred custom self-describing events (also called unstructured events) and contexts ready for loading into dedicated tables in Redshift

In this guide you’ll also find additional information:


Installing EmrEtlRunner

Assumptions

This guide assumes that you have administrator access to a Unix-based server (e.g. Ubuntu, OS X, Fedora) on which you can install EmrEtlRunner and schedule a regular cronjob.

You might wish to try out the steps showing you how an EC2 instance could be set up via AWS CLI.

In theory EmrEtlRunner can be deployed onto a Windows-based server, using the Windows Task Scheduler instead of cron, but this has not been tested or documented.

Dependencies

Hardware

You will need to setup EmrEtlRunner on your own server. A number of people choose to do so on an EC2 instance (thereby keeping all of Snowplow in the Amazon Cloud). If you do so, please note that you must not use a t1.micro instance. You should at the very least use an m1.small instance.

Software

The EmrEtlRunner jar is available for download. For more information, see the Hosted assets page.

* If you prefer, an alternative Ruby manager such as chruby or rbenv should work fine too.

EC2 key

You will also need an EC2 key pair setup in your Amazon EMR account.

For details on how to do this, please see Create a Key Pair. Make sure that you setup the EC2 key pair inside the region in which you will be running your ETL jobs.

S3 locations

EmrEtlRunner processes data through three distinct states:

  1. :raw – raw Snowplow event logs are the input to the EmrEtlRunner process
  2. :enriched – EmrEtlRunner validates and enriches the raw event logs into enriched events
  3. :shredded – EmrEtlRunner shreds JSONs found in enriched events ready for loading into dedicated Redshift tables

For :raw:in, specify the Amazon S3 path you configured for your Snowplow collector.

For all other S3 locations, you can specify paths within a single S3 bucket that you setup now. This bucket must be in the same AWS region as your :raw:in bucket.

Done? Right, now we can install EmrEtlRunner.

Installation

We host EmrEtlRunner on the distribution platform JFrog Bintray. You can get a copy of it as shown below.

Note: follow this link to choose your version of the EmrEtlRunner. The distribution name follows the pattern snowplow_emr_{{RELEASE_VERSION}}.zip.

$ wget http://dl.bintray.com/snowplow/snowplow-generic/snowplow_emr_{{RELEASE_VERSION}}.zip

The archive contains both EmrEtlRunner and StorageLoader. Unzip the archive:

$ unzip snowplow_emr_{{RELEASE_VERSION}}.zip

The archive should contain a snowplow-emr-etl-runner file.

Configuration

EmrEtlRunner requires a YAML format configuration file to run. There is a configuration file template available in the Snowplow GitHub repository at https://github.com/snowplow/emr-etl-runner/blob/master/config/config.yml.sample. See Common configuration more information on how to write this file.

Storage targets

Storages for data can be configured using storage targets JSONs. Configuration file templates available in the Snowplow GitHub repository at /4-storage/config/targets directory

Iglu

You will also need an Iglu resolver configuration file. This is where we list the schema repositories to use to retrieve JSON Schemas for validation. For more information on this, see the wiki page for Configuring shredding.


Using EmrEtlRunner

EmrEtlRunner works in Rolling mode where it processes whatever raw Snowplow event logs it finds in the In Bucket

Commands

Run command

The most useful command is the run command which allows you to actually run your EMR job:

$ ./snowplow-emr-etl-runner run

The available options are as follows:

Usage: run [options] -c, --config CONFIG configuration file -n, --enrichments ENRICHMENTS enrichments directory -r, --resolver RESOLVER Iglu resolver file -t, --targets TARGETS targets directory -d, --debug enable EMR Job Flow debugging -f {enrich,shred,elasticsearch,archive_raw,rdb_load,analyze,archive_enriched,archive_shredded,staging_stream_enrich}, --resume-from resume from the specified step -x {staging,enrich,shred,elasticsearch,archive_raw,rdb_load,consistency_check,analyze,load_manifest_check,archive_enriched,archive_shredded,staging_stream_enrich}, --skip skip the specified step(s) -i, --include {vacuum} include additional step(s) -l, --lock PATH where to store the lock --ignore-lock-on-start ignore the lock if it is set when starting --consul ADDRESS address to the Consul server

Note that the config and resolver options are mandatory.

Note that in Stream Enrich mode you cannot skip nor resume from stagingenrich and archive_raw. Instead of staging and enrich, in Stream Enrich mode single special staging_stream_enrich is used.

Lint commands

Other useful commands include the lint commands which allows you to check the validity of your resolver or enrichments with respect to their respective schemas.

If you want to lint your resolver:

$ ./snowplow-emr-etl-runner lint resolver

The mandatory options are:

Usage: lint resolver [options] -r, --resolver RESOLVER Iglu resolver file

If you want to lint your enrichments:

$ ./snowplow-emr-etl-runner lint enrichments

The mandatory options are:

Usage: lint enrichments [options] -r, --resolver RESOLVER Iglu resolver file -n, --enrichments ENRICHMENTS enrichments directory

Checking the results

Once you have run the EmrEtlRunner you should be able to manually inspect in S3 the folder specified in the :out: parameter in your config.yml file and see new files generated, which will contain the cleaned data either for uploading into a storage target (e.g. Redshift) or for analysing directly using Hive or Spark or some other querying tool on EMR.

Note: most Snowplow users run the ‘spark’ version of the ETL process, in which case the data generated is saved into subfolders with names of the form part-000.... If, however, you are running the legacy ‘hive’ ETL (because e.g. you want to use Hive as your storage target, rather than Redshift, which is the only storage target the ‘spark’ etl currently supports), the subfolders names will be of the format dt=....


Scheduling EmrEtlRunner

Once you have the ETL process working smoothly, you can schedule a daily (or more frequent) task to automate the daily ETL process.

We run our daily ETL jobs at 3 AM UTC so that we are sure that we have processed all of the events from the day before (CloudFront logs can take some time to arrive).

To consider your different scheduling options in turn:

cron

Warning

Running EmrEtlRunner as Ruby (rather than JRuby apps) is no longer actively supported. The latest version of the EmrEtlRunner is available from our Bintray here.

The recommended way of scheduling the ETL process is as a daily cronjob.

0 4 * * * root cronic /path/to/eer/snowplow-emr-etl-runner run -c config.yml

This will run the ETL job daily at 4 AM, emailing any failures to you via cronic.

Jenkins

Some developers use the Jenkins continuous integration server (or Hudson, which is very similar) to schedule their Hadoop and Hive jobs.

Describing how to do this is out of scope for this guide, but the blog post Lowtech Monitoring with Jenkins is a great tutorial on using Jenkins for non-CI-related tasks, and could be easily adapted to schedule EmrEtlRunner.

Windows Task Scheduler

For Windows servers, in theory it should be possible to use a Windows PowerShell script plus Windows Task Scheduler instead of bash and cron. However, this has not been tested or documented.

If you get this working, please let us know!


Self hosting Spark Enrich

EmrEtlRunner runs Spark Enrich resources which are publicly hosted on Amazon S3 by Snowplow – please see the Hosted assets page for details. For most users, this will be fine. However, there are some cases where you will need to self-host the Spark Enrich process in your own Amazon S3 bucket.

Two examples are:

  1. You are using a custom fork of the Spark Enrich process
  2. You are using a commercial version of the MaxMind GeoCity database

For self-hosting instructions read on.

Bucket and directory setup

First create a new S3 bucket, for example:

s3://[mycompanyname]-snowplow-hosted-assets

You do not need to give any public permissions on this bucket.

Now create the following two directory structures:

s3://[mycompanyname]-snowplow-hosted-assets/3-enrich/spark-enrich s3://[mycompanyname]-snowplow-hosted-assets/third-party/maxmind

Spark Enrich process

If you are using the standard version of Spark Enrich, then download it from Hosted assets and upload to:

s3://[mycompanyname]-snowplow-hosted-assets/3-enrich/spark-enrich/snowplow-spark-enrich-[version].jar

If you are using a custom fork of the Spark Enrich process, then upload your assembled fatjar to:

s3://[mycompanyname]-snowplow-hosted-assets/3-enrich/spark-enrich/snowplow-spark-enrich-[version]-[fork].jar

Configuring EmrEtlRunner for self-hosting

In your config.yml file, set s3.buckets.assets to your own bucket name:

buckets:
  assets: s3://[mycompanyname]-snowplow-hosted-assets

If you are using a custom fork of the Spark Enrich process, make sure to update enrich:versions:spark_enrich to your own Spark Enrich version:

enrich:
  ...
  versions:
    ...
    spark_enrich: [version]-[fork]

Configuring enrichments

Snowplow offers the option to configure enrichments for your pipeline. This is done using configuration JSONs.

When running EmrEtlRunner, the --enrichments argument should be populated with the filepath of a directory containing your configuration JSONs. Each enrichment JSON file should be named {{name of enrichment}}.json (for example, referer_parser.json).


Configuring shredding

Snowplow has a Shredding process for Redshift which contributes to the following three phases:

  1. Extracting unstructured event JSONs and context JSONs from enriched event files into their own files
  2. Removing endogenous duplicate records, which are sometimes introduced within the Snowplow pipeline (feature added to r76)
  3. Loading those files into corresponding tables in Redshift

The first two phases are instrumented by EmrEtlRunner; in this page we will explain how to configure the shredding process to operate smoothly with EmrEtlRunner.

Note: Even though the first phase is required only if you want to shred your own unstructured event JSONs and context JSONs, the second phase will be beneficial to data modeling and analysis. If none of it is required and you are only shredding Snowplow-authored JSONs like link clicks and ad impressions, then you can skip this page and go straight to Loading shredded types.

Pre-requisites

This guide assumes that

  • all JSONs you are sending as unstructured events and contexts are self-describing JSONs
  • you have defined self-describing JSON Schemas for each of your JSONs
  • you have setup your own Iglu schema registry to host your schemas

Configuring EmrEtlRunner for shredding

The relevant section of the EmrEtlRunner’s config.yml is:

shredded:
  good: s3://my-out-bucket/shredded/good       # e.g. s3://my-out-bucket/shredded/good
  bad: s3://my-out-bucket/shredded/bad        # e.g. s3://my-out-bucket/shredded/bad
  errors: s3://my-out-bucket/shredded/errors     # Leave blank unless :continue_on_unexpected_error: set to true below
  archive: s3://my-out-bucket/shredded/archive  # Not required for Postgres currently

The configuration file is referenced with --config option to EmrEtlRunner.

Please make sure that these shredded buckets are set correctly.

Next, we let EmrEtlRunner know about your Iglu schema registry, so that schemas can be retrieved from there as well as from Iglu Central. Add your own registry to the repositories array in iglu_resolver.json file:

{ "schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-0", "data": { "cacheSize": 500, "repositories": [ { "name": "Iglu Central", "priority": 0, "vendorPrefixes": [ "com.snowplowanalytics" ], "connection": { "http": { "uri": "http://iglucentral.com" } } } #custom section starts here --> , { ... } #custom section ends here <-- ] } }

You must add an extra entr(-y/ies) in the repositories: array pointing to your own Iglu schema registry. If you are not submitting custom events and contexts and are not interested in shredding then there’s no need in adding the custom section but the iglu_resolver.json file is still required and is referenced with --resolver option to EmrEtlRunner.

For more information on how to customize the iglu_resolver.json file, please review the Iglu client configuration wiki page.


Technical explanation

Raw collector logs that need to be processed are identified in the in-bucket. (This is the bucket that the collector log files are generated in: it’s location is specified in the EmrEtlRunner config file.)

EmrEtlRunner then triggers the Enrichment process to run. It spins up an EMR cluster (the size of which is determined by the config file), uploads the JAR with the Spark Enrichment process on, and instructs EMR to:

  1. Use S3DistCopy to aggregate the collector log files and write them to HDFS
  2. Run the Enrichment process on those aggregated files in HDFS
  3. Write the output of that Enrichment to the Out-bucket in S3. (As specified in the config file).
  4. When the job has completed, EmrEtlRunner moves the processed collector log files from the in-bucket to the archive bucket. (This, again, is specified in the config file.)

By setting up a scheduling job to run EmrEtlRunner regularly, Snowplow users can ensure that the event data regularly flows through the Snowplow data pipeline from the collector to storage.

Note: many references are made to the ‘Hadoop ETL’ and ‘Hive ETL’ in the documentation and the config file. ‘Hadoop ETL’ refers to the current Spark-based Enrichment Process. ‘Hive ETL’ refers to the legacy Hive-based ETL process. EmrEtlRunner can be setup to instrument either. However, we recommend all Snowplow users use the Spark based ‘Hadoop ETL’, as it is much more robust, as well as being cheaper to run.

Stream Enrich mode

In R102 Afontova Gora we added new “Stream Enrich mode” to EmrEtlRunner. In this mode, EmrEtlRunner stages enriched data from Kinesis.

To use this mode you need to have:

  • Configured Snowplow S3 Loader
  • Added new aws.s3.buckets.enriched.stream property in config.yml pointing to S3 Loader’s output folder

Latest config.yml sample for Stream Enrich mode is available at snowplow repo.


Setting up end-to-end encryption

It is possible to setup end-to-end encryption for EmrEtlRunner. For reference, you can check out the dedicated EMR guide: https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-data-encryption-options.html.

Pre-requisites

Encrypting S3 buckets

For at rest encryption on S3, the buckets with which EmrEtlRunner will interact need to have SSE-S3 encryption enabled.

For more information, check out the dedicated AWS guide: https://docs.aws.amazon.com/AmazonS3/latest/dev/serv-side-encryption.html.

Keep in mind that turning this setting on is not retroactive. It effectively means that if you want to have only encrypted data in your bucket you will need to go through the existing data and copy it in place.

Also, if you’re using the Clojure Collector, SSE-S3 encryption needs to be set up at the bucket level.

Setting up an EMR security configuration

Through an EMR security configuraton, you can specify at the EMR level for which parts of your job you want encryption to be enforced, the possibilities are:

  • At rest on S3
  • At rest on local disks
  • In-transit

At rest encryption in S3

Once setup, S3 encrypts data as it writes it to disk.

By default, even without encryption setup, data is encrypted while in transit from EMR to S3 (e.g. for s3-dist-cp steps).

At rest encryption on local disks

When running the Snowplow pipeline in EMR, an HDFS is setup on the different nodes of your cluster. Enabling encryption for the disks local to those nodes will have the following effects:

  • HDFS RPC, e.g. between name node and data node, uses SASL
  • HDFS block transfers (e.g. replication) are encrypted using AES 256
  • Attached EBS volumes are encrypted using LUKS

When enabling this option, please keep the following drawbacks in mind:

To set this type of encryption up you will need to create an appropriate KMS key, refer to the AWS guide for more information: https://docs.aws.amazon.com/kms/latest/developerguide/create-keys.html.

It is important to note that the role used in aws:emr:jobflow_role in the EmrEtlRunner configuration needs to have the kms:GenerateDataKey policy.

In-transit encryption (Spark and MapReduce)

When running the Spark jobs of the Snowplow pipeline (enrich and shred) or consolidating s3-dist-cp steps (e.g. using --groupBy or --targetSize), data is shuffled around the different nodes in your EMR cluster. Enabling encryption for those data movements will have the following effects:

  • MapReduce shuffles use TLS
  • RPC and data transfers in Spark are encrypted using AES 256 if emr >= 5.9.0, otherwise RPC is encrypted using SASL
  • SSL is enabled for all things HTTP in Spark (e.g. history server and UI)

Be aware that this type of encryption also has a performance impact as data needs to be encrypted when sent over the network (e.g. when running deduplication in the Shred job).

To set up this type of encryption, you will need to create certificates according to the guidelines specified at https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-encryption-enable.html#emr-encryption-pem-certificate.

Note that, for this type of encryption to work, you will need to be in a VPC and the domain name specified in the certificates needs to be *.ec2.internal if in us-east-1 or *.region.compute.internal otherwise.

For more information, on all those types of encryption, you can refer to the dedicated guide: https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-create-security-configuration.html.

Configuring EmrEtlRunner for end-to-end encryption

To leverage the security configuration you created, you will need to specify it in the EmrEtlRunner configuration at: aws:emr:security_configuration.

Additionally, you will need to tell EmrEtlRunner that it will have to interact with encrypted buckets through: aws:s3:buckets:encrypted: true.


Input formats

Supported input formats for the EmrEtlRunner are as follows:

cloudfront

Use this when you are running the CloudFront Collector.

Documentation:

clj-tomcat

Use this when you are running the Clojure Collector on Elastic Beanstalk.

Documentation:

thrift

Use this when you are using the Scala Stream Collector plus Kinesis LZO S3 Sink.

Documentation:

tsv/com.amazon.aws.cloudfront/wd_access_log

Use this when you are analyzing Amazon CloudFront access logs (web distribution format only).

If you use CloudFront as your CDN for web content, you can use Snowplow to process your CloudFront access logs. Snowplow will enrich these logs with the user-agent, page URI fragments and geo-location as standard.

To process CloudFront access logs, first create a new EmrEtlRunner config.yml:

  1. Set your :raw:in: bucket to where your logs are written
  2. Set your :etl:collector_format: to tsv/com.amazon.aws.cloudfront/wd_access_log
  3. Provide new bucket paths and a new job name, to prevent this job from clashing with your existing Snowplow job(s)

If you are running the Snowplow batch flow with Amazon Redshift, then you should deploy the relevent event table into your Amazon Redshift database. You can find the table definition here:

You can either load these events using your existing atomic.events table, or if you prefer load into an all-new database or schema. If you load into your existing atomic.events table, make sure to schedule these loads so that they don’t clash with your existing loads.

ndjson/urbanairship.connect/v1

Use this when you are working with Airship. For more details see Airship webhook setup.


Setting up EC2 instance for EmrEtlRunner and StorageLoader

This tutorial assumes it’s your first installation and you probably just want to checkout the platform. Thus many steps describe low-performance and unsecured installation. In real-world scenario you may want to fix that.

Prepare your system

Before getting started you need to have:

  • Account on Amazon Web Services.
  • Installed AWS CLI.
  • IAM user, first one need to be created in AWS Console.
  • IAM user need to have attached AdministratorAccess.
  • Configured credentials on your local machine. (You can use aws configure for it).
  • For some steps you may want to install jq. It’s optional, but handy.

Everything else can be done from CLI.

Setting up EC2 instance for EmrEtlRunner/StorageLoader

In the end of this step, you’ll have an AWS EC2 instance, SSH access to it and key stored on local machine.

Find your Default VPC ID

We will refer to it as {{ VPC_ID }}.

$ aws ec2 describe-vpcs | jq -r ".Vpcs[0].VpcId"

NOTE: This step assumes the default VPC will be first in the list. If your environment has multiple VPCs, run the describe command without piping it to jq to get the ID of the default VPC. Mixing VPC IDs will result in the creation of an unreachable EC2 instance.

Create Security Group for SSH access

On output you’ll get GroupId. We will refer to it as {{ SSH_SG }}.

$ aws ec2 create-security-group \ --group-name "EC2 SSH full access" \ --description "Unsafe. Use for demonstration only" \ --vpc-id {{ VPC_ID }} \ | jq -r '.GroupId'

Add rule allowing SSH access from anywhere

$ aws ec2 authorize-security-group-ingress \ --group-id {{ SSH_SG }} \ --protocol tcp \ --port 22 \ --cidr 0.0.0.0/0

Create SSH key-pair named on the local machine

We named it “snowplow-ec2” here.

$ aws ec2 create-key-pair --key-name snowplow-ec2 \ | jq -r ".KeyMaterial" &gt; ~/.ssh/snowplow-ec2.pem $ chmod go-rwx ~/.ssh/snowplow-ec2.pem

Run t2.small instance with Amazon Linux AMI with previously created SSH-key

On output you will get your instance id. We will refer to it as {{ INSTANCE_ID }}.

$ aws ec2 run-instances \ --image-id ami-60b6c60a \ --count 1 \ --instance-type t2.small \ --key-name snowplow-ec2 \ | jq -r '.Instances[0].InstanceId'

NOTE: you can find available image ID by following this AWS guide.

Attach security group to Instance

$ aws ec2 modify-instance-attribute \ --instance-id {{ INSTANCE_ID }} \ --groups {{ SSH_SG }}

Check public IP-address of newly created Instance

Further we will refer to it as {{ PUBLIC_IP }}.

$ aws ec2 describe-instances \ --instance-ids {{ INSTANCE_ID }} \ | jq '.Reservations[0].Instances[0].PublicDnsName'

Log-in

Fill-in {{ PUBLIC_IP }} from previous step.

$ ssh -i ~/.ssh/snowplow-ec2.pem ec2-user@{{ PUBLIC_IP }}