Lake Loader configuration reference
The configuration reference in this page is written for Lake Loader 0.3.0
Table configurationโ
- Delta Lake
- Iceberg / Glue
- Iceberg / BigLake
- Hudi
Parameter | Description |
---|---|
output.good.location | Required, e.g. gs://mybucket/events . URI of the bucket location to which to write Snowplow enriched events in Delta format. The URI should start with the following prefix:
|
output.good.dataSkippingColumns | Optional. A list of column names which will be brought to the "left-hand-side" of the events table, to enable Delta's data skipping feature. Defaults to the important Snowplow timestamp columns: load_tstamp , collector_tstamp , derived_tstamp , dvce_created_tstamp . |
Parameter | Description |
---|---|
output.good.type | Required, set this to Iceberg |
output.good.catalog.type | Required, set this to Glue |
output.good.location | Required, e.g. s3a://mybucket/events . URI of the bucket location to which to write Snowplow enriched events in Iceberg format. The URI should start with s3a:// |
output.good.database | Required. Name of the database in the Glue catalog |
output.good.table | Required. The name of the table in the Glue database |
output.good.catalog.options.* | Optional. A map of key/value strings which are passed to the catalog configuration. These can be anything from the Iceberg catalog documentation e.g. "glue.id": "1234567" |
Alternative Docker image
To use the Lake Loader with BigLake support, pull the
snowplow/lake-loader-gcp:0.3.0-biglake
image from Docker Hub.Parameter | Description |
---|---|
output.good.type | Required, set this to Iceberg . |
output.good.catalog.type | Required, set this to BigLake |
output.good.location | Required, e.g. gs://mybucket/ . URI of the bucket location to which to write Snowplow enriched events in Iceberg format. The URI should start with gs:// . |
output.good.database | Required. Name of the database in the BigLake catalog |
output.good.table | Required. The name of the table in the BigLake database |
output.good.catalog.project | Required. The GCP project owning the BigLake catalog |
output.good.catalog.name | Required. The name of the BigLake catalog |
output.good.catalog.region | Required. GCP region of the BigLake catalog |
output.good.catalog.options.* | Optional. A map of key/value strings which are passed to the catalog configuration. |
Alternative Docker image
To use the Lake Loader with Hudi support, pull the appropriate alternative image from Docker Hub:
snowplow/lake-loader-aws:0.3.0-hudi
snowplow/lake-loader-gcp:0.3.0-hudi
snowplow/lake-loader-azure:0.3.0-hudi
Parameter | Description |
---|---|
output.good.location | Required, e.g. gs://mybucket/events . URI of the bucket location to which to write Snowplow enriched events in Hudi format. The URI should start with the following prefix:
|
output.good.hudiWriteOptions.* | Optional. A map of key/value strings corresponding to Hudi's configuration options for writing into a table. The default options configure `load_tstamp` as the table's partition field. |
output.good.hudiTableOptions.* | Optional. A map of key/value strings corresponding to Hudi's configuration options for creating a table. The default options configure `load_tstamp` as the table's partition field. |
Streams configurationโ
- AWS
- GCP
- Azure
Parameter | Description |
---|---|
input.streamName | Required. Name of the Kinesis stream with the enriched events |
input.appName | Optional, default snowplow-lake-loader . Name to use for the dynamodb table, used by the underlying Kinesis Consumer Library for managing leases. |
input.initialPosition | Optional, default LATEST . Allowed values are LATEST , TRIM_HORIZON , AT_TIMESTAMP . When the loader is deployed for the first time, this controls from where in the kinesis stream it should start consuming events. On all subsequent deployments of the loader, the loader will resume from the offsets stored in the DynamoDB table. |
input.initialPosition.timestamp | Required if input.initialPosition is AT_TIMESTAMP . A timestamp in ISO8601 format from where the loader should start consuming events. |
input.retrievalMode | Optional, default Polling. Change to FanOut to enable the enhance fan-out feature of Kinesis. |
input.retrievalMode.maxRecords | Optional. Default value 1000. How many events the Kinesis client may fetch in a single poll. Only used when `input.retrievalMode` is Polling. |
input.bufferSize | Optional. Default value 1. The number of batches of events which are pre-fetched from kinesis. The default value is known to work well. |
output.bad.streamName | Required. Name of the Kinesis stream that will receive failed events. |
output.bad.throttledBackoffPolicy.minBackoff | Optional. Default value 100 milliseconds . Initial backoff used to retry sending failed events if we exceed the Kinesis write throughput limits. |
output.bad.throttledBackoffPolicy.maxBackoff | Optional. Default value 1 second . Maximum backoff used to retry sending failed events if we exceed the Kinesis write throughput limits. |
output.bad.recordLimit | Optional. Default value 500. The maximum number of records we are allowed to send to Kinesis in 1 PutRecords request. |
output.bad.byteLimit | Optional. Default value 5242880. The maximum number of bytes we are allowed to send to Kinesis in 1 PutRecords request. |
Parameter | Description |
---|---|
input.subscription | Required, e.g. projects/myproject/subscriptions/snowplow-enriched . Name of the Pub/Sub subscription with the enriched events |
input.parallelPullCount | Optional. Default value 3. Number of threads used internally by the pubsub client library for fetching events |
input.bufferMaxBytes | Optional. Default value 1000000. How many bytes can be buffered by the loader app before blocking the pubsub client library from fetching more events. This is a balance between memory usage vs how efficiently the app can operate. The default value works well. |
input.maxAckExtensionPeriod | Optional. Default value 1 hour. For how long the pubsub client library will continue to re-extend the ack deadline of an unprocessed event. |
input.minDurationPerAckExtension | Optional. Default value 60 seconds. Sets min boundary on the value by which an ack deadline is extended. The actual value used is guided by runtime statistics collected by the pubsub client library. |
input.maxDurationPerAckExtension | Optional. Default value 60 seconds. Sets max boundary on the value by which an ack deadline is extended. The actual value used is guided by runtime statistics collected by the pubsub client library. |
output.bad.topic | Required, e.g. projects/myproject/topics/snowplow-bad . Name of the Pub/Sub topic that will receive failed events. |
output.bad.batchSize | Optional. Default value 100. Bad events are sent to Pub/Sub in batches not exceeding this count. |
output.bad.requestByteThreshold | Optional. Default value 1000000. Bad events are sent to Pub/Sub in batches with a total size not exceeding this byte threshold |
Parameter | Description |
---|---|
input.topicName | Required. Name of the Kafka topic for the source of enriched events. |
input.bootstrapServers | Required. Hostname and port of Kafka bootstrap servers hosting the source of enriched events. |
input.consumerConf.* | Optional. A map of key/value pairs for any standard Kafka consumer configuration option. |
output.bad.topicName | Required. Name of the Kafka topic that will receive failed events. |
output.bad.bootstrapServers | Required. Hostname and port of Kafka bootstrap servers hosting the bad topic |
output.bad.producerConf.* | Optional. A map of key/value pairs for any standard Kafka producer configuration option. |
Event Hubs Authentication
You can use the input.consumerConf
and output.bad.producerConf
options to configure authentication to Azure event hubs using SASL. For example:
"input.consumerConf": {
"security.protocol": "SASL_SSL"
"sasl.mechanism": "PLAIN"
"sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"\$ConnectionString\" password=<PASSWORD>;"
}
Other configuration optionsโ
Parameter | Description |
---|---|
windowing | Optional. Default value 5 minutes . Controls how often the loader writes/commits pending events to the lake. |
spark.taskRetries | Optional. Default value 3. How many times the internal spark context should be retry a task in case of failure |
spark.conf.* | Optional. A map of key/value strings which are passed to the internal spark context. |
monitoring.metrics.statsd.hostname | Optional. If set, the loader sends statsd metrics over UDP to a server on this host name. |
monitoring.metrics.statsd.port | Optional. Default value 8125. If the statsd server is configured, this UDP port is used for sending metrics. |
monitoring.metrics.statsd.tags.* | Optional. A map of key/value pairs to be sent along with the statsd metric. |
monitoring.metrics.statsd.period | Optional. Default 1 minute . How often to report metrics to statsd. |
monitoring.metrics.statsd.prefix | Optional. Default snowplow.lakeloader . Prefix used for the metric name when sending to statsd. |
sentry.dsn | Optional. Set to a Sentry URI to report unexpected runtime exceptions. |
sentry.tags.* | Optional. A map of key/value strings which are passed as tags when reporting exceptions to Sentry. |
telemetry.disable | Optional. Set to true to disable telemetry. |
telemetry.userProvidedId | Optional. See here for more information. |