From 0d8d5fdb32872b593ce18d5d2daa1063d39099e4 Mon Sep 17 00:00:00 2001 From: Ivan Kravets Date: Thu, 3 Sep 2015 19:04:09 +0300 Subject: [PATCH] Allow multiple instances of @PlatformIO --- .isort.cfg | 2 +- platformio/__init__.py | 2 +- platformio/__main__.py | 11 +- platformio/app.py | 24 +++-- platformio/maintenance.py | 13 ++- platformio/telemetry.py | 215 ++++++++++++++++++++++---------------- platformio/util.py | 11 ++ requirements.txt | 3 +- setup.py | 3 +- 9 files changed, 173 insertions(+), 111 deletions(-) diff --git a/.isort.cfg b/.isort.cfg index 3dbd6d91..907a398d 100644 --- a/.isort.cfg +++ b/.isort.cfg @@ -1,3 +1,3 @@ [settings] line_length=79 -known_third_party=click,requests,serial,SCons,pytest,bottle +known_third_party=bottle,click,lockfile,pytest,requests,serial,SCons diff --git a/platformio/__init__.py b/platformio/__init__.py index 8b0740cd..553922c4 100644 --- a/platformio/__init__.py +++ b/platformio/__init__.py @@ -1,7 +1,7 @@ # Copyright (C) Ivan Kravets # See LICENSE for details. -VERSION = (2, 3, "0a3") +VERSION = (2, 3, "0b1") __version__ = ".".join([str(s) for s in VERSION]) __title__ = "platformio" diff --git a/platformio/__main__.py b/platformio/__main__.py index f2ed3308..33a2391d 100644 --- a/platformio/__main__.py +++ b/platformio/__main__.py @@ -55,15 +55,16 @@ class PlatformioCLI(click.MultiCommand): # pylint: disable=R0904 context_settings=dict(help_option_names=["-h", "--help"])) @click.version_option(__version__, prog_name="PlatformIO") @click.option("--force", "-f", is_flag=True, - help="Force to accept any confirmation prompts") + help="Force to accept any confirmation prompts.") +@click.option("--caller", "-c", help="Caller ID (service).") @click.pass_context -def cli(ctx, force): - maintenance.on_platformio_start(ctx, force) +def cli(ctx, force, caller): + maintenance.on_platformio_start(ctx, force, caller) @cli.resultcallback() @click.pass_context -def process_result(ctx, result, force): # pylint: disable=W0613 +def process_result(ctx, result, force, caller): # pylint: disable=W0613 maintenance.on_platformio_end(ctx, result) @@ -81,7 +82,7 @@ def main(): ) return 1 - cli(None, None) + cli(None, None, None) except Exception as e: # pylint: disable=W0703 if not isinstance(e, exception.ReturnErrorCode): maintenance.on_platformio_exception(e) diff --git a/platformio/app.py b/platformio/app.py index b77e481d..6a6f7ee8 100644 --- a/platformio/app.py +++ b/platformio/app.py @@ -5,6 +5,8 @@ import json from os import environ, getenv from os.path import isfile, join +from lockfile import LockFile + from platformio import __version__ from platformio.exception import InvalidSettingName, InvalidSettingValue from platformio.util import get_home_dir, is_ci @@ -48,7 +50,9 @@ DEFAULT_SETTINGS = { SESSION_VARS = { - "force_option": False + "command_ctx": None, + "force_option": False, + "caller_id": None } @@ -59,22 +63,30 @@ class State(object): if not self.path: self.path = join(get_home_dir(), "appstate.json") self._state = {} + self._prev_state = {} + self._lock = None def __enter__(self): try: if isfile(self.path): + self._lock = LockFile(self.path) + self._lock.acquire() with open(self.path, "r") as fp: self._state = json.load(fp) except ValueError: self._state = {} + self._prev_state = self._state.copy() return self._state def __exit__(self, type_, value, traceback): - with open(self.path, "w") as fp: - if "dev" in __version__: - json.dump(self._state, fp, indent=4) - else: - json.dump(self._state, fp) + if self._prev_state != self._state: + with open(self.path, "w") as fp: + if "dev" in __version__: + json.dump(self._state, fp, indent=4) + else: + json.dump(self._state, fp) + if self._lock: + self._lock.release() def sanitize_setting(name, value): diff --git a/platformio/maintenance.py b/platformio/maintenance.py index b2089384..5efec017 100644 --- a/platformio/maintenance.py +++ b/platformio/maintenance.py @@ -3,7 +3,6 @@ import re import struct -import sys from os import remove from os.path import isdir, isfile, join from shutil import rmtree @@ -23,14 +22,14 @@ from platformio.platforms.base import PlatformFactory from platformio.util import get_home_dir -def on_platformio_start(ctx, force): +def on_platformio_start(ctx, force, caller): + app.set_session_var("command_ctx", ctx) app.set_session_var("force_option", force) - telemetry.on_command(ctx) + app.set_session_var("caller_id", caller) + telemetry.on_command() - # skip any check operations when upgrade process - args = [str(s).lower() for s in sys.argv[1:] - if not str(s).startswith("-")] - if len(args) > 1 and args[1] == "upgrade": + # skip any check operations when upgrade command + if len(ctx.args or []) and ctx.args[0] == "upgrade": return after_upgrade(ctx) diff --git a/platformio/telemetry.py b/platformio/telemetry.py index 3426e9d5..e43a3f56 100644 --- a/platformio/telemetry.py +++ b/platformio/telemetry.py @@ -3,17 +3,20 @@ import atexit import platform +import Queue import re import sys import threading import uuid +from collections import deque from os import getenv -from time import time +from time import sleep, time import click import requests from platformio import __version__, app, exception, util +from platformio.ide.projectgenerator import ProjectGenerator class TelemetryBase(object): @@ -75,7 +78,8 @@ class MeasurementProtocol(TelemetryBase): # gather dependent packages dpdata = [] dpdata.append("Click/%s" % click.__version__) - # dpdata.append("Requests/%s" % requests.__version__) + if app.get_session_var("caller_id"): + dpdata.append("Caller/%s" % app.get_session_var("caller_id")) try: result = util.exec_command(["scons", "--version"]) match = re.search(r"engine: v([\d\.]+)", result['out']) @@ -89,21 +93,21 @@ class MeasurementProtocol(TelemetryBase): self['cd1'] = util.get_systype() self['cd2'] = "Python/%s %s" % (platform.python_version(), platform.platform()) - self['cd4'] = 1 if app.get_setting("enable_prompts") else 0 + self['cd4'] = (1 if app.get_setting("enable_prompts") and + not app.get_session_var("caller_id") else 0) def _prefill_screen_name(self): - args = [str(s).lower() for s in sys.argv[1:] - if not str(s).startswith("-")] + self['cd3'] = " ".join([str(s).lower() for s in sys.argv[1:]]) + + ctx_args = app.get_session_var("command_ctx").args or [] + args = [str(s).lower() for s in ctx_args if not str(s).startswith("-")] if not args: return - if args[0] in ("lib", "platforms", "serialports", "settings"): cmd_path = args[:2] else: cmd_path = args[:1] - self['screen_name'] = " ".join([p.title() for p in cmd_path]) - self['cd3'] = " ".join([str(s).lower() for s in sys.argv[1:]]) def send(self, hittype): if not app.get_setting("enable_telemetry"): @@ -115,92 +119,101 @@ class MeasurementProtocol(TelemetryBase): if "qt" in self._params and isinstance(self['qt'], float): self['qt'] = int((time() - self['qt']) * 1000) - MPDataPusher.get_instance().push(self._params) + MPDataPusher().push(self._params) -class MPDataPusher(threading.Thread): +@util.singleton +class MPDataPusher(object): - @classmethod - def get_instance(cls): - try: - return cls._thinstance - except AttributeError: - cls._event = threading.Event() - cls._thinstance = cls() - cls._thinstance.start() - return cls._thinstance - - @classmethod - def http_session(cls): - try: - return cls._http_session - except AttributeError: - cls._http_session = requests.Session() - return cls._http_session + MAX_WORKERS = 5 def __init__(self): - threading.Thread.__init__(self) - self._terminate = False - self._server_online = False - self._stack = [] + self._queue = Queue.LifoQueue() + self._failedque = deque() + self._http_session = requests.Session() + self._http_offline = False + self._workers = [] - def run(self): - while not self._terminate: - self._event.wait() - if self._terminate or not self._stack: - return - self._event.clear() + def push(self, item): + # if network is off-line + if self._http_offline: + if "qt" not in item: + item['qt'] = time() + self._failedque.append(item) + return - data = self._stack.pop() - try: - r = self.http_session().post( - "https://ssl.google-analytics.com/collect", - data=data, - headers=util.get_request_defheaders(), - timeout=3 - ) - r.raise_for_status() - self._server_online = True - except: # pylint: disable=W0702 - self._server_online = False - self._stack.append(data) + self._queue.put(item) + self._tune_workers() - def push(self, data): - self._stack.append(data) - self._event.set() + def in_wait(self): + return self._queue.unfinished_tasks - def is_server_online(self): - return self._server_online + def get_items(self): + items = list(self._failedque) + try: + while True: + items.append(self._queue.get_nowait()) + except Queue.Empty: + pass + return items - def get_stack_data(self): - return self._stack + def _tune_workers(self): + for i, w in enumerate(self._workers): + if not w.is_alive(): + del self._workers[i] - def join(self, timeout=0.1): - self._terminate = True - self._event.set() - self.http_session().close() - threading.Thread.join(self, timeout) + need_nums = min(self._queue.qsize(), self.MAX_WORKERS) + active_nums = len(self._workers) + if need_nums <= active_nums: + return + + for i in range(need_nums - active_nums): + t = threading.Thread(target=self._worker) + t.daemon = True + t.start() + self._workers.append(t) + + def _worker(self): + while True: + item = self._queue.get() + _item = item.copy() + if "qt" not in _item: + _item['qt'] = time() + self._failedque.append(_item) + if self._send_data(item): + self._failedque.remove(_item) + else: + self._http_offline = True + self._queue.task_done() + + def _send_data(self, data): + result = False + try: + r = self._http_session.post( + "https://ssl.google-analytics.com/collect", + data=data, + headers=util.get_request_defheaders(), + timeout=2 + ) + r.raise_for_status() + result = True + except: # pylint: disable=W0702 + pass + return result -@atexit.register -def _finalize(): - MAX_RESEND_REPORTS = 10 - mpdp = MPDataPusher.get_instance() - backup_reports(mpdp.get_stack_data()) +def on_command(): + resend_backuped_reports() - resent_nums = 0 - while mpdp.is_server_online() and resent_nums < MAX_RESEND_REPORTS: - if not resend_backuped_report(): - break - resent_nums += 1 - - -def on_command(ctx): # pylint: disable=W0613 mp = MeasurementProtocol() mp.send("screenview") + if util.is_ci(): measure_ci() + if app.get_session_var("calller_id"): + measure_caller(app.get_session_var("calller_id")) + def measure_ci(): event = { @@ -226,6 +239,18 @@ def measure_ci(): on_event(**event) +def measure_caller(calller_id): + calller_id = str(calller_id)[:20].lower() + event = { + "category": "Caller", + "action": "Misc", + "label": calller_id + } + if calller_id in ProjectGenerator.get_supported_ides(): + event['action'] = "IDE" + on_event(**event) + + def on_run_environment(options, targets): opts = ["%s=%s" % (opt, value) for opt, value in sorted(options.items())] targets = [t.title() for t in targets or ["run"]] @@ -254,16 +279,28 @@ def on_exception(e): mp.send("exception") -def backup_reports(data): - if not data: +@atexit.register +def _finalize(): + timeout = 1000 # msec + elapsed = 0 + while elapsed < timeout: + if not MPDataPusher().in_wait(): + break + sleep(0.2) + elapsed += 200 + backup_reports(MPDataPusher().get_items()) + + +def backup_reports(items): + if not items: return - KEEP_MAX_REPORTS = 1000 + KEEP_MAX_REPORTS = 100 tm = app.get_state_item("telemetry", {}) if "backup" not in tm: tm['backup'] = [] - for params in data: + for params in items: # skip static options for key in params.keys(): if key in ("v", "tid", "cid", "cd1", "cd2", "sr", "an"): @@ -277,21 +314,21 @@ def backup_reports(data): tm['backup'].append(params) - tm['backup'] = tm['backup'][KEEP_MAX_REPORTS*-1:] + tm['backup'] = tm['backup'][KEEP_MAX_REPORTS * -1:] app.set_state_item("telemetry", tm) -def resend_backuped_report(): +def resend_backuped_reports(): tm = app.get_state_item("telemetry", {}) if "backup" not in tm or not tm['backup']: return False - report = tm['backup'].pop() + for report in tm['backup']: + mp = MeasurementProtocol() + for key, value in report.items(): + mp[key] = value + mp.send(report['t']) + + # clean + tm['backup'] = [] app.set_state_item("telemetry", tm) - - mp = MeasurementProtocol() - for key, value in report.items(): - mp[key] = value - mp.send(report['t']) - - return True diff --git a/platformio/util.py b/platformio/util.py index adc1e696..870fe00b 100644 --- a/platformio/util.py +++ b/platformio/util.py @@ -100,6 +100,17 @@ class memoized(object): return functools.partial(self.__call__, obj) +def singleton(cls): + """ From PEP-318 http://www.python.org/dev/peps/pep-0318/#examples """ + _instances = {} + + def get_instance(*args, **kwargs): + if cls not in _instances: + _instances[cls] = cls(*args, **kwargs) + return _instances[cls] + return get_instance + + def get_systype(): data = uname() systype = data[0] diff --git a/requirements.txt b/requirements.txt index 0ffb6172..c2c1efc3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,7 @@ bottle==0.12.8 -click==5.0 +click==5.1 colorama==0.3.3 +lockfile==0.10.2 pyserial==2.7 requests==2.7.0 scons==2.3.6 diff --git a/setup.py b/setup.py index f068f99a..1d72d87f 100644 --- a/setup.py +++ b/setup.py @@ -19,7 +19,8 @@ setup( license=__license__, install_requires=[ "bottle", - "click>=3.0", + "click>=3.2", + "lockfile", "pyserial", "requests>=2.4.0", "SCons"