From cb65bdf22f2013cee5352a6d8f5ecc982fd9a1e4 Mon Sep 17 00:00:00 2001 From: Ivan Kravets Date: Mon, 5 Jun 2023 18:24:42 +0300 Subject: [PATCH] Enhance user privacy protection through refined telemetry implementation --- platformio/app.py | 14 +- platformio/cli.py | 15 + platformio/debug/process/gdb.py | 20 +- platformio/home/rpc/handlers/app.py | 17 +- platformio/maintenance.py | 51 ++-- platformio/platform/_run.py | 11 +- platformio/platform/board.py | 3 +- platformio/telemetry.py | 441 ++++++++++++---------------- platformio/util.py | 10 + 9 files changed, 257 insertions(+), 325 deletions(-) diff --git a/platformio/app.py b/platformio/app.py index 48e90b97..c01e13c3 100644 --- a/platformio/app.py +++ b/platformio/app.py @@ -18,6 +18,7 @@ import json import os import platform import socket +import time import uuid from platformio import __version__, exception, fs, proc @@ -71,15 +72,19 @@ SESSION_VARS = { } +def resolve_state_path(conf_option_dir, file_name, ensure_dir_exists=True): + state_dir = ProjectConfig.get_instance().get("platformio", conf_option_dir) + if ensure_dir_exists and not os.path.isdir(state_dir): + os.makedirs(state_dir) + return os.path.join(state_dir, file_name) + + class State: def __init__(self, path=None, lock=False): self.path = path self.lock = lock if not self.path: - core_dir = ProjectConfig.get_instance().get("platformio", "core_dir") - if not os.path.isdir(core_dir): - os.makedirs(core_dir) - self.path = os.path.join(core_dir, "appstate.json") + self.path = resolve_state_path("core_dir", "appstate.json") self._storage = {} self._lockfile = None self.modified = False @@ -248,6 +253,7 @@ def get_cid(): cid = str(cid) if IS_WINDOWS or os.getuid() > 0: # pylint: disable=no-member set_state_item("cid", cid) + set_state_item("created_at", int(time.time())) return cid diff --git a/platformio/cli.py b/platformio/cli.py index 41aab522..a2a707ad 100644 --- a/platformio/cli.py +++ b/platformio/cli.py @@ -63,6 +63,21 @@ class PlatformioCLI(click.MultiCommand): ] ) + @classmethod + def reveal_cmd_path_args(cls, ctx): + result = [] + group = ctx.command + args = cls.leftover_args[::] + while args: + cmd_name = args.pop(0) + next_group = group.get_command(ctx, cmd_name) + if next_group: + group = next_group + result.append(cmd_name) + if not hasattr(group, "get_command"): + break + return result + def invoke(self, ctx): PlatformioCLI.leftover_args = ctx.args if hasattr(ctx, "protected_args"): diff --git a/platformio/debug/process/gdb.py b/platformio/debug/process/gdb.py index ce7e82c8..2a1f2f72 100644 --- a/platformio/debug/process/gdb.py +++ b/platformio/debug/process/gdb.py @@ -130,11 +130,7 @@ class GDBClientProcess(DebugClientProcess): self._handle_error(data) # go to init break automatically if self.INIT_COMPLETED_BANNER.encode() in data: - telemetry.send_event( - "Debug", - "Started", - telemetry.dump_run_environment(self.debug_config.env_options), - ) + telemetry.log_debug_started(self.debug_config) self._auto_exec_continue() def console_log(self, msg): @@ -180,13 +176,11 @@ class GDBClientProcess(DebugClientProcess): ): return - last_erros = self._errors_buffer.decode() - last_erros = " ".join(reversed(last_erros.split("\n"))) - last_erros = re.sub(r'((~|&)"|\\n\"|\\t)', " ", last_erros, flags=re.M) - - err = "%s -> %s" % ( - telemetry.dump_run_environment(self.debug_config.env_options), - last_erros, + last_errors = self._errors_buffer.decode() + last_errors = " ".join(reversed(last_errors.split("\n"))) + last_errors = re.sub(r'((~|&)"|\\n\"|\\t)', " ", last_errors, flags=re.M) + telemetry.log_debug_exception( + "DebugInitError: %s" % last_errors, self.debug_config ) - telemetry.send_exception("DebugInitError: %s" % err) + self.transport.close() diff --git a/platformio/home/rpc/handlers/app.py b/platformio/home/rpc/handlers/app.py index 9c79e314..4b6195e4 100644 --- a/platformio/home/rpc/handlers/app.py +++ b/platformio/home/rpc/handlers/app.py @@ -12,12 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os from pathlib import Path from platformio import __version__, app, fs, util from platformio.home.rpc.handlers.base import BaseRPCHandler -from platformio.project.config import ProjectConfig from platformio.project.helpers import is_platformio_project @@ -32,16 +30,11 @@ class AppRPC(BaseRPCHandler): "projectsDir", ] - @staticmethod - def get_state_path(): - core_dir = ProjectConfig.get_instance().get("platformio", "core_dir") - if not os.path.isdir(core_dir): - os.makedirs(core_dir) - return os.path.join(core_dir, "homestate.json") - @staticmethod def load_state(): - with app.State(AppRPC.get_state_path(), lock=True) as state: + with app.State( + app.resolve_state_path("core_dir", "homestate.json"), lock=True + ) as state: storage = state.get("storage", {}) # base data @@ -81,7 +74,9 @@ class AppRPC(BaseRPCHandler): @staticmethod def save_state(state): - with app.State(AppRPC.get_state_path(), lock=True) as s: + with app.State( + app.resolve_state_path("core_dir", "homestate.json"), lock=True + ) as s: s.clear() s.update(state) storage = s.get("storage", {}) diff --git a/platformio/maintenance.py b/platformio/maintenance.py index 3110b2c1..c7a3d3b7 100644 --- a/platformio/maintenance.py +++ b/platformio/maintenance.py @@ -25,8 +25,6 @@ from platformio.cli import PlatformioCLI from platformio.commands.upgrade import get_latest_version from platformio.http import HTTPClientError, InternetConnectionError, ensure_internet_on from platformio.package.manager.core import update_core_packages -from platformio.package.manager.tool import ToolPackageManager -from platformio.package.meta import PackageSpec from platformio.package.version import pepver_to_semver from platformio.system.prune import calculate_unnecessary_system_data @@ -34,7 +32,8 @@ from platformio.system.prune import calculate_unnecessary_system_data def on_platformio_start(ctx, caller): app.set_session_var("command_ctx", ctx) set_caller(caller) - telemetry.on_command() + telemetry.log_command(ctx) + telemetry.resend_postponed_logs() if PlatformioCLI.in_silence(): return @@ -60,8 +59,8 @@ def on_platformio_end(ctx, result): # pylint: disable=unused-argument ) -def on_platformio_exception(e): - telemetry.on_exception(e) +def on_platformio_exception(exc): + telemetry.log_exception(exc) def set_caller(caller=None): @@ -83,7 +82,7 @@ class Upgrader: self.to_version = pepver_to_semver(to_version) self._upgraders = [ - (semantic_version.Version("4.4.0-a.8"), self._update_pkg_metadata), + (semantic_version.Version("6.1.8-a.1"), self._appstate_migration), ] def run(self, ctx): @@ -99,21 +98,22 @@ class Upgrader: return all(result) @staticmethod - def _update_pkg_metadata(_): - pm = ToolPackageManager() - for pkg in pm.get_installed(): - if not pkg.metadata or pkg.metadata.spec.external or pkg.metadata.spec.id: - continue - result = pm.search_registry_packages(PackageSpec(name=pkg.metadata.name)) - if len(result) != 1: - continue - result = result[0] - pkg.metadata.spec = PackageSpec( - id=result["id"], - owner=result["owner"]["username"], - name=result["name"], + def _appstate_migration(_): + state_path = app.resolve_state_path("core_dir", "appstate.json") + if not os.path.isfile(state_path): + return True + app.delete_state_item("telemetry") + created_at = app.get_state_item("created_at", None) + if not created_at: + state_stat = os.stat(state_path) + app.set_state_item( + "created_at", + int( + state_stat.st_birthtime + if hasattr(state_stat, "st_birthtime") + else state_stat.st_ctime + ), ) - pkg.dump_meta() return True @@ -154,10 +154,13 @@ def after_upgrade(ctx): "PlatformIO has been successfully upgraded to %s!\n" % __version__, fg="green", ) - telemetry.send_event( - category="Auto", - action="Upgrade", - label="%s > %s" % (last_version, __version__), + telemetry.log_event( + "pio_upgrade_core", + { + "label": "%s > %s" % (last_version, __version__), + "from_version": last_version, + "to_version": __version__, + }, ) # PlatformIO banner diff --git a/platformio/platform/_run.py b/platformio/platform/_run.py index dfd7c406..6a29d955 100644 --- a/platformio/platform/_run.py +++ b/platformio/platform/_run.py @@ -52,7 +52,6 @@ class PlatformRunMixin: self.ensure_engine_compatible() self.configure_project_packages(variables["pioenv"], targets) - self._report_non_sensitive_data(variables["pioenv"], targets) self.silent = silent self.verbose = verbose or app.get_setting("force_verbose") @@ -64,20 +63,12 @@ class PlatformRunMixin: if not os.path.isfile(variables["build_script"]): raise BuildScriptNotFound(variables["build_script"]) + telemetry.log_platform_run(self, self.config, variables["pioenv"], targets) result = self._run_scons(variables, targets, jobs) assert "returncode" in result return result - def _report_non_sensitive_data(self, env, targets): - options = self.config.items(env=env, as_dict=True) - options["platform_packages"] = [ - dict(name=item["name"], version=item["version"]) - for item in self.dump_used_packages() - ] - options["platform"] = {"name": self.name, "version": self.version} - telemetry.send_run_environment(options, targets) - def _run_scons(self, variables, targets, jobs): scons_dir = get_core_package_dir("tool-scons") args = [ diff --git a/platformio/platform/board.py b/platformio/platform/board.py index 73a3ebd7..4cd102d1 100644 --- a/platformio/platform/board.py +++ b/platformio/platform/board.py @@ -14,7 +14,7 @@ import os -from platformio import fs, telemetry, util +from platformio import fs, util from platformio.compat import MISSING from platformio.debug.exception import DebugInvalidOptionsError, DebugSupportError from platformio.exception import UserSideException @@ -119,7 +119,6 @@ class PlatformBoardConfig: if tool_name == "custom": return tool_name if not debug_tools: - telemetry.send_event("Debug", "Request", self.id) raise DebugSupportError(self._manifest["name"]) if tool_name: if tool_name in debug_tools: diff --git a/platformio/telemetry.py b/platformio/telemetry.py index e9cbcd58..56aa173a 100644 --- a/platformio/telemetry.py +++ b/platformio/telemetry.py @@ -14,12 +14,10 @@ import atexit import hashlib -import json import os +import platform as python_platform import queue import re -import shutil -import sys import threading from collections import deque from time import sleep, time @@ -27,167 +25,50 @@ from traceback import format_exc import requests -from platformio import __version__, app, exception, util +from platformio import __title__, __version__, app, exception, util from platformio.cli import PlatformioCLI -from platformio.compat import hashlib_encode_data, string_types -from platformio.http import HTTPSession +from platformio.compat import hashlib_encode_data +from platformio.debug.config.base import DebugConfigBase +from platformio.http import HTTPSession, ensure_internet_on from platformio.proc import is_ci, is_container -from platformio.project.helpers import is_platformio_project + +KEEP_MAX_REPORTS = 100 +SEND_MAX_EVENTS = 25 -class TelemetryBase: +class MeasurementProtocol: def __init__(self): - self._params = {} + self._user_properties = {} + self._events = [] - def __getitem__(self, name): - return self._params.get(name, None) - - def __setitem__(self, name, value): - self._params[name] = value - - def __delitem__(self, name): - if name in self._params: - del self._params[name] - - def send(self, hittype): - raise NotImplementedError() - - -class MeasurementProtocol(TelemetryBase): - TID = "UA-1768265-9" - PARAMS_MAP = { - "screen_name": "cd", - "event_category": "ec", - "event_action": "ea", - "event_label": "el", - "event_value": "ev", - } - - def __init__(self): - super().__init__() - self["v"] = 1 - self["tid"] = self.TID - self["cid"] = app.get_cid() - - try: - self["sr"] = "%dx%d" % shutil.get_terminal_size() - except ValueError: - pass - - self._prefill_screen_name() - self._prefill_appinfo() - self._prefill_sysargs() - self._prefill_custom_data() - - def __getitem__(self, name): - if name in self.PARAMS_MAP: - name = self.PARAMS_MAP[name] - return super().__getitem__(name) - - def __setitem__(self, name, value): - if name in self.PARAMS_MAP: - name = self.PARAMS_MAP[name] - super().__setitem__(name, value) - - def _prefill_appinfo(self): - self["av"] = __version__ - self["an"] = app.get_user_agent() - - def _prefill_sysargs(self): - args = [] - for arg in sys.argv[1:]: - arg = str(arg) - if arg == "account": # ignore account cmd which can contain username - return - if any(("@" in arg, "/" in arg, "\\" in arg)): - arg = "***" - args.append(arg.lower()) - self["cd3"] = " ".join(args) - - def _prefill_custom_data(self): - caller_id = str(app.get_session_var("caller_id")) - self["cd1"] = util.get_systype() - self["cd4"] = 1 if (not is_ci() and (caller_id or not is_container())) else 0 + caller_id = app.get_session_var("caller_id") if caller_id: - self["cd5"] = caller_id.lower() + self.set_user_property("pio_caller_id", caller_id) + self.set_user_property("pio_core_version", __version__) + self.set_user_property( + "pio_human_actor", int(bool(caller_id or not (is_ci() or is_container()))) + ) + self.set_user_property("pio_systype", util.get_systype()) + created_at = app.get_state_item("created_at", None) + if created_at: + self.set_user_property("pio_created_at", int(created_at)) - def _prefill_screen_name(self): - def _first_arg_from_list(args_, list_): - for _arg in args_: - if _arg in list_: - return _arg - return None + def set_user_property(self, name, value): + self._user_properties[name] = {"value": value} - args = [] - for arg in PlatformioCLI.leftover_args: - if not isinstance(arg, string_types): - arg = str(arg) - if not arg.startswith("-"): - args.append(arg.lower()) - if not args: - return + def add_event(self, name, params): + self._events.append({"name": name, "params": params}) - cmd_path = args[:1] - if args[0] in ( - "access", - "account", - "device", - "org", - "package", - "pkg", - "platform", - "project", - "settings", - "system", - "team", - ): - cmd_path = args[:2] - if args[0] == "lib" and len(args) > 1: - lib_subcmds = ( - "builtin", - "install", - "list", - "register", - "search", - "show", - "stats", - "uninstall", - "update", - ) - sub_cmd = _first_arg_from_list(args[1:], lib_subcmds) - if sub_cmd: - cmd_path.append(sub_cmd) - elif args[0] == "remote" and len(args) > 1: - remote_subcmds = ("agent", "device", "run", "test") - sub_cmd = _first_arg_from_list(args[1:], remote_subcmds) - if sub_cmd: - cmd_path.append(sub_cmd) - if len(args) > 2 and sub_cmd in ("agent", "device"): - remote2_subcmds = ("list", "start", "monitor") - sub_cmd = _first_arg_from_list(args[2:], remote2_subcmds) - if sub_cmd: - cmd_path.append(sub_cmd) - self["screen_name"] = " ".join([p.title() for p in cmd_path]) - - def _ignore_hit(self): - if not app.get_setting("enable_telemetry"): - return True - if self["ea"] in ("Idedata", "__Idedata"): - return True - return False - - def send(self, hittype): - if self._ignore_hit(): - return - self["t"] = hittype - # correct queue time - if "qt" in self._params and isinstance(self["qt"], float): - self["qt"] = int((time() - self["qt"]) * 1000) - MPDataPusher().push(self._params) + def to_payload(self): + return { + "client_id": app.get_cid(), + "user_properties": self._user_properties, + "events": self._events, + } @util.singleton -class MPDataPusher: +class TelemetryLogger: MAX_WORKERS = 5 def __init__(self): @@ -197,21 +78,23 @@ class MPDataPusher: self._http_offline = False self._workers = [] - def push(self, item): + def log(self, payload): + if not app.get_setting("enable_telemetry"): + return None + # if network is off-line if self._http_offline: - if "qt" not in item: - item["qt"] = time() - self._failedque.append(item) - return + self._failedque.append(payload) + return False - self._queue.put(item) + self._queue.put(payload) self._tune_workers() + return True def in_wait(self): return self._queue.unfinished_tasks - def get_items(self): + def get_unprocessed(self): items = list(self._failedque) try: while True: @@ -244,19 +127,27 @@ class MPDataPusher: if "qt" not in _item: _item["qt"] = time() self._failedque.append(_item) - if self._send_data(item): + if self._send(item): self._failedque.remove(_item) self._queue.task_done() except: # pylint: disable=W0702 pass - def _send_data(self, data): + def _send(self, payload): if self._http_offline: return False try: r = self._http_session.post( - "https://ssl.google-analytics.com/collect", - data=data, + "https://www.google-analytics.com/mp/collect", + params={ + "measurement_id": util.decrypt_message( + __title__, "t5m7rKu6tbqwx8Cw" + ), + "api_secret": util.decrypt_message( + __title__, "48SRy5rmut28ptm7zLjS5sa7tdmhrQ==" + ), + }, + json=payload, timeout=1, ) r.raise_for_status() @@ -271,17 +162,42 @@ class MPDataPusher: return False -def on_command(): - resend_backuped_reports() - +def log_event(name, params): mp = MeasurementProtocol() - mp.send("screenview") + mp.add_event(name, params) + TelemetryLogger().log(mp.to_payload()) + +def log_command(ctx): + path_args = PlatformioCLI.reveal_cmd_path_args(ctx) + params = { + "page_title": " ".join([arg.title() for arg in path_args]), + "page_path": "/".join(path_args), + "pio_user_agent": app.get_user_agent(), + "pio_python_version": python_platform.python_version(), + } if is_ci(): - measure_ci() + params["ci_actor"] = resolve_ci_actor() or "Unknown" + log_event("page_view", params) -def on_exception(e): +def resolve_ci_actor(): + known_cis = ( + "GITHUB_ACTIONS", + "TRAVIS", + "APPVEYOR", + "GITLAB_CI", + "CIRCLECI", + "SHIPPABLE", + "DRONE", + ) + for name in known_cis: + if os.getenv(name, "false").lower() == "true": + return name + return None + + +def log_exception(e): skip_conditions = [ isinstance(e, cls) for cls in ( @@ -302,68 +218,58 @@ def on_exception(e): type(e).__name__, " ".join(reversed(format_exc().split("\n"))) if is_fatal else str(e), ) - send_exception(description, is_fatal) + params = { + "description": description[:100].strip(), + "is_fatal": int(is_fatal), + "pio_user_agent": app.get_user_agent(), + } + log_event("pio_exception", params) -def measure_ci(): - event = {"category": "CI", "action": "NoName", "label": None} - known_cis = ( - "GITHUB_ACTIONS", - "TRAVIS", - "APPVEYOR", - "GITLAB_CI", - "CIRCLECI", - "SHIPPABLE", - "DRONE", - ) - for name in known_cis: - if os.getenv(name, "false").lower() == "true": - event["action"] = name - break - send_event(**event) - - -def dump_run_environment(options): +def dump_project_env_params(config, env, platform): non_sensitive_data = [ "platform", - "platform_packages", "framework", "board", "upload_protocol", "check_tool", "debug_tool", - "monitor_filters", "test_framework", ] - safe_options = {k: v for k, v in options.items() if k in non_sensitive_data} - if is_platformio_project(os.getcwd()): - phash = hashlib.sha1(hashlib_encode_data(app.get_cid())) - safe_options["pid"] = phash.hexdigest() - return json.dumps(safe_options, sort_keys=True, ensure_ascii=False) + section = f"env:{env}" + params = { + f"pio_{option}": config.get(section, option) + for option in non_sensitive_data + if config.has_option(section, option) + } + params["pio_pid"] = hashlib.sha1(hashlib_encode_data(config.path)).hexdigest() + params["pio_platform_name"] = platform.name + params["pio_platform_version"] = platform.version + params["pio_framework"] = params.get("pio_framework", "__bare_metal__") + # join multi-value options + for key, value in params.items(): + if isinstance(value, list): + params[key] = ", ".join(value) + return params -def send_run_environment(options, targets): - send_event( - "Env", - " ".join([t.title() for t in targets or ["run"]]), - dump_run_environment(options), +def log_platform_run(platform, project_config, project_env, targets=None): + params = dump_project_env_params(project_config, project_env, platform) + if targets: + params["targets"] = ", ".join(targets) + log_event("pio_platform_run", params) + + +def log_debug_started(debug_config: DebugConfigBase): + log_event( + "pio_debug_started", + dump_project_env_params( + debug_config.project_config, debug_config.env_name, debug_config.platform + ), ) -def send_event(category, action, label=None, value=None, screen_name=None): - mp = MeasurementProtocol() - mp["event_category"] = category[:150] - mp["event_action"] = action[:500] - if label: - mp["event_label"] = label[:500] - if value: - mp["event_value"] = int(value) - if screen_name: - mp["screen_name"] = screen_name[:2048] - mp.send("event") - - -def send_exception(description, is_fatal=False): +def log_debug_exception(description, debug_config: DebugConfigBase): # cleanup sensitive information, such as paths description = description.replace("Traceback (most recent call last):", "") description = description.replace("\\", "/") @@ -374,11 +280,16 @@ def send_exception(description, is_fatal=False): re.I | re.M, ) description = re.sub(r"\s+", " ", description, flags=re.M) - - mp = MeasurementProtocol() - mp["exd"] = description[:8192].strip() - mp["exf"] = 1 if is_fatal else 0 - mp.send("exception") + params = { + "description": description[:100].strip(), + "pio_user_agent": app.get_user_agent(), + } + params.update( + dump_project_env_params( + debug_config.project_config, debug_config.env_name, debug_config.platform + ) + ) + log_event("pio_debug_exception", params) @atexit.register @@ -387,54 +298,62 @@ def _finalize(): elapsed = 0 try: while elapsed < timeout: - if not MPDataPusher().in_wait(): + if not TelemetryLogger().in_wait(): break sleep(0.2) elapsed += 200 - backup_reports(MPDataPusher().get_items()) + postpone_logs(TelemetryLogger().get_unprocessed()) except KeyboardInterrupt: pass -def backup_reports(items): +def load_postponed_events(): + state_path = app.resolve_state_path( + "cache_dir", "telemetry.json", ensure_dir_exists=False + ) + if not os.path.isfile(state_path): + return [] + with app.State(state_path) as state: + return state.get("events", []) + + +def save_postponed_events(items): + state_path = app.resolve_state_path("cache_dir", "telemetry.json") if not items: - return - - KEEP_MAX_REPORTS = 100 - tm = app.get_state_item("telemetry", {}) - if "backup" not in tm: - tm["backup"] = [] - - for params in items: - # skip static options - for key in list(params.keys()): - if key in ("v", "tid", "cid", "cd1", "cd2", "sr", "an"): - del params[key] - - # store time in UNIX format - if "qt" not in params: - params["qt"] = time() - elif not isinstance(params["qt"], float): - params["qt"] = time() - (params["qt"] / 1000) - - tm["backup"].append(params) - - tm["backup"] = tm["backup"][KEEP_MAX_REPORTS * -1 :] - app.set_state_item("telemetry", tm) - - -def resend_backuped_reports(): - tm = app.get_state_item("telemetry", {}) - if "backup" not in tm or not tm["backup"]: - return False - - for report in tm["backup"]: - mp = MeasurementProtocol() - for key, value in report.items(): - mp[key] = value - mp.send(report["t"]) - - # clean - tm["backup"] = [] - app.set_state_item("telemetry", tm) + try: + if os.path.isfile(state_path): + os.remove(state_path) + except: # pylint: disable=bare-except + pass + return None + with app.State(state_path, lock=True) as state: + state["events"] = items + state.modified = True + return True + + +def postpone_logs(payloads): + if not payloads: + return None + postponed_events = load_postponed_events() or [] + timestamp_micros = int(time() * 1000000) + for payload in payloads: + for event in payload.get("events", []): + event["timestamp_micros"] = timestamp_micros + postponed_events.append(event) + save_postponed_events(postponed_events[KEEP_MAX_REPORTS * -1 :]) + return True + + +def resend_postponed_logs(): + events = load_postponed_events() + if not events or not ensure_internet_on(): + return None + save_postponed_events(events[SEND_MAX_EVENTS:]) # clean + mp = MeasurementProtocol() + payload = mp.to_payload() + payload["events"] = events[0:SEND_MAX_EVENTS] + TelemetryLogger().log(payload) + if len(events) > SEND_MAX_EVENTS: + resend_postponed_logs() return True diff --git a/platformio/util.py b/platformio/util.py index c981384a..549bccf1 100644 --- a/platformio/util.py +++ b/platformio/util.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import base64 import functools import math import platform @@ -206,3 +207,12 @@ def humanize_duration_time(duration): def strip_ansi_codes(text): # pylint: disable=protected-access return click._compat.strip_ansi(text) + + +def decrypt_message(key, message): + result = "" + message = bytearray(base64.b64decode(message)) + for i, c in enumerate(message): + key_c = key[i % len(key)] + result += chr((256 + c - ord(key_c)) % 256) + return result