Addon Manager: Add more unit test mocks

This commit is contained in:
Chris Hennes
2023-02-10 12:43:48 -08:00
committed by Chris Hennes
parent b3322a4556
commit 2aba1be25e
2 changed files with 388 additions and 37 deletions

View File

@@ -1,6 +1,6 @@
# ***************************************************************************
# * *
# * Copyright (c) 2022 FreeCAD Project Association *
# * Copyright (c) 2022-2023 FreeCAD Project Association *
# * *
# * This file is part of FreeCAD. *
# * *
@@ -23,38 +23,167 @@
"""Mock objects for use when testing the addon manager non-GUI code."""
import os
import shutil
from typing import Union, List
import xml.etree.ElementTree as ElemTree
import FreeCAD
class GitFailed (RuntimeError):
pass
class MockConsole:
"""Mock for the FreeCAD.Console -- does NOT print anything out, just logs it."""
def __init__(self):
self.log = []
self.messages = []
self.warnings = []
self.errors = []
def PrintLog(self, data: str):
self.log.append(data)
def PrintMessage(self, data: str):
self.messages.append(data)
def PrintWarning(self, data: str):
self.warnings.append(data)
def PrintError(self, data: str):
self.errors.append(data)
def missing_newlines(self) -> int:
"""In most cases, all console entries should end with newlines: this is a
convenience function for unit testing that is true."""
counter = 0
counter += self._count_missing_newlines(self.log)
counter += self._count_missing_newlines(self.messages)
counter += self._count_missing_newlines(self.warnings)
counter += self._count_missing_newlines(self.errors)
return counter
@staticmethod
def _count_missing_newlines(some_list) -> int:
counter = 0
for line in some_list:
if line[-1] != "\n":
counter += 1
return counter
class MockMetadata:
def __init__(self):
self.Name = "MockMetadata"
self.Urls = {"repository": {"location": "file://localhost/", "branch": "main"}}
self.Description = "Mock metadata object for testing"
self.Icon = None
self.Version = "1.2.3beta"
self.Content = {}
def minimal_file_scan(self, file: Union[os.PathLike, bytes]):
"""Don't use the real metadata class, but try to read in the parameters we care about
from the given metadata file (or file-like object, as the case probably is). This
allows us to test whether the data is being passed around correctly."""
xml = None
root = None
try:
if os.path.exists(file):
xml = ElemTree.parse(file)
root = xml.getroot()
except TypeError:
pass
if xml is None:
root = ElemTree.fromstring(file)
if root is None:
raise RuntimeError("Failed to parse XML data")
accepted_namespaces = ["", "{https://wiki.freecad.org/Package_Metadata}"]
for ns in accepted_namespaces:
for child in root:
if child.tag == ns + "name":
self.Name = child.text
elif child.tag == ns + "description":
self.Description = child.text
elif child.tag == ns + "icon":
self.Icon = child.text
elif child.tag == ns + "url":
if "type" in child.attrib and child.attrib["type"] == "repository":
url = child.text
if "branch" in child.attrib:
branch = child.attrib["branch"]
else:
branch = "master"
self.Urls["repository"]["location"] = url
self.Urls["repository"]["branch"] = branch
class MockAddon:
"""Minimal Addon class"""
def __init__(self):
test_dir = os.path.join(
FreeCAD.getHomePath(), "Mod", "AddonManager", "AddonManagerTest", "data"
)
self.name = "MockAddon"
self.display_name = "Mock Addon"
self.url = os.path.join(test_dir, "test_simple_repo.zip")
self.branch = "main"
def __init__(
self,
name: str = None,
url: str = None,
status: object = None,
branch: str = "main",
):
test_dir = os.path.join(os.path.dirname(__file__), "..", "data")
if name:
self.name = name
self.display_name = name
else:
self.name = "MockAddon"
self.display_name = "Mock Addon"
self.url = url if url else os.path.join(test_dir, "test_simple_repo.zip")
self.branch = branch
self.status = status
self.macro = None
self.status = None
self.update_status = None
self.metadata = None
self.icon_file = None
self.last_updated = None
self.requires = set()
self.python_requires = set()
self.python_optional = set()
self.on_git = False
self.on_wiki = True
def set_status(self, status):
self.status = status
self.update_status = status
def set_metadata(self, metadata_like: MockMetadata):
"""Set (some) of the metadata, but don't use a real Metadata object"""
self.metadata = metadata_like
if "repository" in self.metadata.Urls:
self.branch = self.metadata.Urls["repository"]["branch"]
self.url = self.metadata.Urls["repository"]["location"]
def load_metadata_file(self, metadata_file: os.PathLike):
if os.path.exists(metadata_file):
self.metadata = MockMetadata()
self.metadata.minimal_file_scan(metadata_file)
@staticmethod
def get_best_icon_relative_path():
return ""
class MockMacro:
"""Minimal Macro class"""
def __init__(self):
self.name = "MockMacro"
def __init__(self, name="MockMacro"):
self.name = name
self.filename = self.name + ".FCMacro"
self.icon = "" # If set, should just be fake filename, doesn't have to exist
self.xpm = ""
self.code = ""
self.raw_code_url = ""
self.other_files = [] # If set, should be fake names, don't have to exist
self.details_filled_from_file = False
self.details_filled_from_code = False
self.parsed_wiki_page = False
self.on_git = False
self.on_wiki = True
def install(self, location: os.PathLike):
"""Installer function for the mock macro object: creates a file with the src_filename
@@ -62,9 +191,9 @@ class MockMacro:
is not usable and serves only as a placeholder for the existence of the files."""
with open(
os.path.join(location, self.filename),
"w",
encoding="utf-8",
os.path.join(location, self.filename),
"w",
encoding="utf-8",
) as f:
f.write("Test file for macro installation unit tests")
if self.icon:
@@ -72,7 +201,7 @@ class MockMacro:
f.write(b"Fake icon data - nothing to see here\n")
if self.xpm:
with open(
os.path.join(location, "MockMacro_icon.xpm"), "w", encoding="utf-8"
os.path.join(location, "MockMacro_icon.xpm"), "w", encoding="utf-8"
) as f:
f.write(self.xpm)
for name in self.other_files:
@@ -82,3 +211,215 @@ class MockMacro:
with open(os.path.join(location, name), "w", encoding="utf-8") as f:
f.write("# Fake macro data for unit testing\n")
return True, []
def fill_details_from_file(self, _):
"""Tracks that this function was called, but otherwise does nothing"""
self.details_filled_from_file = True
def fill_details_from_code(self, _):
self.details_filled_from_code = True
def parse_wiki_page(self, _):
self.parsed_wiki_page = True
class SignalCatcher:
def __init__(self):
self.caught = False
self.killed = False
self.args = None
def catch_signal(self, *args):
self.caught = True
self.args = args
def die(self):
self.killed = True
class AddonSignalCatcher:
def __init__(self):
self.addons = []
def catch_signal(self, addon):
self.addons.append(addon)
class CallCatcher:
def __init__(self):
self.called = False
self.call_count = 0
self.args = None
def catch_call(self, *args):
self.called = True
self.call_count += 1
self.args = args
class MockGitManager:
"""A mock git manager: does NOT require a git installation. Takes no actions, only records
which functions are called for instrumentation purposes. Can be forced to appear to fail as
needed. Various member variables can be set to emulate necessary return responses."""
def __init__(self):
self.called_methods = []
self.update_available_response = False
self.current_tag_response = "main"
self.current_branch_response = "main"
self.get_remote_response = "No remote set"
self.get_branches_response = ["main"]
self.get_last_committers_response = {
"John Doe": {"email": "jdoe@freecad.org", "count": 1}
}
self.get_last_authors_response = {
"Jane Doe": {"email": "jdoe@freecad.org", "count": 1}
}
self.should_fail = False
self.fail_once = False # Switch back to success after the simulated failure
def _check_for_failure(self):
if self.should_fail:
if self.fail_once:
self.should_fail = False
raise GitFailed("Unit test forced failure")
def clone(self, _remote, _local_path, _args: List[str] = None):
self.called_methods.append("clone")
self._check_for_failure()
def async_clone(self, _remote, _local_path, _progress_monitor, _args: List[str] = None):
self.called_methods.append("async_clone")
self._check_for_failure()
def checkout(self, _local_path, _spec, _args: List[str] = None):
self.called_methods.append("checkout")
self._check_for_failure()
def update(self, _local_path):
self.called_methods.append("update")
self._check_for_failure()
def status(self, _local_path) -> str:
self.called_methods.append("status")
self._check_for_failure()
return "Up-to-date"
def reset(self, _local_path, _args: List[str] = None):
self.called_methods.append("reset")
self._check_for_failure()
def async_fetch_and_update(self, _local_path, _progress_monitor, _args=None):
self.called_methods.append("async_fetch_and_update")
self._check_for_failure()
def update_available(self, _local_path) -> bool:
self.called_methods.append("update_available")
self._check_for_failure()
return self.update_available_response
def current_tag(self, _local_path) -> str:
self.called_methods.append("current_tag")
self._check_for_failure()
return self.current_tag_response
def current_branch(self, _local_path) -> str:
self.called_methods.append("current_branch")
self._check_for_failure()
return self.current_branch_response
def repair(self, _remote, _local_path):
self.called_methods.append("repair")
self._check_for_failure()
def get_remote(self, _local_path) -> str:
self.called_methods.append("get_remote")
self._check_for_failure()
return self.get_remote_response
def get_branches(self, _local_path) -> List[str]:
self.called_methods.append("get_branches")
self._check_for_failure()
return self.get_branches_response
def get_last_committers(self, _local_path, _n=10):
self.called_methods.append("get_last_committers")
self._check_for_failure()
return self.get_last_committers_response
def get_last_authors(self, _local_path, _n=10):
self.called_methods.append("get_last_authors")
self._check_for_failure()
return self.get_last_authors_response
class MockSignal:
"""A purely synchronous signal, instrumented and intended only for use in unit testing.
emit() is semi-functional, but does not use queued slots so cannot be used across
threads."""
def __init__(self, *args):
self.expected_types = args
self.connections = []
self.disconnections = []
self.emitted = False
def connect(self, func):
self.connections.append(func)
def disconnect(self, func):
if func in self.connections:
self.connections.remove(func)
self.disconnections.append(func)
def emit(self, *args):
self.emitted = True
for connection in self.connections:
connection(args)
class MockNetworkManager:
"""Instrumented mock for the NetworkManager. Does no network access, is not asynchronous, and
does not require a running event loop. No submitted requests ever complete."""
def __init__(self):
self.urls = []
self.aborted = []
self.data = MockByteArray()
self.called_methods = []
self.completed = MockSignal(int, int, MockByteArray)
self.progress_made = MockSignal(int, int, int)
self.progress_complete = MockSignal(int, int, os.PathLike)
def submit_unmonitored_get(self, url: str) -> int:
self.urls.append(url)
self.called_methods.append("submit_unmonitored_get")
return len(self.urls) - 1
def submit_monitored_get(self, url: str) -> int:
self.urls.append(url)
self.called_methods.append("submit_monitored_get")
return len(self.urls) - 1
def blocking_get(self, url: str):
self.urls.append(url)
self.called_methods.append("blocking_get")
return self.data
def abort_all(self):
self.called_methods.append("abort_all")
for url in self.urls:
self.aborted.append(url)
def abort(self, index: int):
self.called_methods.append("abort")
self.aborted.append(self.urls[index])
class MockByteArray:
def __init__(self, data_to_wrap="data".encode("utf-8")):
self.wrapped = data_to_wrap
def data(self) -> bytes:
return self.wrapped

