Pipeline Components and Applications

  1. Home
  2. Docs
  3. Pipeline Components and Applications
  4. Stream Collector
  5. Configure the Stream Collector

Configure the Stream Collector

This is a complete list of the options that can be configured in the collector HOCON config file. The example configs in github show how to prepare an input file. Some features are described in more detail at the bottom of this page.

Common options

collector.interfaceRequired. E.g. 0.0.0.0. The collector listens for http requests on this interface.
collector.portRequired. The collector listens for http requests on this port.
collector.ssl.enableOptional, default is false. The collector will also listen for https requests on a different port.
collector.ssl.portOptional, default 443. The port on which to listen for https requests
collector.ssl.redirectOptional, default false. If enabled, the collector redirects http requests to the https endpoint using a 301 status code
collector.pathsOptional. There are more details about this feature below. This is for customising the collector’s endpoints. You can also map any valid (ie, two-segment) path to one of the three default paths.
collector.p3p.policyRefOptional, defaults to /w3c/p3p.xml. Configures the p3p http header.
collector.p3p.CPOptiona,l, defaults to NOI DSP COR NID PSA OUR IND COM NAV STA. Configures the p3p http header.
collector.crossDomain.enabledOptional, default is false. If enabled, the /crossdomain.xml endpoint returns a cross domain policy file.
collector.crossDomain.domainsOptional, default [*] meaning the cross domain policy file allows all domains. You could change this to a list of domains.
collector.crossDomain.secureOptional, default true. Configures whether the cross domain policy file grants access to just HTTPS or both HTTP and HTTPS sources
collector.cookie.enabledOptional, default true. The collector sets a cookie to set the user’s network user id. Change this to false to disable setting cookies.
Regardless of this setting, if the collector receives a request with the custom SP-Anonymous:* header, no cookie will be set. You can control whether this header is set or not in your tracking implementation.
collector.cookie.expirationOptional, default 365 days. Configures the expiry of the collector’s cookie.
collector.cookie.nameOptional, default sp. Configures the name of the collector’s cookie.
collector.cookie.domainsOptional, default to no domains. There is more details about this feature below. This is for fine control over the cookie’s domain attribute.
collector.cookie.fallbackDomainOptional. If set, the fallback domain will be used for the cookie if none of the Origin header hosts matches the list of cookie domains.
collector.cookie.secureOptional, default true. Sets the secure property of the cookie.
collector.cookie.httpOnlyOptional, default true. Sets the httpOnly property of the cookie.
collector.cookie.sameSiteOptional, default None. Sets the sameSite property of the cookie, so it can be Strict, Lax or None
collector.doNotTrackCookie.enabledOptional, default false. If enabled, the collector respects a “do not track” cookie. If the cookie is present, it returns a 200 status code but it does not log the request to the output queue.
colletor.doNotTrackCookie.nameRequired when the doNotTrackCookie feature is enabled. Configures the name of the cookie in which to check if tracking is disabled.
collector.doNotTrackCookie.valueRequired when the doNotTrackCookie feature is enabled. Can be a regular expression. The value of the cookie must match this expression in order for the collector to respect the cookie.
collector.cookieBounce.enabledOptional, default false. When enabled, when the cookie is missing, the collector performs a redirect to itself to check if third-party cookies are blocked using the specified name. If they are indeed blocked, fallbackNetworkId is used instead of generating a new random one.
collector.cookieBounce.nameOptional, default n3pc. The name of the request parameter which will be used on redirects checking that the third-party cookies work
collector.cookieBounce.fallbackNetworkUserIdOptional, default 00000000-0000-4000-A000-000000000000. Network user id to use when third-party cookies are blocked.
collector.cookieBounce.forwardedProtocolHeaderOptional. E.g. X-Forwarded-Proto. The header containing the originating protocol for use in the bounce redirect location. Use this if behind a load balancer that performs SSL termination.
collector.enableDefaultRedirectOptional, default false. When enabled, the collector’s /r endpoint returns a 302 status code with a redirect back to a url specified with the ?u= query parameter.
collector.redirectMacro.enabledOptional, default false. When enabled, the redirect url passed via the u query parameter is scanned for a placeholder token. All occurrences of the placeholder are substituted with the cookie’s network user id.
collector.redirectMacro.placeholderOptional, default ${SP_NUID}.
collector.rootResponse.enabledOptional, default false. Enable custom response handling for the root “/” path.
collector.rootResponse.statusCodeOptional, default 302. The http status code to use when root response is enabled.
collector.rootResponse.headersOptional. A map of key value pairs to include in the root response headers.
collector.rootResponse.bodyOptional. The http response body to use when root response is enabled.
collector.cors.accessControlMaxAgeOptional, default “60 minutes”. Configures how long a the results of a preflight request can be cached by the browser. -1 seconds disables the cache.
collector.prometheusMetrics.enabledOptional, default false. When enabled, all requests are logged as prometheus metrics and the /metrics endpoint returns the report about the metrics.
collector.prometheusMetrics.durationBucketsInSecondsOptional, e.g. [0.1, 3, 10]. Custom buckets for the http_request_duration_seconds_bucket duration prometheus metric.
collector.telemetry.disableOptional, default false. Disable collecting meta-information about the running application. We use telemetry to help us improve the Snowplow product.
collector.telemetry.userProvidedIdOptional. It would help us out a lot if you provide a string unique to you, e.g. a uuid or your company name.
akka.*Set any standard akka http option. For example, akka.loglevel = INFO
akka.ssl-config.*Deprecated since collector version 2.4.0. Since 2.4.0, SSL config is instead configured via JVM system properties. See below for details.

