Addon Manager: Create new git handling mechanism
This commit is contained in:
138
src/Mod/AddonManager/AddonManagerTest/app/test_git.py
Normal file
138
src/Mod/AddonManager/AddonManagerTest/app/test_git.py
Normal file
@@ -0,0 +1,138 @@
|
||||
# ***************************************************************************
|
||||
# * *
|
||||
# * Copyright (c) 2022 FreeCAD Project Association *
|
||||
# * *
|
||||
# * This file is part of FreeCAD. *
|
||||
# * *
|
||||
# * FreeCAD is free software: you can redistribute it and/or modify it *
|
||||
# * under the terms of the GNU Lesser General Public License as *
|
||||
# * published by the Free Software Foundation, either version 2.1 of the *
|
||||
# * License, or (at your option) any later version. *
|
||||
# * *
|
||||
# * FreeCAD is distributed in the hope that it will be useful, but *
|
||||
# * WITHOUT ANY WARRANTY; without even the implied warranty of *
|
||||
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
|
||||
# * Lesser General Public License for more details. *
|
||||
# * *
|
||||
# * You should have received a copy of the GNU Lesser General Public *
|
||||
# * License along with FreeCAD. If not, see *
|
||||
# * <https://www.gnu.org/licenses/>. *
|
||||
# * *
|
||||
# ***************************************************************************
|
||||
|
||||
|
||||
import unittest
|
||||
import os
|
||||
import shutil
|
||||
import stat
|
||||
import tempfile
|
||||
from zipfile import ZipFile
|
||||
import FreeCAD
|
||||
|
||||
from typing import Dict
|
||||
|
||||
from addonmanager_git import GitManager, NoGitFound, GitFailed
|
||||
|
||||
|
||||
class TestGit(unittest.TestCase):
|
||||
|
||||
MODULE = "test_git" # file name without extension
|
||||
|
||||
def setUp(self):
|
||||
"""Set up the test case: called by the unit test system"""
|
||||
test_data_dir = os.path.join(
|
||||
FreeCAD.getHomePath(), "Mod", "AddonManager", "AddonManagerTest", "data"
|
||||
)
|
||||
git_repo_zip = os.path.join(test_data_dir, "test_repo.zip")
|
||||
self.test_dir = os.path.join(
|
||||
tempfile.gettempdir(), "FreeCADTesting", "AddonManagerTests", "Git"
|
||||
)
|
||||
os.makedirs(self.test_dir, exist_ok=True)
|
||||
self.test_repo_remote = os.path.join(self.test_dir, "TEST_REPO_REMOTE")
|
||||
self._rmdir(self.test_repo_remote)
|
||||
|
||||
if not os.path.exists(git_repo_zip):
|
||||
self.skipTest("Can't find test repo")
|
||||
return
|
||||
|
||||
with ZipFile(git_repo_zip, "r") as zip_repo:
|
||||
zip_repo.extractall(self.test_repo_remote)
|
||||
self.test_repo_remote = os.path.join(self.test_repo_remote, "test_repo")
|
||||
|
||||
try:
|
||||
self.git = GitManager()
|
||||
except NoGitFound:
|
||||
self.skipTest("No git found")
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up after the test"""
|
||||
# self._rmdir(self.test_dir)
|
||||
|
||||
def test_clone(self):
|
||||
"""Test git clone"""
|
||||
checkout_dir = self._clone_test_repo()
|
||||
self.assertTrue(os.path.exists(checkout_dir))
|
||||
self.assertTrue(os.path.exists(os.path.join(checkout_dir, ".git")))
|
||||
|
||||
def test_checkout(self):
|
||||
"""Test git checkout"""
|
||||
checkout_dir = self._clone_test_repo()
|
||||
|
||||
self.git.checkout(checkout_dir, "HEAD~1")
|
||||
status = self.git.status(checkout_dir).strip()
|
||||
expected_status = "## HEAD (no branch)"
|
||||
self.assertEqual(status, expected_status)
|
||||
|
||||
def test_update(self):
|
||||
"""Test using git to update the local repo"""
|
||||
checkout_dir = self._clone_test_repo()
|
||||
|
||||
self.git.reset(checkout_dir, ["--hard", "HEAD~1"])
|
||||
self.assertTrue(self.git.update_available(checkout_dir))
|
||||
self.git.update(checkout_dir)
|
||||
self.assertFalse(self.git.update_available(checkout_dir))
|
||||
|
||||
def test_tag_and_branch(self):
|
||||
"""Test checking the currently checked-out tag"""
|
||||
checkout_dir = self._clone_test_repo()
|
||||
|
||||
expected_tag = "TestTag"
|
||||
self.git.checkout(checkout_dir, expected_tag)
|
||||
found_tag = self.git.current_tag(checkout_dir)
|
||||
self.assertEqual(found_tag, expected_tag)
|
||||
self.assertFalse(self.git.update_available(checkout_dir))
|
||||
|
||||
expected_branch = "TestBranch"
|
||||
self.git.checkout(checkout_dir, expected_branch)
|
||||
found_branch = self.git.current_branch(checkout_dir)
|
||||
self.assertEqual(found_branch, expected_branch)
|
||||
self.assertFalse(self.git.update_available(checkout_dir))
|
||||
|
||||
expected_branch = "master"
|
||||
self.git.checkout(checkout_dir, expected_branch)
|
||||
found_branch = self.git.current_branch(checkout_dir)
|
||||
self.assertEqual(found_branch, expected_branch)
|
||||
self.assertFalse(self.git.update_available(checkout_dir))
|
||||
|
||||
def _rmdir(self, path):
|
||||
try:
|
||||
shutil.rmtree(path, onerror=self._remove_readonly)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
def _remove_readonly(self, func, path, _) -> None:
|
||||
"""Remove a read-only file."""
|
||||
|
||||
os.chmod(path, stat.S_IWRITE)
|
||||
func(path)
|
||||
|
||||
def _clone_test_repo(self) -> str:
|
||||
checkout_dir = os.path.join(self.test_dir, "test_repo")
|
||||
try:
|
||||
# Git won't clone to an existing directory, so make sure to remove it first
|
||||
if os.path.exists(checkout_dir):
|
||||
self._rmdir(checkout_dir)
|
||||
self.git.clone(self.test_repo_remote, checkout_dir)
|
||||
except GitFailed as e:
|
||||
self.fail(str(e))
|
||||
return checkout_dir
|
||||
BIN
src/Mod/AddonManager/AddonManagerTest/data/test_repo.zip
Normal file
BIN
src/Mod/AddonManager/AddonManagerTest/data/test_repo.zip
Normal file
Binary file not shown.
@@ -7,6 +7,7 @@ SET(AddonManager_SRCS
|
||||
Addon.py
|
||||
AddonManager.py
|
||||
AddonManager.ui
|
||||
addonmanager_git.py
|
||||
addonmanager_macro.py
|
||||
addonmanager_utilities.py
|
||||
addonmanager_workers_startup.py
|
||||
@@ -46,6 +47,7 @@ SET(AddonManagerTests_SRCS
|
||||
SET(AddonManagerTestsApp_SRCS
|
||||
AddonManagerTest/app/__init__.py
|
||||
AddonManagerTest/app/test_addon.py
|
||||
AddonManagerTest/app/test_git.py
|
||||
AddonManagerTest/app/test_macro.py
|
||||
AddonManagerTest/app/test_utilities.py
|
||||
)
|
||||
@@ -70,6 +72,7 @@ SET(AddonManagerTestsFiles_SRCS
|
||||
AddonManagerTest/data/macro_template.FCStd
|
||||
AddonManagerTest/data/missing_macro_metadata.FCStd
|
||||
AddonManagerTest/data/prefpack_only.xml
|
||||
AddonManagerTest/data/test_repo.zip
|
||||
AddonManagerTest/data/test_version_detection.xml
|
||||
AddonManagerTest/data/workbench_only.xml
|
||||
)
|
||||
|
||||
@@ -32,8 +32,12 @@ from AddonManagerTest.app.test_addon import (
|
||||
from AddonManagerTest.app.test_macro import (
|
||||
TestMacro as AddonManagerTestMacro,
|
||||
)
|
||||
from AddonManagerTest.app.test_git import (
|
||||
TestGit as AddonManagerTestGit,
|
||||
)
|
||||
|
||||
# dummy usage to get flake8 and lgtm quiet
|
||||
False if AddonManagerTestUtilities.__name__ else True
|
||||
False if AddonManagerTestAddon.__name__ else True
|
||||
False if AddonManagerTestMacro.__name__ else True
|
||||
False if AddonManagerTestGit.__name__ else True
|
||||
|
||||
191
src/Mod/AddonManager/addonmanager_git.py
Normal file
191
src/Mod/AddonManager/addonmanager_git.py
Normal file
@@ -0,0 +1,191 @@
|
||||
# ***************************************************************************
|
||||
# * *
|
||||
# * Copyright (c) 2022 FreeCAD Project Association *
|
||||
# * *
|
||||
# * This file is part of FreeCAD. *
|
||||
# * *
|
||||
# * FreeCAD is free software: you can redistribute it and/or modify it *
|
||||
# * under the terms of the GNU Lesser General Public License as *
|
||||
# * published by the Free Software Foundation, either version 2.1 of the *
|
||||
# * License, or (at your option) any later version. *
|
||||
# * *
|
||||
# * FreeCAD is distributed in the hope that it will be useful, but *
|
||||
# * WITHOUT ANY WARRANTY; without even the implied warranty of *
|
||||
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
|
||||
# * Lesser General Public License for more details. *
|
||||
# * *
|
||||
# * You should have received a copy of the GNU Lesser General Public *
|
||||
# * License along with FreeCAD. If not, see *
|
||||
# * <https://www.gnu.org/licenses/>. *
|
||||
# * *
|
||||
# ***************************************************************************
|
||||
|
||||
""" Wrapper around git executable to simplify calling git commands from Python. """
|
||||
|
||||
# pylint: disable=too-few-public-methods
|
||||
|
||||
import os
|
||||
import platform
|
||||
import shutil
|
||||
import subprocess
|
||||
from typing import List
|
||||
import FreeCAD
|
||||
|
||||
|
||||
class NoGitFound(RuntimeError):
|
||||
"""Could not locate the git executable on this system."""
|
||||
|
||||
|
||||
class GitFailed(RuntimeError):
|
||||
"""The call to git returned an error of some kind"""
|
||||
|
||||
|
||||
class GitManager:
|
||||
"""A class to manage access to git: mostly just provides a simple wrapper around the basic
|
||||
command-line calls. Provides optional asynchronous access to clone and update."""
|
||||
|
||||
def __init__(self):
|
||||
self.git_exe = None
|
||||
self._find_git()
|
||||
if not self.git_exe:
|
||||
raise NoGitFound()
|
||||
|
||||
def clone(self, remote, local_path, args: List[str] = None):
|
||||
"""Clones the remote to the local path"""
|
||||
final_args = ["clone", "--recurse-submodules"]
|
||||
if args:
|
||||
final_args.extend(args)
|
||||
final_args.extend([remote, local_path])
|
||||
self._synchronous_call_git(final_args)
|
||||
|
||||
def async_clone(self, remote, local_path, progress_monitor, args: List[str] = None):
|
||||
"""Clones the remote to the local path, sending periodic progress updates
|
||||
to the passed progress_monitor. Returns a handle that can be used to
|
||||
cancel the job."""
|
||||
|
||||
def checkout(self, local_path, spec, args: List[str] = None):
|
||||
"""Checks out a specific git revision, tag, or branch. Any valid argument to
|
||||
git checkout can be submitted."""
|
||||
old_dir = os.getcwd()
|
||||
os.chdir(local_path)
|
||||
final_args = ["checkout"]
|
||||
if args:
|
||||
final_args.extend(args)
|
||||
final_args.append(spec)
|
||||
self._synchronous_call_git(final_args)
|
||||
os.chdir(old_dir)
|
||||
|
||||
def update(self, local_path):
|
||||
"""Fetches and pulls the local_path from its remote"""
|
||||
old_dir = os.getcwd()
|
||||
os.chdir(local_path)
|
||||
self._synchronous_call_git(["fetch"])
|
||||
self._synchronous_call_git(["pull"])
|
||||
self._synchronous_call_git(["submodule", "update", "--init", "--recursive"])
|
||||
os.chdir(old_dir)
|
||||
|
||||
def status(self, local_path) -> str:
|
||||
"""Gets the v1 porcelain status"""
|
||||
old_dir = os.getcwd()
|
||||
os.chdir(local_path)
|
||||
status = self._synchronous_call_git(["status", "-sb", "--porcelain"])
|
||||
os.chdir(old_dir)
|
||||
return status
|
||||
|
||||
def reset(self, local_path, args: List[str] = None):
|
||||
"""Executes the git reset command"""
|
||||
old_dir = os.getcwd()
|
||||
os.chdir(local_path)
|
||||
final_args = ["reset"]
|
||||
if args:
|
||||
final_args.extend(args)
|
||||
self._synchronous_call_git(final_args)
|
||||
os.chdir(old_dir)
|
||||
|
||||
def async_fetch_and_update(self, local_path, progress_monitor, args=None):
|
||||
"""Same as fetch_and_update, but asynchronous"""
|
||||
|
||||
def update_available(self, local_path) -> bool:
|
||||
"""Returns True if an update is available from the remote, or false if not"""
|
||||
old_dir = os.getcwd()
|
||||
os.chdir(local_path)
|
||||
self._synchronous_call_git(["fetch"])
|
||||
status = self._synchronous_call_git(["status", "-sb", "--porcelain"])
|
||||
os.chdir(old_dir)
|
||||
return "behind" in status
|
||||
|
||||
def current_tag(self, local_path) -> str:
|
||||
"""Get the name of the currently checked-out tag if HEAD is detached"""
|
||||
old_dir = os.getcwd()
|
||||
os.chdir(local_path)
|
||||
tag = self._synchronous_call_git(["describe", "--tags"]).strip()
|
||||
os.chdir(old_dir)
|
||||
return tag
|
||||
|
||||
def current_branch(self, local_path) -> str:
|
||||
"""Get the name of the current branch"""
|
||||
old_dir = os.getcwd()
|
||||
os.chdir(local_path)
|
||||
branch = self._synchronous_call_git(["branch", "--show-current"]).strip()
|
||||
os.chdir(old_dir)
|
||||
return branch
|
||||
|
||||
def repair(self, remote, local_path):
|
||||
"""Assumes that local_path is supposed to be a local clone of the given remote, and
|
||||
ensures that it is. Note that any local changes in local_path will be destroyed. This
|
||||
is achieved by archiving the old path, cloning an entirely new copy, and then deleting
|
||||
the old directory."""
|
||||
old_path = local_path + ".bak"
|
||||
os.rename(local_path, old_path)
|
||||
try:
|
||||
self.clone(remote, local_path)
|
||||
except GitFailed as e:
|
||||
shutil.rmtree(local_path)
|
||||
os.rename(old_path, local_path)
|
||||
raise e
|
||||
|
||||
def _find_git(self):
|
||||
# Find git. In preference order
|
||||
# A) The value of the GitExecutable user preference
|
||||
# B) The executable located in the same bin directory as FreeCAD and called "git"
|
||||
# C) The result of an shutil search for your system's "git" executable
|
||||
prefs = FreeCAD.ParamGet("User parameter:BaseApp/Preferences/Addons")
|
||||
git_exe = prefs.GetString("GitExecutable", "Not set")
|
||||
if not git_exe or git_exe == "Not set" or not os.path.exists(git_exe):
|
||||
fc_dir = FreeCAD.getHomePath()
|
||||
git_exe = os.path.join(fc_dir, "bin", "git")
|
||||
if "Windows" in platform.system():
|
||||
git_exe += ".exe"
|
||||
|
||||
if not git_exe or not os.path.exists(git_exe):
|
||||
git_exe = shutil.which("git")
|
||||
|
||||
if not git_exe or not os.path.exists(git_exe):
|
||||
return
|
||||
|
||||
prefs.SetString("GitExecutable", git_exe)
|
||||
self.git_exe = git_exe
|
||||
|
||||
def _synchronous_call_git(self, args: List[str]) -> str:
|
||||
"""Calls git and returns its output."""
|
||||
final_args = [self.git_exe]
|
||||
final_args.extend(args)
|
||||
try:
|
||||
proc = subprocess.run(
|
||||
final_args,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
shell=True,
|
||||
check=True,
|
||||
)
|
||||
except subprocess.CalledProcessError as e:
|
||||
raise GitFailed(str(e)) from e
|
||||
|
||||
if proc.returncode != 0:
|
||||
raise GitFailed(
|
||||
f"Git returned a non-zero exit status: {proc.returncode}\n"
|
||||
+ f"Called with: {' '.join(final_args)}\n\n"
|
||||
+ f"Returned stderr:\n{proc.stderr.decode()}"
|
||||
)
|
||||
|
||||
return proc.stdout.decode()
|
||||
Reference in New Issue
Block a user