Addon Manager: Worker refactor (round 1)

This commit is contained in:
Chris Hennes
2022-08-02 12:44:07 -05:00
parent 71dc5ead2f
commit 2e51954b2b
16 changed files with 2145 additions and 1814 deletions

View File

@@ -29,13 +29,31 @@ import shutil
import stat
import tempfile
import hashlib
import threading
import json
from datetime import date, timedelta
from typing import Dict, List
from PySide2 import QtGui, QtCore, QtWidgets
import FreeCAD
import FreeCADGui
from addonmanager_workers import *
from addonmanager_workers_startup import (
CreateAddonListWorker,
LoadPackagesFromCacheWorker,
LoadMacrosFromCacheWorker,
CheckWorkbenchesForUpdatesWorker,
CacheMacroCodeWorker,
GetMacroDetailsWorker,
)
from addonmanager_workers_installation import (
InstallWorkbenchWorker,
DependencyInstallationWorker,
UpdateMetadataCacheWorker,
UpdateAllWorker,
UpdateSingleWorker,
)
from addonmanager_workers_utility import ConnectionChecker
import addonmanager_utilities as utils
import AddonManager_rc
from package_list import PackageList, PackageListItemModel
@@ -51,7 +69,7 @@ from manage_python_dependencies import (
PythonPackageManager,
)
from NetworkManager import HAVE_QTNETWORK, InitializeNetworkManager
import NetworkManager
translate = FreeCAD.Qt.translate
@@ -91,7 +109,7 @@ class CommandAddonManager:
workers = [
"connection_checker",
"update_worker",
"create_addon_list_worker",
"check_worker",
"show_worker",
"showmacro_worker",
@@ -151,7 +169,7 @@ class CommandAddonManager:
def Activated(self) -> None:
InitializeNetworkManager()
NetworkManager.InitializeNetworkManager()
# display first use dialog if needed
pref = FreeCAD.ParamGet("User parameter:BaseApp/Preferences/Addons")
@@ -454,7 +472,7 @@ class CommandAddonManager:
self.update_cache = True
elif not os.path.isdir(am_path):
self.update_cache = True
stopfile = self.get_cache_file_name("CACHE_UPDATE_INTERRUPTED")
stopfile = utils.get_cache_file_name("CACHE_UPDATE_INTERRUPTED")
if os.path.exists(stopfile):
self.update_cache = True
os.remove(stopfile)
@@ -619,19 +637,13 @@ class CommandAddonManager:
pref.SetString("LastCacheUpdate", date.today().isoformat())
self.packageList.item_filter.invalidateFilter()
def get_cache_file_name(self, file: str) -> str:
cache_path = FreeCAD.getUserCachePath()
am_path = os.path.join(cache_path, "AddonManager")
os.makedirs(am_path, exist_ok=True)
return os.path.join(am_path, file)
def populate_packages_table(self) -> None:
self.item_model.clear()
use_cache = not self.update_cache
if use_cache:
if os.path.isfile(self.get_cache_file_name("package_cache.json")):
with open(self.get_cache_file_name("package_cache.json"), "r") as f:
if os.path.isfile(utils.get_cache_file_name("package_cache.json")):
with open(utils.get_cache_file_name("package_cache.json"), "r") as f:
data = f.read()
try:
from_json = json.loads(data)
@@ -644,24 +656,24 @@ class CommandAddonManager:
if not use_cache:
self.update_cache = True # Make sure to trigger the other cache updates, if the json file was missing
self.update_worker = UpdateWorker()
self.update_worker.status_message.connect(self.show_information)
self.update_worker.addon_repo.connect(self.add_addon_repo)
self.create_addon_list_worker = CreateAddonListWorker()
self.create_addon_list_worker.status_message.connect(self.show_information)
self.create_addon_list_worker.addon_repo.connect(self.add_addon_repo)
self.update_progress_bar(10, 100)
self.update_worker.finished.connect(
self.create_addon_list_worker.finished.connect(
self.do_next_startup_phase
) # Link to step 2
self.update_worker.start()
self.create_addon_list_worker.start()
else:
self.update_worker = LoadPackagesFromCacheWorker(
self.get_cache_file_name("package_cache.json")
self.create_addon_list_worker = LoadPackagesFromCacheWorker(
utils.get_cache_file_name("package_cache.json")
)
self.update_worker.addon_repo.connect(self.add_addon_repo)
self.create_addon_list_worker.addon_repo.connect(self.add_addon_repo)
self.update_progress_bar(10, 100)
self.update_worker.finished.connect(
self.create_addon_list_worker.finished.connect(
self.do_next_startup_phase
) # Link to step 2
self.update_worker.start()
self.create_addon_list_worker.start()
def cache_package(self, repo: Addon):
if not hasattr(self, "package_cache"):
@@ -670,7 +682,7 @@ class CommandAddonManager:
def write_package_cache(self):
if hasattr(self, "package_cache"):
package_cache_path = self.get_cache_file_name("package_cache.json")
package_cache_path = utils.get_cache_file_name("package_cache.json")
with open(package_cache_path, "w") as f:
f.write(json.dumps(self.package_cache, indent=" "))
@@ -680,23 +692,29 @@ class CommandAddonManager:
self.do_next_startup_phase()
def populate_macros(self) -> None:
macro_cache_file = self.get_cache_file_name("macro_cache.json")
macro_cache_file = utils.get_cache_file_name("macro_cache.json")
cache_is_bad = True
if os.path.isfile(macro_cache_file):
size = os.path.getsize(macro_cache_file)
if size > 1000: # Make sure there is actually data in there
cache_is_bad = False
if self.update_cache or cache_is_bad:
self.update_cache = True
self.macro_worker = FillMacroListWorker(self.get_cache_file_name("Macros"))
self.macro_worker.status_message_signal.connect(self.show_information)
self.macro_worker.progress_made.connect(self.update_progress_bar)
self.macro_worker.add_macro_signal.connect(self.add_addon_repo)
self.macro_worker.finished.connect(self.do_next_startup_phase)
self.macro_worker.start()
if cache_is_bad:
if not self.update_cache:
self.update_cache = True # Make sure to trigger the other cache updates, if the json file was missing
self.create_addon_list_worker = CreateAddonListWorker()
self.create_addon_list_worker.status_message.connect(self.show_information)
self.create_addon_list_worker.addon_repo.connect(self.add_addon_repo)
self.update_progress_bar(10, 100)
self.create_addon_list_worker.finished.connect(
self.do_next_startup_phase
) # Link to step 2
self.create_addon_list_worker.start()
else:
# It's already been done in the previous step (TODO: Refactor to eliminate this step)
self.do_next_startup_phase()
else:
self.macro_worker = LoadMacrosFromCacheWorker(
self.get_cache_file_name("macro_cache.json")
utils.get_cache_file_name("macro_cache.json")
)
self.macro_worker.add_macro_signal.connect(self.add_addon_repo)
self.macro_worker.finished.connect(self.do_next_startup_phase)
@@ -715,7 +733,7 @@ class CommandAddonManager:
def write_macro_cache(self):
if not hasattr(self, "macro_cache"):
return
macro_cache_path = self.get_cache_file_name("macro_cache.json")
macro_cache_path = utils.get_cache_file_name("macro_cache.json")
with open(macro_cache_path, "w") as f:
f.write(json.dumps(self.macro_cache, indent=" "))
self.macro_cache = []
@@ -768,7 +786,9 @@ class CommandAddonManager:
def load_macro_metadata(self) -> None:
if self.update_cache:
self.load_macro_metadata_worker = CacheMacroCode(self.item_model.repos)
self.load_macro_metadata_worker = CacheMacroCodeWorker(
self.item_model.repos
)
self.load_macro_metadata_worker.status_message.connect(
self.show_information
)
@@ -1551,7 +1571,7 @@ class CommandAddonManager:
)
def write_cache_stopfile(self) -> None:
stopfile = self.get_cache_file_name("CACHE_UPDATE_INTERRUPTED")
stopfile = utils.get_cache_file_name("CACHE_UPDATE_INTERRUPTED")
with open(stopfile, "w", encoding="utf8") as f:
f.write(
"This file indicates that a cache operation was interrupted, and "

View File

@@ -0,0 +1,36 @@
# -*- coding: utf-8 -*-
# ***************************************************************************
# * Copyright (c) 2022 FreeCAD Project Association *
# * *
# * This file is part of the FreeCAD CAx development system. *
# * *
# * This library is free software; you can redistribute it and/or *
# * modify it under the terms of the GNU Lesser General Public *
# * License as published by the Free Software Foundation; either *
# * version 2.1 of the License, or (at your option) any later version. *
# * *
# * This library is distributed in the hope that it will be useful, *
# * but WITHOUT ANY WARRANTY; without even the implied warranty of *
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
# * Lesser General Public License for more details. *
# * *
# * You should have received a copy of the GNU Lesser General Public *
# * License along with this library; if not, write to the Free Software *
# * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA *
# * 02110-1301 USA *
# * *
# ***************************************************************************
import unittest
import os
import tempfile
import FreeCAD
class TestWorkersInstallation(unittest.TestCase):
MODULE = "test_workers_installation" # file name without extension
def setUp(self):
pass

View File

@@ -0,0 +1,36 @@
# -*- coding: utf-8 -*-
# ***************************************************************************
# * Copyright (c) 2022 FreeCAD Project Association *
# * *
# * This file is part of the FreeCAD CAx development system. *
# * *
# * This library is free software; you can redistribute it and/or *
# * modify it under the terms of the GNU Lesser General Public *
# * License as published by the Free Software Foundation; either *
# * version 2.1 of the License, or (at your option) any later version. *
# * *
# * This library is distributed in the hope that it will be useful, *
# * but WITHOUT ANY WARRANTY; without even the implied warranty of *
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
# * Lesser General Public License for more details. *
# * *
# * You should have received a copy of the GNU Lesser General Public *
# * License along with this library; if not, write to the Free Software *
# * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA *
# * 02110-1301 USA *
# * *
# ***************************************************************************
import unittest
import os
import tempfile
import FreeCAD
class TestWorkersStartup(unittest.TestCase):
MODULE = "test_workers_startup" # file name without extension
def setUp(self):
pass

View File

@@ -0,0 +1,73 @@
# -*- coding: utf-8 -*-
# ***************************************************************************
# * Copyright (c) 2022 FreeCAD Project Association *
# * *
# * This file is part of the FreeCAD CAx development system. *
# * *
# * This library is free software; you can redistribute it and/or *
# * modify it under the terms of the GNU Lesser General Public *
# * License as published by the Free Software Foundation; either *
# * version 2.1 of the License, or (at your option) any later version. *
# * *
# * This library is distributed in the hope that it will be useful, *
# * but WITHOUT ANY WARRANTY; without even the implied warranty of *
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
# * Lesser General Public License for more details. *
# * *
# * You should have received a copy of the GNU Lesser General Public *
# * License along with this library; if not, write to the Free Software *
# * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA *
# * 02110-1301 USA *
# * *
# ***************************************************************************
import unittest
import os
import FreeCAD
from addonmanager_workers_utility import ConnectionChecker
from PySide2 import QtCore
import NetworkManager
class TestWorkersUtility(unittest.TestCase):
MODULE = "test_workers_utility" # file name without extension
def setUp(self):
self.test_dir = os.path.join(FreeCAD.getHomePath(), "Mod", "AddonManager", "AddonManagerTest", "data")
self.last_result = None
url = "https://api.github.com/zen"
NetworkManager.InitializeNetworkManager()
result = NetworkManager.AM_NETWORK_MANAGER.blocking_get(url)
if result is None:
self.skipTest("No active internet connection detected")
def test_connection_checker_basic(self):
""" Tests the connection checking worker's basic operation: does not exit until worker thread completes """
worker = ConnectionChecker()
worker.success.connect(self.connection_succeeded)
worker.failure.connect(self.connection_failed)
self.last_result = None
worker.start()
while worker.isRunning():
QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.AllEvents, 50)
self.assertEqual(self.last_result,"SUCCESS")
def test_connection_checker_thread_interrupt(self):
worker = ConnectionChecker()
worker.success.connect(self.connection_succeeded)
worker.failure.connect(self.connection_failed)
self.last_result = None
worker.start()
worker.requestInterruption()
while worker.isRunning():
QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.AllEvents, 50)
self.assertIsNone(self.last_result, "Requesting interruption of thread failed to interrupt")
def connection_succeeded(self):
self.last_result = "SUCCESS"
def connection_failed(self):
self.last_result = "FAILURE"

View File

@@ -9,7 +9,9 @@ SET(AddonManager_SRCS
AddonManager.ui
addonmanager_macro.py
addonmanager_utilities.py
addonmanager_workers.py
addonmanager_workers_startup.py
addonmanager_workers_installation.py
addonmanager_workers_utility.py
AddonManagerOptions.ui
ALLOWED_PYTHON_PACKAGES.txt
change_branch.py
@@ -51,6 +53,9 @@ SET(AddonManagerTestsApp_SRCS
SET(AddonManagerTestsGui_SRCS
AddonManagerTest/gui/__init__.py
AddonManagerTest/gui/test_gui.py
AddonManagerTest/gui/test_workers_installation.py
AddonManagerTest/gui/test_workers_startup.py
AddonManagerTest/gui/test_workers_utility.py
)
SET(AddonManagerTestsFiles_SRCS

View File

@@ -75,6 +75,7 @@ except ImportError:
HAVE_FREECAD = False
from PySide2 import QtCore
if FreeCAD.GuiUp:
from PySide2 import QtWidgets
@@ -154,7 +155,8 @@ if HAVE_QTNETWORK:
self.synchronous_result_data: Dict[int, QtCore.QByteArray] = {}
# Make sure we exit nicely on quit
QtCore.QCoreApplication.instance().aboutToQuit.connect(self.__aboutToQuit)
if QtCore.QCoreApplication.instance() is not None:
QtCore.QCoreApplication.instance().aboutToQuit.connect(self.__aboutToQuit)
# Create the QNAM on this thread:
self.QNAM = QtNetwork.QNetworkAccessManager()
@@ -179,13 +181,23 @@ if HAVE_QTNETWORK:
self.__request_queued.connect(self.__setup_network_request)
def _setup_proxy(self):
""" Set up the proxy based on user preferences or prompts on command line """
"""Set up the proxy based on user preferences or prompts on command line"""
# Set up the proxy, if necesssary:
if HAVE_FREECAD:
noProxyCheck, systemProxyCheck, userProxyCheck, proxy_string = self._setup_proxy_freecad()
(
noProxyCheck,
systemProxyCheck,
userProxyCheck,
proxy_string,
) = self._setup_proxy_freecad()
else:
noProxyCheck, systemProxyCheck, userProxyCheck, proxy_string = self._setup_proxy_standalone()
(
noProxyCheck,
systemProxyCheck,
userProxyCheck,
proxy_string,
) = self._setup_proxy_standalone()
if noProxyCheck:
pass
@@ -208,7 +220,7 @@ if HAVE_QTNETWORK:
self.QNAM.setProxy(proxy)
def _setup_proxy_freecad(self):
""" If we are running within FreeCAD, this uses the config data to set up the proxy """
"""If we are running within FreeCAD, this uses the config data to set up the proxy"""
noProxyCheck = True
systemProxyCheck = False
userProxyCheck = False
@@ -252,7 +264,7 @@ if HAVE_QTNETWORK:
return noProxyCheck, systemProxyCheck, userProxyCheck, proxy_string
def _setup_proxy_standalone(self):
""" If we are NOT running inside FreeCAD, prompt the user for proxy information """
"""If we are NOT running inside FreeCAD, prompt the user for proxy information"""
noProxyCheck = True
systemProxyCheck = False
userProxyCheck = False

View File

@@ -25,5 +25,19 @@
# Unit test for the Addon Manager module GUI
from AddonManagerTest.gui.test_gui import TestGui as AddonManagerTestGui
from AddonManagerTest.gui.test_workers_utility import (
TestWorkersUtility as AddonManagerTestWorkersUtility,
)
from AddonManagerTest.gui.test_workers_startup import (
TestWorkersStartup as AddonManagerTestWorkersStartup,
)
from AddonManagerTest.gui.test_workers_installation import (
TestWorkersInstallation as AddonManagerTestWorkersInstallation,
)
# dummy usage to get flake8 and lgtm quiet
False if AddonManagerTestGui.__name__ else True
False if AddonManagerTestWorkersUtility.__name__ else True
False if AddonManagerTestWorkersStartup.__name__ else True
False if AddonManagerTestWorkersInstallation.__name__ else True

View File

@@ -431,7 +431,8 @@ class Macro(object):
translate(
"AddonsInstaller",
"Unable to fetch macro-specified file {} from {}",
).format(other_file, fetch_url) + "\n"
).format(other_file, fetch_url)
+ "\n"
)
else:
warnings.append(

View File

@@ -381,3 +381,9 @@ def get_python_exe() -> str:
prefs.SetString("PythonExecutableForPip", python_exe)
return python_exe
def get_cache_file_name(file: str) -> str:
cache_path = FreeCAD.getUserCachePath()
am_path = os.path.join(cache_path, "AddonManager")
os.makedirs(am_path, exist_ok=True)
return os.path.join(am_path, file)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,898 @@
# ***************************************************************************
# * *
# * Copyright (c) 2022 FreeCAD Project Association *
# * Copyright (c) 2019 Yorik van Havre <yorik@uncreated.net> *
# * *
# * This library is free software; you can redistribute it and/or *
# * modify it under the terms of the GNU Lesser General Public *
# * License as published by the Free Software Foundation; either *
# * version 2.1 of the License, or (at your option) any later version. *
# * *
# * This library is distributed in the hope that it will be useful, *
# * but WITHOUT ANY WARRANTY; without even the implied warranty of *
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
# * Lesser General Public License for more details. *
# * *
# * You should have received a copy of the GNU Lesser General Public *
# * License along with this library; if not, write to the Free Software *
# * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA *
# * 02110-1301 USA *
# * *
# ***************************************************************************
""" Worker thread classes for Addon Manager installation and removal """
import hashlib
import io
import itertools
import json
import os
import platform
import queue
import re
import shutil
import stat
import subprocess
import sys
import tempfile
import threading
import time
import zipfile
from datetime import datetime
from typing import Union, List, Dict
from enum import Enum, auto
from PySide2 import QtCore
import FreeCAD
import addonmanager_utilities as utils
from addonmanager_macro import Macro
from Addon import Addon
import NetworkManager
have_git = False
try:
import git
# Some types of Python installation will fall back to finding a directory called "git"
# in certain locations instead of a Python package called git: that directory is unlikely
# to have the "Repo" attribute unless it is a real installation, however, so this check
# should catch that. (Bug #4072)
have_git = hasattr(git, "Repo")
if not have_git:
FreeCAD.Console.PrintMessage(
"Unable to locate a viable GitPython installation: falling back to ZIP installation."
)
except ImportError:
pass
translate = FreeCAD.Qt.translate
# @package AddonManager_workers
# \ingroup ADDONMANAGER
# \brief Multithread workers for the addon manager
# @{
NOGIT = False # for debugging purposes, set this to True to always use http downloads
class InstallWorkbenchWorker(QtCore.QThread):
"This worker installs a workbench"
status_message = QtCore.Signal(str)
progress_made = QtCore.Signal(int, int)
success = QtCore.Signal(Addon, str)
failure = QtCore.Signal(Addon, str)
def __init__(self, repo: Addon):
QtCore.QThread.__init__(self)
self.repo = repo
self.update_timer = QtCore.QTimer()
self.update_timer.setInterval(100)
self.update_timer.timeout.connect(self.update_status)
self.update_timer.start()
def run(self):
"installs or updates the selected addon"
if not self.repo:
return
if not have_git or NOGIT:
FreeCAD.Console.PrintLog(
translate(
"AddonsInstaller",
"GitPython not found. Using ZIP file download instead.",
)
+ "\n"
)
if not have_zip:
FreeCAD.Console.PrintError(
translate(
"AddonsInstaller",
"Your version of Python doesn't appear to support ZIP files. Unable to proceed.",
)
+ "\n"
)
return
basedir = FreeCAD.getUserAppDataDir()
moddir = basedir + os.sep + "Mod"
if not os.path.exists(moddir):
os.makedirs(moddir)
target_dir = moddir + os.sep + self.repo.name
if have_git and not NOGIT:
# Do the git process...
self.run_git(target_dir)
else:
# The zip process uses an event loop, since the download can potentially be quite large
self.launch_zip(target_dir)
self.zip_complete = False
current_thread = QtCore.QThread.currentThread()
while not self.zip_complete:
if current_thread.isInterruptionRequested():
return
QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.AllEvents, 50)
def update_status(self) -> None:
if hasattr(self, "git_progress") and self.isRunning():
self.progress_made.emit(self.git_progress.current, self.git_progress.total)
self.status_message.emit(self.git_progress.message)
def run_git(self, clonedir: str) -> None:
if NOGIT or not have_git:
FreeCAD.Console.PrintLog(
translate(
"AddonsInstaller",
"No Git Python installed, skipping git operations",
)
+ "\n"
)
return
self.git_progress = GitProgressMonitor()
if os.path.exists(clonedir):
self.run_git_update(clonedir)
else:
self.run_git_clone(clonedir)
def run_git_update(self, clonedir: str) -> None:
self.status_message.emit("Updating module...")
with self.repo.git_lock:
if not os.path.exists(clonedir + os.sep + ".git"):
utils.repair_git_repo(self.repo.url, clonedir)
repo = git.Git(clonedir)
try:
repo.pull("--ff-only") # Refuses to take a progress object?
if self.repo.contains_workbench():
answer = translate(
"AddonsInstaller",
"Workbench successfully updated. Please restart FreeCAD to apply the changes.",
)
else:
answer = translate(
"AddonsInstaller",
"Workbench successfully updated.",
)
except Exception as e:
answer = (
translate("AddonsInstaller", "Error updating module")
+ " "
+ self.repo.name
+ " - "
+ translate("AddonsInstaller", "Please fix manually")
+ " -- \n"
)
answer += str(e)
self.failure.emit(self.repo, answer)
else:
# Update the submodules for this repository
repo_sms = git.Repo(clonedir)
self.status_message.emit("Updating submodules...")
for submodule in repo_sms.submodules:
submodule.update(init=True, recursive=True)
self.update_metadata()
self.success.emit(self.repo, answer)
def run_git_clone(self, clonedir: str) -> None:
self.status_message.emit("Cloning module...")
current_thread = QtCore.QThread.currentThread()
FreeCAD.Console.PrintMessage("Cloning repo...\n")
if self.repo.git_lock.locked():
FreeCAD.Console.PrintMessage("Waiting for lock to be released to us...\n")
if not self.repo.git_lock.acquire(timeout=2):
FreeCAD.Console.PrintError(
"Timeout waiting for a lock on the git process, failed to clone repo\n"
)
return
else:
self.repo.git_lock.release()
with self.repo.git_lock:
FreeCAD.Console.PrintMessage("Lock acquired...\n")
# NOTE: There is no way to interrupt this process in GitPython: someday we should
# support pygit2/libgit2 so we can actually interrupt this properly.
repo = git.Repo.clone_from(
self.repo.url, clonedir, progress=self.git_progress
)
FreeCAD.Console.PrintMessage("Initial clone complete...\n")
if current_thread.isInterruptionRequested():
return
# Make sure to clone all the submodules as well
if repo.submodules:
FreeCAD.Console.PrintMessage("Updating submodules...\n")
repo.submodule_update(recursive=True)
if current_thread.isInterruptionRequested():
return
if self.repo.branch in repo.heads:
FreeCAD.Console.PrintMessage("Checking out HEAD...\n")
repo.heads[self.repo.branch].checkout()
FreeCAD.Console.PrintMessage("Clone complete\n")
if self.repo.contains_workbench():
answer = translate(
"AddonsInstaller",
"Workbench successfully installed. Please restart FreeCAD to apply the changes.",
)
else:
answer = translate(
"AddonsInstaller",
"Addon successfully installed.",
)
if self.repo.repo_type == Addon.Kind.WORKBENCH:
# symlink any macro contained in the module to the macros folder
macro_dir = FreeCAD.getUserMacroDir(True)
if not os.path.exists(macro_dir):
os.makedirs(macro_dir)
if os.path.exists(clonedir):
for f in os.listdir(clonedir):
if f.lower().endswith(".fcmacro"):
try:
utils.symlink(
os.path.join(clonedir, f), os.path.join(macro_dir, f)
)
except OSError:
# If the symlink failed (e.g. for a non-admin user on Windows), copy the macro instead
shutil.copy(
os.path.join(clonedir, f), os.path.join(macro_dir, f)
)
FreeCAD.ParamGet(
"User parameter:Plugins/" + self.repo.name
).SetString("destination", clonedir)
answer += "\n\n" + translate(
"AddonsInstaller",
"A macro has been installed and is available under Macro -> Macros menu",
)
answer += ":\n<b>" + f + "</b>"
self.update_metadata()
self.success.emit(self.repo, answer)
def launch_zip(self, zipdir: str) -> None:
"downloads and unzip a zip version from a git repo"
bakdir = None
if os.path.exists(zipdir):
bakdir = zipdir + ".bak"
if os.path.exists(bakdir):
shutil.rmtree(bakdir)
os.rename(zipdir, bakdir)
os.makedirs(zipdir)
zipurl = utils.get_zip_url(self.repo)
if not zipurl:
self.failure.emit(
self.repo,
translate("AddonsInstaller", "Error: Unable to locate ZIP from")
+ " "
+ self.repo.name,
)
return
self.zipdir = zipdir
self.bakdir = bakdir
NetworkManager.AM_NETWORK_MANAGER.progress_made.connect(self.update_zip_status)
NetworkManager.AM_NETWORK_MANAGER.progress_complete.connect(self.finish_zip)
self.zip_download_index = (
NetworkManager.AM_NETWORK_MANAGER.submit_monitored_get(zipurl)
)
def update_zip_status(self, index: int, bytes_read: int, data_size: int):
if index == self.zip_download_index:
locale = QtCore.QLocale()
if data_size > 10 * 1024 * 1024: # To avoid overflows, show MB instead
MB_read = bytes_read / 1024 / 1024
MB_total = data_size / 1024 / 1024
self.progress_made.emit(MB_read, MB_total)
mbytes_str = locale.toString(MB_read)
mbytes_total_str = locale.toString(MB_total)
percent = int(100 * float(MB_read / MB_total))
self.status_message.emit(
translate(
"AddonsInstaller",
"Downloading: {mbytes_str}MB of {mbytes_total_str}MB ({percent}%)",
).format(
mbytes_str=mbytes_str,
mbytes_total_str=mbytes_total_str,
percent=percent,
)
)
elif data_size > 0:
self.progress_made.emit(bytes_read, data_size)
bytes_str = locale.toString(bytes_read)
bytes_total_str = locale.toString(data_size)
percent = int(100 * float(bytes_read / data_size))
self.status_message.emit(
translate(
"AddonsInstaller",
"Downloading: {bytes_str} of {bytes_total_str} bytes ({percent}%)",
).format(
bytes_str=bytes_str,
bytes_total_str=bytes_total_str,
percent=percent,
)
)
else:
MB_read = bytes_read / 1024 / 1024
bytes_str = locale.toString(MB_read)
self.status_message.emit(
translate(
"AddonsInstaller",
"Downloading: {bytes_str}MB of unknown total",
).format(bytes_str=bytes_str)
)
def finish_zip(self, index: int, response_code: int, filename: os.PathLike):
self.zip_complete = True
if response_code != 200:
self.failure.emit(
self.repo,
translate(
"AddonsInstaller",
"Error: Error while downloading ZIP file for {}",
).format(self.repo.display_name),
)
return
with zipfile.ZipFile(filename, "r") as zfile:
master = zfile.namelist()[0] # github will put everything in a subfolder
self.status_message.emit(
translate("AddonsInstaller", f"Download complete. Unzipping file...")
)
QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.AllEvents)
zfile.extractall(self.zipdir)
for filename in os.listdir(self.zipdir + os.sep + master):
shutil.move(
self.zipdir + os.sep + master + os.sep + filename,
self.zipdir + os.sep + filename,
)
os.rmdir(self.zipdir + os.sep + master)
if self.bakdir:
shutil.rmtree(self.bakdir)
self.update_metadata()
self.success.emit(
self.repo,
translate(
"AddonsInstaller",
"Successfully installed {} from ZIP file",
).format(self.repo.display_name),
)
def update_metadata(self):
basedir = FreeCAD.getUserAppDataDir()
package_xml = os.path.join(basedir, "Mod", self.repo.name, "package.xml")
if os.path.isfile(package_xml):
self.repo.load_metadata_file(package_xml)
self.repo.installed_version = self.repo.metadata.Version
self.repo.updated_timestamp = os.path.getmtime(package_xml)
class DependencyInstallationWorker(QtCore.QThread):
"""Install dependencies using Addonmanager for FreeCAD, and pip for python"""
no_python_exe = QtCore.Signal()
no_pip = QtCore.Signal(str) # Attempted command
failure = QtCore.Signal(str, str) # Short message, detailed message
success = QtCore.Signal()
def __init__(self, addons, python_required, python_optional):
QtCore.QThread.__init__(self)
self.addons = addons
self.python_required = python_required
self.python_optional = python_optional
def run(self):
for repo in self.addons:
if QtCore.QThread.currentThread().isInterruptionRequested():
return
worker = InstallWorkbenchWorker(repo)
worker.start()
while worker.isRunning():
if QtCore.QThread.currentThread().isInterruptionRequested():
worker.requestInterruption()
worker.wait()
return
time.sleep(0.1)
QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.AllEvents, 50)
if self.python_required or self.python_optional:
python_exe = utils.get_python_exe()
pip_failed = False
if python_exe:
try:
proc = subprocess.run(
[python_exe, "-m", "pip", "--version"], stdout=subprocess.PIPE
)
except subprocess.CalledProcessError as e:
pip_failed = True
if proc.returncode != 0:
pip_failed = True
else:
pip_failed = True
if pip_failed:
self.no_pip.emit(f"{python_exe} -m pip --version")
return
FreeCAD.Console.PrintMessage(proc.stdout)
FreeCAD.Console.PrintWarning(proc.stderr)
result = proc.stdout
FreeCAD.Console.PrintMessage(result.decode())
vendor_path = os.path.join(
FreeCAD.getUserAppDataDir(), "AdditionalPythonPackages"
)
if not os.path.exists(vendor_path):
os.makedirs(vendor_path)
for pymod in self.python_required:
if QtCore.QThread.currentThread().isInterruptionRequested():
return
proc = subprocess.run(
[
python_exe,
"-m",
"pip",
"install",
"--disable-pip-version-check",
"--target",
vendor_path,
pymod,
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
# Note to self: how to list installed packages
# ./python.exe -m pip list --path ~/AppData/Roaming/FreeCAD/AdditionalPythonPackages
FreeCAD.Console.PrintMessage(proc.stdout.decode())
if proc.returncode != 0:
self.failure.emit(
translate(
"AddonsInstaller",
"Installation of Python package {} failed",
).format(pymod),
proc.stderr,
)
return
for pymod in self.python_optional:
if QtCore.QThread.currentThread().isInterruptionRequested():
return
proc = subprocess.run(
[python_exe, "-m", "pip", "install", "--target", vendor_path, pymod],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
FreeCAD.Console.PrintMessage(proc.stdout.decode())
if proc.returncode != 0:
self.failure.emit(
translate(
"AddonsInstaller",
"Installation of Python package {} failed",
).format(pymod),
proc.stderr,
)
return
self.success.emit()
class UpdateMetadataCacheWorker(QtCore.QThread):
"Scan through all available packages and see if our local copy of package.xml needs to be updated"
status_message = QtCore.Signal(str)
progress_made = QtCore.Signal(int, int)
package_updated = QtCore.Signal(Addon)
class RequestType(Enum):
PACKAGE_XML = auto()
METADATA_TXT = auto()
REQUIREMENTS_TXT = auto()
ICON = auto()
def __init__(self, repos):
QtCore.QThread.__init__(self)
self.repos = repos
self.requests: Dict[int, (Addon, UpdateMetadataCacheWorker.RequestType)] = {}
NetworkManager.AM_NETWORK_MANAGER.completed.connect(self.download_completed)
self.requests_completed = 0
self.total_requests = 0
self.store = os.path.join(
FreeCAD.getUserCachePath(), "AddonManager", "PackageMetadata"
)
self.updated_repos = set()
def run(self):
current_thread = QtCore.QThread.currentThread()
for repo in self.repos:
if repo.url and utils.recognized_git_location(repo):
# package.xml
index = NetworkManager.AM_NETWORK_MANAGER.submit_unmonitored_get(
utils.construct_git_url(repo, "package.xml")
)
self.requests[index] = (
repo,
UpdateMetadataCacheWorker.RequestType.PACKAGE_XML,
)
self.total_requests += 1
# metadata.txt
index = NetworkManager.AM_NETWORK_MANAGER.submit_unmonitored_get(
utils.construct_git_url(repo, "metadata.txt")
)
self.requests[index] = (
repo,
UpdateMetadataCacheWorker.RequestType.METADATA_TXT,
)
self.total_requests += 1
# requirements.txt
index = NetworkManager.AM_NETWORK_MANAGER.submit_unmonitored_get(
utils.construct_git_url(repo, "requirements.txt")
)
self.requests[index] = (
repo,
UpdateMetadataCacheWorker.RequestType.REQUIREMENTS_TXT,
)
self.total_requests += 1
while self.requests:
if current_thread.isInterruptionRequested():
NetworkManager.AM_NETWORK_MANAGER.completed.disconnect(
self.download_completed
)
for request in self.requests.keys():
NetworkManager.AM_NETWORK_MANAGER.abort(request)
return
# 50 ms maximum between checks for interruption
QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.AllEvents, 50)
# This set contains one copy of each of the repos that got some kind of data in
# this process. For those repos, tell the main Addon Manager code that it needs
# to update its copy of the repo, and redraw its information.
for repo in self.updated_repos:
self.package_updated.emit(repo)
def download_completed(
self, index: int, code: int, data: QtCore.QByteArray
) -> None:
if index in self.requests:
self.requests_completed += 1
self.progress_made.emit(self.requests_completed, self.total_requests)
request = self.requests.pop(index)
if code == 200: # HTTP success
self.updated_repos.add(request[0]) # mark this repo as updated
if request[1] == UpdateMetadataCacheWorker.RequestType.PACKAGE_XML:
self.process_package_xml(request[0], data)
elif request[1] == UpdateMetadataCacheWorker.RequestType.METADATA_TXT:
self.process_metadata_txt(request[0], data)
elif (
request[1] == UpdateMetadataCacheWorker.RequestType.REQUIREMENTS_TXT
):
self.process_requirements_txt(request[0], data)
elif request[1] == UpdateMetadataCacheWorker.RequestType.ICON:
self.process_icon(request[0], data)
def process_package_xml(self, repo: Addon, data: QtCore.QByteArray):
repo.repo_type = Addon.Kind.PACKAGE # By definition
package_cache_directory = os.path.join(self.store, repo.name)
if not os.path.exists(package_cache_directory):
os.makedirs(package_cache_directory)
new_xml_file = os.path.join(package_cache_directory, "package.xml")
with open(new_xml_file, "wb") as f:
f.write(data.data())
metadata = FreeCAD.Metadata(new_xml_file)
repo.metadata = metadata
self.status_message.emit(
translate("AddonsInstaller", "Downloaded package.xml for {}").format(
repo.name
)
)
# Grab a new copy of the icon as well: we couldn't enqueue this earlier because
# we didn't know the path to it, which is stored in the package.xml file.
icon = metadata.Icon
if not icon:
# If there is no icon set for the entire package, see if there are
# any workbenches, which are required to have icons, and grab the first
# one we find:
content = repo.metadata.Content
if "workbench" in content:
wb = content["workbench"][0]
if wb.Icon:
if wb.Subdirectory:
subdir = wb.Subdirectory
else:
subdir = wb.Name
repo.Icon = subdir + wb.Icon
icon = repo.Icon
icon_url = utils.construct_git_url(repo, icon)
index = NetworkManager.AM_NETWORK_MANAGER.submit_unmonitored_get(icon_url)
self.requests[index] = (repo, UpdateMetadataCacheWorker.RequestType.ICON)
self.total_requests += 1
def process_metadata_txt(self, repo: Addon, data: QtCore.QByteArray):
self.status_message.emit(
translate("AddonsInstaller", "Downloaded metadata.txt for {}").format(
repo.display_name
)
)
f = io.StringIO(data.data().decode("utf8"))
while True:
line = f.readline()
if not line:
break
if line.startswith("workbenches="):
depswb = line.split("=")[1].split(",")
for wb in depswb:
wb_name = wb.strip()
if wb_name:
repo.requires.add(wb_name)
FreeCAD.Console.PrintLog(
f"{repo.display_name} requires FreeCAD Addon '{wb_name}'\n"
)
elif line.startswith("pylibs="):
depspy = line.split("=")[1].split(",")
for pl in depspy:
dep = pl.strip()
if dep:
repo.python_requires.add(dep)
FreeCAD.Console.PrintLog(
f"{repo.display_name} requires python package '{dep}'\n"
)
elif line.startswith("optionalpylibs="):
opspy = line.split("=")[1].split(",")
for pl in opspy:
dep = pl.strip()
if dep:
repo.python_optional.add(dep)
FreeCAD.Console.PrintLog(
f"{repo.display_name} optionally imports python package '{pl.strip()}'\n"
)
# For review and debugging purposes, store the file locally
package_cache_directory = os.path.join(self.store, repo.name)
if not os.path.exists(package_cache_directory):
os.makedirs(package_cache_directory)
new_xml_file = os.path.join(package_cache_directory, "metadata.txt")
with open(new_xml_file, "wb") as f:
f.write(data.data())
def process_requirements_txt(self, repo: Addon, data: QtCore.QByteArray):
self.status_message.emit(
translate(
"AddonsInstaller",
"Downloaded requirements.txt for {}",
).format(repo.display_name)
)
f = io.StringIO(data.data().decode("utf8"))
lines = f.readlines()
for line in lines:
break_chars = " <>=~!+#"
package = line
for n, c in enumerate(line):
if c in break_chars:
package = line[:n].strip()
break
if package:
repo.python_requires.add(package)
# For review and debugging purposes, store the file locally
package_cache_directory = os.path.join(self.store, repo.name)
if not os.path.exists(package_cache_directory):
os.makedirs(package_cache_directory)
new_xml_file = os.path.join(package_cache_directory, "requirements.txt")
with open(new_xml_file, "wb") as f:
f.write(data.data())
def process_icon(self, repo: Addon, data: QtCore.QByteArray):
self.status_message.emit(
translate("AddonsInstaller", "Downloaded icon for {}").format(
repo.display_name
)
)
cache_file = repo.get_cached_icon_filename()
with open(cache_file, "wb") as icon_file:
icon_file.write(data.data())
repo.cached_icon_filename = cache_file
if have_git and not NOGIT:
class GitProgressMonitor(git.RemoteProgress):
"""An object that receives git progress updates and stores them for later display"""
def __init__(self):
super().__init__()
self.current = 0
self.total = 100
self.message = ""
def update(
self,
_: int,
cur_count: Union[str, float],
max_count: Union[str, float, None] = None,
message: str = "",
) -> None:
if max_count:
self.current = int(cur_count)
self.total = int(max_count)
if message:
self.message = message
class UpdateAllWorker(QtCore.QThread):
"""Update all listed packages, of any kind"""
progress_made = QtCore.Signal(int, int)
status_message = QtCore.Signal(str)
success = QtCore.Signal(Addon)
failure = QtCore.Signal(Addon)
# TODO: This should be re-written to be solidly single-threaded, some of the called code is not re-entrant
def __init__(self, repos):
super().__init__()
self.repos = repos
def run(self):
self.progress_made.emit(0, len(self.repos))
self.repo_queue = queue.Queue()
current_thread = QtCore.QThread.currentThread()
for repo in self.repos:
self.repo_queue.put(repo)
FreeCAD.Console.PrintLog(
f" UPDATER: Adding '{repo.name}' to update queue\n"
)
# The original design called for multiple update threads at the same time, but the updater
# itself is not thread-safe, so for the time being only spawn one update thread.
workers = []
for _ in range(1):
FreeCAD.Console.PrintLog(f" UPDATER: Starting worker\n")
worker = UpdateSingleWorker(self.repo_queue)
worker.success.connect(self.on_success)
worker.failure.connect(self.on_failure)
worker.start()
workers.append(worker)
while not self.repo_queue.empty():
if current_thread.isInterruptionRequested():
for worker in workers:
worker.blockSignals(True)
worker.requestInterruption()
worker.wait()
return
# Ensure our signals propagate out by running an internal thread-local event loop
QtCore.QCoreApplication.processEvents()
self.repo_queue.join()
# Make sure all of our child threads have fully exited:
for worker in workers:
worker.wait()
def on_success(self, repo: Addon) -> None:
FreeCAD.Console.PrintLog(
f" UPDATER: Main thread received notice that worker successfully updated {repo.name}\n"
)
self.progress_made.emit(
len(self.repos) - self.repo_queue.qsize(), len(self.repos)
)
self.success.emit(repo)
def on_failure(self, repo: Addon) -> None:
FreeCAD.Console.PrintLog(
f" UPDATER: Main thread received notice that worker failed to update {repo.name}\n"
)
self.progress_made.emit(
len(self.repos) - self.repo_queue.qsize(), len(self.repos)
)
self.failure.emit(repo)
class UpdateSingleWorker(QtCore.QThread):
success = QtCore.Signal(Addon)
failure = QtCore.Signal(Addon)
def __init__(self, repo_queue: queue.Queue):
super().__init__()
self.repo_queue = repo_queue
def run(self):
current_thread = QtCore.QThread.currentThread()
while True:
if current_thread.isInterruptionRequested():
FreeCAD.Console.PrintLog(
f" UPDATER: Interruption requested, stopping all updates\n"
)
return
try:
repo = self.repo_queue.get_nowait()
FreeCAD.Console.PrintLog(
f" UPDATER: Pulling {repo.name} from the update queue\n"
)
except queue.Empty:
FreeCAD.Console.PrintLog(
f" UPDATER: Worker thread queue is empty, exiting thread\n"
)
return
if repo.repo_type == Addon.Kind.MACRO:
FreeCAD.Console.PrintLog(f" UPDATER: Updating macro '{repo.name}'\n")
self.update_macro(repo)
else:
FreeCAD.Console.PrintLog(f" UPDATER: Updating addon '{repo.name}'\n")
self.update_package(repo)
self.repo_queue.task_done()
FreeCAD.Console.PrintLog(
f" UPDATER: Worker thread completed action for '{repo.name}' and reported result to main thread\n"
)
def update_macro(self, repo: Addon):
"""Updating a macro happens in this function, in the current thread"""
cache_path = os.path.join(
FreeCAD.getUserCachePath(), "AddonManager", "MacroCache"
)
os.makedirs(cache_path, exist_ok=True)
install_succeeded, _ = repo.macro.install(cache_path)
if install_succeeded:
install_succeeded, _ = repo.macro.install(FreeCAD.getUserMacroDir(True))
utils.update_macro_installation_details(repo)
if install_succeeded:
self.success.emit(repo)
else:
self.failure.emit(repo)
def update_package(self, repo: Addon):
"""Updating a package re-uses the package installation worker, so actually spawns another thread that we block on"""
worker = InstallWorkbenchWorker(repo)
worker.success.connect(lambda repo, _: self.success.emit(repo))
worker.failure.connect(lambda repo, _: self.failure.emit(repo))
worker.start()
while True:
# Ensure our signals propagate out by running an internal thread-local event loop
QtCore.QCoreApplication.processEvents()
if not worker.isRunning():
break
time.sleep(0.1) # Give the signal a moment to propagate to the other threads
QtCore.QCoreApplication.processEvents()
# @}

View File

@@ -0,0 +1,843 @@
# ***************************************************************************
# * *
# * Copyright (c) 2022 FreeCAD Project Association *
# * Copyright (c) 2019 Yorik van Havre <yorik@uncreated.net> *
# * *
# * This library is free software; you can redistribute it and/or *
# * modify it under the terms of the GNU Lesser General Public *
# * License as published by the Free Software Foundation; either *
# * version 2.1 of the License, or (at your option) any later version. *
# * *
# * This library is distributed in the hope that it will be useful, *
# * but WITHOUT ANY WARRANTY; without even the implied warranty of *
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
# * Lesser General Public License for more details. *
# * *
# * You should have received a copy of the GNU Lesser General Public *
# * License along with this library; if not, write to the Free Software *
# * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA *
# * 02110-1301 USA *
# * *
# ***************************************************************************
""" Worker thread classes for Addon Manager startup """
import hashlib
import io
import itertools
import json
import os
import platform
import queue
import re
import shutil
import stat
import subprocess
import sys
import tempfile
import threading
import time
import zipfile
from datetime import datetime
from typing import Union, List, Dict
from enum import Enum, auto
from PySide2 import QtCore
import FreeCAD
import addonmanager_utilities as utils
from addonmanager_macro import Macro
from Addon import Addon
import NetworkManager
have_git = False
try:
import git
# Some types of Python installation will fall back to finding a directory called "git"
# in certain locations instead of a Python package called git: that directory is unlikely
# to have the "Repo" attribute unless it is a real installation, however, so this check
# should catch that. (Bug #4072)
have_git = hasattr(git, "Repo")
if not have_git:
FreeCAD.Console.PrintMessage(
"Unable to locate a viable GitPython installation: falling back to ZIP installation."
)
except ImportError:
pass
NOGIT = False # for debugging purposes, set this to True to always use http downloads
translate = FreeCAD.Qt.translate
class CreateAddonListWorker(QtCore.QThread):
"""This worker updates the list of available workbenches, emitting an "addon_repo"
signal for each Addon as they are processed."""
status_message = QtCore.Signal(str)
addon_repo = QtCore.Signal(object)
def __init__(self):
QtCore.QThread.__init__(self)
# reject_listed addons
self.macros_reject_list = []
self.mod_reject_list = []
# These addons will print an additional message informing the user
self.obsolete = []
# These addons will print an additional message informing the user Python2 only
self.py2only = []
self.package_names = []
self.moddir = os.path.join(FreeCAD.getUserAppDataDir(), "Mod")
def run(self):
"populates the list of addons"
self.current_thread = QtCore.QThread.currentThread()
try:
self._get_freecad_addon_repo_data()
except ConnectionError as e:
return
self._get_custom_addons()
self._get_official_addons()
self._retrieve_macros_from_git()
self._retrieve_macros_from_wiki()
def _get_freecad_addon_repo_data(self):
# update info lists
p = NetworkManager.AM_NETWORK_MANAGER.blocking_get(
"https://raw.githubusercontent.com/FreeCAD/FreeCAD-addons/master/addonflags.json"
)
if p:
p = p.data().decode("utf8")
j = json.loads(p)
if "obsolete" in j and "Mod" in j["obsolete"]:
self.obsolete = j["obsolete"]["Mod"]
if "blacklisted" in j and "Macro" in j["blacklisted"]:
self.macros_reject_list = j["blacklisted"]["Macro"]
if "blacklisted" in j and "Mod" in j["blacklisted"]:
self.mod_reject_list = j["blacklisted"]["Mod"]
if "py2only" in j and "Mod" in j["py2only"]:
self.py2only = j["py2only"]["Mod"]
if "deprecated" in j:
fc_major = int(FreeCAD.Version()[0])
fc_minor = int(FreeCAD.Version()[1])
for item in j["deprecated"]:
if "as_of" in item and "name" in item:
try:
version_components = item["as_of"].split(".")
major = int(version_components[0])
if len(version_components) > 1:
minor = int(version_components[1])
else:
minor = 0
if major < fc_major or (
major == fc_major and minor <= fc_minor
):
if "kind" not in item or item["kind"] == "mod":
self.obsolete.append(item["name"])
elif item["kind"] == "macro":
self.macros_reject_list.append(item["name"])
else:
FreeCAD.Console.PrintMessage(
f'Unrecognized Addon kind {item["kind"]} in deprecation list.'
)
except Exception:
FreeCAD.Console.PrintMessage(
f"Exception caught when parsing deprecated Addon {item['name']}, version {item['as_of']}"
)
else:
message = translate(
"AddonsInstaller",
"Failed to connect to GitHub. Check your connection and proxy settings.",
)
FreeCAD.Console.PrintError(message + "\n")
self.status_message.emit(message)
raise ConnectionError
def _get_custom_addons(self):
# querying custom addons first
addon_list = (
FreeCAD.ParamGet("User parameter:BaseApp/Preferences/Addons")
.GetString("CustomRepositories", "")
.split("\n")
)
custom_addons = []
for addon in addon_list:
if " " in addon:
addon_and_branch = addon.split(" ")
custom_addons.append(
{"url": addon_and_branch[0], "branch": addon_and_branch[1]}
)
else:
custom_addons.append({"url": addon, "branch": "master"})
for addon in custom_addons:
if self.current_thread.isInterruptionRequested():
return
if addon and addon["url"]:
if addon["url"][-1] == "/":
addon["url"] = addon["url"][0:-1] # Strip trailing slash
addon["url"] = addon["url"].split(".git")[0] # Remove .git
name = addon["url"].split("/")[-1]
if name in self.package_names:
# We already have something with this name, skip this one
continue
self.package_names.append(name)
addondir = os.path.join(self.moddir, name)
if os.path.exists(addondir) and os.listdir(addondir):
state = Addon.Status.UNCHECKED
else:
state = Addon.Status.NOT_INSTALLED
repo = Addon(name, addon["url"], state, addon["branch"])
md_file = os.path.join(addondir, "package.xml")
if os.path.isfile(md_file):
repo.load_metadata_file(md_file)
repo.installed_version = repo.metadata.Version
repo.updated_timestamp = os.path.getmtime(md_file)
repo.verify_url_and_branch(addon["url"], addon["branch"])
self.addon_repo.emit(repo)
def _get_official_addons(self):
# querying official addons
p = NetworkManager.AM_NETWORK_MANAGER.blocking_get(
"https://raw.githubusercontent.com/FreeCAD/FreeCAD-addons/master/.gitmodules"
)
if not p:
return
p = p.data().decode("utf8")
p = re.findall(
(
r'(?m)\[submodule\s*"(?P<name>.*)"\]\s*'
r"path\s*=\s*(?P<path>.+)\s*"
r"url\s*=\s*(?P<url>https?://.*)\s*"
r"(branch\s*=\s*(?P<branch>[^\s]*)\s*)?"
),
p,
)
for name, _, url, _, branch in p:
if self.current_thread.isInterruptionRequested():
return
if name in self.package_names:
# We already have something with this name, skip this one
continue
self.package_names.append(name)
if branch is None or len(branch) == 0:
branch = "master"
url = url.split(".git")[0]
addondir = os.path.join(self.moddir, name)
if os.path.exists(addondir) and os.listdir(addondir):
# make sure the folder exists and it contains files!
state = Addon.Status.UNCHECKED
else:
state = Addon.Status.NOT_INSTALLED
repo = Addon(name, url, state, branch)
md_file = os.path.join(addondir, "package.xml")
if os.path.isfile(md_file):
repo.load_metadata_file(md_file)
repo.installed_version = repo.metadata.Version
repo.updated_timestamp = os.path.getmtime(md_file)
repo.verify_url_and_branch(url, branch)
if name in self.py2only:
repo.python2 = True
if name in self.mod_reject_list:
repo.rejected = True
if name in self.obsolete:
repo.obsolete = True
self.addon_repo.emit(repo)
self.status_message.emit(
translate("AddonsInstaller", "Workbenches list was updated.")
)
def _retrieve_macros_from_git(self):
"""Retrieve macros from FreeCAD-macros.git
Emits a signal for each macro in
https://github.com/FreeCAD/FreeCAD-macros.git
"""
macro_cache_location = utils.get_cache_file_name("Macros")
if not have_git or NOGIT:
message = translate(
"AddonsInstaller",
"Failed to execute Git Python command: check installation of GitPython and/or git",
)
self.status_message_signal.emit(message)
FreeCAD.Console.PrintWarning(message + "\n")
return
try:
if os.path.exists(macro_cache_location):
if not os.path.exists(os.path.join(macro_cache_location, ".git")):
FreeCAD.Console.PrintWarning(
translate(
"AddonsInstaller",
"Attempting to change non-git Macro setup to use git\n",
)
)
utils.repair_git_repo(
"https://github.com/FreeCAD/FreeCAD-macros.git", macro_cache_location
)
gitrepo = git.Git(macro_cache_location)
gitrepo.pull("--ff-only")
else:
git.Repo.clone_from(
"https://github.com/FreeCAD/FreeCAD-macros.git", macro_cache_location
)
except Exception as e:
FreeCAD.Console.PrintMessage(
translate(
"AddonsInstaller",
"An error occurred updating macros from GitHub, trying clean checkout...",
)
+ f":\n{e}\n"
)
FreeCAD.Console.PrintMessage(f"{macro_cache_location}\n")
FreeCAD.Console.PrintMessage(
translate("AddonsInstaller", "Attempting to do a clean checkout...")
+ "\n"
)
try:
shutil.rmtree(macro_cache_location, onerror=self._remove_readonly)
git.Repo.clone_from(
"https://github.com/FreeCAD/FreeCAD-macros.git", macro_cache_location
)
FreeCAD.Console.PrintMessage(
translate("AddonsInstaller", "Clean checkout succeeded") + "\n"
)
except Exception as e:
FreeCAD.Console.PrintWarning(
translate(
"AddonsInstaller",
"Failed to update macros from GitHub -- try clearing the Addon Manager's cache.",
)
+ f":\n{str(e)}\n"
)
return
n_files = 0
for _, _, filenames in os.walk(macro_cache_location):
n_files += len(filenames)
counter = 0
for dirpath, _, filenames in os.walk(macro_cache_location):
counter += 1
if self.current_thread.isInterruptionRequested():
return
if ".git" in dirpath:
continue
for filename in filenames:
if self.current_thread.isInterruptionRequested():
return
if filename.lower().endswith(".fcmacro"):
macro = Macro(filename[:-8]) # Remove ".FCMacro".
macro.on_git = True
macro.src_filename = os.path.join(dirpath, filename)
macro.fill_details_from_file(macro.src_filename)
repo = Addon.from_macro(macro)
FreeCAD.Console.PrintLog(f"Found macro {repo.name}\n")
repo.url = "https://github.com/FreeCAD/FreeCAD-macros.git"
utils.update_macro_installation_details(repo)
def _retrieve_macros_from_wiki(self):
"""Retrieve macros from the wiki
Read the wiki and emit a signal for each found macro.
Reads only the page https://wiki.freecad.org/Macros_recipes
"""
p = NetworkManager.AM_NETWORK_MANAGER.blocking_get(
"https://wiki.freecad.org/Macros_recipes"
)
if not p:
FreeCAD.Console.PrintWarning(
translate(
"AddonsInstaller",
"Error connecting to the Wiki, FreeCAD cannot retrieve the Wiki macro list at this time",
)
+ "\n"
)
return
p = p.data().decode("utf8")
macros = re.findall('title="(Macro.*?)"', p)
macros = [mac for mac in macros if "translated" not in mac]
macro_names = []
for i, mac in enumerate(macros):
if self.current_thread.isInterruptionRequested():
return
macname = mac[6:] # Remove "Macro ".
macname = macname.replace("&amp;", "&")
if not macname:
continue
if (
(macname not in self.macros_reject_list)
and ("recipes" not in macname.lower())
and (macname not in macro_names)
):
macro_names.append(macname)
macro = Macro(macname)
macro.on_wiki = True
macro.parsed = False
repo = Addon.from_macro(macro)
repo.url = "https://wiki.freecad.org/Macros_recipes"
utils.update_macro_installation_details(repo)
def _remove_readonly(self, func, path, _) -> None:
"""Remove a read-only file."""
os.chmod(path, stat.S_IWRITE)
func(path)
class LoadPackagesFromCacheWorker(QtCore.QThread):
addon_repo = QtCore.Signal(object)
def __init__(self, cache_file: str):
QtCore.QThread.__init__(self)
self.cache_file = cache_file
def run(self):
metadata_cache_path = os.path.join(
FreeCAD.getUserCachePath(), "AddonManager", "PackageMetadata"
)
with open(self.cache_file, "r") as f:
data = f.read()
if data:
dict_data = json.loads(data)
for item in dict_data.values():
if QtCore.QThread.currentThread().isInterruptionRequested():
return
repo = Addon.from_cache(item)
repo_metadata_cache_path = os.path.join(
metadata_cache_path, repo.name, "package.xml"
)
if os.path.isfile(repo_metadata_cache_path):
try:
repo.load_metadata_file(repo_metadata_cache_path)
repo.installed_version = repo.metadata.Version
repo.updated_timestamp = os.path.getmtime(
repo_metadata_cache_path
)
except Exception:
FreeCAD.Console.PrintLog(
f"Failed loading {repo_metadata_cache_path}\n"
)
self.addon_repo.emit(repo)
class LoadMacrosFromCacheWorker(QtCore.QThread):
add_macro_signal = QtCore.Signal(object)
def __init__(self, cache_file: str):
QtCore.QThread.__init__(self)
self.cache_file = cache_file
def run(self):
with open(self.cache_file, "r") as f:
data = f.read()
dict_data = json.loads(data)
for item in dict_data:
if QtCore.QThread.currentThread().isInterruptionRequested():
return
new_macro = Macro.from_cache(item)
repo = Addon.from_macro(new_macro)
utils.update_macro_installation_details(repo)
self.add_macro_signal.emit(repo)
class CheckSingleUpdateWorker(QtCore.QObject):
"""This worker is a little different from the others: the actual recommended way of
running in a QThread is to make a worker object that gets moved into the thread."""
update_status = QtCore.Signal(int)
def __init__(self, repo: Addon, parent: QtCore.QObject = None):
super().__init__(parent)
self.repo = repo
def do_work(self):
# Borrow the function from another class:
checker = UpdateChecker()
if self.repo.repo_type == Addon.Kind.WORKBENCH:
checker.check_workbench(self.repo)
elif self.repo.repo_type == Addon.Kind.MACRO:
checker.check_macro(self.repo)
elif self.repo.repo_type == Addon.Kind.PACKAGE:
checker.check_package(self.repo)
self.update_status.emit(self.repo.update_status)
class CheckWorkbenchesForUpdatesWorker(QtCore.QThread):
"""This worker checks for available updates for all workbenches"""
update_status = QtCore.Signal(Addon)
progress_made = QtCore.Signal(int, int)
def __init__(self, repos: List[Addon]):
QtCore.QThread.__init__(self)
self.repos = repos
def run(self):
self.current_thread = QtCore.QThread.currentThread()
self.basedir = FreeCAD.getUserAppDataDir()
self.moddir = os.path.join(self.basedir, "Mod")
checker = UpdateChecker()
count = 1
for repo in self.repos:
if self.current_thread.isInterruptionRequested():
return
self.progress_made.emit(count, len(self.repos))
count += 1
if repo.status() == Addon.Status.UNCHECKED:
if repo.repo_type == Addon.Kind.WORKBENCH:
checker.check_workbench(repo)
self.update_status.emit(repo)
elif repo.repo_type == Addon.Kind.MACRO:
checker.check_macro(repo)
self.update_status.emit(repo)
elif repo.repo_type == Addon.Kind.PACKAGE:
checker.check_package(repo)
self.update_status.emit(repo)
class UpdateChecker:
def __init__(self):
self.basedir = FreeCAD.getUserAppDataDir()
self.moddir = os.path.join(self.basedir, "Mod")
def check_workbench(self, wb):
if not have_git or NOGIT:
wb.set_status(Addon.Status.CANNOT_CHECK)
return
clonedir = os.path.join(self.moddir, wb.name)
if os.path.exists(clonedir):
# mark as already installed AND already checked for updates
if not os.path.exists(os.path.join(clonedir, ".git")):
with wb.git_lock:
utils.repair_git_repo(wb.url, clonedir)
with wb.git_lock:
gitrepo = git.Repo(clonedir)
try:
if gitrepo.head.is_detached:
# By definition, in a detached-head state we cannot
# update, so don't even bother checking.
wb.set_status(Addon.Status.NO_UPDATE_AVAILABLE)
if hasattr(gitrepo.head, "ref"):
wb.branch = gitrepo.head.ref.name
else:
wb.branch = gitrepo.head.name
return
gitrepo.git.fetch()
except Exception as e:
FreeCAD.Console.PrintWarning(
"AddonManager: "
+ translate(
"AddonsInstaller",
"Unable to fetch git updates for workbench {}",
).format(wb.name)
+ "\n"
)
FreeCAD.Console.PrintWarning(str(e) + "\n")
wb.set_status(Addon.Status.CANNOT_CHECK)
else:
try:
if "git pull" in gitrepo.git.status():
wb.set_status(Addon.Status.UPDATE_AVAILABLE)
else:
wb.set_status(Addon.Status.NO_UPDATE_AVAILABLE)
except Exception:
FreeCAD.Console.PrintWarning(
translate(
"AddonsInstaller", "git fetch failed for {}"
).format(wb.name)
+ "\n"
)
wb.set_status(Addon.Status.CANNOT_CHECK)
def check_package(self, package: Addon) -> None:
clonedir = self.moddir + os.sep + package.name
if os.path.exists(clonedir):
# First, try to just do a git-based update, which will give the most accurate results:
if have_git and not NOGIT:
self.check_workbench(package)
if package.status() != Addon.Status.CANNOT_CHECK:
# It worked, just exit now
return
# If we were unable to do a git-based update, try using the package.xml file instead:
installed_metadata_file = os.path.join(clonedir, "package.xml")
if not os.path.isfile(installed_metadata_file):
# If there is no package.xml file, then it's because the package author added it after the last time
# the local installation was updated. By definition, then, there is an update available, if only to
# download the new XML file.
package.set_status(Addon.Status.UPDATE_AVAILABLE)
package.installed_version = None
return
else:
package.updated_timestamp = os.path.getmtime(installed_metadata_file)
try:
installed_metadata = FreeCAD.Metadata(installed_metadata_file)
package.installed_version = installed_metadata.Version
# Packages are considered up-to-date if the metadata version matches. Authors should update
# their version string when they want the addon manager to alert users of a new version.
if package.metadata.Version != installed_metadata.Version:
package.set_status(Addon.Status.UPDATE_AVAILABLE)
else:
package.set_status(Addon.Status.NO_UPDATE_AVAILABLE)
except Exception:
FreeCAD.Console.PrintWarning(
translate(
"AddonsInstaller",
"Failed to read metadata from {name}",
).format(name=installed_metadata_file)
+ "\n"
)
package.set_status(Addon.Status.CANNOT_CHECK)
def check_macro(self, macro_wrapper: Addon) -> None:
# Make sure this macro has its code downloaded:
try:
if not macro_wrapper.macro.parsed and macro_wrapper.macro.on_git:
macro_wrapper.macro.fill_details_from_file(
macro_wrapper.macro.src_filename
)
elif not macro_wrapper.macro.parsed and macro_wrapper.macro.on_wiki:
mac = macro_wrapper.macro.name.replace(" ", "_")
mac = mac.replace("&", "%26")
mac = mac.replace("+", "%2B")
url = "https://wiki.freecad.org/Macro_" + mac
macro_wrapper.macro.fill_details_from_wiki(url)
except Exception:
FreeCAD.Console.PrintWarning(
translate(
"AddonsInstaller",
"Failed to fetch code for macro '{name}'",
).format(name=macro_wrapper.macro.name)
+ "\n"
)
macro_wrapper.set_status(Addon.Status.CANNOT_CHECK)
return
hasher1 = hashlib.sha1()
hasher2 = hashlib.sha1()
hasher1.update(macro_wrapper.macro.code.encode("utf-8"))
new_sha1 = hasher1.hexdigest()
test_file_one = os.path.join(
FreeCAD.getUserMacroDir(True), macro_wrapper.macro.filename
)
test_file_two = os.path.join(
FreeCAD.getUserMacroDir(True), "Macro_" + macro_wrapper.macro.filename
)
if os.path.exists(test_file_one):
with open(test_file_one, "rb") as f:
contents = f.read()
hasher2.update(contents)
old_sha1 = hasher2.hexdigest()
elif os.path.exists(test_file_two):
with open(test_file_two, "rb") as f:
contents = f.read()
hasher2.update(contents)
old_sha1 = hasher2.hexdigest()
else:
return
if new_sha1 == old_sha1:
macro_wrapper.set_status(Addon.Status.NO_UPDATE_AVAILABLE)
else:
macro_wrapper.set_status(Addon.Status.UPDATE_AVAILABLE)
class CacheMacroCodeWorker(QtCore.QThread):
"""Download and cache the macro code, and parse its internal metadata"""
status_message = QtCore.Signal(str)
update_macro = QtCore.Signal(Addon)
progress_made = QtCore.Signal(int, int)
def __init__(self, repos: List[Addon]) -> None:
QtCore.QThread.__init__(self)
self.repos = repos
self.workers = []
self.terminators = []
self.lock = threading.Lock()
self.failed = []
self.counter = 0
def run(self):
self.status_message.emit(translate("AddonsInstaller", "Caching macro code..."))
self.repo_queue = queue.Queue()
current_thread = QtCore.QThread.currentThread()
num_macros = 0
for repo in self.repos:
if repo.macro is not None:
self.repo_queue.put(repo)
num_macros += 1
# Emulate QNetworkAccessManager and spool up six connections:
for _ in range(6):
self.update_and_advance(None)
while True:
if current_thread.isInterruptionRequested():
for worker in self.workers:
worker.blockSignals(True)
worker.requestInterruption()
if not worker.wait(100):
FreeCAD.Console.PrintWarning(
translate(
"AddonsInstaller",
"Addon Manager: a worker process failed to halt ({name})",
).format(name=worker.macro.name)
+ "\n"
)
return
# Ensure our signals propagate out by running an internal thread-local event loop
QtCore.QCoreApplication.processEvents()
with self.lock:
if self.counter >= num_macros:
break
time.sleep(0.1)
# Make sure all of our child threads have fully exited:
for worker in self.workers:
worker.wait(50)
if not worker.isFinished():
FreeCAD.Console.PrintError(
translate(
"AddonsInstaller",
"Addon Manager: a worker process failed to complete while fetching {name}",
).format(name=worker.macro.name)
+ "\n"
)
self.repo_queue.join()
for terminator in self.terminators:
if terminator and terminator.isActive():
terminator.stop()
if len(self.failed) > 0:
num_failed = len(self.failed)
FreeCAD.Console.PrintWarning(
translate(
"AddonsInstaller",
"Out of {num_macros} macros, {num_failed} timed out while processing",
).format(num_macros=num_macros, num_failed=num_failed)
)
def update_and_advance(self, repo: Addon) -> None:
if repo is not None:
if repo.macro.name not in self.failed:
self.update_macro.emit(repo)
self.repo_queue.task_done()
with self.lock:
self.counter += 1
if QtCore.QThread.currentThread().isInterruptionRequested():
return
self.progress_made.emit(
len(self.repos) - self.repo_queue.qsize(), len(self.repos)
)
try:
next_repo = self.repo_queue.get_nowait()
worker = GetMacroDetailsWorker(next_repo)
worker.finished.connect(lambda: self.update_and_advance(next_repo))
with self.lock:
self.workers.append(worker)
self.terminators.append(
QtCore.QTimer.singleShot(10000, lambda: self.terminate(worker))
)
self.status_message.emit(
translate(
"AddonsInstaller",
"Getting metadata from macro {}",
).format(next_repo.macro.name)
)
worker.start()
except queue.Empty:
pass
def terminate(self, worker) -> None:
if not worker.isFinished():
macro_name = worker.macro.name
FreeCAD.Console.PrintWarning(
translate(
"AddonsInstaller",
"Timeout while fetching metadata for macro {}",
).format(macro_name)
+ "\n"
)
worker.blockSignals(True)
worker.requestInterruption()
worker.wait(100)
if worker.isRunning():
FreeCAD.Console.PrintError(
translate(
"AddonsInstaller",
"Failed to kill process for macro {}!\n",
).format(macro_name)
)
with self.lock:
self.failed.append(macro_name)
class GetMacroDetailsWorker(QtCore.QThread):
"""Retrieve the macro details for a macro"""
status_message = QtCore.Signal(str)
readme_updated = QtCore.Signal(str)
def __init__(self, repo):
QtCore.QThread.__init__(self)
self.macro = repo.macro
def run(self):
self.status_message.emit(
translate("AddonsInstaller", "Retrieving macro description...")
)
if not self.macro.parsed and self.macro.on_git:
self.status_message.emit(
translate("AddonsInstaller", "Retrieving info from git")
)
self.macro.fill_details_from_file(self.macro.src_filename)
if not self.macro.parsed and self.macro.on_wiki:
self.status_message.emit(
translate("AddonsInstaller", "Retrieving info from wiki")
)
mac = self.macro.name.replace(" ", "_")
mac = mac.replace("&", "%26")
mac = mac.replace("+", "%2B")
url = "https://wiki.freecad.org/Macro_" + mac
self.macro.fill_details_from_wiki(url)
message = (
"<h1>"
+ self.macro.name
+ "</h1>"
+ self.macro.desc
+ '<br/><br/>Macro location: <a href="'
+ self.macro.url
+ '">'
+ self.macro.url
+ "</a>"
)
if QtCore.QThread.currentThread().isInterruptionRequested():
return
self.readme_updated.emit(message)

View File

@@ -0,0 +1,68 @@
# ***************************************************************************
# * *
# * Copyright (c) 2022 FreeCAD Project Association *
# * *
# * This library is free software; you can redistribute it and/or *
# * modify it under the terms of the GNU Lesser General Public *
# * License as published by the Free Software Foundation; either *
# * version 2.1 of the License, or (at your option) any later version. *
# * *
# * This library is distributed in the hope that it will be useful, *
# * but WITHOUT ANY WARRANTY; without even the implied warranty of *
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
# * Lesser General Public License for more details. *
# * *
# * You should have received a copy of the GNU Lesser General Public *
# * License along with this library; if not, write to the Free Software *
# * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA *
# * 02110-1301 USA *
# * *
# ***************************************************************************
""" Misc. worker thread classes for the FreeCAD Addon Manager. """
from typing import Optional
import FreeCAD
from PySide2 import QtCore
import NetworkManager
class ConnectionChecker(QtCore.QThread):
"""A worker thread for checking the connection to GitHub as a proxy for overall
network connectivity. It has two signals: success() and failure(str). The failure
signal contains a translated error message suitable for display to an end user."""
success = QtCore.Signal()
failure = QtCore.Signal(str)
def __init__(self):
QtCore.QThread.__init__(self)
def run(self):
"""Not generally called directly: create a new ConnectionChecker object and
call start() on it to spawn a child thread."""
FreeCAD.Console.PrintLog("Checking network connection...\n")
result = self.check_network_connection()
if QtCore.QThread.currentThread().isInterruptionRequested():
return
if not result:
self.failure.emit(
translate(
"AddonsInstaller",
"Unable to read data from GitHub: check your internet connection and proxy settings and try again.",
)
)
return
FreeCAD.Console.PrintLog(f"GitHub's zen message response: {result}\n")
self.success.emit()
def check_network_connection(self) -> Optional[str]:
""" The main work of this object: returns the decoded result of the connection request, or
None if the request failed """
url = "https://api.github.com/zen"
result = NetworkManager.AM_NETWORK_MANAGER.blocking_get(url)
if result:
return result.data().decode("utf8")
return None

View File

@@ -32,8 +32,8 @@ translate = FreeCAD.Qt.translate
def ask_to_install_toolbar_button(repo: Addon) -> None:
""" Presents a dialog to the user asking if they want to install a toolbar button for
a particular macro, and walks through that process if they agree to do so. """
"""Presents a dialog to the user asking if they want to install a toolbar button for
a particular macro, and walks through that process if they agree to do so."""
pref = FreeCAD.ParamGet("User parameter:BaseApp/Preferences/Addons")
do_not_show_dialog = pref.GetBool("dontShowAddMacroButtonDialog", False)
button_exists = check_for_button(repo)
@@ -52,7 +52,7 @@ def ask_to_install_toolbar_button(repo: Addon) -> None:
def check_for_button(repo: Addon) -> bool:
""" Returns True if a button already exists for this macro, or False if not. """
"""Returns True if a button already exists for this macro, or False if not."""
command = FreeCADGui.Command.findCustomCommand(repo.macro.filename)
if not command:
return False
@@ -67,12 +67,10 @@ def check_for_button(repo: Addon) -> bool:
return False
def ask_for_toolbar(
repo: Addon, custom_toolbars
) -> object:
""" Determine what toolbar to add the icon to. The first time it is called it prompts the
user to select or create a toolbar. After that, the prompt is optional and can be configured
via a preference. Returns the pref group for the new toolbar."""
def ask_for_toolbar(repo: Addon, custom_toolbars) -> object:
"""Determine what toolbar to add the icon to. The first time it is called it prompts the
user to select or create a toolbar. After that, the prompt is optional and can be configured
via a preference. Returns the pref group for the new toolbar."""
pref = FreeCAD.ParamGet("User parameter:BaseApp/Preferences/Addons")
# In this one spot, default True: if this is the first time we got to
@@ -126,8 +124,8 @@ def ask_for_toolbar(
def get_toolbar_with_name(name: str) -> object:
""" Try to find a toolbar with a given name. Returns the preference group for the toolbar
if found, or None if it does not exist. """
"""Try to find a toolbar with a given name. Returns the preference group for the toolbar
if found, or None if it does not exist."""
top_group = FreeCAD.ParamGet("User parameter:BaseApp/Workbench/Global/Toolbar")
custom_toolbars = top_group.GetGroups()
for toolbar in custom_toolbars:
@@ -141,7 +139,7 @@ def get_toolbar_with_name(name: str) -> object:
def create_new_custom_toolbar() -> object:
""" Create a new custom toolbar and returns its preference group. """
"""Create a new custom toolbar and returns its preference group."""
# We need two names: the name of the auto-created toolbar, as it will be displayed to the
# user in various menus, and the underlying name of the toolbar group. Both must be
@@ -177,13 +175,13 @@ def create_new_custom_toolbar() -> object:
def check_for_toolbar(toolbar_name: str) -> bool:
""" Returns True if the toolbar exists, otherwise False """
"""Returns True if the toolbar exists, otherwise False"""
return get_toolbar_with_name(toolbar_name) is not None
def install_toolbar_button(repo: Addon) -> None:
""" If the user has requested a toolbar button be installed, this function is called
to continue the process and request any additional required information. """
"""If the user has requested a toolbar button be installed, this function is called
to continue the process and request any additional required information."""
pref = FreeCAD.ParamGet("User parameter:BaseApp/Preferences/Addons")
custom_toolbar_name = pref.GetString(
"CustomToolbarName", "Auto-Created Macro Toolbar"
@@ -232,7 +230,7 @@ def install_toolbar_button(repo: Addon) -> None:
def install_macro_to_toolbar(repo: Addon, toolbar: object) -> None:
""" Adds an icon for the given macro to the given toolbar. """
"""Adds an icon for the given macro to the given toolbar."""
menuText = repo.display_name
tooltipText = f"<b>{repo.display_name}</b>"
if repo.macro.comment:

View File

@@ -70,8 +70,8 @@ except ImportError:
class PackageDetails(QWidget):
""" The PackageDetails QWidget shows package README information and provides
install, uninstall, and update buttons. """
"""The PackageDetails QWidget shows package README information and provides
install, uninstall, and update buttons."""
back = Signal()
install = Signal(Addon)
@@ -114,9 +114,9 @@ class PackageDetails(QWidget):
self.ui.webView.hide()
def show_repo(self, repo: Addon, reload: bool = False) -> None:
""" The main entry point for this class, shows the package details and related buttons
for the provided repo. If reload is true, then even if this is already the current repo
the data is reloaded. """
"""The main entry point for this class, shows the package details and related buttons
for the provided repo. If reload is true, then even if this is already the current repo
the data is reloaded."""
# If this is the same repo we were already showing, we do not have to do the
# expensive refetch unless reload is true
@@ -155,13 +155,15 @@ class PackageDetails(QWidget):
if repo.status() == Addon.Status.UNCHECKED:
if not self.status_update_thread:
self.status_update_thread = QThread()
self.status_update_worker = CheckSingleUpdateWorker(repo)
self.status_update_worker.moveToThread(self.status_update_thread)
self.status_create_addon_list_worker = CheckSingleUpdateWorker(repo)
self.status_create_addon_list_worker.moveToThread(self.status_update_thread)
self.status_update_thread.finished.connect(
self.status_update_worker.deleteLater
self.status_create_addon_list_worker.deleteLater
)
self.check_for_update.connect(self.status_create_addon_list_worker.do_work)
self.status_create_addon_list_worker.update_status.connect(
self.display_repo_status
)
self.check_for_update.connect(self.status_update_worker.do_work)
self.status_update_worker.update_status.connect(self.display_repo_status)
self.status_update_thread.start()
self.check_for_update.emit(self.repo)
@@ -390,9 +392,9 @@ class PackageDetails(QWidget):
self.ui.labelWarningInfo.hide()
def requires_newer_freecad(self) -> Optional[str]:
""" If the current package is not installed, returns the first supported version of
FreeCAD, if one is set, or None if no information is available (or if the package is
already installed). """
"""If the current package is not installed, returns the first supported version of
FreeCAD, if one is set, or None if no information is available (or if the package is
already installed)."""
# If it's not installed, check to see if it's for a newer version of FreeCAD
if self.repo.status() == Addon.Status.NOT_INSTALLED and self.repo.metadata:
@@ -449,7 +451,7 @@ class PackageDetails(QWidget):
self.ui.buttonChangeBranch.show()
def set_disable_button_state(self):
""" Set up the enable/disable button based on the enabled/disabled state of the addon """
"""Set up the enable/disable button based on the enabled/disabled state of the addon"""
self.ui.buttonEnable.hide()
self.ui.buttonDisable.hide()
status = self.repo.status()
@@ -503,7 +505,7 @@ class PackageDetails(QWidget):
self.macro_readme_updated()
def macro_readme_updated(self):
""" Update the display of a Macro's README data. """
"""Update the display of a Macro's README data."""
url = self.repo.macro.wiki
if not url:
url = self.repo.macro.url
@@ -588,12 +590,12 @@ class PackageDetails(QWidget):
self.ui.webView.page().runJavaScript(s)
def load_started(self):
""" Called when loading is started: sets up the progress bar """
"""Called when loading is started: sets up the progress bar"""
self.ui.progressBar.show()
self.ui.progressBar.setValue(0)
def load_progress(self, progress: int):
""" Called during load to update the progress bar """
"""Called during load to update the progress bar"""
self.ui.progressBar.setValue(progress)
def load_finished(self, load_succeeded: bool):
@@ -624,14 +626,14 @@ class PackageDetails(QWidget):
self.show_error_for(url)
def long_load_running(self):
""" Displays a message about loading taking a long time. """
"""Displays a message about loading taking a long time."""
if hasattr(self.ui, "webView") and self.ui.webView.isHidden():
self.ui.slowLoadLabel.show()
self.ui.loadingLabel.hide()
self.ui.webView.show()
def show_error_for(self, url: QUrl) -> None:
""" Displays error information. """
"""Displays error information."""
m = translate(
"AddonsInstaller", "Could not load README data from URL {}"
).format(url.toString())
@@ -639,7 +641,7 @@ class PackageDetails(QWidget):
self.ui.webView.setHtml(html)
def change_branch_clicked(self) -> None:
""" Loads the branch-switching dialog """
"""Loads the branch-switching dialog"""
basedir = FreeCAD.getUserAppDataDir()
path_to_repo = os.path.join(basedir, "Mod", self.repo.name)
change_branch_dialog = ChangeBranchDialog(path_to_repo, self)
@@ -647,7 +649,7 @@ class PackageDetails(QWidget):
change_branch_dialog.exec()
def enable_clicked(self) -> None:
""" Called by the Enable button, enables this Addon and updates GUI to reflect that status. """
"""Called by the Enable button, enables this Addon and updates GUI to reflect that status."""
self.repo.enable()
self.repo.set_status(Addon.Status.PENDING_RESTART)
self.set_disable_button_state()
@@ -664,7 +666,7 @@ class PackageDetails(QWidget):
self.ui.labelWarningInfo.setStyleSheet("color:" + utils.bright_color_string())
def disable_clicked(self) -> None:
""" Called by the Disable button, disables this Addon and updates the GUI to reflect that status. """
"""Called by the Disable button, disables this Addon and updates the GUI to reflect that status."""
self.repo.disable()
self.repo.set_status(Addon.Status.PENDING_RESTART)
self.set_disable_button_state()
@@ -683,7 +685,7 @@ class PackageDetails(QWidget):
)
def branch_changed(self, name: str) -> None:
""" Displays a dialog confirming the branch changed, and tries to access the metadata file from that branch. """
"""Displays a dialog confirming the branch changed, and tries to access the metadata file from that branch."""
QMessageBox.information(
self,
translate("AddonsInstaller", "Success"),
@@ -728,8 +730,8 @@ if HAS_QTWEBENGINE:
self.settings().setAttribute(QWebEngineSettings.ErrorPageEnabled, False)
def acceptNavigationRequest(self, url, _type, isMainFrame):
""" A callback for navigation requests: this widget will only display navigation requests to the
FreeCAD Wiki (for translation purposes) -- anything else will open in a new window. """
"""A callback for navigation requests: this widget will only display navigation requests to the
FreeCAD Wiki (for translation purposes) -- anything else will open in a new window."""
if _type == QWebEnginePage.NavigationTypeLinkClicked:
# See if the link is to a FreeCAD Wiki page -- if so, follow it, otherwise ask the OS to open it
@@ -743,9 +745,9 @@ if HAS_QTWEBENGINE:
return super().acceptNavigationRequest(url, _type, isMainFrame)
def javaScriptConsoleMessage(self, level, message, lineNumber, _):
""" Handle JavaScript console messages by optionally outputting them to the FreeCAD Console. This
must be manually enabled in this Python file by setting the global show_javascript_console_output
to true. """
"""Handle JavaScript console messages by optionally outputting them to the FreeCAD Console. This
must be manually enabled in this Python file by setting the global show_javascript_console_output
to true."""
global show_javascript_console_output
if show_javascript_console_output:
tag = translate("AddonsInstaller", "Page JavaScript reported")
@@ -758,8 +760,7 @@ if HAS_QTWEBENGINE:
class Ui_PackageDetails(object):
""" The generated UI from the Qt Designer UI file """
"""The generated UI from the Qt Designer UI file"""
def setupUi(self, PackageDetails):
if not PackageDetails.objectName():

View File

@@ -44,13 +44,15 @@ translate = FreeCAD.Qt.translate
class ListDisplayStyle(IntEnum):
""" The display mode of the list """
"""The display mode of the list"""
COMPACT = 0
EXPANDED = 1
class StatusFilter(IntEnum):
""" Predefined filers """
"""Predefined filers"""
ANY = 0
INSTALLED = 1
NOT_INSTALLED = 2
@@ -97,7 +99,7 @@ class PackageList(QWidget):
self.item_model = None
def setModel(self, model):
""" This is a model-view-controller widget: set its model. """
"""This is a model-view-controller widget: set its model."""
self.item_model = model
self.item_filter.setSourceModel(self.item_model)
self.item_filter.sort(0)
@@ -117,8 +119,8 @@ class PackageList(QWidget):
)
def on_listPackages_clicked(self, index: QModelIndex):
""" Determine what addon was selected and emit the itemSelected signal with it as
an argument. """
"""Determine what addon was selected and emit the itemSelected signal with it as
an argument."""
source_selection = self.item_filter.mapToSource(index)
selected_repo = self.item_model.repos[source_selection.row()]
self.itemSelected.emit(selected_repo)
@@ -176,7 +178,7 @@ class PackageList(QWidget):
self.item_filter.setFilterRegExp(text_filter)
def set_view_style(self, style: ListDisplayStyle) -> None:
""" Set the style (compact or expanded) of the list """
"""Set the style (compact or expanded) of the list"""
self.item_model.layoutAboutToBeChanged.emit()
self.item_delegate.set_view(style)
if style == ListDisplayStyle.COMPACT:
@@ -190,7 +192,7 @@ class PackageList(QWidget):
class PackageListItemModel(QAbstractListModel):
""" The model for use with the PackageList class. """
"""The model for use with the PackageList class."""
repos = []
write_lock = threading.Lock()
@@ -200,19 +202,19 @@ class PackageListItemModel(QAbstractListModel):
IconUpdateRole = Qt.UserRole + 2
def rowCount(self, parent: QModelIndex = QModelIndex()) -> int:
""" The number of rows """
"""The number of rows"""
if parent.isValid():
return 0
return len(self.repos)
def columnCount(self, parent: QModelIndex = QModelIndex()) -> int:
""" Only one column, always returns 1. """
"""Only one column, always returns 1."""
if parent.isValid():
return 0
return 1
def data(self, index: QModelIndex, role: int = Qt.DisplayRole):
""" Get the data for a given index and role. """
"""Get the data for a given index and role."""
if not index.isValid():
return None
row = index.row()
@@ -235,7 +237,7 @@ class PackageListItemModel(QAbstractListModel):
return self.repos[row]
def headerData(self, _unused1, _unused2, _role=Qt.DisplayRole):
""" No header in this implementation: always returns None. """
"""No header in this implementation: always returns None."""
return None
def setData(self, index: QModelIndex, value, role=Qt.EditRole) -> None:
@@ -259,7 +261,7 @@ class PackageListItemModel(QAbstractListModel):
)
def append_item(self, repo: Addon) -> None:
""" Adds this addon to the end of the model. Thread safe. """
"""Adds this addon to the end of the model. Thread safe."""
if repo in self.repos:
# Cowardly refuse to insert the same repo a second time
return
@@ -269,7 +271,7 @@ class PackageListItemModel(QAbstractListModel):
self.endInsertRows()
def clear(self) -> None:
""" Clear the model, removing all rows. Thread safe. """
"""Clear the model, removing all rows. Thread safe."""
if self.rowCount() > 0:
with self.write_lock:
self.beginRemoveRows(QModelIndex(), 0, self.rowCount() - 1)
@@ -277,7 +279,7 @@ class PackageListItemModel(QAbstractListModel):
self.endRemoveRows()
def update_item_status(self, name: str, status: Addon.Status) -> None:
""" Set the status of addon with name to status. """
"""Set the status of addon with name to status."""
for row, item in enumerate(self.repos):
if item.name == name:
self.setData(
@@ -286,7 +288,7 @@ class PackageListItemModel(QAbstractListModel):
return
def update_item_icon(self, name: str, icon: QIcon) -> None:
""" Set the icon for Addon with name to icon """
"""Set the icon for Addon with name to icon"""
for row, item in enumerate(self.repos):
if item.name == name:
self.setData(
@@ -295,7 +297,7 @@ class PackageListItemModel(QAbstractListModel):
return
def reload_item(self, repo: Addon) -> None:
""" Sets the addon data for the given addon (based on its name) """
"""Sets the addon data for the given addon (based on its name)"""
for index, item in enumerate(self.repos):
if item.name == repo.name:
with self.write_lock:
@@ -336,17 +338,17 @@ class PackageListItemDelegate(QStyledItemDelegate):
self.widget = self.expanded
def set_view(self, style: ListDisplayStyle) -> None:
""" Set the view of to style """
"""Set the view of to style"""
if not self.displayStyle == style:
self.displayStyle = style
def sizeHint(self, _option, index):
""" Attempt to figure out the correct height for the widget based on its current contents. """
"""Attempt to figure out the correct height for the widget based on its current contents."""
self.update_content(index)
return self.widget.sizeHint()
def update_content(self, index):
""" Creates the display of the content for a given index. """
"""Creates the display of the content for a given index."""
repo = index.data(PackageListItemModel.DataAccessRole)
if self.displayStyle == ListDisplayStyle.EXPANDED:
self.widget = self.expanded
@@ -381,8 +383,8 @@ class PackageListItemDelegate(QStyledItemDelegate):
self.widget.adjustSize()
def _setup_expanded_package (self, repo:Addon):
""" Set up the display for a package in expanded view """
def _setup_expanded_package(self, repo: Addon):
"""Set up the display for a package in expanded view"""
maintainers = repo.metadata.Maintainer
maintainers_string = ""
if len(maintainers) == 1:
@@ -392,23 +394,17 @@ class PackageListItemDelegate(QStyledItemDelegate):
)
elif len(maintainers) > 1:
n = len(maintainers)
maintainers_string = translate(
"AddonsInstaller", "Maintainers:", "", n
)
maintainers_string = translate("AddonsInstaller", "Maintainers:", "", n)
for maintainer in maintainers:
maintainers_string += (
f"\n{maintainer['name']} <{maintainer['email']}>"
)
maintainers_string += f"\n{maintainer['name']} <{maintainer['email']}>"
self.widget.ui.labelMaintainer.setText(maintainers_string)
if repo.tags:
self.widget.ui.labelTags.setText(
translate("AddonsInstaller", "Tags")
+ ": "
+ ", ".join(repo.tags)
translate("AddonsInstaller", "Tags") + ": " + ", ".join(repo.tags)
)
def _setup_macro(self, repo:Addon):
""" Set up the display for a macro """
def _setup_macro(self, repo: Addon):
"""Set up the display for a macro"""
self.widget.ui.labelDescription.setText(repo.macro.comment)
version_string = ""
if repo.macro.version:
@@ -526,8 +522,8 @@ class PackageListItemDelegate(QStyledItemDelegate):
return result
def paint(self, painter: QPainter, option: QStyleOptionViewItem, _: QModelIndex):
""" Main paint function: renders this widget into a given rectangle, successively drawing
all of its children. """
"""Main paint function: renders this widget into a given rectangle, successively drawing
all of its children."""
painter.save()
self.widget.resize(option.rect.size())
painter.translate(option.rect.topLeft())
@@ -550,42 +546,42 @@ class PackageListFilter(QSortFilterProxyModel):
def setPackageFilter(
self, package_type: int
) -> None: # 0=All, 1=Workbenches, 2=Macros, 3=Preference Packs
""" Set the package filter to package_type and refreshes. """
"""Set the package filter to package_type and refreshes."""
self.package_type = package_type
self.invalidateFilter()
def setStatusFilter(
self, status: int
) -> None: # 0=Any, 1=Installed, 2=Not installed, 3=Update available
""" Sets the status filter to status and refreshes. """
"""Sets the status filter to status and refreshes."""
self.status = status
self.invalidateFilter()
def setHidePy2(self, hide_py2: bool) -> None:
""" Sets whether or not to hide Python 2-only Addons """
"""Sets whether or not to hide Python 2-only Addons"""
self.hide_py2 = hide_py2
self.invalidateFilter()
def setHideObsolete(self, hide_obsolete: bool) -> None:
""" Sets whether or not to hide Addons marked obsolete """
"""Sets whether or not to hide Addons marked obsolete"""
self.hide_obsolete = hide_obsolete
self.invalidateFilter()
def setHideNewerFreeCADRequired(self, hide_nfr: bool) -> None:
""" Sets whether or not to hide packages that have indicated they need a newer version
of FreeCAD than the one currently running. """
"""Sets whether or not to hide packages that have indicated they need a newer version
of FreeCAD than the one currently running."""
self.hide_newer_freecad_required = hide_nfr
self.invalidateFilter()
def lessThan(self, left, right) -> bool:
""" Enable sorting of display name (not case sensitive). """
"""Enable sorting of display name (not case sensitive)."""
l = self.sourceModel().data(left, PackageListItemModel.DataAccessRole)
r = self.sourceModel().data(right, PackageListItemModel.DataAccessRole)
return l.display_name.lower() < r.display_name.lower()
def filterAcceptsRow(self, row, parent=QModelIndex()):
""" Do the actual filtering (called automatically by Qt when drawing the list) """
"""Do the actual filtering (called automatically by Qt when drawing the list)"""
index = self.sourceModel().createIndex(row, 0)
data = self.sourceModel().data(index, PackageListItemModel.DataAccessRole)
if self.package_type == 1: