Getting started on Snowplow Open Source

  1. Home
  2. Docs
  3. Getting started on Snowplow Open Source
  4. Setup Snowplow Open Source on AWS
  5. Setup the Snowplow collector
  6. Configure the Scala Stream Collector

Configure the Scala Stream Collector

The Scala Stream Collector has a number of configuration options available.

Basic configuration

Template

Download a template configuration file from GitHub: config.hocon.sample.

Now open the config.hocon.sample file in your editor of choice.

Stream configuration

You will need to put in the names of your good and bad streams:

  • collector.streams.good: The name of the good input stream of the tool which you choose as a sink. This is stream where the events which have successfully been collected will be stored in.
  • collector.streams.bad: The name of the bad input stream of the tool which you choose as a sink. This is stream where bad rows will be stored in. The collector can currently produce two types of bad row. A size_violation bad row indicates that an event was too big to meet the stream limits for individual records — 1MB for Kinesis and 256KB for SQS. A generic_error bad row wraps errors thrown when the querystring of the request cannot be parsed, eg because of illegal characters.
  • collector.streams.useIpAddressAsPartitionKey: Whether to use the incoming event’s ip as the partition key for the good stream/topic. (This setting applies to the Kinesis sink but not to the SQS sink.)

HTTP settings

Also verify the settings of the HTTP service:

  • collector.interface
  • collector.port

Buffer settings

You will also need to set appropriate limits for:

  • collector.streams.buffer.byteLimit (max allowed for Kinesis is 1000000 and for SQS 256000)
  • collector.streams.buffer.recordLimit (max allowed for Kinesis is 500 and for SQS 10)
  • collector.streams.buffer.timeLimit

This buffer gets filled with events as they arrive at the collector and is flushed any time one of the thresholds above is reached. The byteLimit and recordLimit settings ensure that Kinesis and SQS limits for the size of write requests will be respected. There is no limitation on the value of timeLimit.

Additional configuration options

Sinks

The collector.streams.sink.enabled setting determines which of the supported sinks to write raw events to:

  • "kinesis" for writing Thrift-serialized records and error rows to a Kinesis stream
  • "kafka" for writing Thrift-serialized records and error rows to a Kafka topic
  • "googlepubsub" for writing Thrift-serialized records and error rows to a Google PubSub topic
  • "stdout" for writing Base64-encoded Thrift-serialized records and error rows to stdout and stderr respectively
  • "nsq" for writing Thrift-serialized records and error rows to NSQ topic
  • “sqs” for writing Thrift-serialized records and error rows to an SQS queue.

If you switch to "stdout", we recommend setting ‘akka.loglevel = OFF’ and ‘akka.loggers = []’ to prevent Akka debug information from polluting your event stream on stdout.

You should fill the rest of the collector.streams.sink section according to your selection as a sink.

To use stdout as a sink comment everything in the collector.streams.sink but collector.streams.sink.enabled which should be set to stdout.

For SQS, ensure that you configure a standard queue rather than a FIFO one.

Setting the P3P policy header

The P3P policy header is set in collector.p3p, and if not set, the P3P policy header defaults to:

policyRef="/w3c/p3p.xml", CP="NOI DSP COR NID PSA OUR IND COM NAV STA"
Code language: JavaScript (javascript)

Setting the domain name

Set the cookie name using the collector.cookie.name setting. To maintain backward compatibility with earlier versions of the collector, use the string “sp” as the cookie name.

The collector responds to valid requests with a Set-Cookie header, which may or may not specify a domain for the cookie.

If no domain is specified, the cookie will be set against the full collector domain, for example collector.snplow.com. That will mean that applications running elsewhere on *.snplow.com won’t be able to access it. If you don’t need to grant access to the cookie from other applications on the domain, then you can ignore the domains and fallbackDomain settings.

In earlier versions, you could specify a domain to tie the cookie to. For example, if set to .snplow.com, the cookie would have been accessible to other applications running on *.snplow.com. To do the same in this version, use the fallbackDomain setting but make sure that you no longer include a leading dot:

fallbackDomain = "snplow.com"
Code language: JavaScript (javascript)

The cookie set by the collector can be treated differently by browsers, depending on whether it’s considered to be a first-party or a third-party cookie. In earlier versions (0.15.0 and earlier), if you had two collector endpoints, one on collector.snplow.com and one on collector.snplow.net, you could only specify one of those domains in the configuration. That meant that you were only able to set a first-party cookie server-side on either .snplow.com or .snplow.net, but not on both. From version 0.16.0, you can specify a list of domains to be used for the cookie (note the lack of a leading dot):

domains = [ "snplow.com" "snplow.net" ]
Code language: JavaScript (javascript)

Which domain to be used in the Set-Cookie header is determined by matching the domains from the Origin header of the request to the specified list. The first match is used. If no matches are found, the fallback domain will be used, if configured. If no fallbackDomain is configured, the cookie will be tied to the full collector domain.

If you specify a main domain in the list, all subdomains on it will be matched. If you specify a subdomain, only that subdomain will be matched.

Examples:

  • domain.com will match Origin headers like domain.com, www.domain.com and secure.client.domain.com
  • client.domain.com will match an Origin header like secure.client.domain.com but not domain.com or www.domain.com.

Disabling cookies

Setting a cookie from the collector can be disabled altogether in the configuration file:

cookie { enabled = false ... }
Code language: JavaScript (javascript)

Regardless of this setting, if the collector receives a request with the custom SP-Anonymous:* header, no cookie will be set. You can control whether this header is set or not in your tracking implementation.

Configuring Secure, HttpOnly and SameSite attributes for the cookie

You can control the values of those attributes in the Set-Cookie response header by specifying them in collector.cookie:

