Pipeline Components and Applications

  1. Home
  2. Docs
  3. Pipeline Components and Applications
  4. Loaders and storage targets
  5. S3 Loader

S3 Loader

1. Overview

Snowplow S3 Loader consumes records from an Amazon Kinesis stream and writes them to S3.

Records that can’t be successfully written to S3 are written to a second Kinesis stream with the error message.

There are 2 file formats supported:

  • LZO
  • Gzip

1.1. LZO

Records are treated as raw byte arrays. Elephant Bird’s BinaryBlockWriter class is used to serialize them as a Protocol Buffers array (so it is clear where one record ends and the next begins) before compressing them.

The compression process generates both compressed .lzo files and small .lzo.index files (splittable LZO). Each index file contain the byte offsets of the LZO blocks in the corresponding compressed file, meaning that the blocks can be processed in parallel.

LZO encoding is generally used for raw data produced by Snowplow Collector.

1.2. Gzip

The records are treated as byte arrays containing UTF-8 encoded strings (whether CSV, JSON or TSV). New lines are used to separate records written to a file. This format can be used with the Snowplow Kinesis Enriched stream, among other streams.

Gzip encoding is generally used for both enriched data and bad data.

2. Run

S3 loader requires one configuration file whose example can be found here and whose details can be found below on this page.

2.1. Docker image

docker run \ -d \ --name snowplow-s3-loader \ --restart always \ --log-driver awslogs \ --log-opt awslogs-group=snowplow-s3-loader \ --log-opt awslogs-stream=`ec2metadata --instance-id` \ --network host \ -v $(pwd):/snowplow/config \ -e 'JAVA_OPTS=-Xms512M -Xmx1024M -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN' \ snowplow/snowplow-s3-loader:2.0.0 \ --config /snowplow/config/config.hocon
Code language: JavaScript (javascript)

2.2. Jar

java -jar snowplow-s3-loader-2.0.0.jar --config config.hocon
Code language: CSS (css)

JAR can be found attached to the Github release.

Running the jar requires to have the native LZO binaries installed. For example for Debian this can be done with:

sudo apt-get install lzop liblzo2-dev
Code language: JavaScript (javascript)

3. Configuration

The sink is configured using a HOCON file. These are the fields:

  • region: The AWS region used to initialize AWS SDK clients
  • purpose: The purpose of this Loader instance. It can be RAW to sink data as is, usually used for Collector raw data. ENRICHED_EVENTS, used for enriched TSV – if metrics.statsd is configured – the Loader will attempt to report data latency, SELF_DESCRIBING, used primarily for bad data or any other self-describing data – the Loader will attempt to partition icoming data based on entity’s schema
  • input: Configuration for input Kinesis stream
  • input.streamName: The name of the input stream of the tool which you choose as a source. This should be the stream to which your are writing records with the Scala Stream Collector.
  • input.appName: Unique identifier for the app which ensures that if it is stopped and restarted, it will restart at the correct location.
  • input.position: Where to start reading from the stream the first time the app is run. “TRIM_HORIZON” for as far back as possible, “LATEST” for as recent as possibly, “AT_TIMESTAMP” for after the specified timestamp.
  • input.maxRecords: Maximum number of records to read per GetRecords call
  • input.customEndpoint: Endpoint url configuration to override aws kinesis endpoints. This can be used to specify local endpoints when using localstack (Optional)
  • output: All output data the Loader can produce
  • output.s3.path: Full path to output data
  • output.s3.maxTimeout: The maximum amount of time the app attempts to PUT to S3 before it will kill itself
  • output.s3.compression: Output format. We recommend GZIP for enriched and self-describing data and LZO for raw data
  • output.s3.dateFormat: Format for date-based partitioning (Optional)
  • output.s3.filenamePrefix: Common prefix for all produced files (Optional)
  • output.bad.streamName: Kinesis Stream name to output all data that couldn’t be sank on S3 (due IO or compression error), will be produced as self-describing JSON
  • buffer: Controlling how much data will be stored in memory before flushing to S3
  • buffer.byteLimit: Whenever the total size of the buffered records exceeds this number, they will all be sent to S3.
  • buffer.recordLimit: Whenever the total number of buffered records exceeds this number, they will all be sent to S3.
  • buffer.timeLimit: If this length of time passes without the buffer being flushed, the buffer will be flushed

3.1 Monitoring

It’s possible to include different kinds of monitoring in the application. This is setup through the monitoring section at the bottom of the config file:

  • monitoring.snowplow.collector: Snowplow collector URI
  • monitoring.snowplow.appId: The app-id used in decorating the events sent
  • monitoring.sentry.dsn: The Sentry DSN to track exceptions happening in S3
  • monitoring.metrics.cloudWatch: Enable CloudWatch metrics for KCL
  • monitoring.metrics.statsd: StatsD-powered metrics
  • monitoring.metrics.statsd.hostname: StatsD endpoint host
  • monitoring.metrics.statsd.port: StatsD endpoint port
  • monitoring.metrics.statsd.tags: metainformational tags, environment variables can be provided (Optional)

To disable Snowplow monitoring, just remove the entire monitoring section from the config.

Articles