Getting started on Snowplow Open Source

  1. Home
  2. Docs
  3. Getting started on Snowplow Open Source
  4. Setup Snowplow Open Source on AWS
  5. Setup the Snowplow collector
  6. Configure the Scala Stream Collector

Configure the Scala Stream Collector

The Scala Stream Collector has a number of configuration options available.

Basic configuration

Template

Download a template configuration file from GitHub: config.hocon.sample.

Now open the config.hocon.sample file in your editor of choice.

Stream configuration

You will need to put in the names of your good and bad streams:

  • collector.streams.good: The name of the good input stream of the tool which you choose as a sink. This is stream where the events which have successfully been collected will be stored in.
  • collector.streams.bad: The name of the bad input stream of the tool which you choose as a sink. This is stream where the events that are too big (w.r.t Kinesis 1MB limit) will be stored in.
  • collector.streams.useIpAddressAsPartitionKey: Whether to use the incoming event’s ip as the partition key for the good stream/topic

HTTP settings

Also verify the settings of the HTTP service:

  • collector.interface
  • collector.port

Buffer settings

You will also need to set appropriate limits for:

  • collector.streams.buffer.byteLimit
  • collector.streams.buffer.recordLimit
  • collector.streams.buffer.timeLimit

Additional configuration options

Sinks

The collector.streams.sink.enabled setting determines which of the supported sinks to write raw events to:

  • "kinesis" for writing Thrift-serialized records and error rows to a Kinesis stream
  • "kafka" for writing Thrift-serialized records and error rows to a Kafka topic
  • "googlepubsub" for writing Thrift-serialized records and error rows to a Google PubSub topic
  • "stdout" for writing Base64-encoded Thrift-serialized records and error rows to stdout and stderr respectively
  • "nsq" for writing Thrift-serialized records and error rows to NSQ topic

If you switch to "stdout", we recommend setting ‘akka.loglevel = OFF’ and ‘akka.loggers = []’ to prevent Akka debug information from polluting your event stream on stdout.

You should fill the rest of the collector.streams.sink section according to your selection as a sink.

To use stdout as a sink comment everything in the collector.streams.sink but collector.streams.sink.enabled which should be set to stdout.

Setting the P3P policy header

The P3P policy header is set in collector.p3p, and if not set, the P3P policy header defaults to:

policyRef="/w3c/p3p.xml", CP="NOI DSP COR NID PSA OUR IND COM NAV STA"

Setting the domain name

Set the cookie name using the collector.cookie.name setting. To maintain backward compatibility with earlier versions of the collector, use the string “sp” as the cookie name.

The collector responds to valid requests with a Set-Cookie header, which may or may not specify a domain for the cookie.

If no domain is specified, the cookie will be set against the full collector domain, for example collector.snplow.com. That will mean that applications running elsewhere on *.snplow.com won’t be able to access it. If you don’t need to grant access to the cookie from other applications on the domain, then you can ignore the domains and fallbackDomain settings.

In earlier versions, you could specify a domain to tie the cookie to. For example, if set to .snplow.com, the cookie would have been accessible to other applications running on *.snplow.com. To do the same in this version, use the fallbackDomain setting but make sure that you no longer include a leading dot:

fallbackDomain = "snplow.com"

The cookie set by the collector can be treated differently by browsers, depending on whether it’s considered to be a first-party or a third-party cookie. Up until now, if you had two collector endpoints, one on collector.snplow.com and one on collector.snplow.net, you could only specify one of those domains in the configuration. That meant that you were only able to set a first-party cookie server-side on either .snplow.com or .snplow.net, but not on both. From version 0.16.0, you can specify a list of domains to be used for the cookie (note the lack of a leading dot):

domains = [ "snplow.com" "snplow.net" ]

Which domain to be used in the Set-Cookie header is determined by matching the domains from the Origin header of the request to the specified list. The first match is used. If no matches are found, the fallback domain will be used, if configured. If no fallbackDomain is configured, the cookie will be tied to the full collector domain.

If you specify a main domain in the list, all subdomains on it will be matched. If you specify a subdomain, only that subdomain will be matched.

Examples:

  • domain.com will match Origin headers like domain.com, www.domain.com and secure.client.domain.com
  • client.domain.com will match an Origin header like secure.client.domain.com but not domain.com or www.domain.com.

Configuring Secure, HttpOnly and SameSite attributes for the cookie

You can control the values of those attributes in the Set-Cookie response header by specifying them in collector.cookie:

secure = true # set to true if you want to enforce secure connections httpOnly = false # set to true if you want to make the cookie inaccessible to non-HTTP requests sameSite = "None" # or "Lax", or "Strict"

The sameSite parameter is optional. If you omit it, the default value None will be assumed.

Setting the cookie duration

The cookie expiration duration is set in collector.cookie.expiration. If no value is provided, cookies set the default to expire after one year (i.e. 365 days). If you don’t want to set a third party cookie at all it could be disabled by setting collector.cookie.enabled to false. Alternatively, it could be achieved if collector.cookie.expiration is set to 0 (from version 0.4.0).