Kinesis collector options

collector.streams.goodRequired. Name of the output kinesis stream for successfully collected events
collector.streams.badRequired. Name of the output kinesis stream for http requests which could not be written to the good stream. For example, if the event size exceeds the kinesis limit of 1MB.
collector.streams.useIpAddressAsPartitionKeyOptional, default false. Whether to use the user’s IP address as the kinesis partition key.
collector.streams.sink.regionOptional, defaults to eu-central-1. AWS region of the kinesis streams.
collector.streams.sink.customEndpointOptional. Override the aws kinesis endpoints. Can be helpful when using localstack for testing.
collector.streams.sink.threadPoolSizeOptional, default 10. Configures the thread pool size used by the collector sink for asynchronous operations.
collector.streams.sink.sqsGoodBufferOptional. Set to the name of a SQS topic to enable buffering of good output events. When messages cannot be sent to Kinesis, (e.g. because of exceeding api limits) then they get sent to SQS as a fallback. Helpful for smoothing over traffic spikes.
collector.streams.sink.sqsBadBufferOptional. Like the sqsGoodBuffer but for failed events.
collector.streams.sink.aws.accessKeyRequired. Set to default to use the default provider chain; set to iam to use AWS IAM roles; or set to env to use the AWS_ACCESS_KEY_ID environment variable.
collector.streams.sink.aws.secretKeyRequired. Set to default to use the default provider chain; set to iam to use AWS IAM roles; or set to env to use the AWS_SECRET_ACCESS_KEY environment variable.
collector.streams.backoffPolicy.minBackoffOptional, default 3000. Time in milliseconds for retrying sending to kinesis / SQS after failure.
colletor.streams.backoffPolicy.maxBackoffOptional, default 600000. Time in milliseconds for retrying sending to kinesis / SQS after failure.
collector.streams.buffer.byteLimitOptional, default 3145728. Incoming events are stored in an internal buffer before being sent to Kinesis. This configures the maximum total size of pending events.
collector.streams.buffer.recordLimitOptional, default 50. Configures the maximum number of pending events before flushing to Kinesis.
collector.streams.buffer.timeLimitOptional, default 5000. Configures the maximum time in milliseconds before flushing pending buffered events to Kinesis.

Pubsub collector options

