From 43441a431e3b9e42dca535ace774b3bd667d75d7 Mon Sep 17 00:00:00 2001 From: Ivan Kravets Date: Sun, 14 Dec 2014 00:39:33 +0200 Subject: [PATCH] Improve Telemetry service --- platformio/telemetry.py | 155 ++++++++++++++++++++++++++++------------ 1 file changed, 108 insertions(+), 47 deletions(-) diff --git a/platformio/telemetry.py b/platformio/telemetry.py index 55d1d276..2ec9a180 100644 --- a/platformio/telemetry.py +++ b/platformio/telemetry.py @@ -1,8 +1,10 @@ # Copyright (C) Ivan Kravets # See LICENSE for details. +import atexit import platform import re +import threading import uuid from sys import argv as sys_argv from time import time @@ -57,14 +59,6 @@ class MeasurementProtocol(TelemetryBase): self._prefill_appinfo() self._prefill_custom_data() - @classmethod - def session_instance(cls): - try: - return cls._session_instance - except AttributeError: - cls._session_instance = requests.Session() - return cls._session_instance - def __getitem__(self, name): if name in self.PARAMS_MAP: name = self.PARAMS_MAP[name] @@ -119,22 +113,88 @@ class MeasurementProtocol(TelemetryBase): if "qt" in self._params and isinstance(self['qt'], float): self['qt'] = int((time() - self['qt']) * 1000) + MPDataPusher.get_instance().push(self._params) + + +class MPDataPusher(threading.Thread): + + @classmethod + def get_instance(cls): try: - r = self.session_instance().post( - "https://ssl.google-analytics.com/collect", - data=self._params - ) - r.raise_for_status() - except: # pylint: disable=W0702 - backup_report(self._params) - return False - return True + return cls._thinstance + except AttributeError: + cls._event = threading.Event() + cls._thinstance = cls() + cls._thinstance.start() + return cls._thinstance + + @classmethod + def http_session(cls): + try: + return cls._http_session + except AttributeError: + cls._http_session = requests.Session() + return cls._http_session + + def __init__(self): + threading.Thread.__init__(self) + self._terminate = False + self._server_online = False + self._stack = [] + + def run(self): + while not self._terminate: + self._event.wait() + if self._terminate or not self._stack: + return + self._event.clear() + + data = self._stack.pop() + try: + r = self.http_session().post( + "https://ssl.google-analytics.com/collect", + data=data, + timeout=3 + ) + r.raise_for_status() + self._server_online = True + except: # pylint: disable=W0702 + self._server_online = False + self._stack.append(data) + + def push(self, data): + self._stack.append(data) + self._event.set() + + def is_server_online(self): + return self._server_online + + def get_stack_data(self): + return self._stack + + def join(self, timeout=3): + self._terminate = True + self._event.set() + self.http_session().close() + threading.Thread.join(self, timeout) + + +@atexit.register +def _finalize(): + MAX_RESEND_REPORTS = 10 + mpdp = MPDataPusher.get_instance() + backup_reports(mpdp.get_stack_data()) + + resent_nums = 0 + while mpdp.is_server_online() and resent_nums < MAX_RESEND_REPORTS: + if not resend_backuped_report(): + break + resent_nums += 1 def on_command(ctx): # pylint: disable=W0613 mp = MeasurementProtocol() - if mp.send("screenview"): - resend_backuped_reports() + mp.send("screenview") def on_run_environment(options, targets): @@ -153,53 +213,54 @@ def on_event(category, action, label=None, value=None, screen_name=None): mp['event_value'] = int(value) if screen_name: mp['screen_name'] = screen_name[:2048] - return mp.send("event") + mp.send("event") def on_exception(e): mp = MeasurementProtocol() mp['exd'] = "%s: %s" % (type(e).__name__, e) mp['exf'] = 1 - return mp.send("exception") + mp.send("exception") -def backup_report(params): +def backup_reports(data): + if not data: + return + KEEP_MAX_REPORTS = 1000 tm = app.get_state_item("telemetry", {}) if "backup" not in tm: tm['backup'] = [] - # skip static options - for key in params.keys(): - if key in ("v", "tid", "cid", "cd1", "cd2", "sr", "an"): - del params[key] + for params in data: + # skip static options + for key in params.keys(): + if key in ("v", "tid", "cid", "cd1", "cd2", "sr", "an"): + del params[key] - # store time in UNIX format - if "qt" not in params: - params['qt'] = time() - elif not isinstance(params['qt'], float): - params['qt'] = time() - (params['qt'] / 1000) + # store time in UNIX format + if "qt" not in params: + params['qt'] = time() + elif not isinstance(params['qt'], float): + params['qt'] = time() - (params['qt'] / 1000) + + tm['backup'].append(params) - tm['backup'].append(params) tm['backup'] = tm['backup'][KEEP_MAX_REPORTS*-1:] app.set_state_item("telemetry", tm) -def resend_backuped_reports(): - MAX_RESEND_REPORTS = 10 +def resend_backuped_report(): + tm = app.get_state_item("telemetry", {}) + if "backup" not in tm or not tm['backup']: + return False - resent_nums = 0 - while resent_nums < MAX_RESEND_REPORTS: - tm = app.get_state_item("telemetry", {}) - if "backup" not in tm or not tm['backup']: - break + report = tm['backup'].pop() + app.set_state_item("telemetry", tm) - report = tm['backup'].pop() - app.set_state_item("telemetry", tm) - resent_nums += 1 + mp = MeasurementProtocol() + for key, value in report.items(): + mp[key] = value + mp.send(report['t']) - mp = MeasurementProtocol() - for key, value in report.items(): - mp[key] = value - if not mp.send(report['t']): - break + return True