Pipeline Components and Applications

  1. Home
  2. Docs
  3. Pipeline Components and Applications
  4. Loaders & storage targets
  5. S3 Loader

S3 Loader

Overview

The Snowplow S3 Loader consumes records from an Amazon Kinesis stream or NSQ topic, and writes them to S3.

There are 2 file formats supported:

  • LZO
  • GZip

LZO

The records are treated as raw byte arrays. Elephant Bird’s BinaryBlockWriter class is used to serialize them as a Protocol Buffers array (so it is clear where one record ends and the next begins) before compressing them.

The compression process generates both compressed .lzo files and small .lzo.index files (splittable LZO). Each index file contain the byte offsets of the LZO blocks in the corresponding compressed file, meaning that the blocks can be processed in parallel.

GZip

The records are treated as byte arrays containing UTF-8 encoded strings (whether CSV, JSON or TSV). New lines are used to separate records written to a file. This format can be used with the Snowplow Kinesis Enriched stream, among other streams.

Setup

Overview

Snowplow S3 Loader reads records from an Amazon Kinesis or NSQ stream, compresses them using splittable LZO or GZip, and writes them to S3.

It was created to store the Thrift records generated by the Scala Stream Collector in S3.

If it fails to process a record, it will write that record to a second Kinesis or NSQ stream along with an error message.

Prerequisites

To run the Snowplow S3 Loader, you must have installed the native LZO binaries. To do this on Ubuntu, run:

$ sudo apt-get install lzop liblzo2-dev
Code language: JavaScript (javascript)

Installation

Option 1: use Docker image

S3 loader image can be found on Docker Hub.

Option 2: compile from source

Alternatively, you can build it from the source files. To do so, you will need scala and sbt installed.

To do so, clone the Snowplow repo:

$ git clone https://github.com/snowplow/snowplow-s3-loader.git
Code language: PHP (php)

Navigate into the snowplow-s3-loader folder:

$ cd snowplow-s3-loader

Use sbt to resolve dependencies, compile the source, and build an assembled fat JAR file with all dependencies.

$ sbt assembly

The jar file will be saved as snowplow-s3-loader-1.0.0.jar in the target/scala-2.13 subdirectory. It is now ready to be deployed.

Configuration

The sink is configured using a HOCON file. These are the fields:

  • source: Choose kinesis or nsq as a source stream
  • sink: Choose between kinesis or nsq as a sink stream for failed events
  • aws.accessKey and aws.secretKey: Change these to your AWS credentials. You can alternatively leave them as “default”, in which case the DefaultAWSCredentialsProviderChain will be used.
  • kinesis.initialPosition: Where to start reading from the stream the first time the app is run. “TRIM_HORIZON” for as far back as possible, “LATEST” for as recent as possibly, “AT_TIMESTAMP” for after the specified timestamp.
  • kinesis.initialTimestamp: Timestamp for “AT_TIMESTAMP” initial position
  • kinesis.maxRecords: Maximum number of records to read per GetRecords call
  • kinesis.region: The Kinesis region name to use.
  • kinesis.appName: Unique identifier for the app which ensures that if it is stopped and restarted, it will restart at the correct location.
  • kinesis.customEndpoint: Optional endpoint url configuration to override aws kinesis endpoints. This can be used to specify local endpoints when using localstack.
  • kinesis.disableCloudWatch: Optional override to disable CloudWatch metrics for KCL
  • nsq.channelName: Channel name for NSQ source stream. If more than one application reading from the same NSQ topic at the same time, all of them must have unique channel name to be able to get all the data from the same topic.
  • nsq.host: Hostname for NSQ tools
  • nsq.port: HTTP port number for nsqd
  • nsq.lookupPort: HTTP port number for nsqlookupd
  • stream.inStreamName: The name of the input stream of the tool which you choose as a source. This should be the stream to which your are writing records with the Scala Stream Collector.
  • streams.outStreamName: The name of the output stream of the tool which you choose as sink. This is stream where records are sent if the compression process fails.
  • streams.buffer.byteLimit: Whenever the total size of the buffered records exceeds this number, they will all be sent to S3.
  • streams.buffer.recordLimit: Whenever the total number of buffered records exceeds this number, they will all be sent to S3.
  • streams.buffer.timeLimit: If this length of time passes without the buffer being flushed, the buffer will be flushed. Note: With NSQ streams, only record limit is taken into account. Other two option will be ignored.
  • s3.region: The AWS region for the S3 bucket
  • s3.bucket: The name of the S3 bucket in which files are to be stored
  • s3.format: The format the app should write to S3 in (lzo or gzip)
  • s3.maxTimeout: The maximum amount of time the app attempts to PUT to S3 before it will kill itself

Monitoring

You can also now include Snowplow Monitoring in the application. This is setup through a new section at the bottom of the config. You will need to ammend:

  • monitoring.snowplow.collectorUri insert your snowplow collector URI here.
  • monitoring.snowplow.appId the app-id used in decorating the events sent.

If you do not wish to include Snowplow Monitoring, remove the entire monitoring section from the config.

An example is available in the repo.

Execution

The Snowplow S3 Loader is a jarfile. Simply provide the configuration file as a parameter:

$ java -jar snowplow-s3-loader-1.0.0.jar --config my.conf

This will start the process of reading events from selected source, compressing them, and writing them to S3.