Pipeline Components and Applications

  1. Home
  2. Docs
  3. Pipeline Components and Applications
  4. Stream Collector
  5. Configure the Stream Collector

Configure the Stream Collector

The Stream Collector has a number of configuration options available.

Basic configuration

Template

Download a template configuration file from GitHub: config.hocon.sample.

Now open the config.hocon.sample file in your editor of choice.

Stream configuration

You will need to put in the names of your good and bad streams:

  • collector.streams.good: The name of the good input stream of the tool which you choose as a sink. This is stream where the events which have successfully been collected will be stored in.
  • collector.streams.bad: The name of the bad input stream of the tool which you choose as a sink. This is stream where bad rows will be stored in. The collector can currently produce two types of bad row. A size_violation bad row indicates that an event was too big to meet the stream limits for individual records — 1MB for Kinesis and 256KB for SQS. A generic_error bad row wraps errors thrown when the querystring of the request cannot be parsed, eg because of illegal characters.
  • collector.streams.useIpAddressAsPartitionKey: Whether to use the incoming event’s ip as the partition key for the good stream/topic. (This setting applies to the Kinesis sink but not to the SQS sink.)

HTTP settings

Also verify the settings of the HTTP service:

  • collector.interface
  • collector.port

Buffer settings

You will also need to set appropriate limits for:

  • collector.streams.buffer.byteLimit (max allowed for Kinesis is 1000000 and for SQS 256000)
  • collector.streams.buffer.recordLimit (max allowed for Kinesis is 500 and for SQS 10)
  • collector.streams.buffer.timeLimit

This buffer gets filled with events as they arrive at the collector and is flushed any time one of the thresholds above is reached. The byteLimit and recordLimit settings ensure that Kinesis and SQS limits for the size of write requests will be respected. There is no limitation on the value of timeLimit.

Additional configuration options

Sinks

The collector.streams.sink.enabled setting determines which of the supported sinks to write raw events to:

  • "kinesis" for writing Thrift-serialized records and error rows to a Kinesis stream
  • "kafka" for writing Thrift-serialized records and error rows to a Kafka topic
  • "googlepubsub" for writing Thrift-serialized records and error rows to a Google PubSub topic
  • "stdout" for writing Base64-encoded Thrift-serialized records and error rows to stdout and stderr respectively
  • "nsq" for writing Thrift-serialized records and error rows to NSQ topic
  • “sqs” for writing Thrift-serialized records and error rows to an SQS queue.

If you switch to "stdout", we recommend setting ‘akka.loglevel = OFF’ and ‘akka.loggers = []’ to prevent Akka debug information from polluting your event stream on stdout.

You should fill the rest of the collector.streams.sink section according to your selection as a sink.

To use stdout as a sink comment everything in the collector.streams.sink but collector.streams.sink.enabled which should be set to stdout.

For SQS, ensure that you configure a standard queue rather than a FIFO one.

Setting the P3P policy header

The P3P policy header is set in collector.p3p, and if not set, the P3P policy header defaults to:

policyRef="/w3c/p3p.xml", CP="NOI DSP COR NID PSA OUR IND COM NAV STA"
Code language: JavaScript (javascript)

Setting the domain name

Set the cookie name using the collector.cookie.name setting. To maintain backward compatibility with earlier versions of the collector, use the string “sp” as the cookie name.

The collector responds to valid requests with a Set-Cookie header, which may or may not specify a domain for the cookie.

If no domain is specified, the cookie will be set against the full collector domain, for example collector.snplow.com. That will mean that applications running elsewhere on *.snplow.com won’t be able to access it. If you don’t need to grant access to the cookie from other applications on the domain, then you can ignore the domains and fallbackDomain settings.

In earlier versions, you could specify a domain to tie the cookie to. For example, if set to .snplow.com, the cookie would have been accessible to other applications running on *.snplow.com. To do the same in this version, use the fallbackDomain setting but make sure that you no longer include a leading dot:

fallbackDomain = "snplow.com"
Code language: JavaScript (javascript)

