Pipeline Components and Applications

  1. Home
  2. Docs
  3. Pipeline Components and Applications
  4. Scala Stream Collector

Scala Stream Collector

The Scala Stream Collector is a Snowplow event collector for Snowplow, written in Scala. The Scala Stream Collector allows near-real time processing (Enrichment, Storage, Analytics) of a Snowplow raw event stream.

The Scala Stream Collector receives raw Snowplow events over HTTP, serializes them to a Thrift record format, and then writes them to a sink. Currently supported sinks are:

  1. Amazon Kinesis
  2. Google PubSub
  3. Apache Kafka
  4. NSQ
  5. stdout for a custom stream collection process

Like the Clojure Collector, the Scala Stream Collector supports cross-domain Snowplow deployments, setting a user_id (used to identify unique visitors) server side to reliably identify the same user across domains.

Setup

Setup guides for the Scala Stream collector can be found in the respective AWS and GCP setup guides.

How it works

User identification

The Scala Stream Collector allows the use of a third-party cookie, making user tracking across domains possible. The CloudFront Collector does not support cross domain tracking of users because user ids are set client-side, whereas the Scala Stream Collector sets them server-side.

In a nutshell: the Scala Stream Collector receives events from a tracker, sets/updates a third-party user tracking cookie, and returns the pixel to the client. The ID in this third-party user tracking cookie is stored in the network_userid field in Snowplow events.

In pseudocode terms:

if (request contains an "sp" cookie) { Record that cookie as the user identifier Set that cookie with a now+1 year cookie expiry Add the headers and payload to the output array } else { Set the "sp" cookie with a now+1 year cookie expiry Add the headers and payload to the output array }

Collector logging formats

The Scala Stream collector produces streams of Snowplow events (records). The data (payload) is serialized by utilizing Apache Thrift framefork.

Binary serialization allows for:

  • simpler data structure
  • smaller size
  • faster transfer
  • easier (programmatical) parsing

The Snowplow Thrift raw event format conforms to this Thrift schema. For easier perception, the structure of the collector payload is depicted below.

struct CollectorPayload { 31337: string schema // Required fields which are intrinsic properties of HTTP 100: string ipAddress // Required fields which are Snowplow-specific 200: i64 timestamp 210: string encoding 220: string collector // Optional fields which are intrinsic properties of HTTP 300: optional string userAgent 310: optional string refererUri 320: optional string path 330: optional string querystring 340: optional string body 350: optional list<string> headers 360: optional string contentType // Optional fields which are Snowplow-specific 400: optional string hostname 410: optional string networkUserId }

It’s important to note that we built stream data processing on the idea of a Lambda architecture which implies both a speed (real-time) layer and a batch layer. As a result we provide two consumers on AWS; Stream Enrich and Snowplow S3 Loader.

Due to their nature (purpose):

  • Stream Enrich reads raw Snowplow events off a Kinesis stream and writes the enriched Snowplow event to another Kinesis stream
  • Snowplow S3 Loader reads records from an Amazon Kinesis stream, encodes and wraps them into Thrift by means of ElephantBird librarycompresses the data using splittable LZO or GZIP , and writes them to S3

The output of Snowplow S3 Loader is a projection of raw event data (serialized Thrift records, not enriched) in the form of a compressed LZO file.

Each .lzo file has a corresponding .lzo.index file containing the byte offsets for the LZO blocks, so that the blocks can be processed in parallel using Spark.

Generally, the LZO file generated by Snowplow S3 Loader could be depicted as an “onion-like” layered object as shown below.

The main characteristics of stream-based raw events:

  1. A serialized Thrift record format.
  2. Support both GET and POST requests.
  3. Support network_userid.

Technical architecture

The Scala Stream Collector is built on top of akka-http.

GitHub repository