diff --git a/platformio/telemetry.py b/platformio/telemetry.py index 5064c3dd..0c8adc52 100644 --- a/platformio/telemetry.py +++ b/platformio/telemetry.py @@ -15,59 +15,53 @@ import atexit import hashlib import os -import platform as python_platform import queue import re import threading import time +import traceback from collections import deque -from traceback import format_exc import requests -from platformio import __title__, __version__, app, exception, util +from platformio import __title__, __version__, app, exception, fs, util from platformio.cli import PlatformioCLI from platformio.compat import hashlib_encode_data from platformio.debug.config.base import DebugConfigBase from platformio.http import HTTPSession, ensure_internet_on -from platformio.proc import is_ci, is_container +from platformio.proc import is_ci KEEP_MAX_REPORTS = 100 SEND_MAX_EVENTS = 25 -SESSION_TIMEOUT_DURATION = 30 * 60 # secs class MeasurementProtocol: - def __init__(self): + def __init__(self, events=None): self.client_id = app.get_cid() - self.session_id = start_session() + self._events = events or [] self._user_properties = {} - self._events = [] - caller_id = app.get_session_var("caller_id") - if caller_id: - self.set_user_property("pio_caller_id", caller_id) - self.set_user_property("pio_core_version", __version__) - self.set_user_property( - "pio_human_actor", int(bool(caller_id or not (is_ci() or is_container()))) - ) - self.set_user_property("pio_systype", util.get_systype()) + self.set_user_property("systype", util.get_systype()) created_at = app.get_state_item("created_at", None) if created_at: - self.set_user_property("pio_created_at", int(created_at)) + self.set_user_property("created_at", int(created_at)) + + @staticmethod + def event_to_dict(name, params, timestamp=None): + event = {"name": name, "params": params} + if timestamp is not None: + event["timestamp"] = timestamp + return event def set_user_property(self, name, value): - self._user_properties[name] = {"value": value} + self._user_properties[name] = value def add_event(self, name, params): - params["session_id"] = params.get("session_id", self.session_id) - params["engagement_time_msec"] = params.get("engagement_time_msec", 1) - self._events.append({"name": name, "params": params}) + self._events.append(self.event_to_dict(name, params)) def to_payload(self): return { "client_id": self.client_id, - "non_personalized_ads": True, "user_properties": self._user_properties, "events": self._events, } @@ -75,142 +69,122 @@ class MeasurementProtocol: @util.singleton class TelemetryLogger: - MAX_WORKERS = 5 - def __init__(self): - self._queue = queue.LifoQueue() - self._failedque = deque() + self._events = deque() + + self._sender_thread = None + self._sender_queue = queue.Queue() + self._sender_terminated = False + self._http_session = HTTPSession() self._http_offline = False - self._workers = [] - def log(self, payload): + def close(self): + self._http_session.close() + + def log_event(self, name, params, timestamp=None, instant_sending=False): if not app.get_setting("enable_telemetry") or app.get_session_var( "pause_telemetry" ): return None - - # if network is off-line - if self._http_offline: - self._failedque.append(payload) + timestamp = timestamp or int(time.time()) + self._events.append( + MeasurementProtocol.event_to_dict(name, params, timestamp=timestamp) + ) + if self._http_offline: # if network is off-line return False - - self._queue.put(payload) - self._tune_workers() + if instant_sending: + self.send() return True - def in_wait(self): - return self._queue.unfinished_tasks - - def get_unprocessed(self): - items = list(self._failedque) - try: - while True: - items.append(self._queue.get_nowait()) - except queue.Empty: - pass - return items - - def _tune_workers(self): - for i, w in enumerate(self._workers): - if not w.is_alive(): - del self._workers[i] - - need_nums = min(self._queue.qsize(), self.MAX_WORKERS) - active_nums = len(self._workers) - if need_nums <= active_nums: + def send(self): + if not self._events or self._sender_terminated: return - - for i in range(need_nums - active_nums): - t = threading.Thread(target=self._worker) - t.daemon = True - t.start() - self._workers.append(t) - - def _worker(self): - while True: + if not self._sender_thread: + self._sender_thread = threading.Thread( + target=self._sender_worker, daemon=True + ) + self._sender_thread.start() + while self._events: + events = [] try: - item = self._queue.get() - _item = item.copy() - self._failedque.append(_item) - if self._send(item): - self._failedque.remove(_item) - self._queue.task_done() - except: # pylint: disable=bare-except + while len(events) < SEND_MAX_EVENTS: + events.append(self._events.popleft()) + except IndexError: + pass + self._sender_queue.put(events) + + def _sender_worker(self): + while True: + if self._sender_terminated: + return + try: + events = self._sender_queue.get() + if not self._commit_events(events): + self._events.extend(events) + self._sender_queue.task_done() + except (queue.Empty, ValueError): pass - def _send(self, payload): + def _commit_events(self, events): if self._http_offline: return False + mp = MeasurementProtocol(events) + payload = mp.to_payload() + # print("_commit_payload", payload) try: r = self._http_session.post( - "https://www.google-analytics.com/mp/collect", - params={ - "measurement_id": util.decrypt_message( - __title__, "t5m7rKu6tbqwx8Cw" - ), - "api_secret": util.decrypt_message( - __title__, "48SRy5rmut28ptm7zLjS5sa7tdmhrQ==" - ), - }, + "https://telemetry.platformio.org/collect", json=payload, - timeout=1, + timeout=(2, 5), # connect, read ) r.raise_for_status() return True except requests.exceptions.HTTPError as exc: # skip Bad Request - if 400 >= exc.response.status_code < 500: + if exc.response.status_code >= 400 and exc.response.status_code < 500: return True except: # pylint: disable=bare-except pass self._http_offline = True return False + def terminate_sender(self): + self._sender_terminated = True -@util.memoized("1m") -def start_session(): - with app.State( - app.resolve_state_path("cache_dir", "session.json"), lock=True - ) as state: - state.modified = True - start_at = state.get("start_at") - last_seen_at = state.get("last_seen_at") + def is_sending(self): + return self._sender_queue.unfinished_tasks - if ( - not start_at - or not last_seen_at - or last_seen_at < (time.time() - SESSION_TIMEOUT_DURATION) - ): - start_at = last_seen_at = int(time.time()) - state["start_at"] = state["last_seen_at"] = start_at - else: - state["last_seen_at"] = int(time.time()) - return start_at + def get_unsent_events(self): + result = list(self._events) + try: + while True: + result.extend(self._sender_queue.get_nowait()) + except queue.Empty: + pass + return result + + +def log_event(name, params, instant_sending=False): + TelemetryLogger().log_event(name, params, instant_sending=instant_sending) def on_platformio_start(cmd_ctx): + process_postponed_logs() log_command(cmd_ctx) - resend_postponed_logs() -def log_event(name, params): - mp = MeasurementProtocol() - mp.add_event(name, params) - TelemetryLogger().log(mp.to_payload()) +def on_platformio_end(): + TelemetryLogger().send() def log_command(ctx): - path_args = PlatformioCLI.reveal_cmd_path_args(ctx) params = { - "page_title": " ".join([arg.title() for arg in path_args]), - "page_path": "/".join(path_args), - "pio_user_agent": app.get_user_agent(), - "pio_python_version": python_platform.python_version(), + "path_args": PlatformioCLI.reveal_cmd_path_args(ctx), } if is_ci(): params["ci_actor"] = resolve_ci_actor() or "Unknown" - log_event("page_view", params) + log_event("cmd_run", params) def resolve_ci_actor(): @@ -229,35 +203,6 @@ def resolve_ci_actor(): return None -def log_exception(e): - skip_conditions = [ - isinstance(e, cls) - for cls in ( - IOError, - exception.ReturnErrorCode, - exception.UserSideException, - ) - ] - if any(skip_conditions): - return - is_fatal = any( - [ - not isinstance(e, exception.PlatformioException), - "Error" in e.__class__.__name__, - ] - ) - description = "%s: %s" % ( - type(e).__name__, - " ".join(reversed(format_exc().split("\n"))) if is_fatal else str(e), - ) - params = { - "description": description[:100].strip(), - "is_fatal": int(is_fatal), - "pio_user_agent": app.get_user_agent(), - } - log_event("pio_exception", params) - - def dump_project_env_params(config, env, platform): non_sensitive_data = [ "platform", @@ -270,77 +215,113 @@ def dump_project_env_params(config, env, platform): ] section = f"env:{env}" params = { - f"pio_{option}": config.get(section, option) + option: config.get(section, option) for option in non_sensitive_data if config.has_option(section, option) } - params["pio_pid"] = hashlib.sha1(hashlib_encode_data(config.path)).hexdigest() - params["pio_platform_name"] = platform.name - params["pio_platform_version"] = platform.version - params["pio_framework"] = params.get("pio_framework", "__bare_metal__") - # join multi-value options - for key, value in params.items(): - if isinstance(value, list): - params[key] = ", ".join(value) + params["pid"] = hashlib.sha1(hashlib_encode_data(config.path)).hexdigest() + params["platform_name"] = platform.name + params["platform_version"] = platform.version return params -def log_platform_run( - platform, project_config, project_env, targets=None, elapsed_time=None -): +def log_platform_run(platform, project_config, project_env, targets=None): params = dump_project_env_params(project_config, project_env, platform) if targets: - params["targets"] = ", ".join(targets) - if elapsed_time: - params["engagement_time_msec"] = int(elapsed_time * 1000) - log_event("pio_platform_run", params) + params["targets"] = targets + log_event("platform_run", params, instant_sending=True) + + +def log_exception(exc): + skip_conditions = [ + isinstance(exc, cls) + for cls in ( + IOError, + exception.ReturnErrorCode, + exception.UserSideException, + ) + ] + skip_conditions.append(not isinstance(exc, Exception)) + if any(skip_conditions): + return + is_fatal = any( + [ + not isinstance(exc, exception.PlatformioException), + "Error" in exc.__class__.__name__, + ] + ) + + def _strip_module_path(match): + module_path = match.group(1).replace(fs.get_source_dir() + os.sep, "") + sp_folder_name = "site-packages" + sp_pos = module_path.find(sp_folder_name) + if sp_pos != -1: + module_path = module_path[sp_pos + len(sp_folder_name) + 1 :] + module_path = fs.to_unix_path(module_path) + return f'File "{module_path}",' + + trace = re.sub( + r'File "([^"]+)",', + _strip_module_path, + traceback.format_exc(), + flags=re.MULTILINE, + ) + + params = { + "name": exc.__class__.__name__, + "description": str(exc), + "traceback": trace, + "is_fatal": is_fatal, + } + log_event("exception", params) def log_debug_started(debug_config: DebugConfigBase): log_event( - "pio_debug_started", + "debug_started", dump_project_env_params( debug_config.project_config, debug_config.env_name, debug_config.platform ), ) -def log_debug_exception(description, debug_config: DebugConfigBase): +def log_debug_exception(exc, debug_config: DebugConfigBase): # cleanup sensitive information, such as paths - description = description.replace("Traceback (most recent call last):", "") - description = description.replace("\\", "/") + description = fs.to_unix_path(str(exc)) description = re.sub( r'(^|\s+|")(?:[a-z]\:)?((/[^"/]+)+)(\s+|"|$)', lambda m: " %s " % os.path.join(*m.group(2).split("/")[-2:]), description, re.I | re.M, ) - description = re.sub(r"\s+", " ", description, flags=re.M) params = { - "description": description[:100].strip(), - "pio_user_agent": app.get_user_agent(), + "name": exc.__class__.__name__, + "description": description.strip(), } params.update( dump_project_env_params( debug_config.project_config, debug_config.env_name, debug_config.platform ) ) - log_event("pio_debug_exception", params) + log_event("debug_exception", params) @atexit.register def _finalize(): timeout = 1000 # msec elapsed = 0 + telemetry = TelemetryLogger() + telemetry.terminate_sender() try: while elapsed < timeout: - if not TelemetryLogger().in_wait(): + if not telemetry.is_sending(): break time.sleep(0.2) elapsed += 200 - postpone_logs(TelemetryLogger().get_unprocessed()) except KeyboardInterrupt: pass + postpone_events(telemetry.get_unsent_events()) + telemetry.close() def load_postponed_events(): @@ -353,9 +334,9 @@ def load_postponed_events(): return state.get("events", []) -def save_postponed_events(items): +def save_postponed_events(events): state_path = app.resolve_state_path("cache_dir", "telemetry.json") - if not items: + if not events: try: if os.path.isfile(state_path): os.remove(state_path) @@ -363,33 +344,38 @@ def save_postponed_events(items): pass return None with app.State(state_path, lock=True) as state: - state["events"] = items + state["events"] = events state.modified = True return True -def postpone_logs(payloads): - if not payloads: +def postpone_events(events): + if not events: return None postponed_events = load_postponed_events() or [] - timestamp_micros = int(time.time() * 1000000) - for payload in payloads: - for event in payload.get("events", []): - event["timestamp_micros"] = timestamp_micros - postponed_events.append(event) + timestamp = int(time.time()) + for event in events: + if "timestamp" not in event: + event["timestamp"] = timestamp + postponed_events.append(event) save_postponed_events(postponed_events[KEEP_MAX_REPORTS * -1 :]) return True -def resend_postponed_logs(): - events = load_postponed_events() - if not events or not ensure_internet_on(): +def process_postponed_logs(): + if not ensure_internet_on(): return None - save_postponed_events(events[SEND_MAX_EVENTS:]) # clean - mp = MeasurementProtocol() - payload = mp.to_payload() - payload["events"] = events[0:SEND_MAX_EVENTS] - TelemetryLogger().log(payload) - if len(events) > SEND_MAX_EVENTS: - resend_postponed_logs() + events = load_postponed_events() + if not events: + return None + save_postponed_events([]) # clean + telemetry = TelemetryLogger() + for event in events: + if set(["name", "params", "timestamp"]) <= set(event.keys()): + telemetry.log_event( + event["name"], + event["params"], + timestamp=event["timestamp"], + instant_sending=False, + ) return True