Pipeline Components and Applications

  1. Home
  2. Docs
  3. Pipeline Components and Applications
  4. Enrichment
  5. enrich-pubsub (GCP) and enrich-kinesis (AWS)
  6. Configuration

Configuration

enrich-pubsub

A minimal configuration file can be found on the Github repo, as well as a comprehensive one.

input.subscriptionRequired. PubSub subscription identifier for the collector payloads.
Example: projects/example-project/subscriptions/collectorPayloads
input.parallelPullCountOptional. Number of threads used internally by permutive library to handle incoming messages. These threads do very little “work” apart from writing the message to a concurrent Queue.
Default: 1
input.maxQueueSizeOptional. Configures the “max outstanding element count” of PubSub. This is the principal way we control concurrency in the app; it puts an upper bound on the number of events in memory at once. An event counts towards this limit starting from when it received by the permutive library, until we ack it (after publishing to output). The value must be large enough that it does not cause the sink to block whilst it is waiting for a batch to be completed.
Default: 3000
output.good.topicRequired. Name of the PubSub topic that will receive the enriched events.
Example: projects/example-project/topics/enriched
output.good.attributesOptional. Enriched event fields to add as PubSub message attributes. For example, if this is [ "app_id" ] then the enriched event’s app_id field will be an attribute of the PubSub message, as well as being a field within the message data
output.good.delayThresholdOptional. Delay threshold to use for batching. After this amount of time has elapsed, before maxBatchSize and maxBatchBytes have been reached, messages from the buffer will be sent.
Default: 200 milliseconds
output.good.maxBatchSizeOptional. Maximum number of messages sent within a batch. When the buffer reaches this number of messages they are sent.
Default: 1000 (PubSub maximum)
output.good.maxBatchBytesOptional. Maximum number of bytes sent within a batch. When the buffer reaches this size messages are sent.
Default: 10000000 (PubSub maximum of 10MB)
output.pii.topicOptional. Should be used in conjunction with the PII pseudonymization enrichment. When configured, enables an extra output topic for writing a pii_transformation event.
Example: projects/test-project/topics/pii
output.pii.attributesSame as output.good.attributes for pii events
output.pii.delayThresholdSame as output.good.delayThreshold for pii events
output.pii.maxBatchSizeSame as output.good.maxBatchSize for pii events
output.pii.maxBatchBytesSame as output.good.maxBatchBytes for pii events
output.bad.topicRequired. Name of the PubSub topic that will receive the bad rows.
Example: projects/example-project/topics/badrows
output.bad.delayThresholdSame as output.good.delayThreshold for bad rows
output.bad.maxBatchSizeSame as output.good.maxBatchSize for bad rows
output.bad.maxBatchBytesSame as output.good.maxBatchBytes for bad rows

enrich-kinesis

A minimal configuration file can be found on the Github repo, as well as a comprehensive one.

