Pipeline Components and Applications

  1. Home
  2. Docs
  3. Pipeline Components and Applications
  4. Loaders & storage targets
  5. Snowplow RDB Loader
  6. Configuration


Both jobs use same configuration HOCON. Here’s an example:

{ # Human-readable identificator, can be random "name": "Acme Redshift", # Machine-readable unique identificator, must be UUID "id": "123e4567-e89b-12d3-a456-426655440000", # Data Lake (S3) region "region": "us-east-1", # SQS topic name used by Shredder and Loader to communicate "messageQueue": "messages", # Shredder-specific configs "shredder": { # Path to enriched archive (must be populated separately with run=YYYY-MM-DD-hh-mm-ss directories) "input": "s3://bucket/input/", # Path to shredded output "output": "s3://bucket/good/", # Path to data failed being processed "outputBad": "s3://bucket/bad/", # Shredder output compression, GZIP or NONE "compression": "GZIP" }, # Schema-specific format settings (recommended to leave all three groups empty and use TSV as default) "formats": { # Format used by default (TSV or JSON) "default": "TSV", # Schemas to be shredded as JSONs, corresponding JSONPath files must be present. Automigrations will be disabled "json": [ ], # Schemas to be shredded as TSVs, presence of the schema on Iglu Server is necessary. Automigartions enabled "tsv": [ ], # Schemas that won't be loaded "skip": [ ] }, # Warehouse connection details "storage" = { # Database, redshift is the only acceptable option "type": "redshift", # Redshift hostname "host": "redshift.amazon.com", # Database name "database": "snowplow", # Database port "port": 5439, # AWS Role ARN allowing Redshift to load data from S3 "roleArn": "arn:aws:iam::123456789012:role/RedshiftLoadRole", # DB schema name "schema": "atomic", # DB user with permissions to load data "username": "storage-loader", # DB password "password": "secret", # Custom JDBC configuration "jdbc": {"ssl": true}, # MAXERROR, amount of acceptable loading errors "maxError": 10, "compRows": 100000 }, # Additional steps. analyze, vacuum and transit_load are valid values "steps": ["analyze"], # Observability and logging opitons "monitoring": { # Snowplow tracking (optional) "snowplow": null, # Sentry (optional) "sentry": null } }
Code language: PHP (php)
  • name and id are just identificators. First one can be arbitrary human-readable, second one must be UUID
  • regions is AWS region for S3 buckets
  • messageQueue is a FIFO SQS message queue that Shredder uses to communicate with Loader
  • shredder.input is a path to S3 with enriched data lake. Enriched data lake, must be populated with folders in form of run=2021-01-27-16-30-00. You can use S3DistCp (explained below) to populate it
  • shredder.output is a path to S3 with shredded archive, shredder will be writing data there
  • shredded.outputBad is a path to S3 with shredded bad archive, a data that shredder failed to process for some reasons
  • compression is data compression format GZIP and NONE are two potential options
  • formats is a setting to configure output format of shredded data
  • formats.default is data format produced by default, TSV and JSON are two supported options. TSV is recommended as it enables table autocreation, but requires Iglu Server to be available with known schemas (including Snowplow schemas). JSON does not require Iglu Server, but requires Redshift JSONPaths to be configured and does not support table autocreation
  • formats.json – array of schema criterions (iglu:com.acme/some_name/jsonschema/1-*-*). If default is set to TSV these list of schemas will still be shredded into JSON
  • formats.tsv – array of schema criterions (iglu:com.acme/some_name/jsonschema/1-*-*).If default is set to JSON these list of schemas will still be shredded into TSV

Dataflow Runner

You can use any suitable tool to periodically submit the Shredder job to EMR cluster. We recommend to use Dataflow Runner, here an example of cluster config:

{ "schema": "iglu:com.snowplowanalytics.dataflowrunner/ClusterConfig/avro/1-1-0", "data": { "name": "RDB Shredder", "logUri": "s3://com-acme/logs/", "credentials": { "accessKeyId": "env", "secretAccessKey": "env" }, "roles": { "jobflow": "EMR_EC2_DefaultRole", "service": "EMR_DefaultRole" }, "ec2": { "amiVersion": "6.1.0", "keyName": "key-name", "location": { "vpc": { "subnetId": "subnet-id" } }, "instances": { "master": { "type": "m4.large", "ebsConfiguration": { "ebsOptimized": true, "ebsBlockDeviceConfigs": [ ] } }, "core": { "type": "r4.xlarge", "count": 1 }, "task": { "type": "m4.large", "count": 0, "bid": "0.015" } } }, "tags": [ ], "bootstrapActionConfigs": [ ], "configurations": [ { "classification":"core-site", "properties":{ "Io.file.buffer.size":"65536" }, "configurations":[ ] }, { "classification":"yarn-site", "properties":{ "yarn.nodemanager.resource.memory-mb":"57344", "yarn.scheduler.maximum-allocation-mb":"57344", "yarn.nodemanager.vmem-check-enabled":"false" }, "configurations":[ ] }, { "classification":"spark", "properties":{ "maximizeResourceAllocation":"false" }, "configurations":[ ] }, { "classification":"spark-defaults", "properties":{ "spark.executor.memory":"7G", "spark.driver.memory":"7G", "spark.driver.cores":"3", "spark.yarn.driver.memoryOverhead":"1024", "spark.default.parallelism":"24", "spark.executor.cores":"1", "spark.executor.instances":"6", "spark.yarn.executor.memoryOverhead":"1024", "spark.dynamicAllocation.enabled":"false" }, "configurations":[ ] } ], "applications": [ "Hadoop", "Spark" ] } }
Code language: JSON / JSON with Comments (json)

This is a typical cluster configuration for processing ~1.5GB of ungzipped enriched data.

You need to change following settings to match your configuration:

  • logUri – your S3 bucket with logs
  • ec2.keyName (optional) – EC2 SSH key name if you’ll need to log-in to EMR cluster
  • ec2.location.vpc.subnetId – your VPN subnet id

Here’s a typical playbook:

{ "schema": "iglu:com.snowplowanalytics.dataflowrunner/PlaybookConfig/avro/1-0-1", "data": { "region": "eu-central-1", "credentials": { "accessKeyId": "env", "secretAccessKey": "env" }, "steps": [ { "type": "CUSTOM_JAR", "name": "S3DistCp enriched data archiving", "actionOnFailure": "CANCEL_AND_WAIT", "jar": "/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar", "arguments": [ "--src", "s3://com-acme/enriched/sink/", "--dest", "s3://com-acme/enriched/archive/run={{nowWithFormat "2006-01-02-15-04-05"}}/", "--s3Endpoint", "s3-eu-central-1.amazonaws.com", "--srcPattern", ".*", "--outputCodec", "gz", "--deleteOnSuccess" ] }, { "type": "CUSTOM_JAR", "name": "RDB Shredder", "actionOnFailure": "CANCEL_AND_WAIT", "jar": "command-runner.jar", "arguments": [ "spark-submit", "--class", "com.snowplowanalytics.snowplow.shredder.Main", "--master", "yarn", "--deploy-mode", "cluster", "s3://snowplow-hosted-assets-eu-central-1/4-storage/rdb-shredder/snowplow-rdb-shredder-0.19.0.jar", "--iglu-config", "{{base64File "/home/snowplow/configs/snowplow/iglu_resolver.json"}}", "--config", "{{base64File "/home/snowplow/configs/snowplow/config.hocon"}}" ] } ], "tags": [ ] } }
Code language: JSON / JSON with Comments (json)

Here you’ll need to set:

  • region
  • Paths to your enriched data sink (--src) and enriched data lake (--dest)