Pipeline Components and Applications

  1. Home
  2. Docs
  3. Pipeline Components and Applications
  4. Loaders & storage targets
  5. Snowplow RDB Loader
  6. 1.1.0 Configuration

1.1.0 Configuration

Both shredder and loader use the same configuration HOCON. An example can be found here.

  • name and id are just identifiers. First one can be arbitrary human-readable, second one must be UUID
  • regions is AWS region for S3 buckets
  • messageQueue is a FIFO SQS message queue that Shredder uses to communicate with Loader
  • shredder.input is a path to S3 with enriched data lake. Enriched data lake, must be populated with folders in form of run=2021-01-27-16-30-00. You can use S3DistCp (explained below) to populate it
  • shredder.output is a path to S3 with shredded archive, shredder will be writing data there
  • compression is data compression format GZIP and NONE are two potential options
  • formats is a setting to configure output format of shredded data
  • formats.default is data format produced by default, TSV and JSON are two supported options. TSV is recommended as it enables table autocreation, but requires Iglu Server to be available with known schemas (including Snowplow schemas). JSON does not require Iglu Server, but requires Redshift JSONPaths to be configured and does not support table autocreation
  • formats.json – array of schema criterions (iglu:com.acme/some_name/jsonschema/1-*-*). If default is set to TSV these list of schemas will still be shredded into JSON
  • formats.tsv – array of schema criterions (iglu:com.acme/some_name/jsonschema/1-*-*).If default is set to JSON these list of schemas will still be shredded into TSV

Dataflow Runner

You can use any suitable tool to periodically submit the Shredder job to EMR cluster. We recommend to use Dataflow Runner, here an example of cluster config:

{ "schema": "iglu:com.snowplowanalytics.dataflowrunner/ClusterConfig/avro/1-1-0", "data": { "name": "RDB Shredder", "logUri": "s3://com-acme/logs/", "credentials": { "accessKeyId": "env", "secretAccessKey": "env" }, "roles": { "jobflow": "EMR_EC2_DefaultRole", "service": "EMR_DefaultRole" }, "ec2": { "amiVersion": "6.2.0", "keyName": "ec2-key-name", "location": { "vpc": { "subnetId": "subnet-id" } }, "instances": { "master": { "type": "m4.large", "ebsConfiguration": { "ebsOptimized": true, "ebsBlockDeviceConfigs": [ ] } }, "core": { "type": "r4.xlarge", "count": 1 }, "task": { "type": "m4.large", "count": 0, "bid": "0.015" } } }, "tags": [ ], "bootstrapActionConfigs": [ ], "configurations": [ { "classification":"core-site", "properties":{ "Io.file.buffer.size":"65536" }, "configurations":[ ] }, { "classification":"yarn-site", "properties":{ "yarn.nodemanager.resource.memory-mb":"57344", "yarn.scheduler.maximum-allocation-mb":"57344", "yarn.nodemanager.vmem-check-enabled":"false" }, "configurations":[ ] }, { "classification":"spark", "properties":{ "maximizeResourceAllocation":"false" }, "configurations":[ ] }, { "classification":"spark-defaults", "properties":{ "spark.executor.memory":"7G", "spark.driver.memory":"7G", "spark.driver.cores":"3", "spark.yarn.driver.memoryOverhead":"1024", "spark.default.parallelism":"24", "spark.executor.cores":"1", "spark.executor.instances":"6", "spark.yarn.executor.memoryOverhead":"1024", "spark.dynamicAllocation.enabled":"false" }, "configurations":[ ] } ], "applications": [ "Hadoop", "Spark" ] } }
Code language: JSON / JSON with Comments (json)

This is a typical cluster configuration for processing ~1.5GB of ungzipped enriched data.

You need to change following settings to match your configuration:

  • logUri – your S3 bucket with logs
  • ec2.keyName (optional) – EC2 SSH key name if you’ll need to log-in to EMR cluster
  • ec2.location.vpc.subnetId – your VPN subnet id

Here’s a typical playbook:

{ "schema": "iglu:com.snowplowanalytics.dataflowrunner/PlaybookConfig/avro/1-0-1", "data": { "region": "eu-central-1", "credentials": { "accessKeyId": "env", "secretAccessKey": "env" }, "steps": [ { "type": "CUSTOM_JAR", "name": "S3DistCp enriched data archiving", "actionOnFailure": "CANCEL_AND_WAIT", "jar": "/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar", "arguments": [ "--src", "s3://com-acme/enriched/sink/", "--dest", "s3://com-acme/enriched/archive/run={{nowWithFormat "2006-01-02-15-04-05"}}/", "--s3Endpoint", "s3-eu-central-1.amazonaws.com", "--srcPattern", ".*", "--outputCodec", "gz", "--deleteOnSuccess" ] }, { "type": "CUSTOM_JAR", "name": "RDB Shredder", "actionOnFailure": "CANCEL_AND_WAIT", "jar": "command-runner.jar", "arguments": [ "spark-submit", "--class", "com.snowplowanalytics.snowplow.rdbloader.shredder.batch.Main", "--master", "yarn", "--deploy-mode", "cluster", "s3://snowplow-hosted-assets-eu-central-1/4-storage/rdb-shredder/snowplow-rdb-shredder-1.1.0.jar", "--iglu-config", "{{base64File "/home/snowplow/configs/snowplow/iglu_resolver.json"}}", "--config", "{{base64File "/home/snowplow/configs/snowplow/config.hocon"}}" ] } ], "tags": [ ] } }
Code language: JSON / JSON with Comments (json)

Here you’ll need to set:

  • region
  • Paths to your enriched data sink (--src) and enriched data lake (--dest)