input.appNameOptional. Name of the application which the KCL daemon should assume. A DynamoDB table with this name will be created.
Default: snowplow-enrich-kinesis
input.streamNameRequired. Name of the Kinesis stream with the collector payloads to read from
input.regionOptional. Region where the Kinesis stream is located. This field is optional if it can be resolved with AWS region provider chain. It checks places like env variables, system properties, AWS profile file.
input.initialPosition.typeOptional. Set the initial position to consume the Kinesis stream. Possible values:
LATEST: most recent data
TRIM_HORIZON: oldest available data
AT_TIMESTAMP: start from the record at or after the specified timestamp
Default: TRIM_HORIZON
input.initialPosition.timestampRequired for AT_TIMESTAMP.
Example: 2020-07-17T10:00:00Z
input.retrievalMode.typeOptional. Set the mode for retrieving records. Possible values:
Polling
FanOut
Default: Polling
input.bufferSizeOptional. Size of the internal buffer used when reading messages from Kinesis, each buffer holding up to maxRecords from above.
Default: 3
input.customEndpointOptional. Endpoint url configuration to override aws kinesis endpoints. Can be used to specify local endpoint when using localstack
Example: http://localhost:4566
input.dynamodbCustomEndpointOptional. Endpoint url configuration to override aws dyanomdb endpoint for Kinesis checkpoints lease table. Can be used to specify local endpoint when using localstack.
Example: http://localhost:4569
input.cloudwatchCustomEndpointOptional. Endpoint url configuration to override aws cloudwatch endpoint for metrics. Can be used to specify local endpoint when using localstack.
Example: http://localhost:4582
input.retrievalMode.maxRecordsRequired for Polling. Maximum size of a batch returned by a call to getRecords. Records are checkpointed after a batch has been fully processed, thus the smaller maxRecords, the more often records can be checkpointed into DynamoDb, but possibly reducing the throughput.
Default: 10000
output.good.streamNameRequired. Name of the Kinesis stream to write to the enriched events.
Example: enriched
output.good.regionSame as input.region for enriched events stream
output.good.partitionKeyOptional. How the output stream will be partitioned in Kinesis. Events with the same partition key value will go to the same shard.
Possible partition keys: event_id, event_fingerprint, domain_userid, network_userid, user_ipaddress, domain_sessionid, user_fingerprint
Refer to this page to know what the possible partition keys correspond to.
If not specified, the partition key will be a random UUID
output.good.backoffPolicy.minBackoffOptional. Minimum backoff before retrying when writing fails.
Default: 100 milliseconds
output.good.backoffPolicy.maxBackoffOptional. Maximum backoff before retrying when writing fails.
Default: 10 seconds
output.good.maxBufferedTimeOptional. Maximum amount of time an enriched event may spend being buffered before it gets sent.
Default: 100 millis
output.good.collection.maxCountOptional. Maximum number of Kinesis records to pack into a PutRecords request.
Default: 500
output.good.collection.maxSizeOptional. Maximum amount of data in bytes to send with a PutRecords request.
Default: 5242880 (5 Mb)
output.good.aggregationOptional. If not specified, aggregation is not activated. Visit this page to learn about aggregation
output.good.aggregation.maxCountRequired if aggregation is enabled. Maximum number of enriched events to pack into an aggregated Kinesis record.
Example: 10000
output.good.aggregation.maxSizeRequired if aggregation is enabled. Maximum number of bytes to pack into an aggregated Kinesis record.
Example: 51200 (50 kb, which is the max allowed)
output.good.maxConnectionsOptional. Maximum number of connections to open in the backend for KPL to send the records. HTTP requests are sent in parallel over multiple connections. Setting this too high may impact latency and consume additional resources without increasing throughput.
Default: 24
output.good.logLevelOptional. Minimum level of logs for the native KPL daemon (binary used under the hood by KPL). Logs show up on stderr. Possible values:
trace
debug
info
warning
error
Default: warning
output.good.customEndpointOptional. Use a custom Kinesis endpoint. Note this does not accept protocols or paths, only host names or ip addresses. There is no way to disable TLS. Needs to be specified along with customPort.
Example: localhost
output.good.customPortOptional. Server port to connect to for Kinesis. Needs to be specified along with customEndpoint.
Example: 4566
output.good.cloudwatchEndpointOptional. Use a custom Cloudwatch endpoint. Note this does not accept protocols or paths, only host names or ip addresses. There is no way to disable TLS. Needs to be specified along with cloudwatchPort.
Example: localhost
output.good.cloudwatchPortOptional. Server port to connect to for CloudWatch. Needs to be specified along with cloudwatchPort.
Example: 4582
output.pii.streamNameOptional. Should be used in conjunction with the PII pseudonymization enrichment. When configured, enables an extra output stream for writing a pii_transformation event.
Example: pii
output.pii.regionSame as output.good.region for pii events
output.pii.partitionKeySame as output.good.partitionKey for pii events
output.pii.backoffPolicy.minBackoffSame as output.good.backoffPolicy.minBackoff for pii events
output.pii.backoffPolicy.maxBackoffSame as output.good.backoffPolicy.maxBackoff for pii events
output.pii.maxBufferedTimeSame as output.good.maxBufferedTime for pii events
output.pii.collection.maxCountSame as output.good.collection.maxCount for pii events
output.pii.collection.maxSizeSame as output.good.collection.maxSize for pii events
output.pii.aggregationSame as output.good.aggregation for pii events
output.pii.aggregation.maxCountSame as output.good.aggregation.maxCount for pii events
output.pii.aggregation.maxSizeSame as output.good.aggregation.maxSize for pii events
output.pii.maxConnectionsSame as output.good.maxConnections
for pii events
output.pii.logLevelSame as output.good.logLevel for pii events
output.pii.customEndpointSame as output.good.customEndpoint for pii events
output.pii.customPortSame as output.good.customPort for pii events
output.pii.cloudwatchEndpointSame as output.good.cloudwatchEndpoint for pii events
output.pii.cloudwatchPortSame as output.good.cloudwatchPort for pii events
output.bad.streamNameRequired. Name of the Kinesis stream to write to the bad rows.
Example: bad
output.bad.regionSame as output.good.region for bad rows
output.bad.backoffPolicy.minBackoffSame as output.good.backoffPolicy.minBackoff for bad rows
output.bad.backoffPolicy.maxBackoffSame as output.good.backoffPolicy.maxBackoff for bad rows
output.bad.maxBufferedTimeSame as output.good.maxBufferedTime for bad rows
output.bad.collection.maxCountSame as output.good.collection.maxCount for bad rows
output.bad.collection.maxSizeSame as output.good.aggregation.maxSize for bad rows
output.bad.aggregationSame as output.good.aggregation for bad rows
output.bad.aggregation.maxCountSame as output.good.aggregation.maxCount for bad rows
output.bad.aggregation.maxSizeSame as output.good.aggregation.maxSize for bad rows
output.bad.maxConnectionsSame as output.good.maxConnections
for bad rows
output.bad.logLevelSame as output.good.logLevel for bad rows
output.bad.customEndpointSame as output.good.customEndpoint for bad rows
output.bad.customPortSame as output.good.customPort for bad rows
output.bad.cloudwatchEndpointSame as output.good.cloudwatchEndpoint for bad rows
output.bad.cloudwatchPortSame as output.good.cloudwatchPort for bad rows
monitoring.cloudwatchWhether KCL and KPL metrics should be sent to Cloudwatch.
Default: false

