Remove unused code

This commit is contained in:
Ivan Kravets
2023-07-21 16:35:42 +03:00
parent 5f75e36efd
commit 6313042291
5 changed files with 109 additions and 482 deletions

View File

@ -0,0 +1,102 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import functools
from platformio import __main__, __version__, app, proc, util
from platformio.compat import (
IS_WINDOWS,
aio_create_task,
aio_get_running_loop,
get_locale_encoding,
)
from platformio.home.rpc.handlers.base import BaseRPCHandler
class PIOCoreProtocol(asyncio.SubprocessProtocol):
def __init__(self, exit_future, on_data_callback=None):
self.exit_future = exit_future
self.on_data_callback = on_data_callback
self.stdout = ""
self.stderr = ""
self._is_exited = False
self._encoding = get_locale_encoding()
def pipe_data_received(self, fd, data):
data = data.decode(self._encoding, "replace")
pipe = ["stdin", "stdout", "stderr"][fd]
if pipe == "stdout":
self.stdout += data
if pipe == "stderr":
self.stderr += data
if self.on_data_callback:
self.on_data_callback(pipe=pipe, data=data)
def connection_lost(self, exc):
self.process_exited()
def process_exited(self):
if self._is_exited:
return
self.exit_future.set_result(True)
self._is_exited = True
@util.memoized(expire="60s")
def get_core_fullpath():
return proc.where_is_program("platformio" + (".exe" if IS_WINDOWS else ""))
class CoreRPC(BaseRPCHandler):
NAMESPACE = "core"
@staticmethod
def version():
return __version__
async def exec(self, args, options=None):
loop = aio_get_running_loop()
exit_future = loop.create_future()
data_callback = functools.partial(
self._on_exec_data_received, exec_options=options
)
if args[0] != "--caller" and app.get_session_var("caller_id"):
args = ["--caller", app.get_session_var("caller_id")] + args
transport, protocol = await loop.subprocess_exec(
lambda: PIOCoreProtocol(exit_future, data_callback),
get_core_fullpath(),
*args,
stdin=None,
**options.get("spawn", {}),
)
await exit_future
transport.close()
return {
"stdout": protocol.stdout,
"stderr": protocol.stderr,
"returncode": transport.get_returncode(),
}
def _on_exec_data_received(self, exec_options, pipe, data):
notification_method = exec_options.get(f"{pipe}NotificationMethod")
if not notification_method:
return
aio_create_task(
self.factory.notify_clients(
method=notification_method,
params=[data],
actor="frontend",
)
)

View File

@ -15,7 +15,6 @@
import glob
import io
import os
import shutil
from functools import cmp_to_key
import click
@ -67,9 +66,10 @@ class OSRPC(BaseRPCHandler):
cc.set(cache_key, result, cache_valid)
return result
def request_content(self, uri, data=None, headers=None, cache_valid=None):
@classmethod
def request_content(cls, uri, data=None, headers=None, cache_valid=None):
if uri.startswith("http"):
return self.fetch_content(uri, data, headers, cache_valid)
return cls.fetch_content(uri, data, headers, cache_valid)
local_path = uri[7:] if uri.startswith("file://") else uri
with io.open(local_path, encoding="utf-8") as fp:
return fp.read()
@ -102,22 +102,10 @@ class OSRPC(BaseRPCHandler):
def is_dir(path):
return os.path.isdir(path)
@staticmethod
def make_dirs(path):
return os.makedirs(path)
@staticmethod
def get_file_mtime(path):
return os.path.getmtime(path)
@staticmethod
def rename(src, dst):
return os.rename(src, dst)
@staticmethod
def copy(src, dst):
return shutil.copytree(src, dst, symlinks=True)
@staticmethod
def glob(pathnames, root=None):
if not isinstance(pathnames, list):

View File

