Refactor event logging

This commit is contained in:
Ivan Kravets
2023-06-16 20:09:23 +03:00
parent 378528abfc
commit d017a8197e

View File

@ -15,59 +15,53 @@
import atexit import atexit
import hashlib import hashlib
import os import os
import platform as python_platform
import queue import queue
import re import re
import threading import threading
import time import time
import traceback
from collections import deque from collections import deque
from traceback import format_exc
import requests import requests
from platformio import __title__, __version__, app, exception, util from platformio import __title__, __version__, app, exception, fs, util
from platformio.cli import PlatformioCLI from platformio.cli import PlatformioCLI
from platformio.compat import hashlib_encode_data from platformio.compat import hashlib_encode_data
from platformio.debug.config.base import DebugConfigBase from platformio.debug.config.base import DebugConfigBase
from platformio.http import HTTPSession, ensure_internet_on from platformio.http import HTTPSession, ensure_internet_on
from platformio.proc import is_ci, is_container from platformio.proc import is_ci
KEEP_MAX_REPORTS = 100 KEEP_MAX_REPORTS = 100
SEND_MAX_EVENTS = 25 SEND_MAX_EVENTS = 25
SESSION_TIMEOUT_DURATION = 30 * 60 # secs
class MeasurementProtocol: class MeasurementProtocol:
def __init__(self): def __init__(self, events=None):
self.client_id = app.get_cid() self.client_id = app.get_cid()
self.session_id = start_session() self._events = events or []
self._user_properties = {} self._user_properties = {}
self._events = []
caller_id = app.get_session_var("caller_id") self.set_user_property("systype", util.get_systype())
if caller_id:
self.set_user_property("pio_caller_id", caller_id)
self.set_user_property("pio_core_version", __version__)
self.set_user_property(
"pio_human_actor", int(bool(caller_id or not (is_ci() or is_container())))
)
self.set_user_property("pio_systype", util.get_systype())
created_at = app.get_state_item("created_at", None) created_at = app.get_state_item("created_at", None)
if created_at: if created_at:
self.set_user_property("pio_created_at", int(created_at)) self.set_user_property("created_at", int(created_at))
@staticmethod
def event_to_dict(name, params, timestamp=None):
event = {"name": name, "params": params}
if timestamp is not None:
event["timestamp"] = timestamp
return event
def set_user_property(self, name, value): def set_user_property(self, name, value):
self._user_properties[name] = {"value": value} self._user_properties[name] = value
def add_event(self, name, params): def add_event(self, name, params):
params["session_id"] = params.get("session_id", self.session_id) self._events.append(self.event_to_dict(name, params))
params["engagement_time_msec"] = params.get("engagement_time_msec", 1)
self._events.append({"name": name, "params": params})
def to_payload(self): def to_payload(self):
return { return {
"client_id": self.client_id, "client_id": self.client_id,
"non_personalized_ads": True,
"user_properties": self._user_properties, "user_properties": self._user_properties,
"events": self._events, "events": self._events,
} }
@ -75,142 +69,122 @@ class MeasurementProtocol:
@util.singleton @util.singleton
class TelemetryLogger: class TelemetryLogger:
MAX_WORKERS = 5
def __init__(self): def __init__(self):
self._queue = queue.LifoQueue() self._events = deque()
self._failedque = deque()
self._sender_thread = None
self._sender_queue = queue.Queue()
self._sender_terminated = False
self._http_session = HTTPSession() self._http_session = HTTPSession()
self._http_offline = False self._http_offline = False
self._workers = []
def log(self, payload): def close(self):
self._http_session.close()
def log_event(self, name, params, timestamp=None, instant_sending=False):
if not app.get_setting("enable_telemetry") or app.get_session_var( if not app.get_setting("enable_telemetry") or app.get_session_var(
"pause_telemetry" "pause_telemetry"
): ):
return None return None
timestamp = timestamp or int(time.time())
# if network is off-line self._events.append(
if self._http_offline: MeasurementProtocol.event_to_dict(name, params, timestamp=timestamp)
self._failedque.append(payload) )
if self._http_offline: # if network is off-line
return False return False
if instant_sending:
self._queue.put(payload) self.send()
self._tune_workers()
return True return True
def in_wait(self): def send(self):
return self._queue.unfinished_tasks if not self._events or self._sender_terminated:
def get_unprocessed(self):
items = list(self._failedque)
try:
while True:
items.append(self._queue.get_nowait())
except queue.Empty:
pass
return items
def _tune_workers(self):
for i, w in enumerate(self._workers):
if not w.is_alive():
del self._workers[i]
need_nums = min(self._queue.qsize(), self.MAX_WORKERS)
active_nums = len(self._workers)
if need_nums <= active_nums:
return return
if not self._sender_thread:
for i in range(need_nums - active_nums): self._sender_thread = threading.Thread(
t = threading.Thread(target=self._worker) target=self._sender_worker, daemon=True
t.daemon = True )
t.start() self._sender_thread.start()
self._workers.append(t) while self._events:
events = []
def _worker(self):
while True:
try: try:
item = self._queue.get() while len(events) < SEND_MAX_EVENTS:
_item = item.copy() events.append(self._events.popleft())
self._failedque.append(_item) except IndexError:
if self._send(item): pass
self._failedque.remove(_item) self._sender_queue.put(events)
self._queue.task_done()
except: # pylint: disable=bare-except def _sender_worker(self):
while True:
if self._sender_terminated:
return
try:
events = self._sender_queue.get()
if not self._commit_events(events):
self._events.extend(events)
self._sender_queue.task_done()
except (queue.Empty, ValueError):
pass pass
def _send(self, payload): def _commit_events(self, events):
if self._http_offline: if self._http_offline:
return False return False
mp = MeasurementProtocol(events)
payload = mp.to_payload()
# print("_commit_payload", payload)
try: try:
r = self._http_session.post( r = self._http_session.post(
"https://www.google-analytics.com/mp/collect", "https://telemetry.platformio.org/collect",
params={
"measurement_id": util.decrypt_message(
__title__, "t5m7rKu6tbqwx8Cw"
),
"api_secret": util.decrypt_message(
__title__, "48SRy5rmut28ptm7zLjS5sa7tdmhrQ=="
),
},
json=payload, json=payload,
timeout=1, timeout=(2, 5), # connect, read
) )
r.raise_for_status() r.raise_for_status()
return True return True
except requests.exceptions.HTTPError as exc: except requests.exceptions.HTTPError as exc:
# skip Bad Request # skip Bad Request
if 400 >= exc.response.status_code < 500: if exc.response.status_code >= 400 and exc.response.status_code < 500:
return True return True
except: # pylint: disable=bare-except except: # pylint: disable=bare-except
pass pass
self._http_offline = True self._http_offline = True
return False return False
def terminate_sender(self):
self._sender_terminated = True
@util.memoized("1m") def is_sending(self):
def start_session(): return self._sender_queue.unfinished_tasks
with app.State(
app.resolve_state_path("cache_dir", "session.json"), lock=True
) as state:
state.modified = True
start_at = state.get("start_at")
last_seen_at = state.get("last_seen_at")
if ( def get_unsent_events(self):
not start_at result = list(self._events)
or not last_seen_at try:
or last_seen_at < (time.time() - SESSION_TIMEOUT_DURATION) while True:
): result.extend(self._sender_queue.get_nowait())
start_at = last_seen_at = int(time.time()) except queue.Empty:
state["start_at"] = state["last_seen_at"] = start_at pass
else: return result
state["last_seen_at"] = int(time.time())
return start_at
def log_event(name, params, instant_sending=False):
TelemetryLogger().log_event(name, params, instant_sending=instant_sending)
def on_platformio_start(cmd_ctx): def on_platformio_start(cmd_ctx):
process_postponed_logs()
log_command(cmd_ctx) log_command(cmd_ctx)
resend_postponed_logs()
def log_event(name, params): def on_platformio_end():
mp = MeasurementProtocol() TelemetryLogger().send()
mp.add_event(name, params)
TelemetryLogger().log(mp.to_payload())
def log_command(ctx): def log_command(ctx):
path_args = PlatformioCLI.reveal_cmd_path_args(ctx)
params = { params = {
"page_title": " ".join([arg.title() for arg in path_args]), "path_args": PlatformioCLI.reveal_cmd_path_args(ctx),
"page_path": "/".join(path_args),
"pio_user_agent": app.get_user_agent(),
"pio_python_version": python_platform.python_version(),
} }
if is_ci(): if is_ci():
params["ci_actor"] = resolve_ci_actor() or "Unknown" params["ci_actor"] = resolve_ci_actor() or "Unknown"
log_event("page_view", params) log_event("cmd_run", params)
def resolve_ci_actor(): def resolve_ci_actor():
@ -229,35 +203,6 @@ def resolve_ci_actor():
return None return None
def log_exception(e):
skip_conditions = [
isinstance(e, cls)
for cls in (
IOError,
exception.ReturnErrorCode,
exception.UserSideException,
)
]
if any(skip_conditions):
return
is_fatal = any(
[
not isinstance(e, exception.PlatformioException),
"Error" in e.__class__.__name__,
]
)
description = "%s: %s" % (
type(e).__name__,
" ".join(reversed(format_exc().split("\n"))) if is_fatal else str(e),
)
params = {
"description": description[:100].strip(),
"is_fatal": int(is_fatal),
"pio_user_agent": app.get_user_agent(),
}
log_event("pio_exception", params)
def dump_project_env_params(config, env, platform): def dump_project_env_params(config, env, platform):
non_sensitive_data = [ non_sensitive_data = [
"platform", "platform",
@ -270,77 +215,113 @@ def dump_project_env_params(config, env, platform):
] ]
section = f"env:{env}" section = f"env:{env}"
params = { params = {
f"pio_{option}": config.get(section, option) option: config.get(section, option)
for option in non_sensitive_data for option in non_sensitive_data
if config.has_option(section, option) if config.has_option(section, option)
} }
params["pio_pid"] = hashlib.sha1(hashlib_encode_data(config.path)).hexdigest() params["pid"] = hashlib.sha1(hashlib_encode_data(config.path)).hexdigest()
params["pio_platform_name"] = platform.name params["platform_name"] = platform.name
params["pio_platform_version"] = platform.version params["platform_version"] = platform.version
params["pio_framework"] = params.get("pio_framework", "__bare_metal__")
# join multi-value options
for key, value in params.items():
if isinstance(value, list):
params[key] = ", ".join(value)
return params return params
def log_platform_run( def log_platform_run(platform, project_config, project_env, targets=None):
platform, project_config, project_env, targets=None, elapsed_time=None
):
params = dump_project_env_params(project_config, project_env, platform) params = dump_project_env_params(project_config, project_env, platform)
if targets: if targets:
params["targets"] = ", ".join(targets) params["targets"] = targets
if elapsed_time: log_event("platform_run", params, instant_sending=True)
params["engagement_time_msec"] = int(elapsed_time * 1000)
log_event("pio_platform_run", params)
def log_exception(exc):
skip_conditions = [
isinstance(exc, cls)
for cls in (
IOError,
exception.ReturnErrorCode,
exception.UserSideException,
)
]
skip_conditions.append(not isinstance(exc, Exception))
if any(skip_conditions):
return
is_fatal = any(
[
not isinstance(exc, exception.PlatformioException),
"Error" in exc.__class__.__name__,
]
)
def _strip_module_path(match):
module_path = match.group(1).replace(fs.get_source_dir() + os.sep, "")
sp_folder_name = "site-packages"
sp_pos = module_path.find(sp_folder_name)
if sp_pos != -1:
module_path = module_path[sp_pos + len(sp_folder_name) + 1 :]
module_path = fs.to_unix_path(module_path)
return f'File "{module_path}",'
trace = re.sub(
r'File "([^"]+)",',
_strip_module_path,
traceback.format_exc(),
flags=re.MULTILINE,
)
params = {
"name": exc.__class__.__name__,
"description": str(exc),
"traceback": trace,
"is_fatal": is_fatal,
}
log_event("exception", params)
def log_debug_started(debug_config: DebugConfigBase): def log_debug_started(debug_config: DebugConfigBase):
log_event( log_event(
"pio_debug_started", "debug_started",
dump_project_env_params( dump_project_env_params(
debug_config.project_config, debug_config.env_name, debug_config.platform debug_config.project_config, debug_config.env_name, debug_config.platform
), ),
) )
def log_debug_exception(description, debug_config: DebugConfigBase): def log_debug_exception(exc, debug_config: DebugConfigBase):
# cleanup sensitive information, such as paths # cleanup sensitive information, such as paths
description = description.replace("Traceback (most recent call last):", "") description = fs.to_unix_path(str(exc))
description = description.replace("\\", "/")
description = re.sub( description = re.sub(
r'(^|\s+|")(?:[a-z]\:)?((/[^"/]+)+)(\s+|"|$)', r'(^|\s+|")(?:[a-z]\:)?((/[^"/]+)+)(\s+|"|$)',
lambda m: " %s " % os.path.join(*m.group(2).split("/")[-2:]), lambda m: " %s " % os.path.join(*m.group(2).split("/")[-2:]),
description, description,
re.I | re.M, re.I | re.M,
) )
description = re.sub(r"\s+", " ", description, flags=re.M)
params = { params = {
"description": description[:100].strip(), "name": exc.__class__.__name__,
"pio_user_agent": app.get_user_agent(), "description": description.strip(),
} }
params.update( params.update(
dump_project_env_params( dump_project_env_params(
debug_config.project_config, debug_config.env_name, debug_config.platform debug_config.project_config, debug_config.env_name, debug_config.platform
) )
) )
log_event("pio_debug_exception", params) log_event("debug_exception", params)
@atexit.register @atexit.register
def _finalize(): def _finalize():
timeout = 1000 # msec timeout = 1000 # msec
elapsed = 0 elapsed = 0
telemetry = TelemetryLogger()
telemetry.terminate_sender()
try: try:
while elapsed < timeout: while elapsed < timeout:
if not TelemetryLogger().in_wait(): if not telemetry.is_sending():
break break
time.sleep(0.2) time.sleep(0.2)
elapsed += 200 elapsed += 200
postpone_logs(TelemetryLogger().get_unprocessed())
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
postpone_events(telemetry.get_unsent_events())
telemetry.close()
def load_postponed_events(): def load_postponed_events():
@ -353,9 +334,9 @@ def load_postponed_events():
return state.get("events", []) return state.get("events", [])
def save_postponed_events(items): def save_postponed_events(events):
state_path = app.resolve_state_path("cache_dir", "telemetry.json") state_path = app.resolve_state_path("cache_dir", "telemetry.json")
if not items: if not events:
try: try:
if os.path.isfile(state_path): if os.path.isfile(state_path):
os.remove(state_path) os.remove(state_path)
@ -363,33 +344,38 @@ def save_postponed_events(items):
pass pass
return None return None
with app.State(state_path, lock=True) as state: with app.State(state_path, lock=True) as state:
state["events"] = items state["events"] = events
state.modified = True state.modified = True
return True return True
def postpone_logs(payloads): def postpone_events(events):
if not payloads: if not events:
return None return None
postponed_events = load_postponed_events() or [] postponed_events = load_postponed_events() or []
timestamp_micros = int(time.time() * 1000000) timestamp = int(time.time())
for payload in payloads: for event in events:
for event in payload.get("events", []): if "timestamp" not in event:
event["timestamp_micros"] = timestamp_micros event["timestamp"] = timestamp
postponed_events.append(event) postponed_events.append(event)
save_postponed_events(postponed_events[KEEP_MAX_REPORTS * -1 :]) save_postponed_events(postponed_events[KEEP_MAX_REPORTS * -1 :])
return True return True
def resend_postponed_logs(): def process_postponed_logs():
events = load_postponed_events() if not ensure_internet_on():
if not events or not ensure_internet_on():
return None return None
save_postponed_events(events[SEND_MAX_EVENTS:]) # clean events = load_postponed_events()
mp = MeasurementProtocol() if not events:
payload = mp.to_payload() return None
payload["events"] = events[0:SEND_MAX_EVENTS] save_postponed_events([]) # clean
TelemetryLogger().log(payload) telemetry = TelemetryLogger()
if len(events) > SEND_MAX_EVENTS: for event in events:
resend_postponed_logs() if set(["name", "params", "timestamp"]) <= set(event.keys()):
telemetry.log_event(
event["name"],
event["params"],
timestamp=event["timestamp"],
instant_sending=False,
)
return True return True