View File

@@ -1,6 +1,6 @@
# ***************************************************************************
# * *
# * Copyright (c) 2022 FreeCAD Project Association *
# * Copyright (c) 2022-2023 FreeCAD Project Association *
# * *
# * This file is part of FreeCAD. *
# * *
@@ -22,10 +22,12 @@
from PySide import QtCore, QtWidgets
from AddonManagerTest.app.mocks import SignalCatcher
class DialogInteractor(QtCore.QObject):
"""Takes the title of the dialog, a button string, and a callable. The callable is passed
the widget we found and can do whatever it wants to it. Whatever it does should eventually
"""Takes the title of the dialog and a callable. The callable is passed the widget
we found and can do whatever it wants to it. Whatever it does should eventually
close the dialog, however."""
def __init__(self, dialog_to_watch_for, interaction):
@@ -53,21 +55,6 @@ class DialogInteractor(QtCore.QObject):
self.dialog_found = True
self.timer.stop()
if self.execution_counter > 25 and not self.dialog_found:
# OK, it wasn't the active modal widget... was it some other window, and never became
# active? That's an error, but we should get it closed anyway.
windows = QtWidgets.QApplication.topLevelWidgets()
for widget in windows:
if (
hasattr(widget, "windowTitle")
and callable(widget.windowTitle)
and widget.windowTitle() == self.dialog_to_watch_for
):
if self.interaction is not None and callable(self.interaction):
self.interaction(widget)
self.timer.stop()
print("Found a window with the expected title, but it was not the active modal dialog.")
self.has_run = True
self.execution_counter += 1
if self.execution_counter > 100:
@@ -93,6 +80,7 @@ class DialogWatcher(DialogInteractor):
a one-shot QTimer to allow the dialog time to open up. If the specified dialog is found, but
it does not contain the expected button, button_found will be false, and the dialog will be
closed with a reject() slot."""
def __init__(self, dialog_to_watch_for, button=QtWidgets.QDialogButtonBox.NoButton):
super().__init__(dialog_to_watch_for, self.click_button)
if button != QtWidgets.QDialogButtonBox.NoButton:
@@ -132,3 +120,25 @@ class MockThread:
def isRunning(self):
return False
class AsynchronousMonitor:
"""Watch for a signal to be emitted for at most some given number of milliseconds"""
def __init__(self, signal):
self.signal = signal
self.signal_catcher = SignalCatcher()
self.signal.connect(self.signal_catcher.catch_signal)
self.kill_timer = QtCore.QTimer()
self.kill_timer.setSingleShot(True)
self.kill_timer.timeout.connect(self.signal_catcher.die)
def wait_for_at_most(self, max_wait_millis) -> None:
self.kill_timer.setInterval(max_wait_millis)
self.kill_timer.start()
while not self.signal_catcher.caught and not self.signal_catcher.killed:
QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.AllEvents, 10)
self.kill_timer.stop()
def good(self) -> bool:
return self.signal_catcher.caught and not self.signal_catcher.killed