secure = true # set to true if you want to enforce secure connections httpOnly = false # set to true if you want to make the cookie inaccessible to non-HTTP requests sameSite = "None" # or "Lax", or "Strict"
Code language: PHP (php)

The sameSite parameter is optional. If you omit it, the default value None will be assumed.

Setting the cookie duration

The cookie expiration duration is set in collector.cookie.expiration. If no value is provided, cookies set the default to expire after one year (i.e. 365 days). If you don’t want to set a third party cookie at all it could be disabled by setting collector.cookie.enabled to false. Alternatively, it could be achieved if collector.cookie.expiration is set to 0 (from version 0.4.0).

Configuring custom paths

The collector responds with a cookie to requests with a path that matches the vendor/version protocol. The expected values are:

  • com.snowplowanalytics.snowplow/tp2 for Tracker Protocol 2
  • r/tp2 for redirects
  • com.snowplowanalytics.iglu/v1 for the Iglu Webhook.

You can also map any valid (ie, two-segment) path to one of the three defaults via the collector.paths section of the configuration file. Your custom path must be the key and the value must be one of the corresponding default paths. Both must be full valid paths starting with a leading slash:

paths { "/com.acme/track" = "/com.snowplowanalytics.snowplow/tp2" "/com.acme/redirect" = "/r/tp2" "/com.acme/iglu" = "/com.snowplowanalytics.iglu/v1" }
Code language: JavaScript (javascript)

TLS port binding and certificate

As an additional security measure it is now possible to terminate TLS/SSL connection directly within scala-stream-collector using a battle-tested Lightbend SSL Config. We have introduced several new configuration parameters in order to accommodate different workflows and configurations. There are two configuration sections that can be overriden in order to achieve the expected workflow: collector.ssl and ssl-config.

The former is a high-level section that allows:

  • collector.ssl.enable – turn on ssl termination
  • collector.ssl.redirect – whether automatic upgrade from http to https should be performed
  • collector.ssl.port – port on which TLS/SSL server should be started

The latter allows for low-level TLS/SSL configuration exposed by Lightbend SSL Config. ​ For example to start up an ssl-enabled, auto-upgrade server, following config can be used:

ssl {
  enable = true
  redirect = true
  port = 443
}

However, this configuration will use environment-defined JVM-attached certificates. In order to override the default behaviour and use a custom certificate, the low-level section can be defined as:

ssl-config {
  keyManager = {
    stores = [
      {type = "PKCS12", classpath = false, path = ${CERT_FILE}, password = "pass" }
    ]
  }
}

Custom & default redirect paths

Changelog

v2.0.0 – Default redirect path set to false by default (enableDefaultRedirect = false)

v0.17.0 (R117) – Allowed users to disable the default redirect endpoint r/tp2 (enableDefaultRedirect)

v0.16.0 (R116) – Added the option to map the default redirect path r/tp2 to custom values e.g. randomstring1/randomstring2

v0.6.0 (R78) – Added click redirect mode

From v2.0.0 onwards, the default redirect path is disabled by default (enableDefaultRedirect = false). Prior to this release it was set to true by default.

If you wish to enable the default redirect path /r/tp2, then you would need to set enableDefaultRedirect = true in your config.hocon file. Alternatively, you can set the default endpoint to false (disabled) and instead set up a custom user-defined url for redirects. For example, the following configuration will only allow redirects for the custom-defined /com.acme/redirect-me endpoint, whereas the default /r/tp2 will not be available.

enableDefaultRedirect = false
paths {
  "/com.acme/redirect-me" = "/r/tp2"
}

Setting up an SQS buffer (2.0.0+)

The lack of auto-scaling in Kinesis results in throttled streams in case of traffic spikes and Stream Collector starts accumulating events to retry them later. If accumulation continues long enough, Stream Collector will run out of memory. To prevent the possibility of a broken collector, we decided to make it possible to configure an SQS buffer which can provide additional assurance during extreme traffic spikes.

SQS is used to queue any message that Stream Collector failed to send to the Kinesis and the sqs2kinesis application is then responsible for reading the messages from SQS and writing to Kinesis once it is ready. In the event of any AWS API glitches, there is a retry mechanism which retries sending the SQS queue 10 times.

The keys set up for the Kinesis stream are stored as SQS message attributes in order to preserve the information. Note, the SQS messages cannot be as big as Kinesis messages. The limit is 256kB per message, but we send the messages as Base64 encoded, so the limit goes down to 192kB for the original message.

Setting up the SQS queues

(This section only applies to the case when SQS is used as a fallback sink when Kinesis is unavailable. If you are using SQS as the primary sink, then the settings below should be ignored and the good and bad streams should be configured as normal under streams.good and streams.bad respectively.)

To start using this feature, you will first need to set up the SQS queues. Two separate queues are required for good (raw) events and bad events. The Collector then needs to be informed about the queue names, and this can be done by adding these as entries to config.hocon:

sqsGoodBuffer = {good-sqs-queue-url} sqsBadBuffer = {bad-sqs-queue-url}

Setting up the sqs2kinesis application

(This application can be used to move data from SQS to Kinesis, regardless of whether SQS is being used as a fallback or primary sink.)

The application (sqs2kinesis) is used to read messages from the SQS queue and send them to the Kinesis stream using a back-pressured mechanism. The sqs2kinesis application deals with decoding messages and setting up the appropriate keys for the Kinesis stream.

It can be found on Docker Hub.

To configure the app, either set the environment variables SQS_QUEUE, KINESIS_STREAM_NAME and SENTRY_DSN, or create application.conf with:

sqs2kinesis { sqs-queue = ... kinesis-stream-name = ... sentry-dsn = ... }

and specify the path of the file by adding -Dconfig.file=path/to/config-file to the JAVA options.