Configuring custom paths

The collector responds with a cookie to requests with a path that matches the vendor/version protocol. The expected values are:

  • com.snowplowanalytics.snowplow/tp2 for Tracker Protocol 2
  • r/tp2 for redirects
  • com.snowplowanalytics.iglu/v1 for the Iglu Webhook.

You can also map any valid (ie, two-segment) path to one of the three defaults via the collector.paths section of the configuration file. Your custom path must be the key and the value must be one of the corresponding default paths. Both must be full valid paths starting with a leading slash:

paths { "/com.acme/track" = "/com.snowplowanalytics.snowplow/tp2" "/com.acme/redirect" = "/r/tp2" "/com.acme/iglu" = "/com.snowplowanalytics.iglu/v1" }

TLS port binding and certificate

As an additional security measure it is now possible to terminate TLS/SSL connection directly within scala-stream-collector using a battle-tested Lightbend SSL Config. We have introduced several new configuration parameters in order to accommodate different workflows and configurations. There are two configuration sections that can be overriden in order to achieve the expected workflow: collector.ssl and ssl-config.

The former is a high-level section that allows:

  • collector.ssl.enable – turn on ssl termination
  • collector.ssl.redirect – whether automatic upgrade from http to https should be performed
  • collector.ssl.port – port on which TLS/SSL server should be started

The latter allows for low-level TLS/SSL configuration exposed by Lightbend SSL Config. ​ For example to start up an ssl-enabled, auto-upgrade server, following config can be used:

ssl {
  enable = true
  redirect = true
  port = 443
}

However, this configuration will use environment-defined JVM-attached certificates. In order to override the default behaviour and use a custom certificate, the low-level section can be defined as:

ssl-config {
  keyManager = {
    stores = [
      {type = "PKCS12", classpath = false, path = ${CERT_FILE}, password = "pass" }
    ]
  }
}

Custom & default redirect paths

Changelog

v2.0.0 – Default redirect path set to false by default (enableDefaultRedirect = false)

v0.17.0 (R117) – Allowed users to disable the default redirect endpoint r/tp2 (enableDefaultRedirect)

v0.16.0 (R116) – Added the option to map the default redirect path r/tp2 to custom values e.g. randomstring1/randomstring2

v0.6.0 (R78) – Added click redirect mode

From v2.0.0 onwards, the default redirect path is disabled by default (enableDefaultRedirect = false). Prior to this release it was set to true by default.

If you wish to enable the default redirect path /r/tp2, then you would need to set enableDefaultRedirect = true in your config.hocon file. Alternatively, you can set the default endpoint to false (disabled) and instead set up a custom user-defined url for redirects. For example, the following configuration will only allow redirects for the custom-defined /com.acme/redirect-me endpoint, whereas the default /r/tp2 will not be available.

enableDefaultRedirect = false
paths {
  "/com.acme/redirect-me" = "/r/tp2"
}

Setting up an SQS buffer (2.0.0+)

The lack of auto-scaling in Kinesis results in throttled streams in case of traffic spikes and Stream Collector starts accumulating events to retry them later. If accumulation continues long enough, Stream Collector will run out of memory. To prevent the possibility of a broken collector, we decided to make it possible to configure an SQS buffer which can provide additional assurance during extreme traffic spikes.

SQS is used to queue any message that Stream Collector failed to send to the Kinesis and the sqs2kinesis application is then responsible for reading the messages from SQS and writing to Kinesis once it is ready. In the event of any AWS API glitches, there is a retry mechanism which retries sending the SQS queue 10 times.

The keys set up for the Kinesis stream are stored as SQS message attributes in order to preserve the information. Note, the SQS messages cannot be as big as Kinesis messages. The limit is 256kB per message, but we send the messages as Base64 encoded, so the limit goes down to 192kB for the original message.

Setting up the SQS queues

To start using this feature, you will first need to set up the SQS queues. Two separate queues are required for good (raw) events and bad events. The Collector then needs to be informed about the queue names, and this can be done by adding these as entries to config.hocon:

sqsGoodBuffer = {good-sqs-queue-url} sqsBadBuffer = {bad-sqs-queue-url}

Setting up the sqs2kinesis application

The application (sqs2kinesis) is used to read messages from the SQS queue and send them to the Kinesis stream using a back-pressured mechanism. The sqs2kinesis application deals with decoding messages and setting up the appropriate keys for the Kinesis stream.

It can be found on Docker Hub.

To configure the app, either set the environment variables SQS_QUEUE, KINESIS_STREAM_NAME and SENTRY_DSN, or create application.conf with:

sqs2kinesis { sqs-queue = ... kinesis-stream-name = ... sentry-dsn = ... }

and specify the path of the file by adding -Dconfig.file=path/to/config-file to the JAVA options.