Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 91 additions & 70 deletions src/manage/install_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def download_package(cmd, install, dest, cache, *, on_progress=None, urlopen=_ur
LOGGER.verbose("Using bundled file at %s", bundled)
return bundled

unlink(dest, "Removing old download is taking some time. " +
unlink(dest, "Removing old download is taking some time. " +
"Please continue to wait, or press Ctrl+C to abort.")

def _find_creds(url):
Expand Down Expand Up @@ -528,36 +528,6 @@ def _restore_site(cmd, state):
LOGGER.verbose("TRACEBACK", exc_info=True)


def _sanitise_install(cmd, install):
"""Prepares install metadata for storing locally.

This includes:
* filtering out disabled shortcuts
* preserving original shortcuts
* sanitising URLs
"""

if "shortcuts" in install:
# This saves our original set of shortcuts, so a later repair operation
# can enable those that were originally disabled.
shortcuts = install.setdefault("__original-shortcuts", install["shortcuts"])
if cmd.enable_shortcut_kinds or cmd.disable_shortcut_kinds:
orig_shortcuts = shortcuts
shortcuts = []
for s in orig_shortcuts:
if cmd.enable_shortcut_kinds and s["kind"] not in cmd.enable_shortcut_kinds:
continue
if cmd.disable_shortcut_kinds and s["kind"] in cmd.disable_shortcut_kinds:
continue
shortcuts.append(s)
install["shortcuts"] = shortcuts

install["url"] = sanitise_url(install["url"])
# If there's a non-empty and non-default source, sanitise it
if install.get("source") and install["source"] != cmd.fallback_source:
install["source"] = sanitise_url(install["source"])


def _install_one(cmd, source, install, *, target=None):
if cmd.repair:
LOGGER.info("Repairing %s.", install['display-name'])
Expand All @@ -574,46 +544,87 @@ def _install_one(cmd, source, install, *, target=None):
package = _download_one(cmd, source, install, cmd.download_dir)

dest = target or (cmd.install_dir / install["id"])
metadata_dest = dest / "__install__.json"

preserved_site = _preserve_site(cmd, dest, install)

LOGGER.verbose("Extracting %s to %s", package, dest)
if not cmd.repair:
try:
rmtree(
dest,
"Removing the previous install is taking some time. " +
"Ensure Python is not running, and continue to wait " +
"or press Ctrl+C to abort.",
remove_ext_first=("exe", "dll", "json"),
)
except FileExistsError:
LOGGER.error(
"Unable to remove previous install. " +
"Please check your packages directory at %s for issues.",
dest.parent
)
raise
except FilesInUseError:
LOGGER.error(
"Unable to remove previous install because files are still in use. " +
"Please ensure Python is not currently running."
try:
if not cmd.repair:
_remove_existing(dest)

with ProgressPrinter("Extracting", maxwidth=CONSOLE_MAX_WIDTH) as on_progress:
extract_package(package, dest, on_progress=on_progress, repair=cmd.repair)

if target:
unlink(
metadata_dest,
"Removing metadata from the install is taking some time. Please " +
"continue to wait, or press Ctrl+C to abort."
)
raise
else:
_finalize_metadata(cmd, install, metadata_dest)

with ProgressPrinter("Extracting", maxwidth=CONSOLE_MAX_WIDTH) as on_progress:
extract_package(package, dest, on_progress=on_progress, repair=cmd.repair)
LOGGER.debug("Write __install__.json to %s", dest)
with open(metadata_dest, "w", encoding="utf-8") as f:
json.dump(install, f, default=str)

if target:
unlink(
dest / "__install__.json",
"Removing metadata from the install is taking some time. Please " +
"continue to wait, or press Ctrl+C to abort."
finally:
# May be letting an exception bubble out here, so we'll handle and log
# here rather than letting any new ones leave.
try:
if dest.is_dir():
# Install may be broken at this point, but we'll put site back anyway
_restore_site(cmd, preserved_site)
else:
# Install is certainly broken, but we don't want to delete user files
# Just warn, until we come up with a better idea
LOGGER.warn("This runtime has been lost due to an error, you will "
"need to reinstall.")
except Exception:
LOGGER.warn("Unexpected failure finalizing install. See log file for details")
LOGGER.verbose("TRACEBACK", exc_info=True)

LOGGER.verbose("Install complete")


def _remove_existing(install_dir):
try:
rmtree(
install_dir,
"Removing the previous install is taking some time. " +
"Ensure Python is not running, and continue to wait " +
"or press Ctrl+C to abort.",
remove_ext_first=("exe", "dll", "json"),
)
else:
except FileExistsError:
LOGGER.error(
"Unable to remove previous install. " +
"Please check your packages directory at %s for issues.",
install_dir.parent
)
raise
except FilesInUseError:
LOGGER.error(
"Unable to remove previous install because files are still in use. " +
"Please ensure Python is not currently running."
)
raise


def _finalize_metadata(cmd, install, merge_from=None):
"""Prepares install metadata for storing locally.

This includes:
* filtering out disabled shortcuts
* preserving original shortcuts
* sanitising URLs
"""

if merge_from:
try:
with open(dest / "__install__.json", "r", encoding="utf-8-sig") as f:
LOGGER.debug("Updating from __install__.json in %s", dest)
with open(merge_from, "r", encoding="utf-8-sig") as f:
LOGGER.debug("Updating from __install__.json in %s", merge_from.parent)
for k, v in json.load(f).items():
if not install.setdefault(k, v):
install[k] = v
Expand All @@ -626,15 +637,25 @@ def _install_one(cmd, source, install, *, target=None):
)
raise

_sanitise_install(cmd, install)

LOGGER.debug("Write __install__.json to %s", dest)
with open(dest / "__install__.json", "w", encoding="utf-8") as f:
json.dump(install, f, default=str)

_restore_site(cmd, preserved_site)
if "shortcuts" in install:
# This saves our original set of shortcuts, so a later repair operation
# can enable those that were originally disabled.
shortcuts = install.setdefault("__original-shortcuts", install["shortcuts"])
if cmd.enable_shortcut_kinds or cmd.disable_shortcut_kinds:
orig_shortcuts = shortcuts
shortcuts = []
for s in orig_shortcuts:
if cmd.enable_shortcut_kinds and s["kind"] not in cmd.enable_shortcut_kinds:
continue
if cmd.disable_shortcut_kinds and s["kind"] in cmd.disable_shortcut_kinds:
continue
shortcuts.append(s)
install["shortcuts"] = shortcuts

LOGGER.verbose("Install complete")
install["url"] = sanitise_url(install["url"])
# If there's a non-empty and non-default source, sanitise it
if install.get("source") and install["source"] != cmd.fallback_source:
install["source"] = sanitise_url(install["source"])


def _merge_existing_index(versions, index_json):
Expand Down
20 changes: 10 additions & 10 deletions tests/test_install_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ def test_install_from_script(tmp_path, assert_log):
)


def test_sanitise_install_urls():
def test_finalize_metadata_urls():
class Cmd:
enable_shortcut_kinds = []
disable_shortcut_kinds = []
Expand All @@ -341,13 +341,13 @@ class Cmd:
"source": "http://user:placeholder@example.com/index.json",
}

IC._sanitise_install(Cmd, i)
IC._finalize_metadata(Cmd, i)

assert i["url"] == "http://example.com/package.zip"
assert i["source"] == "http://example.com/index.json"


def test_sanitise_install_fallback_urls():
def test_finalize_metadata_fallback_urls():
class Cmd:
enable_shortcut_kinds = []
disable_shortcut_kinds = []
Expand All @@ -358,13 +358,13 @@ class Cmd:
"source": "http://user:placeholder@example.com/index.json",
}

