BigQuery Loader

A project to load Snowplow enriched events into the Google BigQuery.

Setup guide

Configuration file

Both Loader and Mutator use same configuration file with iglu:com.snowplowanalytics.snowplow.storage/bigquery_config/jsonschema/ schema, that looks like following:

{
    "schema": "iglu:com.snowplowanalytics.snowplow.storage/bigquery_config/jsonschema/1-0-0",
    "data": {
        "name": "Alpha BigQuery test",
        "id": "31b1559d-d319-4023-aaae-97698238d808",

        "projectId": "com-acme",
        "datasetId": "snowplow",
        "tableId": "events",

        "input": "enriched-good-sub",
        "typesTopic": "bq-test-types",
        "typesSubscription": "bq-test-types-sub",
        "badRows": "bq-test-bad-rows",
        "failedInserts": "bq-test-bad-inserts",

        "load": {
            "mode": "STREAMING_INSERTS",
            "retry": false
        },

        "purpose": "ENRICHED_EVENTS"
    }
}
  • All topics and subscriptions (input, typesTopic, typesSubscription, badRows and failedInserts) explained in topics and message formats section.
  • projectId used to group all resources (topics, subscriptions and BigQuery table)
  • datasetId and tableId (along with projectId) allow to identify BigQuery table to load
  • name is an arbitrary human-readable description of a storage target
  • id is unique identification in UUID format
  • load specifies loading mode and explained in dedicated section
  • purpose is a standard storage configuration, can be only ENRICHED_EVENTS

Loading mode

BigQuery supports two loading API:

In order to configure BigQuery Loader to use one of above APIs, you can use load property.

In case of streaming inserts it can be following:

{
    "load": {
        "mode": "STREAMING_INSERTS",
        "retry": false
    }
}

retry specifies if BigQuery loader needs to retry inserts that were failed (e.g. due mutation lag) infinitely or send them straight to failedInserts topic. Note that if loader won’t be able to insert row - it will keep trying which can throttle whole job, so that it will have to be restarted.

In case of load jobs it can look like following:

{
    "load": {
        "mode": "FILE_LOADS",
        "frequency": 60000
    }
}

frequency specifies how often should load job performed, in seconds. Unlike near-realtime streaming inserts API, load jobs are more batch-oriented.

Note that load jobs do not support retry (as streaming inserts do not support frequency).

It is generally recommended to stick with streaming jobs API without retries (and use forwarder job to recover data from failedInserts). However, load jobs API is cheaper and provides much fewer duplicates.

Command line options

All three apps: Loader, Mutator and Forwarder accept path to above config file and to Iglu resolver config.

Loader

Loader accepts these two arguments and any other, supported by Google Cloud Dataflow.

./snowplow-bigquery-loader \
    --config=$CONFIG \
    --resolver=$RESOLVER

This can be launched from any machine authenticated to submit Dataflow jobs.

Mutator

Mutator has three subcommands: listen, create and add-column.

listen

listen is primary one and used to automate table migrations.

./snowplow-bigquery-mutator \
    listen
    --config $CONFIG \
    --resolver $RESOLVER \
    --verbose               # Optional, for debug only

add-column

add-column can be used once to add particular column manually. This should eliminate chance of mutation lag and necessity of running forwarder job.

./snowplow-bigquery-mutator \
    add-column \
    --config $CONFIG \
    --resolver $RESOLVER \
    --shred-property CONTEXTS \
    --schema iglu:com.acme/app_context/jsonschema/1-0-0

Specified schema must be present in one of Iglu registries in resolver configuration.

create

create just creates empty table with atomic structure.

./snowplow-bigquery-mutator \
    create \
    --config $CONFIG \
    --resolver $RESOLVER

Forwarder

Forwarder as well as Loader can be submitted from any machine authenticated to submit Dataflow jobs.

./snowplow-bigquery-forwarder \
    --config=$CONFIG \
    --resolver=$RESOLVER
    --failedInsertsSub=$FAILED_INSERTS_SUB

Its only unique option is failedInsertsSub, which is a subscription (which must be created upfront) with failed inserts.

Note that by convention both Dataflow jobs (Forwarder and Loader) accept CLI options with = symbol and camelCase, while Mutator accepts in UNIX style (without =).

