The Scala Stream Collector is a Snowplow event collector for Snowplow, written in Scala. The Scala Stream Collector allows near-real time processing (Enrichment, Storage, Analytics) of a Snowplow raw event stream.
The Scala Stream Collector receives raw Snowplow events over HTTP, serializes them to a Thrift record format, and then writes them to a sink. Currently supported sinks are:
- Amazon Kinesis
- Google PubSub
- Apache Kafka
- NSQ
stdout
for a custom stream collection process
Like the Clojure Collector, the Scala Stream Collector supports cross-domain Snowplow deployments, setting a user_id
(used to identify unique visitors) server side to reliably identify the same user across domains.
Setup
Setup guides for the Scala Stream collector can be found in the respective AWS and GCP setup guides.
How it works
User identification
The Scala Stream Collector allows the use of a third-party cookie, making user tracking across domains possible. The CloudFront Collector does not support cross domain tracking of users because user ids are set client-side, whereas the Scala Stream Collector sets them server-side.
In a nutshell: the Scala Stream Collector receives events from a tracker, sets/updates a third-party user tracking cookie, and returns the pixel to the client. The ID in this third-party user tracking cookie is stored in the network_userid
field in Snowplow events.
In pseudocode terms:
if (request contains an "sp" cookie) {
Record that cookie as the user identifier
Set that cookie with a now+1 year cookie expiry
Add the headers and payload to the output array
} else {
Set the "sp" cookie with a now+1 year cookie expiry
Add the headers and payload to the output array
}
Code language: JavaScript (javascript)
Collector logging formats
The Scala Stream collector produces streams of Snowplow events (records). The data (payload) is serialized by utilizing Apache Thrift framefork.
Binary serialization allows for:
- simpler data structure
- smaller size
- faster transfer
- easier (programmatical) parsing
The Snowplow Thrift raw event format conforms to this Thrift schema. For easier perception, the structure of the collector payload is depicted below.
struct CollectorPayload {
31337: string schema
// Required fields which are intrinsic properties of HTTP
100: string ipAddress
// Required fields which are Snowplow-specific
200: i64 timestamp
210: string encoding
220: string collector
// Optional fields which are intrinsic properties of HTTP
300: optional string userAgent
310: optional string refererUri
320: optional string path
330: optional string querystring
340: optional string body
350: optional list<string> headers
360: optional string contentType
// Optional fields which are Snowplow-specific
400: optional string hostname
410: optional string networkUserId
}
Code language: PHP (php)
It’s important to note that we built stream data processing on the idea of a Lambda architecture which implies both a speed (real-time) layer and a batch layer. As a result we provide two consumers on AWS; Stream Enrich and Snowplow S3 Loader.
Due to their nature (purpose):
- Stream Enrich reads raw Snowplow events off a Kinesis stream and writes the enriched Snowplow event to another Kinesis stream
- Snowplow S3 Loader reads records from an Amazon Kinesis stream, encodes and wraps them into Thrift by means of ElephantBird library, compresses the data using splittable LZO or GZIP , and writes them to S3
The output of Snowplow S3 Loader is a projection of raw event data (serialized Thrift records, not enriched) in the form of a compressed LZO file.
Each .lzo
file has a corresponding .lzo.index
file containing the byte offsets for the LZO blocks, so that the blocks can be processed in parallel using Spark.
Generally, the LZO file generated by Snowplow S3 Loader could be depicted as an “onion-like” layered object as shown below.

The main characteristics of stream-based raw events:
- A serialized Thrift record format.
- Support both
GET
andPOST
requests. - Support
network_userid
.
Technical architecture
The Scala Stream Collector is built on top of akka-http.