Now that we’ve discussed configuring the recovery, let’s dive in to running it on your pipeline.
First we’ll need to define configuration:
Define configuration
The configuration consists of the resolver-config.json
that your pipeline uses to resolve events against corresponding schema, for example:
{
"schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-1",
"data": {
"cacheSize": 0,
"repositories": [
{
"name": "Iglu Central",
"priority": 0,
"vendorPrefixes": [
"com.snowplowanalytics"
],
"connection": {
"http": {
"uri": "http://iglucentral.com"
}
}
}
]
}
}
Code language: JSON / JSON with Comments (json)
Also, thejob-config.json
which describes the recovery job to be done, for example:
{
"schema": "iglu:com.snowplowanalytics.snowplow/recoveries/jsonschema/4-0-0",
"data": {
"iglu:com.snowplowanalytics.snowplow.badrows/enrichment_failures/jsonschema/1-0-0": [
{
"name": "pass through",
"conditions": [],
"steps": []
}
]
}
}
Code language: JSON / JSON with Comments (json)
Encode configuration
Configuration is supplied to recovery jobs as a Base64-encoded string. You can use your encoding plugin of choice, but here is an example ammonite script you can use to encode your configuration:
import java.util.Base64
import java.nio.charset.StandardCharsets
import ammonite.ops._
import $ivy.`io.circe::circe-parser:0.13.0`, io.circe._, io.circe.syntax._, io.circe.parser._
val load = (path: String) => read! os.Path(path, base = os.pwd)
val encode = (str: String) => Base64.getEncoder.encodeToString(str.getBytes(StandardCharsets.UTF_8))
@main def run(config: String): Unit =
parse(load(config)) match {
case Right(data) => println(encode(data.asJson.noSpaces))
case Left(err) => println(s"Invalid JSON input in: ${err.getMessage}")
}
Code language: JavaScript (javascript)
And then just run (assuming ammonite is on your path and above script is called encode.sc
:
amm ./encode.sc resolver-config.json amm ./encode.sc job-config.json
Once the configuration files are encrypted it’s time to deploy the recovery job on your pipeline.
Deploy the job
There are several runtimes that recovery process can be deployed to: