forked from platformio/platformio-core
* CLI to manage organizations. Resolve #3532 * fix tests * fix test * add org owner test * fix org test * fix invalid username/orgname error text * refactor auth request in clients * fix * fix send auth request * fix regexp * remove duplicated code. minor fixes. * Remove space Co-authored-by: Ivan Kravets <me@ikravets.com>
This commit is contained in:
@ -35,7 +35,7 @@ class AccountAlreadyAuthorized(AccountError):
|
|||||||
MESSAGE = "You are already authorized with {0} account."
|
MESSAGE = "You are already authorized with {0} account."
|
||||||
|
|
||||||
|
|
||||||
class AccountClient(RESTClient):
|
class AccountClient(RESTClient): # pylint:disable=too-many-public-methods
|
||||||
|
|
||||||
SUMMARY_CACHE_TTL = 60 * 60 * 24 * 7
|
SUMMARY_CACHE_TTL = 60 * 60 * 24 * 7
|
||||||
|
|
||||||
@ -61,6 +61,14 @@ class AccountClient(RESTClient):
|
|||||||
del account[key]
|
del account[key]
|
||||||
app.set_state_item("account", account)
|
app.set_state_item("account", account)
|
||||||
|
|
||||||
|
def send_auth_request(self, *args, **kwargs):
|
||||||
|
headers = kwargs.get("headers", {})
|
||||||
|
if "Authorization" not in headers:
|
||||||
|
token = self.fetch_authentication_token()
|
||||||
|
headers["Authorization"] = "Bearer %s" % token
|
||||||
|
kwargs["headers"] = headers
|
||||||
|
return self.send_request(*args, **kwargs)
|
||||||
|
|
||||||
def login(self, username, password):
|
def login(self, username, password):
|
||||||
try:
|
try:
|
||||||
self.fetch_authentication_token()
|
self.fetch_authentication_token()
|
||||||
@ -107,11 +115,9 @@ class AccountClient(RESTClient):
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
def change_password(self, old_password, new_password):
|
def change_password(self, old_password, new_password):
|
||||||
token = self.fetch_authentication_token()
|
self.send_auth_request(
|
||||||
self.send_request(
|
|
||||||
"post",
|
"post",
|
||||||
"/v1/password",
|
"/v1/password",
|
||||||
headers={"Authorization": "Bearer %s" % token},
|
|
||||||
data={"old_password": old_password, "new_password": new_password},
|
data={"old_password": old_password, "new_password": new_password},
|
||||||
)
|
)
|
||||||
return True
|
return True
|
||||||
@ -141,11 +147,9 @@ class AccountClient(RESTClient):
|
|||||||
)
|
)
|
||||||
|
|
||||||
def auth_token(self, password, regenerate):
|
def auth_token(self, password, regenerate):
|
||||||
token = self.fetch_authentication_token()
|
result = self.send_auth_request(
|
||||||
result = self.send_request(
|
|
||||||
"post",
|
"post",
|
||||||
"/v1/token",
|
"/v1/token",
|
||||||
headers={"Authorization": "Bearer %s" % token},
|
|
||||||
data={"password": password, "regenerate": 1 if regenerate else 0},
|
data={"password": password, "regenerate": 1 if regenerate else 0},
|
||||||
)
|
)
|
||||||
return result.get("auth_token")
|
return result.get("auth_token")
|
||||||
@ -154,21 +158,12 @@ class AccountClient(RESTClient):
|
|||||||
return self.send_request("post", "/v1/forgot", data={"username": username},)
|
return self.send_request("post", "/v1/forgot", data={"username": username},)
|
||||||
|
|
||||||
def get_profile(self):
|
def get_profile(self):
|
||||||
token = self.fetch_authentication_token()
|
return self.send_auth_request("get", "/v1/profile",)
|
||||||
return self.send_request(
|
|
||||||
"get", "/v1/profile", headers={"Authorization": "Bearer %s" % token},
|
|
||||||
)
|
|
||||||
|
|
||||||
def update_profile(self, profile, current_password):
|
def update_profile(self, profile, current_password):
|
||||||
token = self.fetch_authentication_token()
|
|
||||||
profile["current_password"] = current_password
|
profile["current_password"] = current_password
|
||||||
self.delete_local_state("summary")
|
self.delete_local_state("summary")
|
||||||
response = self.send_request(
|
response = self.send_auth_request("put", "/v1/profile", data=profile,)
|
||||||
"put",
|
|
||||||
"/v1/profile",
|
|
||||||
headers={"Authorization": "Bearer %s" % token},
|
|
||||||
data=profile,
|
|
||||||
)
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
def get_account_info(self, offline=False):
|
def get_account_info(self, offline=False):
|
||||||
@ -187,10 +182,7 @@ class AccountClient(RESTClient):
|
|||||||
"username": account.get("username"),
|
"username": account.get("username"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
token = self.fetch_authentication_token()
|
result = self.send_auth_request("get", "/v1/summary",)
|
||||||
result = self.send_request(
|
|
||||||
"get", "/v1/summary", headers={"Authorization": "Bearer %s" % token},
|
|
||||||
)
|
|
||||||
account["summary"] = dict(
|
account["summary"] = dict(
|
||||||
profile=result.get("profile"),
|
profile=result.get("profile"),
|
||||||
packages=result.get("packages"),
|
packages=result.get("packages"),
|
||||||
@ -201,6 +193,40 @@ class AccountClient(RESTClient):
|
|||||||
app.set_state_item("account", account)
|
app.set_state_item("account", account)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
def create_org(self, orgname, email, display_name):
|
||||||
|
response = self.send_auth_request(
|
||||||
|
"post",
|
||||||
|
"/v1/orgs",
|
||||||
|
data={"orgname": orgname, "email": email, "displayname": display_name},
|
||||||
|
)
|
||||||
|
return response
|
||||||
|
|
||||||
|
def list_orgs(self):
|
||||||
|
response = self.send_auth_request("get", "/v1/orgs",)
|
||||||
|
return response
|
||||||
|
|
||||||
|
def update_org(self, orgname, data):
|
||||||
|
response = self.send_auth_request(
|
||||||
|
"put", "/v1/orgs/%s" % orgname, data={k: v for k, v in data.items() if v}
|
||||||
|
)
|
||||||
|
return response
|
||||||
|
|
||||||
|
def add_org_owner(self, orgname, username):
|
||||||
|
response = self.send_auth_request(
|
||||||
|
"post", "/v1/orgs/%s/owners" % orgname, data={"username": username},
|
||||||
|
)
|
||||||
|
return response
|
||||||
|
|
||||||
|
def list_org_owners(self, orgname):
|
||||||
|
response = self.send_auth_request("get", "/v1/orgs/%s/owners" % orgname,)
|
||||||
|
return response
|
||||||
|
|
||||||
|
def remove_org_owner(self, orgname, username):
|
||||||
|
response = self.send_auth_request(
|
||||||
|
"delete", "/v1/orgs/%s/owners" % orgname, data={"username": username},
|
||||||
|
)
|
||||||
|
return response
|
||||||
|
|
||||||
def fetch_authentication_token(self):
|
def fetch_authentication_token(self):
|
||||||
if "PLATFORMIO_AUTH_TOKEN" in os.environ:
|
if "PLATFORMIO_AUTH_TOKEN" in os.environ:
|
||||||
return os.environ["PLATFORMIO_AUTH_TOKEN"]
|
return os.environ["PLATFORMIO_AUTH_TOKEN"]
|
||||||
|
@ -24,6 +24,14 @@ class RegistryClient(RESTClient):
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
super(RegistryClient, self).__init__(base_url=__registry_api__)
|
super(RegistryClient, self).__init__(base_url=__registry_api__)
|
||||||
|
|
||||||
|
def send_auth_request(self, *args, **kwargs):
|
||||||
|
headers = kwargs.get("headers", {})
|
||||||
|
if "Authorization" not in headers:
|
||||||
|
token = AccountClient().fetch_authentication_token()
|
||||||
|
headers["Authorization"] = "Bearer %s" % token
|
||||||
|
kwargs["headers"] = headers
|
||||||
|
return self.send_request(*args, **kwargs)
|
||||||
|
|
||||||
def publish_package(
|
def publish_package(
|
||||||
self, archive_path, owner=None, released_at=None, private=False, notify=True
|
self, archive_path, owner=None, released_at=None, private=False, notify=True
|
||||||
):
|
):
|
||||||
@ -33,7 +41,7 @@ class RegistryClient(RESTClient):
|
|||||||
account.get_account_info(offline=True).get("profile").get("username")
|
account.get_account_info(offline=True).get("profile").get("username")
|
||||||
)
|
)
|
||||||
with open(archive_path, "rb") as fp:
|
with open(archive_path, "rb") as fp:
|
||||||
response = self.send_request(
|
response = self.send_auth_request(
|
||||||
"post",
|
"post",
|
||||||
"/v3/package/%s/%s" % (owner, PackageType.from_archive(archive_path)),
|
"/v3/package/%s/%s" % (owner, PackageType.from_archive(archive_path)),
|
||||||
params={
|
params={
|
||||||
@ -42,7 +50,6 @@ class RegistryClient(RESTClient):
|
|||||||
"released_at": released_at,
|
"released_at": released_at,
|
||||||
},
|
},
|
||||||
headers={
|
headers={
|
||||||
"Authorization": "Bearer %s" % account.fetch_authentication_token(),
|
|
||||||
"Content-Type": "application/octet-stream",
|
"Content-Type": "application/octet-stream",
|
||||||
"X-PIO-Content-SHA256": fs.calculate_file_hashsum(
|
"X-PIO-Content-SHA256": fs.calculate_file_hashsum(
|
||||||
"sha256", archive_path
|
"sha256", archive_path
|
||||||
@ -63,12 +70,7 @@ class RegistryClient(RESTClient):
|
|||||||
path = "/v3/package/%s/%s/%s" % (owner, type, name)
|
path = "/v3/package/%s/%s/%s" % (owner, type, name)
|
||||||
if version:
|
if version:
|
||||||
path = path + "/version/" + version
|
path = path + "/version/" + version
|
||||||
response = self.send_request(
|
response = self.send_auth_request(
|
||||||
"delete",
|
"delete", path, params={"undo": 1 if undo else 0},
|
||||||
path,
|
|
||||||
params={"undo": 1 if undo else 0},
|
|
||||||
headers={
|
|
||||||
"Authorization": "Bearer %s" % account.fetch_authentication_token()
|
|
||||||
},
|
|
||||||
)
|
)
|
||||||
return response
|
return response
|
||||||
|
@ -29,13 +29,15 @@ def cli():
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def validate_username(value):
|
def validate_username(value, field="username"):
|
||||||
value = str(value).strip()
|
value = str(value).strip()
|
||||||
if not re.match(r"^[a-z\d](?:[a-z\d]|-(?=[a-z\d])){3,38}$", value, flags=re.I):
|
if not re.match(r"^[a-z\d](?:[a-z\d]|-(?=[a-z\d])){0,37}$", value, flags=re.I):
|
||||||
raise click.BadParameter(
|
raise click.BadParameter(
|
||||||
"Invalid username format. "
|
"Invalid %s format. "
|
||||||
"Username must contain at least 4 characters including single hyphens,"
|
"%s may only contain alphanumeric characters "
|
||||||
" and cannot begin or end with a hyphen"
|
"or single hyphens, cannot begin or end with a hyphen, "
|
||||||
|
"and must not be longer than 38 characters."
|
||||||
|
% (field.lower(), field.capitalize())
|
||||||
)
|
)
|
||||||
return value
|
return value
|
||||||
|
|
||||||
|
128
platformio/commands/org.py
Normal file
128
platformio/commands/org.py
Normal file
@ -0,0 +1,128 @@
|
|||||||
|
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
# pylint: disable=unused-argument
|
||||||
|
|
||||||
|
import json
|
||||||
|
|
||||||
|
import click
|
||||||
|
from tabulate import tabulate
|
||||||
|
|
||||||
|
from platformio.clients.account import AccountClient
|
||||||
|
from platformio.commands.account import validate_email, validate_username
|
||||||
|
|
||||||
|
|
||||||
|
@click.group("org", short_help="Manage Organizations")
|
||||||
|
def cli():
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def validate_orgname(value):
|
||||||
|
return validate_username(value, "Organization name")
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command("create", short_help="Create a new organization")
|
||||||
|
@click.argument(
|
||||||
|
"orgname", callback=lambda _, __, value: validate_orgname(value),
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
"--email", callback=lambda _, __, value: validate_email(value) if value else value
|
||||||
|
)
|
||||||
|
@click.option("--display-name",)
|
||||||
|
def org_create(orgname, email, display_name):
|
||||||
|
client = AccountClient()
|
||||||
|
client.create_org(orgname, email, display_name)
|
||||||
|
return click.secho("An organization has been successfully created.", fg="green",)
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command("list", short_help="List organizations")
|
||||||
|
@click.option("--json-output", is_flag=True)
|
||||||
|
def org_list(json_output):
|
||||||
|
client = AccountClient()
|
||||||
|
orgs = client.list_orgs()
|
||||||
|
if json_output:
|
||||||
|
return click.echo(json.dumps(orgs))
|
||||||
|
click.echo()
|
||||||
|
click.secho("Organizations", fg="cyan")
|
||||||
|
click.echo("=" * len("Organizations"))
|
||||||
|
for org in orgs:
|
||||||
|
click.echo()
|
||||||
|
click.secho(org.get("orgname"), bold=True)
|
||||||
|
click.echo("-" * len(org.get("orgname")))
|
||||||
|
data = []
|
||||||
|
if org.get("displayname"):
|
||||||
|
data.append(("Display Name:", org.get("displayname")))
|
||||||
|
if org.get("email"):
|
||||||
|
data.append(("Email:", org.get("email")))
|
||||||
|
data.append(
|
||||||
|
(
|
||||||
|
"Owners:",
|
||||||
|
", ".join((owner.get("username") for owner in org.get("owners"))),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
click.echo(tabulate(data, tablefmt="plain"))
|
||||||
|
return click.echo()
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command("update", short_help="Update organization")
|
||||||
|
@click.argument("orgname")
|
||||||
|
@click.option("--new-orgname")
|
||||||
|
@click.option("--email")
|
||||||
|
@click.option("--display-name",)
|
||||||
|
def org_update(orgname, **kwargs):
|
||||||
|
client = AccountClient()
|
||||||
|
org = next(
|
||||||
|
(org for org in client.list_orgs() if org.get("orgname") == orgname), None
|
||||||
|
)
|
||||||
|
if not org:
|
||||||
|
return click.ClickException("Organization '%s' not found" % orgname)
|
||||||
|
del org["owners"]
|
||||||
|
new_org = org.copy()
|
||||||
|
if not any(kwargs.values()):
|
||||||
|
for field in org:
|
||||||
|
new_org[field] = click.prompt(
|
||||||
|
field.replace("_", " ").capitalize(), default=org[field]
|
||||||
|
)
|
||||||
|
if field == "email":
|
||||||
|
validate_email(new_org[field])
|
||||||
|
if field == "orgname":
|
||||||
|
validate_orgname(new_org[field])
|
||||||
|
else:
|
||||||
|
new_org.update(
|
||||||
|
{key.replace("new_", ""): value for key, value in kwargs.items() if value}
|
||||||
|
)
|
||||||
|
client.update_org(orgname, new_org)
|
||||||
|
return click.secho("An organization has been successfully updated.", fg="green",)
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command("add", short_help="Add a new owner to organization")
|
||||||
|
@click.argument("orgname",)
|
||||||
|
@click.argument("username",)
|
||||||
|
def org_add_owner(orgname, username):
|
||||||
|
client = AccountClient()
|
||||||
|
client.add_org_owner(orgname, username)
|
||||||
|
return click.secho(
|
||||||
|
"A new owner has been successfully added to organization.", fg="green",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command("remove", short_help="Remove an owner from organization")
|
||||||
|
@click.argument("orgname",)
|
||||||
|
@click.argument("username",)
|
||||||
|
def org_remove_owner(orgname, username):
|
||||||
|
client = AccountClient()
|
||||||
|
client.remove_org_owner(orgname, username)
|
||||||
|
return click.secho(
|
||||||
|
"An owner has been successfully removed from organization.", fg="green",
|
||||||
|
)
|
@ -145,8 +145,12 @@ def test_account_password_change_with_invalid_old_password(
|
|||||||
)
|
)
|
||||||
assert result.exit_code > 0
|
assert result.exit_code > 0
|
||||||
assert result.exception
|
assert result.exception
|
||||||
assert "Invalid user password" in str(result.exception)
|
assert (
|
||||||
|
"Invalid request data for new_password -> "
|
||||||
|
"'Password must contain at least 8 "
|
||||||
|
"characters including a number and a lowercase letter'"
|
||||||
|
in str(result.exception)
|
||||||
|
)
|
||||||
finally:
|
finally:
|
||||||
clirunner.invoke(cmd_account, ["logout"])
|
clirunner.invoke(cmd_account, ["logout"])
|
||||||
|
|
||||||
@ -174,9 +178,9 @@ def test_account_password_change_with_invalid_new_password_format(
|
|||||||
assert result.exit_code > 0
|
assert result.exit_code > 0
|
||||||
assert result.exception
|
assert result.exception
|
||||||
assert (
|
assert (
|
||||||
"Invalid password format. Password must contain at"
|
"Invalid request data for new_password -> "
|
||||||
" least 8 characters including a number and a lowercase letter"
|
"'Password must contain at least 8 characters"
|
||||||
in str(result.exception)
|
" including a number and a lowercase letter'" in str(result.exception)
|
||||||
)
|
)
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
191
tests/commands/test_orgs.py
Normal file
191
tests/commands/test_orgs.py
Normal file
@ -0,0 +1,191 @@
|
|||||||
|
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from platformio.commands.account import cli as cmd_account
|
||||||
|
from platformio.commands.org import cli as cmd_org
|
||||||
|
|
||||||
|
pytestmark = pytest.mark.skipif(
|
||||||
|
not (
|
||||||
|
os.environ.get("PLATFORMIO_TEST_ACCOUNT_LOGIN")
|
||||||
|
and os.environ.get("PLATFORMIO_TEST_ACCOUNT_PASSWORD")
|
||||||
|
),
|
||||||
|
reason="requires PLATFORMIO_TEST_ACCOUNT_LOGIN, PLATFORMIO_TEST_ACCOUNT_PASSWORD environ variables",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="session")
|
||||||
|
def credentials():
|
||||||
|
return {
|
||||||
|
"login": os.environ["PLATFORMIO_TEST_ACCOUNT_LOGIN"],
|
||||||
|
"password": os.environ["PLATFORMIO_TEST_ACCOUNT_PASSWORD"],
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def test_org_add(clirunner, credentials, validate_cliresult, isolated_pio_home):
|
||||||
|
try:
|
||||||
|
result = clirunner.invoke(
|
||||||
|
cmd_account,
|
||||||
|
["login", "-u", credentials["login"], "-p", credentials["password"]],
|
||||||
|
)
|
||||||
|
validate_cliresult(result)
|
||||||
|
assert "Successfully logged in!" in result.output
|
||||||
|
|
||||||
|
result = clirunner.invoke(cmd_org, ["list", "--json-output"],)
|
||||||
|
validate_cliresult(result)
|
||||||
|
json_result = json.loads(result.output.strip())
|
||||||
|
if len(json_result) < 3:
|
||||||
|
for i in range(3 - len(json_result)):
|
||||||
|
result = clirunner.invoke(
|
||||||
|
cmd_org,
|
||||||
|
[
|
||||||
|
"create",
|
||||||
|
"%s-%s" % (i, credentials["login"]),
|
||||||
|
"--email",
|
||||||
|
"test@test.com",
|
||||||
|
"--display-name",
|
||||||
|
"TEST ORG %s" % i,
|
||||||
|
],
|
||||||
|
)
|
||||||
|
validate_cliresult(result)
|
||||||
|
result = clirunner.invoke(cmd_org, ["list", "--json-output"],)
|
||||||
|
validate_cliresult(result)
|
||||||
|
json_result = json.loads(result.output.strip())
|
||||||
|
assert len(json_result) == 3
|
||||||
|
finally:
|
||||||
|
clirunner.invoke(cmd_account, ["logout"])
|
||||||
|
|
||||||
|
|
||||||
|
def test_org_list(clirunner, credentials, validate_cliresult, isolated_pio_home):
|
||||||
|
try:
|
||||||
|
result = clirunner.invoke(
|
||||||
|
cmd_account,
|
||||||
|
["login", "-u", credentials["login"], "-p", credentials["password"]],
|
||||||
|
)
|
||||||
|
validate_cliresult(result)
|
||||||
|
assert "Successfully logged in!" in result.output
|
||||||
|
result = clirunner.invoke(cmd_org, ["list", "--json-output"],)
|
||||||
|
validate_cliresult(result)
|
||||||
|
json_result = json.loads(result.output.strip())
|
||||||
|
assert len(json_result) == 3
|
||||||
|
check = False
|
||||||
|
for org in json_result:
|
||||||
|
assert "orgname" in org
|
||||||
|
assert "displayname" in org
|
||||||
|
assert "email" in org
|
||||||
|
assert "owners" in org
|
||||||
|
for owner in org.get("owners"):
|
||||||
|
assert "username" in owner
|
||||||
|
check = owner["username"] == credentials["login"] if not check else True
|
||||||
|
assert "firstname" in owner
|
||||||
|
assert "lastname" in owner
|
||||||
|
assert check
|
||||||
|
finally:
|
||||||
|
clirunner.invoke(cmd_account, ["logout"])
|
||||||
|
|
||||||
|
|
||||||
|
def test_org_update(clirunner, credentials, validate_cliresult, isolated_pio_home):
|
||||||
|
try:
|
||||||
|
result = clirunner.invoke(
|
||||||
|
cmd_account,
|
||||||
|
["login", "-u", credentials["login"], "-p", credentials["password"]],
|
||||||
|
)
|
||||||
|
validate_cliresult(result)
|
||||||
|
assert "Successfully logged in!" in result.output
|
||||||
|
|
||||||
|
result = clirunner.invoke(cmd_org, ["list", "--json-output"],)
|
||||||
|
validate_cliresult(result)
|
||||||
|
json_result = json.loads(result.output.strip())
|
||||||
|
assert len(json_result) == 3
|
||||||
|
org = json_result[0]
|
||||||
|
assert "orgname" in org
|
||||||
|
assert "displayname" in org
|
||||||
|
assert "email" in org
|
||||||
|
assert "owners" in org
|
||||||
|
|
||||||
|
old_orgname = org["orgname"]
|
||||||
|
if len(old_orgname) > 10:
|
||||||
|
new_orgname = "neworg" + org["orgname"][6:]
|
||||||
|
|
||||||
|
result = clirunner.invoke(
|
||||||
|
cmd_org, ["update", old_orgname, "--new-orgname", new_orgname],
|
||||||
|
)
|
||||||
|
validate_cliresult(result)
|
||||||
|
|
||||||
|
result = clirunner.invoke(
|
||||||
|
cmd_org, ["update", new_orgname, "--new-orgname", old_orgname],
|
||||||
|
)
|
||||||
|
validate_cliresult(result)
|
||||||
|
|
||||||
|
result = clirunner.invoke(cmd_org, ["list", "--json-output"],)
|
||||||
|
validate_cliresult(result)
|
||||||
|
assert json.loads(result.output.strip()) == json_result
|
||||||
|
finally:
|
||||||
|
clirunner.invoke(cmd_account, ["logout"])
|
||||||
|
|
||||||
|
|
||||||
|
def test_org_owner(clirunner, credentials, validate_cliresult, isolated_pio_home):
|
||||||
|
try:
|
||||||
|
result = clirunner.invoke(
|
||||||
|
cmd_account,
|
||||||
|
["login", "-u", credentials["login"], "-p", credentials["password"]],
|
||||||
|
)
|
||||||
|
validate_cliresult(result)
|
||||||
|
assert "Successfully logged in!" in result.output
|
||||||
|
result = clirunner.invoke(cmd_org, ["list", "--json-output"],)
|
||||||
|
validate_cliresult(result)
|
||||||
|
json_result = json.loads(result.output.strip())
|
||||||
|
assert len(json_result) == 3
|
||||||
|
org = json_result[0]
|
||||||
|
assert "orgname" in org
|
||||||
|
assert "displayname" in org
|
||||||
|
assert "email" in org
|
||||||
|
assert "owners" in org
|
||||||
|
|
||||||
|
result = clirunner.invoke(cmd_org, ["add", org["orgname"], "platformio"],)
|
||||||
|
validate_cliresult(result)
|
||||||
|
|
||||||
|
result = clirunner.invoke(cmd_org, ["list", "--json-output"],)
|
||||||
|
validate_cliresult(result)
|
||||||
|
json_result = json.loads(result.output.strip())
|
||||||
|
assert len(json_result) == 3
|
||||||
|
check = False
|
||||||
|
for item in json_result:
|
||||||
|
if item["orgname"] != org["orgname"]:
|
||||||
|
continue
|
||||||
|
for owner in item.get("owners"):
|
||||||
|
check = owner["username"] == "platformio" if not check else True
|
||||||
|
assert check
|
||||||
|
|
||||||
|
result = clirunner.invoke(cmd_org, ["remove", org["orgname"], "platformio"],)
|
||||||
|
validate_cliresult(result)
|
||||||
|
|
||||||
|
result = clirunner.invoke(cmd_org, ["list", "--json-output"],)
|
||||||
|
validate_cliresult(result)
|
||||||
|
json_result = json.loads(result.output.strip())
|
||||||
|
assert len(json_result) == 3
|
||||||
|
check = False
|
||||||
|
for item in json_result:
|
||||||
|
if item["orgname"] != org["orgname"]:
|
||||||
|
continue
|
||||||
|
for owner in item.get("owners"):
|
||||||
|
check = owner["username"] == "platformio" if not check else True
|
||||||
|
assert not check
|
||||||
|
|
||||||
|
finally:
|
||||||
|
clirunner.invoke(cmd_account, ["logout"])
|
Reference in New Issue
Block a user