1. Home
  2. Docs
  3. Managing data quality
  4. Event Recovery for Snowplow Insights
  5. Running
  6. Spark

Spark

The Spark job reads bad rows from an S3 location and stores the recovered payloads in Kinesis, unrecovered and unrecoverable in other S3 buckets.

Building

To build the fat jar, run:

sbt spark/assembly

Running

Using the JAR directly (which is hosted at s3://snowplow-hosted-assets/3-enrich/snowplow-event-recovery/):

spark-submit \ --class com.snowplowanalytcs.snowplow.event.recovery.Main \ --master master-url \ --deploy-mode deploy-mode \ snowplow-event-recovery-spark-0.1.1.jar --input s3://bad-rows-location/ --region eu-central-1 --output collector-payloads --failedOutput s3://unrecovered-collector-payloads-location/ --unrecoverableOutput s3://unrecoverable-collector-payloads-location/ --config base64-encoded-configuration

Or through an EMR step:

aws emr add-steps --cluster-id j-XXXXXXXX --steps \ Name=snowplow-event-recovery,\ Type=CUSTOM_JAR,\ Jar=s3://snowplow-hosted-assets/3-enrich/snowplow-event-recovery/snowplow-event-recovery-spark-0.2.0.jar,\ MainClass=com.snowplowanalytics.snowplow.event.recovery.Main,\ Args=[--input,s3://bad-rows-location/,--region,eu-central-1,--output,collector-payloads,--failedOutput,s3://unrecovered-collector-payloads-location/,--unrecoverableOutput,s3://unrecoverable-collector-payloads-location/,--config,base64-encoded-configuration],\ ActionOnFailure=CONTINUE
Code language: PHP (php)

Or using Dataflow Runner, with emr-config.json:

{ "schema": "iglu:com.snowplowanalytics.dataflowrunner/ClusterConfig/avro/1-1-0", "data": { "name": "emr-recovery-cluster", "logUri": "s3://logs/", "region": "eu-central-1", "credentials": { "accessKeyId": "{{secret "aws-access-key-id"}}", "secretAccessKey": "{{secret "aws-secret-access-key"}}" }, "roles": { "jobflow": "EMR_EC2_DefaultRole", "service": "{{ prefix }}-event-recovery" }, "ec2": { "amiVersion": "5.17.0", "keyName": "some-key-name", "location": { "vpc": { "subnetId": "subnet-sample" } }, "instances": { "master": { "type": "m1.medium", "ebsConfiguration": { "ebsOptimized": true, "ebsBlockDeviceConfigs": [ { "volumesPerInstance": 12, "volumeSpecification": { "iops": 8, "sizeInGB": 10, "volumeType": "gp2" } } ] } }, "core": { "type": "m1.medium", "count": 1 }, "task": { "type": "m1.medium", "count": 0, "bid": "0.015" } } }, "tags": [ { "key": "client", "value": "com.snplow.eng" }, { "key": "job", "value": "recovery" } ], "applications": [ "Hadoop", "Spark" ] } }
Code language: JSON / JSON with Comments (json)

And emr-playbook.json:

{ "schema": "iglu:com.snowplowanalytics.dataflowrunner/PlaybookConfig/avro/1-0-1", "data": { "region": "eu-west-1", "credentials": { "accessKeyId": "{{secret "aws-access-key-id"}}", "secretAccessKey": "{{secret "aws-secret-access-key"}}" }, "steps": [ { "type": "CUSTOM_JAR", "name": "Staging of bad rows", "actionOnFailure": "CANCEL_AND_WAIT", "jar": "/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar", "arguments": [ "--src", "s3n://${BUCKET_ID}/recovery/enriched/bad/run=2019-01-12-15-04-23/", "--dest", "s3n://${BUCKET_ID}/stage_01_19/" ] }, { "type": "CUSTOM_JAR", "name": "Move to HDFS", "actionOnFailure": "CANCEL_AND_WAIT", "jar": "/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar", "arguments": [ "--src", "s3n://${BUCKET_ID}/stage_01_19/", "--dest", "hdfs:///local/to-recover/", "--outputCodec", "none" ] }, { "type": "CUSTOM_JAR", "name": "snowplow-event-recovery", "actionOnFailure": "CANCEL_AND_WAIT", "jar": "command-runner.jar", "arguments": [ "spark-submit", "--class", "com.snowplowanalytics.snowplow.event.recovery.Main", "--master", "yarn", "--deploy-mode", "cluster", "s3://bad-rows/snowplow-event-recovery-spark-0.2.0.jar", "--input", "hdfs:///local/to-recover/", "--output", "good-events", "--region", "eu-central-1", "--failedOutput", "s3://bad-rows/left", "--unrecoverableOutput", "s3://bad-rows/left/unrecoverable", "--resolver", "eyJzY2hlbWEiOiJpZ2x1OmNvbS5zbm93cGxvd2FuYWx5dGljcy5pZ2x1L3Jlc29sdmVyLWNvbmZpZy9qc29uc2NoZW1hLzEtMC0xIiwiZGF0YSI6eyJjYWNoZVNpemUiOjAsInJlcG9zaXRvcmllcyI6W3sibmFtZSI6ICJJZ2x1IENlbnRyYWwiLCJwcmlvcml0eSI6IDAsInZlbmRvclByZWZpeGVzIjogWyAiY29tLnNub3dwbG93YW5hbHl0aWNzIiBdLCJjb25uZWN0aW9uIjogeyJodHRwIjp7InVyaSI6Imh0dHA6Ly9pZ2x1Y2VudHJhbC5jb20ifX19LHsibmFtZSI6IlByaXYiLCJwcmlvcml0eSI6MCwidmVuZG9yUHJlZml4ZXMiOlsiY29tLnNub3dwbG93YW5hbHl0aWNzIl0sImNvbm5lY3Rpb24iOnsiaHR0cCI6eyJ1cmkiOiJodHRwczovL3Jhdy5naXRodWJ1c2VyY29udGVudC5jb20vcGVlbC9zY2hlbWFzL21hc3RlciJ9fX1dfX0=", "--config", "eyJzY2hlbWEiOiJpZ2x1OmNvbS5zbm93cGxvd2FuYWx5dGljcy5zbm93cGxvdy9yZWNvdmVyaWVzL2pzb25zY2hlbWEvMi0wLTEiLCJkYXRhIjp7ImlnbHU6Y29tLnNub3dwbG93YW5hbHl0aWNzLnNub3dwbG93LmJhZHJvd3MvZW5yaWNobWVudF9mYWlsdXJlcy9qc29uc2NoZW1hLzEtMC0qIjpbeyJuYW1lIjoibWFpbi1mbG93IiwiY29uZGl0aW9ucyI6W10sInN0ZXBzIjpbXX1dLCJpZ2x1OmNvbS5zbm93cGxvd2FuYWx5dGljcy5zbm93cGxvdy5iYWRyb3dzL2FkYXB0ZXJfZmFpbHVyZXMvanNvbnNjaGVtYS8xLTAtKiI6W3sibmFtZSI6Im1haW4tZmxvdyIsImNvbmRpdGlvbnMiOltdLCJzdGVwcyI6W119XSwiaWdsdTpjb20uc25vd3Bsb3dhbmFseXRpY3Muc25vd3Bsb3cuYmFkcm93cy9zY2hlbWFfdmlvbGF0aW9ucy9qc29uc2NoZW1hLzEtMC0qIjpbeyJuYW1lIjoibWFpbi1mbG93IiwiY29uZGl0aW9ucyI6W10sInN0ZXBzIjpbXX1dLCJpZ2x1OmNvbS5zbm93cGxvd2FuYWx5dGljcy5zbm93cGxvdy5iYWRyb3dzL3RyYWNrZXJfcHJvdG9jb2xfdmlvbGF0aW9ucy9qc29uc2NoZW1hLzEtMC0qIjpbeyJuYW1lIjoibWFpbi1mbG93IiwiY29uZGl0aW9ucyI6W10sInN0ZXBzIjpbXX1dfX0=" ] } { "type": "CUSTOM_JAR", "name": "Back to S3", "actionOnFailure": "CANCEL_AND_WAIT", "jar": "/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar", "arguments": [ "--src", "hdfs:///local/recovered/", "--dest", "s3n://${BUCKET_ID}/raw/" ] } ], "tags": [ { "key": "client", "value": "com.snowplowanalytics" }, { "key": "job", "value": "recovery" } ] } }
Code language: JSON / JSON with Comments (json)

Run:

dataflow-runner run-transient --emr-config ./emr-config.json --emr-playbook ./emr-playbook.json

If you’d like to learn more about Snowplow Insights you can book a demo with our team, or if you’d prefer, you can try Snowplow technology for yourself quickly and easily.