Common parameters

concurrency.enrichOptional. Number of events that can get enriched at the same time within a chunk (events are processed by chunks in the app).
Default: 256
concurrency.sinkOptional. Number of chunks that can get sunk at the same time.
Default for enrich-pubsub: 3
Default for enrich-kinesis: 1 (WARNING for enrich-kinesis: if greater than 1, records can get checkpointed before they are sunk)
assetsUpdatePeriodOptional. Period after which enrich assets (e.g. the maxmind database for the IpLookups enrichment) should be checked for udpates. Assets will never be updated if this key is missing.
Example: 7 days
monitoring.sentry.dsnOptional. To track runtime exceptions in Sentry.
Example: http://sentry.acme.com
monitoring.metrics.statsd.hostnameOptional. Hostname of the StatsD server to send enrichment metrics (latency and event counts) to.
Example: localhost
monitoring.metrics.statsd.portOptional. Port of the StatsD server.
Example: 8125
monitoring.metrics.statsd.periodOptional. How frequently to report metric
Example: 10 seconds
monitoring.metrics.statsd.tagsOptional. Key-value pairs attached to each metric sent to StatsD to provide contextual information.
Example” { "env": "prod" }
monitoring.metrics.statsd.prefixOptional. Configures the prefix of StatsD metric names.
Default: snowplow.enrich.
monitoring.metrics.stdout.periodOptional. If set, metrics will be printed in the logs with this frequence.
Example: 10 seconds
monitoring.metrics.stdout.prefixOptional. Prefix for the metrics appearing in the logs.
Default: snowplow.enrich.
telemetry.disableOptional. Set to true to disable telemetry
telemetry.intervalOptional. Interval for the heartbeat event
Example: 1 hour
telemetry.methodOptional. HTTP method used to send the heartbeat event.
Possible values: GET or POST
telemetry.collectorUriOptional. hostname of the collector receiving the heartbeat event.
Default: collector-g.snowplowanalytics.com
telemetry.collectorPortOptional. Port of the collector receiving the heartbeat event.
Example: 443
telemetry.secureOptional. Whether to use https or not.
Default: true
telemetry.userProvidedIdOptional. Identifier intended to tie events together across modules, infrastructure and apps when used consistently.
Example: my_company
telemetry.autoGeneratedIdOptional. Intended to identify each independent module, and the infrastructure it controls
telemetry.instanceIdOptional. Unique for each instance of the app running within a module
telemetry.moduleNameOptional. Name of the terraform module that deployed the app.
Example: enrich-kinesis-ce
telemetry.moduleVersionOptional. Version of the terraform module that deployed the app.
Example: 1.0.0
featureFlags.acceptInvalidOptional. Enrich 3.0.0 introduces the validation of the enriched events against atomic schema before emitting. If set to false, a bad row will be emitted instead of the enriched event if validation fails. If set to true, invalid enriched events will be emitted, as before. More details here.
Default: false