collector.streams.goodRequired. Name of the output Pubsub topic for successfully collected events
collector.streams.badRequired. Name of the output pubsub topic for http requests which could not be written to the good stream. For example, if the event size exceeds the Pubsub limit of 10MB.
collector.streams.sink.googleProjectIdRequired. GCP project name.
collector.streams.sink.backoffPolicy.minBackoffOptional, default 1000. Time in milliseconds for retrying sending to Pubsub after failure.
collector.streams.sink.backoffPolicy.maxBackoffOptional, default 1000. Time in milliseconds for retrying sending to Pubsub after failure
collector.streams.sink.backoffPolicy.totalBackoffOptional, default 9223372036854. We set this to the maximum value so that we never give up on trying to send a message to pubsub.
collector.streams.sink.backoffPolicy.multiplerOptional, default 2. Configures time between retries after failing send message to Pubsub.
collector.streams.sink.backoffPolicy.initialRpcTimeoutOptional, default 10000. Time in milliseconds before a RPC call to Pubsub is aborted and retried.
collector.streams.sink.backoffPolicy.maxRpcTimeoutOptional, default 10000. Maximum time in milliseconds before RPC call to Pubsub is aborted and retried.
collector.streams.sink.backoffPolicy.rcpTimeoutMultiplerOptional, default 2. Configures how RPC timeouts are allowed to increase as they are retried.
collector.streams.buffer.byteLimitOptional, default 1000000. Incoming events are stored in an internal buffer before being sent to Pubsub. This configures the maximum total size of pending eve
collector.streams.buffer.recordLimitOptional, default 40. Configures the maximum number of pending events before flushing to Pubsub.
collector.streams.buffer.timeLimitOptional, default 1000. Configures the maximum time in milliseconds before flushing pending buffered events to Pubsub.

Setting the domain name

Set the cookie name using the collector.cookie.name setting. To maintain backward compatibility with earlier versions of the collector, use the string “sp” as the cookie name.

The collector responds to valid requests with a Set-Cookie header, which may or may not specify a domain for the cookie.

If no domain is specified, the cookie will be set against the full collector domain, for example collector.snplow.com. That will mean that applications running elsewhere on *.snplow.com won’t be able to access it. If you don’t need to grant access to the cookie from other applications on the domain, then you can ignore the domains and fallbackDomain settings.

In earlier versions, you could specify a domain to tie the cookie to. For example, if set to .snplow.com, the cookie would have been accessible to other applications running on *.snplow.com. To do the same in this version, use the fallbackDomain setting but make sure that you no longer include a leading dot:

fallbackDomain = "snplow.com"
Code language: JavaScript (javascript)

The cookie set by the collector can be treated differently by browsers, depending on whether it’s considered to be a first-party or a third-party cookie. In earlier versions (0.15.0 and earlier), if you had two collector endpoints, one on collector.snplow.com and one on collector.snplow.net, you could only specify one of those domains in the configuration. That meant that you were only able to set a first-party cookie server-side on either .snplow.com or .snplow.net, but not on both. From version 0.16.0, you can specify a list of domains to be used for the cookie (note the lack of a leading dot):

domains = [ "snplow.com" "snplow.net" ]
Code language: JavaScript (javascript)

Which domain to be used in the Set-Cookie header is determined by matching the domains from the Origin header of the request to the specified list. The first match is used. If no matches are found, the fallback domain will be used, if configured. If no fallbackDomain is configured, the cookie will be tied to the full collector domain.

If you specify a main domain in the list, all subdomains on it will be matched. If you specify a subdomain, only that subdomain will be matched.

Examples:

  • domain.com will match Origin headers like domain.com, www.domain.com and secure.client.domain.com
  • client.domain.com will match an Origin header like secure.client.domain.com but not domain.com or www.domain.com.

Configuring custom paths

The collector responds with a cookie to requests with a path that matches the vendor/version protocol. The expected values are:

  • com.snowplowanalytics.snowplow/tp2 for Tracker Protocol 2
  • r/tp2 for redirects
  • com.snowplowanalytics.iglu/v1 for the Iglu Webhook.

You can also map any valid (ie, two-segment) path to one of the three defaults via the collector.paths section of the configuration file. Your custom path must be the key and the value must be one of the corresponding default paths. Both must be full valid paths starting with a leading slash:

