diff --git a/tcp-kafka-dashing/README.md b/tcp-kafka-dashing/README.md new file mode 100644 index 0000000..eabcd44 --- /dev/null +++ b/tcp-kafka-dashing/README.md @@ -0,0 +1,97 @@ +Divolte tcp-kafka-dashing example +================================== + +This example uses the python to create a Kafka consumer that sends events from Divolte to Dashing. To run this, you need: +- The accompanying Javadoc Avro schema installed into your local Maven repository. +- A running HTTP server which serves the static Javadoc HTML files instrumented with the Divolte Collector tag. +- Kafka (including Zookeeper) +- Dashing +- kafka-python (pip install kafka-python) +- apache avro (http://avro.apache.org/docs/1.7.6/gettingstartedpython.html#download_install) + +## Building and running + +#### Step 1: install and configure Divolte Collector +Download the latest [Divolte Collector](https://github.com/divolte/divolte-collector) release. Use either the .zip or the .tar.gz archive. Extract the archive to a directory of your choice. In the installation, there is a conf/ directory. In here, create a file called divolte-collector.conf with the following contents: +```hocon +divolte { + kafka_flusher { + enabled = true + threads = 1 + } + + hdfs_flusher { + enabled = false + } + + javascript { + logging = true + debug = true + } + + tracking { + schema_file = /path/to/divolte-examples/avro-schema/src/main/resources/JavadocEventRecord.avsc + schema_mapping { + version = 2 + mapping_script_file = "/path/to/divolte-examples/avro-schema/mapping.groovy" + } + } +} +``` +> *Make sure you correct the paths to the Avro schema and mapping configuration!* + +#### Step 2: download, unpack and run Kafka +Setting up Kafka is beyond the scope of this document. It is however very simple to get Kafka up and running on your local machine using all default settings. [Download a Kafka release](https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz), unpack it and run as follows: +```sh +# In one terminal session +cd kafka_2.10-0.8.1.1/bin +./zookeeper-server-start.sh ../config/zookeeper.properties + +# Leave Zookeeper running and in another terminal session, do: +cd kafka_2.10-0.8.1.1/bin +./kafka-server-start.sh ../config/server.properties +``` +#### Step 3: start Divolte Collector +Go into the bin directory of your installation and run: +```sh +cd divolte-collector-0.2/bin +./divolte-collector +``` + +#### Step 4: host your Javadoc files +Setup a HTTP server that serves the Javadoc files that you generated or downloaded for the examples. If you have Python installed, you can use this: +```sh +cd +python -m SimpleHTTPServer +``` + +#### Step 5: build a dashing template based on your avro-schema +```sh +cd divolte-examples/tcp-kafka-dashing +python generate_template.py --schema /path/to/divolte-examples/avro-schema/src/main/resources/JavadocEventRecord.avsc +``` + +#### Step 7: generate a new dashing dashboard and install the required pie widget +```sh +cd +dashing new divolte_dashboard_project +cd divolte_dashboard_project +dashing install 6273841 +bundle +``` + +#### Step 8: copy/paste the template into your new dashboard +Edit dashboards/sample.erb, replace the body of the ul-element with the contents of the template.dashing file your generated in step 5. + +#### Step 9: start everything +```sh +cd /divolte_dashboard_project +dashing start + +# Leaving it running and in another terminal do +cd divolte-examples/tcp-kafka-dashing +python main.py --schema /path/to/divolte-examples/avro-schema/src/main/resources/JavadocEventRecord.avsc +``` + +#### Step 10: enjoy +Go to localhost:8000 to view the javadoc, and localhost:3030 to view the dashboard \ No newline at end of file diff --git a/tcp-kafka-dashing/generate_template.py b/tcp-kafka-dashing/generate_template.py new file mode 100644 index 0000000..a961f7f --- /dev/null +++ b/tcp-kafka-dashing/generate_template.py @@ -0,0 +1,85 @@ +import avro.schema +import avro.io +import argparse + +def select_dashing(options): + selected = raw_input('\tChoose which between %s, [%s]' % (", ".join(options), options[0])) + if selected not in options + ['']: + return select_dashing(options) + return selected or options[0] + +def generate_mapping(input_avro, output_template, output_properties): + mapping = {} + json_schema = "" + with open(input_avro) as fp: + for line in fp: + if not line.startswith("//"): + json_schema += line + + schema = avro.schema.parse(json_schema) + for key, value in schema.fields_dict.iteritems(): + type = value.to_json()['type'] + if isinstance(type, basestring): + type = [type] + + answer = raw_input('Include "%s" in the dashing dashboard? [yes]' % key) + if answer == '' or answer == 'yes': + if 'boolean' in type: + dashing_type = 'graph' + elif 'string' in type: + dashing_type = select_dashing(['pie', 'list']) + else: + dashing_type = select_dashing(['graph', 'number', 'meter']) + + mapping[key] = dashing_type + + template = open(output_template, 'w') + properties = open(output_properties, 'w') + + for key, value in mapping.iteritems(): + print >> properties, key, value + + if value == 'graph': + dashing_template = """ +
  • +
    +
  • """ + elif value == 'list': + dashing_template = """ +
  • +
    +
  • """ + elif value == 'pie': + dashing_template = """ +
  • +
    +
  • """ + elif value == 'number': + dashing_template = """ +
  • +
    +
  • """ + else: + dashing_template = """ +
  • +
    +
  • """ + + print >> template, dashing_template % (key, key.capitalize()) + + template.close() + properties.close() + +def parse_args(): + def utf8_bytes(s): + return bytes(s, 'utf-8') + + parser = argparse.ArgumentParser(description='Runs the consumer.') + parser.add_argument('--schema', '-s', metavar='SCHEMA', type=str, required=True, help='Avro schema of Kafka messages.') + parser.add_argument('--template', '-t', metavar='TEMPLATE', type=str, required=False, default='template.dashing', help='Where to output the dashing widget-template.') + parser.add_argument('--properties', '-p', metavar='PROPERTIES', type=str, required=False, default='properties.dashing', help='Where to output the property file mapping the avro fields to dashing widgets.') + return parser.parse_args() + +if __name__ == '__main__': + args = parse_args() + generate_mapping(args.schema, args.template, args.properties) diff --git a/tcp-kafka-dashing/main.py b/tcp-kafka-dashing/main.py new file mode 100644 index 0000000..c1195d6 --- /dev/null +++ b/tcp-kafka-dashing/main.py @@ -0,0 +1,99 @@ +import avro.schema +import avro.io +import argparse +from kafka import KafkaConsumer +from StringIO import StringIO +from utils import RunningAverage, StringRunningAverage +from threading import Thread +import requests +import json +import time +from requests.exceptions import ConnectionError + +def construct_averages(input_properties): + values = {} + for line in open(input_properties): + key, dashing_type = line.strip().split() + if dashing_type in ['pie', 'list']: + values[key] = (dashing_type, StringRunningAverage(5, 60)) + else: + values[key] = (dashing_type, RunningAverage(1 if key == 'pageView' else 5, 60)) + return values + +def start_consumer(input_avro, metadata_broker_list, values): + json_schema = "" + with open(input_avro) as fp: + for line in fp: + if not line.startswith("//"): + json_schema += line + + schema = avro.schema.parse(json_schema) + dreader = avro.io.DatumReader(schema, schema) + + def parse_message(msg): + return dreader.read(avro.io.BinaryDecoder(StringIO(msg))) + + consumer = KafkaConsumer("divolte", metadata_broker_list=metadata_broker_list, deserializer_class=lambda msg: parse_message(msg)) + for message in consumer: + timestamp = int(message.value['timestamp'] / 1000) + for key, value in message.value.iteritems(): + if key == 'eventType': + key, value = value, 1 + + if key in values: + values[key][1].addValue(timestamp, value) + + consumer.close() + +def start_poster(dashing_ip, dashing_auth, values): + def gen_list(dictionary, x, y, sortByKey=False, sortByValue=False, removeZeros=False): + keys = dictionary.keys() + if sortByValue: + keys.sort(cmp=lambda a, b: cmp(dictionary[a], dictionary[b]), reverse=True) + if sortByKey: + keys.sort() + return [{x: key, y: dictionary[key]} for key in keys if not removeZeros or (dictionary[key] != 0 and key)] + + while True: + for key, value in values.iteritems(): + try: + dashing_type, average = value + if dashing_type == 'list': + payload = {'auth_token': dashing_auth, 'items': gen_list(average.getSumAsDict(), "label", "value", sortByValue=True, removeZeros=True)} + elif dashing_type == 'graph': + payload = {'auth_token': dashing_auth, 'points': gen_list(average.getSumAsDict(), 'x', 'y', sortByKey=True)} + elif dashing_type == 'pie': + payload = {'auth_token': dashing_auth, 'value': gen_list(average.getSumAsDict(), 'label', 'value', removeZeros=True)} + elif dashing_type == 'number': + payload = {'auth_token': dashing_auth, 'current': average.getSum(), 'last': average.getSum()} + else: + raise RuntimeError("'%s' is not supported" % dashing_type) + + requests.post("http://%s/widgets/%s" % (dashing_ip, key), data=json.dumps(payload)) + + except ConnectionError: + pass + + time.sleep(1) + +def parse_args(): + def utf8_bytes(s): + return bytes(s, 'utf-8') + + parser = argparse.ArgumentParser(description='Runs the consumer.') + parser.add_argument('--schema', '-s', metavar='SCHEMA', type=str, required=True, help='Avro schema of Kafka messages.') + parser.add_argument('--properties', '-p', metavar='PROPERTIES', type=str, required=False, default='properties.dashing', help='The mapping of the avro fields to dashing widgets.') + parser.add_argument('--dashing', '-d', metavar='DASHING_HOST_PORT', type=str, required=False, default='localhost:3030', help='Dashing hostname + port.') + parser.add_argument('--dashingauth', '-a', metavar='DASHING_AUTH', type=str, required=False, default='YOUR_AUTH_TOKEN', help='Dashing auth token.') + parser.add_argument('--brokers', '-b', metavar='KAFKA_BROKERS', type=str, nargs="+", help='A list of Kafka brokers (host:port).', default=['localhost:9092']) + return parser.parse_args() + +if __name__ == '__main__': + args = parse_args() + + values = construct_averages(args.properties) + + t = Thread(target=start_poster, args=(args.dashing, args.dashingauth, values)) + t.start() + + start_consumer(args.schema, args.brokers, values) diff --git a/tcp-kafka-dashing/test/test_utils.py b/tcp-kafka-dashing/test/test_utils.py new file mode 100644 index 0000000..dec09b7 --- /dev/null +++ b/tcp-kafka-dashing/test/test_utils.py @@ -0,0 +1,57 @@ +from unittest import TestCase +from utils import RunningAverage, StringRunningAverage +from time import time + +class TestUtils(TestCase): + + def setUp(self): + self.r = RunningAverage(5, 60) + + def test_running_average(self): + self.r.addValue(time(), 1) + self.r.addValue(time(), 1) + + assert self.r.getSum() == 2, self.r.getSum() + assert self.r.getAverage() == 2 / 60.0, self.r.getAverage() + + def test_running_average_bin(self): + now = int(time() / 5) * 5 + self.r.addValue(now, 1) + self.r.addValue(now, 1) + self.r.addValue(now - 5, 1) + + assert self.r.getSumAsDict().get(now) == 2, (now, self.r.getSumAsDict()) + assert self.r.getAverageAsDict().get(now) == 2 / 5.0, (now, self.r.getAverageAsDict()) + + def test_running_average_bool(self): + self.r.addValue(time(), True) + self.r.addValue(time(), True) + + assert self.r.getSum() == 2, self.r.getSum() + assert self.r.getAverage() == 2 / 60.0, self.r.getAverage() + + def test_running_average_to_old(self): + self.r.addValue(0, 1) + self.r.addValue(0, 1) + + assert self.r.getSum() == 0, self.r.getSum() + assert self.r.getAverage() == 0, self.r.getAverage() + +class TestStringRA(TestCase): + + def setUp(self): + self.r = StringRunningAverage(5, 60) + self.r.addValue(time(), "hello") + self.r.addValue(time(), "world") + + def test_stringrunning_average(self): + assert self.r.getSum() == 2, self.r.getSum() + assert self.r.getAverage() == 2 / 60.0, self.r.getAverage() + + def test_stringrunning_average_bin(self): + self.r.addValue(time(), "world") + + assert self.r.getSumAsDict()["hello"] == 1, self.r.getSumAsDict() + assert self.r.getSumAsDict()["world"] == 2, self.r.getSumAsDict() + assert self.r.getAverageAsDict()["hello"] == 1 / 60.0, self.r.getAverageAsDict() + assert self.r.getAverageAsDict()["world"] == 2 / 60.0, self.r.getAverageAsDict() diff --git a/tcp-kafka-dashing/utils.py b/tcp-kafka-dashing/utils.py new file mode 100644 index 0000000..ed9c9de --- /dev/null +++ b/tcp-kafka-dashing/utils.py @@ -0,0 +1,113 @@ +from time import time +from collections import defaultdict +import sys + +class RunningAverage: + # refreshrate and period in seconds + def __init__(self, refreshRate, period): + self.refreshRate = refreshRate + self.period = period + + self.nbElements = self.period / self.refreshRate + 2 + self.lastUpdate = int(time() / self.refreshRate) + self.values = [0] * self.nbElements + + def addValue(self, timestamp, value): + if timestamp < (self.lastUpdate - self.period): + if __debug__: + print >> sys.stderr, "too old, ignoring" + else: + # we get the current time factor + timeFactor = int(timestamp / self.refreshRate) + + # we first _update the buffer + self._update(timeFactor) + + # and then we add our value to the current element + self.values[(timeFactor % self.nbElements)] += value + + def getAverage(self): + return self.getSum() / float(self.period) + + def getSum(self): + # We get the current timeFactor + timeFactor = int(time() / self.refreshRate) + + # We first _update the buffer + self._update(timeFactor) + + # The sum of all elements used for the average. + sum = 0 + + # Starting on oldest one (the one after the next one) + # Ending on last one fully updated (the one previous current one) + for i in range(timeFactor + 2, timeFactor + self.nbElements + 1): + # Simple addition + sum += self.values[i % self.nbElements] + + return sum + + def getAverageAsDict(self): + sumdict = self.getSumAsDict() + for key, value in sumdict.items(): + sumdict[key] = value / float(self.refreshRate) + + return sumdict + + def getSumAsDict(self): + # We get the current timeFactor + timeFactor = int(time() / self.refreshRate) + + # We first _update the buffer + self._update(timeFactor) + + sumdict = {} + + # Starting on oldest one (the one after the next one) + # Ending on last one fully updated (the one previous current one) + for i in range(timeFactor, timeFactor - self.nbElements, -1): + sumdict[i * self.refreshRate] = self.values[i % self.nbElements] + + return sumdict + + def _update(self, timeFactor): + # If we have a really OLD lastUpdate, we could erase the buffer a + # huge number of times, so if it's really old, we change it so we'll only + # erase the buffer once. + + if self.lastUpdate < timeFactor - self.nbElements: + self.lastUpdate = timeFactor - self.nbElements - 1; + + # For all values between lastUpdate + 1 (next value than last updated) + # and timeFactor (which is the new value insertion position) + if self.lastUpdate < timeFactor: + for i in range(self.lastUpdate + 1, timeFactor): + self.values[i % self.nbElements] = 0 + + # We also clear the next value to be inserted (so on next time change...) + self.values[(timeFactor + 1) % self.nbElements] = 0 + + # And we _update lastUpdate. + self.lastUpdate = timeFactor + + +class StringRunningAverage: + + def __init__(self, refreshRate, period): + self.period = period + self.values = defaultdict(lambda: RunningAverage(refreshRate, period)) + + def addValue(self, timestamp, value): + self.values[value].addValue(timestamp, 1) + + def getAverage(self): + return self.getSum() / float(self.period) + + def getSum(self): + return sum(value.getSum() for value in self.values.itervalues()) + + def getAverageAsDict(self): + return dict((key, value.getAverage()) for key, value in self.values.iteritems()) + + def getSumAsDict(self): + return dict((key, value.getSum()) for key, value in self.values.iteritems())