Docker support

All three applications are available as Docker images.

  • snowplow-docker-registry.bintray.io/snowplow/snowplow-bigquery-loader:0.1.0
  • snowplow-docker-registry.bintray.io/snowplow/snowplow-bigquery-forwarder:0.1.0
  • snowplow-docker-registry.bintray.io/snowplow/snowplow-bigquery-mutator:0.1.0

Partitioning

During initial setup it is strictly recommended to setup partitioning on derived_tstamp property. Mutator’s create does not automatically adding partitioning yet.

This application consists of two independent apps:

  1. Snowplow BigQuery Loader, an Apache Beam job which reads Snowplow enriched data from Google PubSub, transform into BigQuery-friendly format and loads it. Also it writes data into auxiliary typesTopic PubSub topic
  2. Snowplow BigQuery Mutator, a Scala app which reads typesSubscription and performs necessary table mutations to add new columns
  3. Snoplow BigQuery Forwarder, an auxiliary Apache Beam job used to recover data from failedInserts topic (e.g. due mutation lag)

bigquery-architecture

Snowplow BigQuery Loader

Overview

An Apache Beam job intended to run on Google Dataflow and load enriched data from enriched PubSub topic to Google BigQuery.

Algorithm

  • Reads Snowplow enriched events from input PubSub subscription
  • Uses the JSON transformer from the Snowplow Scala Analytics SDK to convert those enriched events into JSONs
  • Uses Iglu Client to fetch JSON Schemas for contexts and self-describing events
  • Uses Iglu Schema DDL to transform contexts and self-describing events into BigQuery format
  • Writes transformed data into BigQuery
  • Writes all encountered iglu types into typesTopic topic
  • Writes all data failed to be processed into badRows PubSub topic
  • Write data that succeeded to be transformed, but failed to be loaded into failedInserts topic

Snowplow BigQuery Mutator

Overview

This is a Scala app which reads data from typesSubscription PubSub topic and performs table mutations.

Algorithm

  • Read messages from typesSubscription
  • Find out if message contains a type that has not been encountered yet (by checking internal cache)
  • If message contains new type - double-check it with connected BigQuery table
  • If type is not in the table - fetch its JSON Schema from Iglu Registry
  • Transform JSON Schema into BigQuery column definition
  • Add column to connected BigQuery table

Snowplow BigQuery Forwarder

An Apache Beam job intended to run on Google Dataflow and load enriched data from failedInserts into BigQuery.

Overview

This is a very generic and primitive Dataflow job. It could be done using standard Dataflow template jobs. But standard template job cannot accept subscription, only topic, which means it must be running all the time and 99% of time it will be idle.

Note: Forwarder is a streaming job, which means it won’t terminate automatically once it loads everything to BigQuery - it must be done manually once it process enough records.

Mutation lag

BigQuery Loader inserts data into BigQuery in near real-time. At the same time, it sinks shredded_type payloads into types topic approximately every 5 seconds. It also can take up to 10-15 seconds for Mutator to fetch, parse message and execute alter table statement against the table.

If new type arrives from input subscription in this period of time and mutator fails to handle it - BigQuery rejects a row containing it and sends it to failedInserts topic. This topic contains JSON objects ready to be loaded to BigQuery (i.e. not canonical Snowplow Enriched event format).

In order to load this data again from failedInserts to BigQuery you can use Forwarder job. It reads a subscription from failedInserts and simply performs insert statements.

Topics and message formats

Snowplow BigQuery Loader uses Google PubSub topics and subscriptions to store intermeidate data and communicate between applications.

  • input subscription - data enriched by Beam Enrich, in canonical TSV+JSON format
  • types topic - all shredded types in iglu:com.snowplowanalytics.snowplow/shredded_type/jsonschema/1-0-0 self-describing payload encountered by Snowplow Loader are sinked here with ~5 seconds interval
  • typesSubscription subscription - subscription to types topic used by Mutator with iglu:com.snowplowanalytics.snowplow/shredded_type/jsonschema/1-0-0 self-describing payload
  • badRows topic - data that could not be processed by Loader due Iglu Registry unavailability with bad rows format
  • failedInserts topic - data that has been successfully transformed by Loader, but failed loading to BigQuery usually due mutation lag in BigQuery JSON format