The cookie set by the collector can be treated differently by browsers, depending on whether it’s considered to be a first-party or a third-party cookie. In earlier versions (0.15.0 and earlier), if you had two collector endpoints, one on collector.snplow.com and one on collector.snplow.net, you could only specify one of those domains in the configuration. That meant that you were only able to set a first-party cookie server-side on either .snplow.com or .snplow.net, but not on both. From version 0.16.0, you can specify a list of domains to be used for the cookie (note the lack of a leading dot):

domains = [ "snplow.com" "snplow.net" ]
Code language: JavaScript (javascript)

Which domain to be used in the Set-Cookie header is determined by matching the domains from the Origin header of the request to the specified list. The first match is used. If no matches are found, the fallback domain will be used, if configured. If no fallbackDomain is configured, the cookie will be tied to the full collector domain.

If you specify a main domain in the list, all subdomains on it will be matched. If you specify a subdomain, only that subdomain will be matched.

Examples:

  • domain.com will match Origin headers like domain.com, www.domain.com and secure.client.domain.com
  • client.domain.com will match an Origin header like secure.client.domain.com but not domain.com or www.domain.com.

Disabling cookies

Setting a cookie from the collector can be disabled altogether in the configuration file:

cookie { enabled = false ... }
Code language: JavaScript (javascript)

Regardless of this setting, if the collector receives a request with the custom SP-Anonymous:* header, no cookie will be set. You can control whether this header is set or not in your tracking implementation.

Configuring Secure, HttpOnly and SameSite attributes for the cookie

You can control the values of those attributes in the Set-Cookie response header by specifying them in collector.cookie:

secure = true # set to true if you want to enforce secure connections httpOnly = false # set to true if you want to make the cookie inaccessible to non-HTTP requests sameSite = "None" # or "Lax", or "Strict"
Code language: PHP (php)

The sameSite parameter is optional. If you omit it, the default value None will be assumed.

Setting the cookie duration

The cookie expiration duration is set in collector.cookie.expiration. If no value is provided, cookies set the default to expire after one year (i.e. 365 days). If you don’t want to set a third party cookie at all it could be disabled by setting collector.cookie.enabled to false. Alternatively, it could be achieved if collector.cookie.expiration is set to 0 (from version 0.4.0).

Configuring custom paths

The collector responds with a cookie to requests with a path that matches the vendor/version protocol. The expected values are:

  • com.snowplowanalytics.snowplow/tp2 for Tracker Protocol 2
  • r/tp2 for redirects
  • com.snowplowanalytics.iglu/v1 for the Iglu Webhook.

You can also map any valid (ie, two-segment) path to one of the three defaults via the collector.paths section of the configuration file. Your custom path must be the key and the value must be one of the corresponding default paths. Both must be full valid paths starting with a leading slash:

paths { "/com.acme/track" = "/com.snowplowanalytics.snowplow/tp2" "/com.acme/redirect" = "/r/tp2" "/com.acme/iglu" = "/com.snowplowanalytics.iglu/v1" }
Code language: JavaScript (javascript)

TLS port binding and certificate (pre 2.4.0)

As an additional security measure, it is now possible to terminate TLS/SSL connection directly within scala-stream-collector using a battle-tested Lightbend SSL Config. We have introduced several new configuration parameters in order to accommodate different workflows and configurations. There are two configuration sections that can be overridden in order to achieve the expected workflow: collector.ssl and ssl-config.

The former is a high-level section that allows:

  • collector.ssl.enable – turn on ssl termination
  • collector.ssl.redirect – whether automatic upgrade from http to https should be performed
  • collector.ssl.port – port on which TLS/SSL server should be started

The latter allows for low-level TLS/SSL configuration exposed by Lightbend SSL Config. ​ For example, to start up an SSL-enabled, auto-upgrade server, the following config can be used:

ssl {
  enable = true
  redirect = true
  port = 443
}

However, this configuration will use environment-defined JVM-attached certificates. In order to override the default behavior and use a custom certificate, the low-level section can be defined as:

ssl-config {
  keyManager = {
    stores = [
      {type = "PKCS12", classpath = false, path = ${CERT_FILE}, password = "pass" }
    ]
  }
}