IC._sanitise_install(Cmd, i)
IC._finalize_metadata(Cmd, i)

assert i["url"] == "http://example.com/package.zip"
assert i["source"] == "http://user:placeholder@example.com/index.json"


def test_sanitise_install_shortcuts():
def test_finalize_metadata_shortcuts():
class Cmd:
enable_shortcut_kinds = []
disable_shortcut_kinds = []
Expand All @@ -375,13 +375,13 @@ class Cmd:
"shortcuts": [dict(kind=a) for a in "abc"],
}

IC._sanitise_install(Cmd, i)
IC._finalize_metadata(Cmd, i)

assert [j["kind"] for j in i["shortcuts"]] == ["a", "b", "c"]
assert [j["kind"] for j in i["__original-shortcuts"]] == ["a", "b", "c"]


def test_sanitise_install_shortcuts_disable():
def test_finalize_metadata_shortcuts_disable():
class Cmd:
enable_shortcut_kinds = []
disable_shortcut_kinds = ["b"]
Expand All @@ -392,13 +392,13 @@ class Cmd:
"shortcuts": [dict(kind=a) for a in "abc"],
}

IC._sanitise_install(Cmd, i)
IC._finalize_metadata(Cmd, i)

assert [j["kind"] for j in i["shortcuts"]] == ["a", "c"]
assert [j["kind"] for j in i["__original-shortcuts"]] == ["a", "b", "c"]


def test_sanitise_install_shortcuts_enable():
def test_finalize_metadata_shortcuts_enable():
class Cmd:
enable_shortcut_kinds = ["b"]
disable_shortcut_kinds = []
Expand All @@ -409,7 +409,7 @@ class Cmd:
"shortcuts": [dict(kind=a) for a in "abc"],
}

IC._sanitise_install(Cmd, i)
IC._finalize_metadata(Cmd, i)

assert [j["kind"] for j in i["shortcuts"]] == ["b"]
assert [j["kind"] for j in i["__original-shortcuts"]] == ["a", "b", "c"]