Tracker instances must be initialized with an emitter. This section will go into more depth about the Emitter class and its subclasses.
The basic Emitter class
At its most basic, the Emitter class only needs a collector URI:
from snowplow_tracker import Emitter
e = Emitter("d3rkrsqld9gmqf.cloudfront.net")
Code language: JavaScript (javascript)
This is the signature of the constructor for the base Emitter class:
def __init__(self, endpoint,
protocol="http", port=None, method="get",
buffer_size=None, on_success=None, on_failure=None):
Code language: PHP (php)
Argument | Description | Required? | Validation |
---|---|---|---|
endpoint | The collector URI | Yes | Dict |
protocol | Request protocol: HTTP or HTTPS | No | List |
port | The port to connect to | No | Positive integer |
method | The method to use: “get” or “post” | No | String |
buffer_size | Number of events to store before flushing | No | Positive integer |
on_success | Callback executed when a flush is successful | No | Function taking 1 argument |
on_failure | Callback executed when a flush is unsuccessful | No | Function taking 2 arguments |
byte_limit | Number of bytes to store before flushing | No | Positive integer |
protocol
defaults to “http” but can also be “https”.
When the emitter receives an event, it adds it to a buffer. When the queue is full, all events in the queue get sent to the collector. The buffer_size
argument allows you to customize the queue size. By default, it is 1 for GET requests and 10 for POST requests. (So in the case of GET requests, each event is fired as soon as the emitter receives it.) If the emitter is configured to send POST requests, then instead of sending one for every event in the buffer, it will send a single request containing all those events in JSON format.
on_success
is an optional callback that will execute whenever the queue is flushed successfully, that is, whenever every request sent has status code 200. It will be passed one argument: the number of events that were sent.
on_failure
is similar, but executes when the flush is not wholly successful. It will be passed two arguments: the number of events that were successfully sent, and an array of unsent requests. (If the emitter is configured to send POST requests, the array will actually be a string, but it can be turned back into an array of Python dictionaries (each corresponding to an event) by using json.loads
.)
byte_limit
is similar to buffer_size
, but instead of counting events – it takes into account only the amount of bytes to be sent over the network. Warning: this limit is approximate with infelicity < 1%.
An example:
def f(x):
print(str(x) + " events sent successfully!")
unsent_events = []
def g(x, y):
print(str(x) + " events sent successfully!")
print("These events were not sent successfully and have been stored in unsent_events:")
for event_dict in y:
print(event_dict)
unsent_events.append(event_dict)
e = Emitter("d3rkrsqld9gmqf.cloudfront.net", buffer_size=3, on_success=f, on_failure=g)
t = Tracker(e)
# This doesn't cause the emitter to send a request because the buffer_size was set to 3, not 1
t.track_page_view("http://www.example.com")
t.track_page_view("http://www.example.com/page1")
# This does cause the emitter to try to send all 3 events
t.track_page_view("http://www.example.com/page2")
# Since the method is GET by default, 3 separate requests are sent
# If any of them are unsuccessful, they will be stored in the unsent_events variable
Code language: PHP (php)
The AsyncEmitter class
from snowplow_tracker import AsyncEmitter
e = Emitter("d3rkrsqld9gmqf.cloudfront.net", thread_count=10)
Code language: JavaScript (javascript)
The AsyncEmitter
class works just like the Emitter class. It has one advantage, though: HTTP(S) requests are sent asynchronously, so the Tracker won’t be blocked while the Emitter waits for a response. For this reason, the AsyncEmitter is recommended over the base Emitter
class.
The AsyncEmitter uses a fixed-size thread pool to perform network I/O. By default, this pool contains only one thread, but you can configure the number of threads in the constructor using the thread_count
argument.
The CeleryEmitter class
The CeleryEmitter
class works just like the base Emitter
class, but it registers sending requests as a task for a Celery worker. If there is a module named snowplow_celery_config.py on your PYTHONPATH, it will be used as the Celery configuration file; otherwise, a default configuration will be used. You can run the worker using this command:
celery -A snowplow_tracker.emitters worker --loglevel=debug
Note that on_success
and on_failure
callbacks cannot be supplied to this emitter.
The RedisEmitter class
Use a RedisEmitter instance to store events in a Redis database for later use. This is the RedisEmitter constructor function:
def __init__(self, rdb=None, key="snowplow"):
Code language: PHP (php)
rdb
should be an instance of either the Redis
or StrictRedis
class, found in the redis
module. If it is not supplied, a default will be used. key
is the key used to store events in the database. It defaults to “snowplow”. The format for event storage is a Redis list of JSON strings.
An example:
from snowplow_tracker import RedisEmitter, Tracker
import redis
rdb = redis.StrictRedis(db=2)
e = RedisEmitter(rdb, "my_snowplow_key")
t = Tracker(e)
t.track_page_view("http://www.example.com")
# Check that the event was stored in Redis:
print(rdb.lrange("my_snowplowkey", 0, -1))
# prints something like:
# ['{"tv":"py-0.4.0", "ev": "pv", "url": "http://www.example.com", "dtm": 1400252420261, "tid": 7515828, "p": "pc"}']
Code language: PHP (php)
Manual flushing
You can flush the emitter manually using the flush
method of the Tracker
instance which is sending events to the emitter. This is a blocking call which synchronously sends all events in the emitter’s buffer.
t.flush()
Code language: CSS (css)
You can alternatively perform an asynchronous flush, which tells the tracker to send all buffered events but doesn’t wait for the sending to complete:
t.flush(False)
Code language: CSS (css)
If you are using the AsyncEmitter, you shouldn’t perform a synchronous flush inside an on_success or on_failure callback function as this can cause a deadlock.
Multiple emitters
You can configure a tracker instance to send events to multiple emitters by passing the Tracker
constructor function an array of emitters instead of a single emitter, or by using the addEmitter
method:
from snowplow_tracker import Subject, Tracker, AsyncEmitter, RedisEmitter
import redis
e1 = AsyncEmitter("collector1.cloudfront.net", method="get")
e1 = AsyncEmitter("collector2.cloudfront.net", method="post")
t = Tracker([e1, e2])
rdb = redis.StrictRedis(db=2)
e3 = RedisEmitter(rdb, "my_snowplow_key")
t.addEmitter(e3)
Code language: JavaScript (javascript)
Custom emitters
You can create your own custom emitter class, either from scratch or by subclassing one of the existing classes (with the exception of CeleryEmitter
, since it uses the pickle
module which doesn’t work correctly with a class subclassed from a class located in a different module). The only requirement for compatibility is that is must have an input
method which accepts a Python dictionary of name-value pairs.
Automatically retry sending failed events
You can use the following function as the on_failure
callback to immediately retry failed events:
def on_failure_retry(failed_event_count, failed_events):
# possible backoff-and-retry timeout here
for e in failed_events:
my_emitter.input(e)
Code language: CSS (css)
You may wish to add backoff logic to delay the resending.
Setting flush timer
You can flush your emitter based on some time interval:
e1 = AsyncEmitter("collector1.cloudfront.net", method="post")
e1.set_flush_timer(5) # flush each 5 seconds
Code language: PHP (php)
Automatic flush can also be cancelled:
e1.cancel_flush()
Code language: CSS (css)