@ -1,231 +0,0 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import functools
import io
import json
import os
import sys
import threading
import click
from ajsonrpc.core import JSONRPC20DispatchException
from platformio import __main__, __version__, app, fs, proc, util
from platformio.compat import (
IS_WINDOWS,
aio_create_task,
aio_get_running_loop,
aio_to_thread,
get_locale_encoding,
is_bytes,
)
from platformio.exception import PlatformioException
from platformio.home.rpc.handlers.base import BaseRPCHandler
class PIOCoreProtocol(asyncio.SubprocessProtocol):
def __init__(self, exit_future, on_data_callback=None):
self.exit_future = exit_future
self.on_data_callback = on_data_callback
self.stdout = ""
self.stderr = ""
self._is_exited = False
self._encoding = get_locale_encoding()
def pipe_data_received(self, fd, data):
data = data.decode(self._encoding, "replace")
pipe = ["stdin", "stdout", "stderr"][fd]
if pipe == "stdout":
self.stdout += data
if pipe == "stderr":
self.stderr += data
if self.on_data_callback:
self.on_data_callback(pipe=pipe, data=data)
def connection_lost(self, exc):
self.process_exited()
def process_exited(self):
if self._is_exited:
return
self.exit_future.set_result(True)
self._is_exited = True
class MultiThreadingStdStream:
def __init__(self, parent_stream):
self._buffers = {threading.get_ident(): parent_stream}
def __getattr__(self, name):
thread_id = threading.get_ident()
self._ensure_thread_buffer(thread_id)
return getattr(self._buffers[thread_id], name)
def _ensure_thread_buffer(self, thread_id):
if thread_id not in self._buffers:
self._buffers[thread_id] = io.StringIO()
def write(self, value):
thread_id = threading.get_ident()
self._ensure_thread_buffer(thread_id)
return self._buffers[thread_id].write(
value.decode() if is_bytes(value) else value
)
def get_value_and_reset(self):
result = ""
try:
result = self.getvalue()
self.seek(0)
self.truncate(0)
except AttributeError:
pass
return result
@util.memoized(expire="60s")
def get_core_fullpath():
return proc.where_is_program("platformio" + (".exe" if IS_WINDOWS else ""))
class PIOCoreRPC(BaseRPCHandler):
NAMESPACE = "core"
@staticmethod
def version():
return __version__
async def exec(self, args, options=None):
loop = aio_get_running_loop()
exit_future = loop.create_future()
data_callback = functools.partial(
self._on_exec_data_received, exec_options=options
)
if args[0] != "--caller" and app.get_session_var("caller_id"):
args = ["--caller", app.get_session_var("caller_id")] + args
transport, protocol = await loop.subprocess_exec(
lambda: PIOCoreProtocol(exit_future, data_callback),
get_core_fullpath(),
*args,
stdin=None,
**options.get("spawn", {}),
)
await exit_future
transport.close()
return {
"stdout": protocol.stdout,
"stderr": protocol.stderr,
"returncode": transport.get_returncode(),
}
def _on_exec_data_received(self, exec_options, pipe, data):
notification_method = exec_options.get(f"{pipe}NotificationMethod")
if not notification_method:
return
aio_create_task(
self.factory.notify_clients(
method=notification_method,
params=[data],
actor="frontend",
)
)
@staticmethod
def setup_multithreading_std_streams():
if isinstance(sys.stdout, MultiThreadingStdStream):
return
PIOCoreRPC.thread_stdout = MultiThreadingStdStream(sys.stdout)
PIOCoreRPC.thread_stderr = MultiThreadingStdStream(sys.stderr)
sys.stdout = PIOCoreRPC.thread_stdout
sys.stderr = PIOCoreRPC.thread_stderr
@staticmethod
async def call(args, options=None):
for i, arg in enumerate(args):
if not isinstance(arg, str):
args[i] = str(arg)
options = options or {}
to_json = "--json-output" in args
try:
if options.get("force_subprocess"):
result = await PIOCoreRPC._call_subprocess(args, options)
return PIOCoreRPC._process_result(result, to_json)
result = await PIOCoreRPC._call_inline(args, options)
try:
return PIOCoreRPC._process_result(result, to_json)
except ValueError:
# fall-back to subprocess method
result = await PIOCoreRPC._call_subprocess(args, options)
return PIOCoreRPC._process_result(result, to_json)
except Exception as exc: # pylint: disable=bare-except
raise JSONRPC20DispatchException(
code=5000, message="PIO Core Call Error", data=str(exc)
) from exc
@staticmethod
async def _call_subprocess(args, options):
result = await aio_to_thread(
proc.exec_command,
[get_core_fullpath()] + args,
cwd=options.get("cwd") or os.getcwd(),
)
return (result["out"], result["err"], result["returncode"])
@staticmethod
async def _call_inline(args, options):
PIOCoreRPC.setup_multithreading_std_streams()
def _thread_safe_call(args, cwd):
with fs.cd(cwd):
exit_code = __main__.main(["-c"] + args)
return (
PIOCoreRPC.thread_stdout.get_value_and_reset(),
PIOCoreRPC.thread_stderr.get_value_and_reset(),
exit_code,
)
return await aio_to_thread(
_thread_safe_call, args=args, cwd=options.get("cwd") or os.getcwd()
)
@staticmethod
def _process_result(result, to_json=False):
out, err, code = result
if out and is_bytes(out):
out = out.decode(get_locale_encoding())
if err and is_bytes(err):
err = err.decode(get_locale_encoding())
text = ("%s\n\n%s" % (out, err)).strip()
if code != 0:
raise PlatformioException(text)
if not to_json:
return text
try:
return json.loads(out)
except ValueError as exc:
click.secho("%s => `%s`" % (exc, out), fg="red", err=True)
# if PIO Core prints unhandled warnings
for line in out.split("\n"):
line = line.strip()
if not line:
continue
try:
return json.loads(line)
except ValueError:
pass
raise exc

