Collecting data with Trackers and Webhooks

  1. Home
  2. Docs
  3. Collecting data with Trackers and Webhooks
  4. Trackers – collecting data from your own applications
  5. Ruby Tracker
  6. Emitters

Emitters

Tracker instances must be initialized with an emitter. This section will go into more depth about the Emitter and AsyncEmitter classes.

Overview

Each tracker instance must now be initialized with an Emitter which is responsible for firing events to a Collector. An Emitter instance is initialized with two arguments: an endpoint and an optional configuration hash.

A simple example with just an endpoint:

# Create an emitter my_emitter = SnowplowTracker::Emitter.new('d3rkrsqld9gmqf.cloudfront.net')

A complicated example using every setting:

# Create an emitter my_emitter = SnowplowTracker::AsyncEmitter.new('d3rkrsqld9gmqf.cloudfront.net', { :protocol => 'https', :method => 'post', :port => 80, :buffer_size => 0, :on_success => lambda { |success_count| puts '#{success_count} events sent successfully' }, :on_failure => lambda { |success_count, failures| puts '#{success_count} events sent successfully, #{failures.size} events sent unsuccessfully' }, :thread_count => 10 })

Every setting in the configuration hash is optional. Here is what they do:

  • :protocol determines whether events will be sent using HTTP or HTTPS. It defaults to “http”.
  • :method determines whether events will be sent using GET or POST. It defaults to “get”.
  • :port determines the port to use. If you wish to set events over HTTPS, you should usually set it to 443.
  • :buffer_size is the number of events which will be buffered before they are all sent simultaneously. The process of sending all buffered events is called “flushing”. When using GET, buffer_size defaults to 0 because each request can only contain one event. When using POST, buffer_size defaults to 10, and the buffered events are all sent together in a single request.
  • :on_success is a callback which is called every time the buffer is flushed and every event in it is sent successfully (meaning with status code 200). It should accept one argument: the number of requests sent this way.
  • on_failure is a callback which is called if the buffer is flushed but not every event is sent successfully. It should accept two arguments: the number of successfully sent events and an array containing the unsuccessful events.
  • thread_count is only used by the AsyncEmitter. It determines the number of worker threads which will be used to send events.

The AsyncEmitter class

AsyncEmitter is a subclass of Emitter. Whenever the buffer is flushed, the AsyncEmitter places the flushed events in a work queue. The AsyncEmitter asynchronously sends events in this queue using a thread pool of a fixed size. You can choose the size of this thread pool with the thread_count field:

AsyncEmitter.new(ENDPOINT, { thread_count: 5 })

By default, this value is 1.

A note on testing: if you test the AsyncEmitter by using a short script to send an event, you may find that the event fails to send. This is because the process exits before the flushing thread is finished. You can get round this either by adding a sleep(10) to the end of your script or by using the synchronous flush.

Multiple emitters

It is possible to initialize a tracker with an array of emitters, in which case events will be sent to all of them:

# Create a tracker with multiple emitters my_tracker = SnowplowTracker::Tracker.new([my_sync_emitter, my_async_emitter], 'my_tracker_name', 'my_app_id')

You can also add new emitters after creating a tracker with the add_emitter method:

# Create a tracker with multiple emitters my_tracker.add_emitter(another_emitter)

Manual flushing

You may want to force an emitter to send all events in its buffer, even if the buffer is not full. The Tracker class has a flush method which flushes all its emitters. It accepts one argument, async, which defaults to false. Unless you set async to true, the flush will be synchronous: it will block until all queued events have been sent.

# Asynchronous flush my_tracker.flush(true) # Synchronous flush my_tracker.flush

Automatically retry sending failed events

You can use the following function as the on_failure callback to immediately retry failed events:

def on_failure_retry(failed_event_count, failed_events) # possible backoff-and-retry timeout here failed_events.each do |e| my_emitter.input(e) end end

You may wish to add backoff logic to delay the resending.