TLS port binding and certificate (2.4.0+)

Since 2.4.0 certificates were removed from “akka” section of configuration to the JVM system parameters. The “Customizing JSSE” section in Java 11 JSSE reference documentation explains all system properties in detail. SSL configuration block is still required.

ssl {
  enable = true
  redirect = true
  port = 443
}

Following JVM properties are the ones to be used most of the time.

System PropertyCustomized ItemDefaultNotes
javax.net.ssl.keyStoreDefault keystore; see Customizing the Default Keystores and Truststores, Store Types, and Store PasswordsNone 
javax.net.ssl.keyStorePasswordDefault keystore password; see Customizing the Default Keystores and Truststores, Store Types, and Store PasswordsNoneIt is inadvisable to specify the password in a way that exposes it to discovery by other users. 

Password can not be empty.
javax.net.ssl.keyStoreTypeDefault keystore type; see Customizing the Default Keystores and Truststores, Store Types, and Store PasswordsPKCS12  
jdk.tls.server.cipherSuitesServer-side default enabled cipher suites. See Specifying Default Enabled Cipher SuitesSee SunJSSE Cipher Suites to determine which cipher suites are enabled by defaultCaution: These system properties can be used to configure weak cipher suites, or the configured cipher suites may be weak in the future. It is not recommended that you use these system properties without understanding the risks.
jdk.tls.server.protocolsDefault handshaking protocols for TLS/DTLS servers. See The SunJSSE ProviderNoneTo configure the default enabled protocol suite in the server-side of a SunJSSE provider, specify the protocols in a comma-separated list within quotation marks. The protocols in this list are standard SSL protocol names as described in Java Security Standard Algorithm Names. Note that this System Property impacts only the default protocol suite (SSLContext of the algorithms SSL, TLS and DTLS). If an application uses a version-specific SSLContext (SSLv3, TLSv1, TLSv1.1, TLSv1.2, TLSv1.3, DTLSv1.0, or DTLSv1.2), or sets the enabled protocol version explicitly, this System Property has no impact.

Setting up an SQS buffer (2.0.0+)

The lack of auto-scaling in Kinesis results in throttled streams in case of traffic spikes and Stream Collector starts accumulating events to retry them later. If accumulation continues long enough, Stream Collector will run out of memory. To prevent the possibility of a broken collector, we decided to make it possible to configure an SQS buffer that can provide additional assurance during extreme traffic spikes.

SQS is used to queue any message that Stream Collector failed to send to the Kinesis and the sqs2kinesis application is then responsible for reading the messages from SQS and writing to Kinesis once it is ready. In the event of any AWS API glitches, there is a retry mechanism which retries sending the SQS queue 10 times.

The keys set up for the Kinesis stream are stored as SQS message attributes in order to preserve the information. Note, the SQS messages cannot be as big as Kinesis messages. The limit is 256kB per message, but we send the messages as Base64 encoded, so the limit goes down to 192kB for the original message.

Setting up the SQS queues

(This section only applies to the case when SQS is used as a fallback sink when Kinesis is unavailable. If you are using SQS as the primary sink, then the settings below should be ignored and the good and bad streams should be configured as normal under streams.good and streams.bad respectively.)

To start using this feature, you will first need to set up the SQS queues. Two separate queues are required for good (raw) events and bad events. The Collector then needs to be informed about the queue names, and this can be done by adding these as entries to config.hocon:

sqsGoodBuffer = {good-sqs-queue-url} sqsBadBuffer = {bad-sqs-queue-url}

Telemetry

Starting with version 2.4.0 of the collector snowplow will be collecting the heartbeats with some meta-information about the application. This is an opt-out feature, meaning that it has to be explicitly disabled to stop it. Schema is available here.

At the base, telemetry is sending the application name and version every hour. This is done to help us to improve the product, we need to understand what is popular, so we could focus our development effort in the right place. You can help us by providing userProvidedId in the config file.

telemetry {
    userProvidedId = myCompany
 }

Put the following entry into your configuration file to disable the telemetry.

telemetry {
    disable = true
 }