Records that can’t be successfully written to S3 are written to a second Kinesis stream with the error message.
There are 2 file formats supported:
Records are treated as raw byte arrays. Elephant Bird’s
BinaryBlockWriter class is used to serialize them as a Protocol Buffers array (so it is clear where one record ends and the next begins) before compressing them.
The compression process generates both compressed .lzo files and small .lzo.index files (splittable LZO). Each index file contain the byte offsets of the LZO blocks in the corresponding compressed file, meaning that the blocks can be processed in parallel.
LZO encoding is generally used for raw data produced by Snowplow Collector.
The records are treated as byte arrays containing UTF-8 encoded strings (whether CSV, JSON or TSV). New lines are used to separate records written to a file. This format can be used with the Snowplow Kinesis Enriched stream, among other streams.
Gzip encoding is generally used for both enriched data and bad data.
S3 loader requires one configuration file whose example can be found here and whose details can be found below on this page.
2.1. Docker image
docker run \ -d \ --name snowplow-s3-loader \ --restart always \ --log-driver awslogs \ --log-opt awslogs-group=snowplow-s3-loader \ --log-opt awslogs-stream=`ec2metadata --instance-id` \ --network host \ -v $(pwd):/snowplow/config \ -e 'JAVA_OPTS=-Xms512M -Xmx1024M -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN' \ snowplow/snowplow-s3-loader:2.0.0 \ --config /snowplow/config/config.hocon
Code language: CSS (css)
java -jar snowplow-s3-loader-2.0.0.jar --config config.hocon
JAR can be found attached to the Github release.
Running the jar requires to have the native LZO binaries installed. For example for Debian this can be done with:
sudo apt-get install lzop liblzo2-dev
The sink is configured using a HOCON file. These are the fields:
region: The AWS region used to initialize AWS SDK clients
purpose: The purpose of this Loader instance. It can be
RAWto sink data as is, usually used for Collector raw data.
ENRICHED_EVENTS, used for enriched TSV – if
metrics.statsdis configured – the Loader will attempt to report data latency,
SELF_DESCRIBING, used primarily for bad data or any other self-describing data – the Loader will attempt to partition icoming data based on entity’s schema
input: Configuration for input Kinesis stream
input.streamName: The name of the input stream of the tool which you choose as a source. This should be the stream to which your are writing records with the Scala Stream Collector.
input.appName: Unique identifier for the app which ensures that if it is stopped and restarted, it will restart at the correct location.
input.position: Where to start reading from the stream the first time the app is run. “TRIM_HORIZON” for as far back as possible, “LATEST” for as recent as possibly, “AT_TIMESTAMP” for after the specified timestamp.
input.maxRecords: Maximum number of records to read per GetRecords call
input.customEndpoint: Endpoint url configuration to override aws kinesis endpoints. This can be used to specify local endpoints when using localstack (Optional)
output: All output data the Loader can produce
output.s3.path: Full path to output data
output.s3.maxTimeout: The maximum amount of time the app attempts to PUT to S3 before it will kill itself
output.s3.compression: Output format. We recommend
GZIPfor enriched and self-describing data and
LZOfor raw data
output.s3.dateFormat: Format for date-based partitioning (Optional)
output.s3.filenamePrefix: Common prefix for all produced files (Optional)
output.bad.streamName: Kinesis Stream name to output all data that couldn’t be sank on S3 (due IO or compression error), will be produced as self-describing JSON
buffer: Controlling how much data will be stored in memory before flushing to S3
buffer.byteLimit: Whenever the total size of the buffered records exceeds this number, they will all be sent to S3.
buffer.recordLimit: Whenever the total number of buffered records exceeds this number, they will all be sent to S3.
buffer.timeLimit: If this length of time passes without the buffer being flushed, the buffer will be flushed
It’s possible to include different kinds of monitoring in the application. This is setup through the
monitoring section at the bottom of the config file:
monitoring.snowplow.collector:Snowplow collector URI
monitoring.snowplow.appId: The app-id used in decorating the events sent
monitoring.sentry.dsn:The Sentry DSN to track exceptions happening in S3
monitoring.metrics.cloudWatch: Enable CloudWatch metrics for KCL
monitoring.metrics.statsd: StatsD-powered metrics
monitoring.metrics.statsd.hostname: StatsD endpoint host
monitoring.metrics.statsd.port: StatsD endpoint port
monitoring.metrics.statsd.tags: metainformational tags, environment variables can be provided (Optional)
To disable Snowplow monitoring, just remove the entire
monitoring section from the config.