Instead of Kinesis or PubSub, it’s also possible to read collector payloads from files on disk. This can be used for instance for testing purposes. In this case the configuration needs to be as below.

input.typeShould be FileSystem
input.dirDirectory containing collector payloads encoded with Thrift

Likewise, it’s possible to write enriched events, pii events and bad rows to files instead of PubSub or Kinesis.

To write enriched events to files:

output.good.typeShould be FileSystem
output.good.fileFile where enriched events will be written
output.good.maxBytesOptional. Maximum size of a file in bytes. Triggers file rotation.

To write pii events to files:

output.pii.typeShould be FileSystem
output.pii.fileFile where pii events will be written
output.pii.maxBytesOptional. Maximum size of a file in bytes. Triggers file rotation

To write bad rows to files:

output.bad.typeShould be FileSystem
output.bad.fileFile where bad rows will be written
output.bad.maxBytesOptional. Maximum size of a file in bytes. Triggers file rotation

Enriched events validation against atomic schema

Enriched events are expected to match atomic schema.
However, until 3.0.0, it was never checked that the enriched events emitted by enrich were valid.
If an event is not valid against atomic schema, a bad row should be emitted instead of the enriched event.
However, this is a breaking change, and we want to give some time to users to adapt, in case today they are working downstream with enriched events that are not valid against atomic.
For this reason, this new validation was added as a feature that can be deactivated like that:

"featureFlags": { "acceptInvalid": true }
Code language: JavaScript (javascript)

In this case, enriched events that are not valid against atomic schema will still be emitted as before, so that enrich 3.0.0 can be fully backward compatible.
It will be possible to know if the new validation would have had an impact by 2 ways:

  1. A new metric invalid_enriched has been introducred.
    It reports the number of enriched events that were not valid against atomic schema. As the other metrics, it can be seen on stdout and/or StatsD.
  2. Each time there is an enriched event invalid against atomic schema, a line will be logged with the bad row (add -Dorg.slf4j.simpleLogger.log.InvalidEnriched=debug to the JAVA_OPTS to see it).

If acceptInvalid is set to false, a bad row will be emitted instead of the enriched event in case it’s not valid against atomic schema.

When we’ll know that all our customers don’t have any invalid enriched events any more, we’ll remove the feature flags and it will be impossible to emit invalid enriched events.

Telemetry

Starting with version 3.0.0 of enrich, snowplow will be collecting the heartbeats with some meta-information about the application (schema here). This is done to help us to improve the product, we need to understand what is popular, so we could focus our development effort in the right place.

At the base, telemetry is sending the application name and version every hour. You can help us by providing userProvidedId in the config file :

"telemetry" { "userProvidedId": "myCompany" }
Code language: JSON / JSON with Comments (json)

Telemetry can be deactivated by putting the following section in the configuration file :

"telemetry": { "disable": true }
Code language: JavaScript (javascript)

Enrichments

The list of the enrichments that can be configured can be found on this page.