Now you have the collector setup, you are in a position to setup a new data source (e.g. a Tracker or Webhook). The collector will receive the data and publish it to a Kinesis stream (the raw stream) from where it can be processed and loaded into different destinations.
We recommend, as part of your collector setup, setting up the snowplow-s3-loader to sink this raw data to S3. This data is then available in the event that something goes wrong with your pipeline to be reprocessed at a later date.
To sink the raw data from Kinesis to S3, follow the instructions below:
To run the Snowplow S3 Loader, you must have installed the native LZO binaries. To do this on Ubuntu, run:
$ sudo apt-get install lzop liblzo2-dev
Option 1: download the executable jar file
See Hosted assets for the zipfile to download.
Option 2: compile from source
To do so, clone the Snowplow repo:
Code language: PHP (php)
$ git clone https://github.com/snowplow/snowplow-s3-loader.git
Navigate into the snowplow-s3-loader folder:
$ cd snowplow-s3-loader
sbt to resolve dependencies, compile the source, and build an assembled fat JAR file with all dependencies.
$ sbt assembly
jar file will be saved as
snowplow-s3-loader-0.6.0.jar in the
target/scala-2.11 subdirectory. It is now ready to be deployed.
The sink is configured using a HOCON file. These are the fields:
source: Choose kinesis or nsq as a source stream
sink: Choose between kinesis or nsq as a sink stream for failed events
aws.secretKey: Change these to your AWS credentials. You can alternatively leave them as “default”, in which case the DefaultAWSCredentialsProviderChain will be used.
kinesis.initialPosition: Where to start reading from the stream the first time the app is run. “TRIM_HORIZON” for as far back as possible, “LATEST” for as recent as possibly, “AT_TIMESTAMP” for after the specified timestamp.
kinesis.initialTimestamp: Timestamp for “AT_TIMESTAMP” initial position
kinesis.maxRecords: Maximum number of records to read per GetRecords call
kinesis.region: The Kinesis region name to use.
kinesis.appName: Unique identifier for the app which ensures that if it is stopped and restarted, it will restart at the correct location.
nsq.channelName: Channel name for NSQ source stream. If more than one application reading from the same NSQ topic at the same time, all of them must have unique channel name to be able to get all the data from the same topic.
nsq.host: Hostname for NSQ tools
nsq.port: HTTP port number for nsqd
nsq.lookupPort: HTTP port number for nsqlookupd
stream.inStreamName: The name of the input stream of the tool which you choose as a source. This should be the stream to which your are writing records with the Scala Stream Collector.
streams.outStreamName: The name of the output stream of the tool which you choose as sink. This is stream where records are sent if the compression process fails.
streams.buffer.byteLimit: Whenever the total size of the buffered records exceeds this number, they will all be sent to S3.
streams.buffer.recordLimit: Whenever the total number of buffered records exceeds this number, they will all be sent to S3.
streams.buffer.timeLimit: If this length of time passes without the buffer being flushed, the buffer will be flushed. Note: With NSQ streams, only record limit is taken into account. Other two option will be ignored.
s3.region: The AWS region for the S3 bucket
s3.bucket: The name of the S3 bucket in which files are to be stored
s3.format: The format the app should write to S3 in (
s3.maxTimeout: The maximum amount of time the app attempts to PUT to S3 before it will kill itself
You can also now include Snowplow Monitoring in the application. This is setup through a new section at the bottom of the config. You will need to ammend:
monitoring.snowplow.collectorUriinsert your snowplow collector URI here.
monitoring.snowplow.appIdthe app-id used in decorating the events sent.
If you do not wish to include Snowplow Monitoring, remove the entire
monitoring section from the config.
An example is available in the repo.
The Snowplow S3 Loader is a jarfile. Simply provide the configuration file as a parameter:
$ java -jar snowplow-s3-loader-0.6.0.jar --config my.conf
This will start the process of reading events from selected source, compressing them, and writing them to S3.