paths { "/com.acme/track" = "/com.snowplowanalytics.snowplow/tp2" "/com.acme/redirect" = "/r/tp2" "/com.acme/iglu" = "/com.snowplowanalytics.iglu/v1" }
Code language: JavaScript (javascript)

TLS port binding and certificate (2.4.0+)

Since 2.4.0 TLS certificates are configured using JVM system parameters. The “Customizing JSSE” section in Java 11 JSSE reference documentation explains all system properties in detail.

The following JVM properties are the ones to be used most of the time.

System PropertyCustomized ItemDefaultNotes
javax.net.ssl.keyStoreDefault keystore; see Customizing the Default Keystores and Truststores, Store Types, and Store PasswordsNone 
javax.net.ssl.keyStorePasswordDefault keystore password; see Customizing the Default Keystores and Truststores, Store Types, and Store PasswordsNoneIt is inadvisable to specify the password in a way that exposes it to discovery by other users. 

Password can not be empty.
javax.net.ssl.keyStoreTypeDefault keystore type; see Customizing the Default Keystores and Truststores, Store Types, and Store PasswordsPKCS12  
jdk.tls.server.cipherSuitesServer-side default enabled cipher suites. See Specifying Default Enabled Cipher SuitesSee SunJSSE Cipher Suites to determine which cipher suites are enabled by defaultCaution: These system properties can be used to configure weak cipher suites, or the configured cipher suites may be weak in the future. It is not recommended that you use these system properties without understanding the risks.
jdk.tls.server.protocolsDefault handshaking protocols for TLS/DTLS servers. See The SunJSSE ProviderNoneTo configure the default enabled protocol suite in the server-side of a SunJSSE provider, specify the protocols in a comma-separated list within quotation marks. The protocols in this list are standard SSL protocol names as described in Java Security Standard Algorithm Names. Note that this System Property impacts only the default protocol suite (SSLContext of the algorithms SSL, TLS and DTLS). If an application uses a version-specific SSLContext (SSLv3, TLSv1, TLSv1.1, TLSv1.2, TLSv1.3, DTLSv1.0, or DTLSv1.2), or sets the enabled protocol version explicitly, this System Property has no impact.

Setting up an SQS buffer (2.0.0+)

The lack of auto-scaling in Kinesis results in throttled streams in case of traffic spikes and Stream Collector starts accumulating events to retry them later. If accumulation continues long enough, Stream Collector will run out of memory. To prevent the possibility of a broken collector, we decided to make it possible to configure an SQS buffer that can provide additional assurance during extreme traffic spikes.

SQS is used to queue any message that Stream Collector failed to send to the Kinesis and the sqs2kinesis application is then responsible for reading the messages from SQS and writing to Kinesis once it is ready. In the event of any AWS API glitches, there is a retry mechanism which retries sending the SQS queue 10 times.

The keys set up for the Kinesis stream are stored as SQS message attributes in order to preserve the information. Note, the SQS messages cannot be as big as Kinesis messages. The limit is 256kB per message, but we send the messages as Base64 encoded, so the limit goes down to 192kB for the original message.

Setting up the SQS queues

(This section only applies to the case when SQS is used as a fallback sink when Kinesis is unavailable. If you are using SQS as the primary sink, then the settings below should be ignored and the good and bad streams should be configured as normal under streams.good and streams.bad respectively.)

To start using this feature, you will first need to set up the SQS queues. Two separate queues are required for good (raw) events and bad events. The Collector then needs to be informed about the queue names, and this can be done by adding these as entries to config.hocon:

sqsGoodBuffer = {good-sqs-queue-url} sqsBadBuffer = {bad-sqs-queue-url}

Telemetry

Starting with version 2.4.0 of the collector snowplow will be collecting the heartbeats with some meta-information about the application. This is an opt-out feature, meaning that it has to be explicitly disabled to stop it. Schema is available here.

At the base, telemetry is sending the application name and version every hour. This is done to help us to improve the product, we need to understand what is popular, so we could focus our development effort in the right place. You can help us by providing userProvidedId in the config file.

telemetry {
    userProvidedId = myCompany
 }

Put the following entry into your configuration file to disable the telemetry.

telemetry {
    disable = true
 }