View File

@ -13,24 +13,17 @@
# limitations under the License.
import os
import shutil
import time
from pathlib import Path
import semantic_version
from ajsonrpc.core import JSONRPC20DispatchException
from platformio import app, exception, fs
from platformio.home.rpc.handlers.app import AppRPC
from platformio import app, fs
from platformio.home.rpc.handlers.base import BaseRPCHandler
from platformio.home.rpc.handlers.piocore import PIOCoreRPC
from platformio.package.manager.platform import PlatformPackageManager
from platformio.platform.factory import PlatformFactory
from platformio.project.config import ProjectConfig
from platformio.project.exception import ProjectError
from platformio.project.helpers import get_project_dir, is_platformio_project
from platformio.project.integration.generator import ProjectGenerator
from platformio.project.options import get_config_options_schema
class ProjectRPC(BaseRPCHandler):
@ -50,232 +43,7 @@ class ProjectRPC(BaseRPCHandler):
with fs.cd(project_dir):
return getattr(ProjectConfig(**init_kwargs), method)(*args)
@staticmethod
def config_load(path):
return ProjectConfig(
path, parse_extra=False, expand_interpolations=False
).as_tuple()
@staticmethod
def config_dump(path, data):
config = ProjectConfig(path, parse_extra=False, expand_interpolations=False)
config.update(data, clear=True)
return config.save()
@staticmethod
def config_update_description(path, text):
config = ProjectConfig(path, parse_extra=False, expand_interpolations=False)
if not config.has_section("platformio"):
config.add_section("platformio")
if text:
config.set("platformio", "description", text)
else:
if config.has_option("platformio", "description"):
config.remove_option("platformio", "description")
if not config.options("platformio"):
config.remove_section("platformio")
return config.save()
@staticmethod
def get_config_schema():
return get_config_options_schema()
@staticmethod
def get_projects():
def _get_project_data():
data = {"boards": [], "envLibdepsDirs": [], "libExtraDirs": []}
config = ProjectConfig()
data["envs"] = config.envs()
data["description"] = config.get("platformio", "description")
data["libExtraDirs"].extend(config.get("platformio", "lib_extra_dirs", []))
libdeps_dir = config.get("platformio", "libdeps_dir")
for section in config.sections():
if not section.startswith("env:"):
continue
data["envLibdepsDirs"].append(os.path.join(libdeps_dir, section[4:]))
if config.has_option(section, "board"):
data["boards"].append(config.get(section, "board"))
data["libExtraDirs"].extend(config.get(section, "lib_extra_dirs", []))
# skip non existing folders and resolve full path
for key in ("envLibdepsDirs", "libExtraDirs"):
data[key] = [
fs.expanduser(d) if d.startswith("~") else os.path.abspath(d)
for d in data[key]
if os.path.isdir(d)
]
return data
def _path_to_name(path):
return (os.path.sep).join(path.split(os.path.sep)[-2:])
result = []
pm = PlatformPackageManager()
for project_dir in AppRPC.load_state()["storage"]["recentProjects"]:
if not os.path.isdir(project_dir):
continue
data = {}
boards = []
try:
with fs.cd(project_dir):
data = _get_project_data()
except ProjectError:
continue
for board_id in data.get("boards", []):
name = board_id
try:
name = pm.board_config(board_id)["name"]
except exception.PlatformioException:
pass
boards.append({"id": board_id, "name": name})
result.append(
{
"path": project_dir,
"name": _path_to_name(project_dir),
"modified": int(os.path.getmtime(project_dir)),
"boards": boards,
"description": data.get("description"),
"envs": data.get("envs", []),
"envLibStorages": [
{"name": os.path.basename(d), "path": d}
for d in data.get("envLibdepsDirs", [])
],
"extraLibStorages": [
{"name": _path_to_name(d), "path": d}
for d in data.get("libExtraDirs", [])
],
}
)
return result
@staticmethod
def get_project_examples():
result = []
pm = PlatformPackageManager()
for pkg in pm.get_installed():
examples_dir = os.path.join(pkg.path, "examples")
if not os.path.isdir(examples_dir):
continue
items = []
for project_dir, _, __ in os.walk(examples_dir):
project_description = None
try:
config = ProjectConfig(os.path.join(project_dir, "platformio.ini"))
config.validate(silent=True)
project_description = config.get("platformio", "description")
except ProjectError:
continue
path_tokens = project_dir.split(os.path.sep)
items.append(
{
"name": "/".join(
path_tokens[path_tokens.index("examples") + 1 :]
),
"path": project_dir,
"description": project_description,
}
)
manifest = pm.load_manifest(pkg)
result.append(
{
"platform": {
"title": manifest["title"],
"version": manifest["version"],
},
"items": sorted(items, key=lambda item: item["name"]),
}
)
return sorted(result, key=lambda data: data["platform"]["title"])
async def init(self, board, framework, project_dir):
assert project_dir
if not os.path.isdir(project_dir):
os.makedirs(project_dir)
args = ["init", "--board", board, "--sample-code"]
if framework:
args.extend(["--project-option", "framework = %s" % framework])
ide = app.get_session_var("caller_id")
if ide in ProjectGenerator.get_supported_ides():
args.extend(["--ide", ide])
await PIOCoreRPC.call(
args, options={"cwd": project_dir, "force_subprocess": True}
)
return project_dir
@staticmethod
async def import_arduino(board, use_arduino_libs, arduino_project_dir):
board = str(board)
# don't import PIO Project
if is_platformio_project(arduino_project_dir):
return arduino_project_dir
is_arduino_project = any(
os.path.isfile(
os.path.join(
arduino_project_dir,
"%s.%s" % (os.path.basename(arduino_project_dir), ext),
)
)
for ext in ("ino", "pde")
)
if not is_arduino_project:
raise JSONRPC20DispatchException(
code=4000, message="Not an Arduino project: %s" % arduino_project_dir
)
state = AppRPC.load_state()
project_dir = os.path.join(
state["storage"]["projectsDir"], time.strftime("%y%m%d-%H%M%S-") + board
)
if not os.path.isdir(project_dir):
os.makedirs(project_dir)
args = ["init", "--board", board]
args.extend(["--project-option", "framework = arduino"])
if use_arduino_libs:
args.extend(
["--project-option", "lib_extra_dirs = ~/Documents/Arduino/libraries"]
)
ide = app.get_session_var("caller_id")
if ide in ProjectGenerator.get_supported_ides():
args.extend(["--ide", ide])
await PIOCoreRPC.call(
args, options={"cwd": project_dir, "force_subprocess": True}
)
with fs.cd(project_dir):
config = ProjectConfig()
src_dir = config.get("platformio", "src_dir")
if os.path.isdir(src_dir):
fs.rmtree(src_dir)
shutil.copytree(arduino_project_dir, src_dir, symlinks=True)
return project_dir
@staticmethod
async def import_pio(project_dir):
if not project_dir or not is_platformio_project(project_dir):
raise JSONRPC20DispatchException(
code=4001, message="Not an PlatformIO project: %s" % project_dir
)
new_project_dir = os.path.join(
AppRPC.load_state()["storage"]["projectsDir"],
time.strftime("%y%m%d-%H%M%S-") + os.path.basename(project_dir),
)
shutil.copytree(project_dir, new_project_dir, symlinks=True)
args = ["init"]
ide = app.get_session_var("caller_id")
if ide in ProjectGenerator.get_supported_ides():
args.extend(["--ide", ide])
await PIOCoreRPC.call(
args, options={"cwd": new_project_dir, "force_subprocess": True}
)
return new_project_dir
async def init_v2(self, configuration, options=None):
async def init(self, configuration, options=None):
project_dir = os.path.join(configuration["location"], configuration["name"])
if not os.path.isdir(project_dir):
os.makedirs(project_dir)

View File

@ -28,10 +28,10 @@ from platformio.compat import aio_get_running_loop
from platformio.exception import PlatformioException
from platformio.home.rpc.handlers.account import AccountRPC
from platformio.home.rpc.handlers.app import AppRPC
from platformio.home.rpc.handlers.core import CoreRPC
from platformio.home.rpc.handlers.ide import IDERPC
from platformio.home.rpc.handlers.misc import MiscRPC
from platformio.home.rpc.handlers.os import OSRPC
from platformio.home.rpc.handlers.piocore import PIOCoreRPC
from platformio.home.rpc.handlers.platform import PlatformRPC
from platformio.home.rpc.handlers.project import ProjectRPC
from platformio.home.rpc.handlers.registry import RegistryRPC
@ -72,7 +72,7 @@ def run_server(host, port, no_open, shutdown_timeout, home_url):
ws_rpc_factory.add_object_handler(IDERPC())
ws_rpc_factory.add_object_handler(MiscRPC())
ws_rpc_factory.add_object_handler(OSRPC())
ws_rpc_factory.add_object_handler(PIOCoreRPC())
ws_rpc_factory.add_object_handler(CoreRPC())
ws_rpc_factory.add_object_handler(ProjectRPC())
ws_rpc_factory.add_object_handler(PlatformRPC())
ws_rpc_factory.add_object_handler(RegistryRPC())