Addon Manager: Correct run_interruptable_subprocess
communicate() has to be called after a final kill() to get the output
This commit is contained in:
committed by
Yorik van Havre
parent
e607b5757e
commit
6254cb910b
@@ -22,16 +22,26 @@
|
||||
# ***************************************************************************
|
||||
|
||||
import unittest
|
||||
from unittest.mock import MagicMock, patch
|
||||
import os
|
||||
import FreeCAD
|
||||
import sys
|
||||
import subprocess
|
||||
|
||||
from Addon import Addon
|
||||
try:
|
||||
import FreeCAD
|
||||
except ImportError:
|
||||
FreeCAD = None
|
||||
|
||||
sys.path.append("../..")
|
||||
|
||||
from AddonManagerTest.app.mocks import MockAddon as Addon
|
||||
|
||||
from addonmanager_utilities import (
|
||||
recognized_git_location,
|
||||
get_readme_url,
|
||||
get_assigned_string_literal,
|
||||
get_macro_version_from_file,
|
||||
run_interruptable_subprocess,
|
||||
)
|
||||
|
||||
|
||||
@@ -40,9 +50,7 @@ class TestUtilities(unittest.TestCase):
|
||||
MODULE = "test_utilities" # file name without extension
|
||||
|
||||
def setUp(self):
|
||||
self.test_dir = os.path.join(
|
||||
FreeCAD.getHomePath(), "Mod", "AddonManager", "AddonManagerTest", "data"
|
||||
)
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
@@ -59,7 +67,7 @@ class TestUtilities(unittest.TestCase):
|
||||
"https://salsa.debian.org/science-team/freecad",
|
||||
]
|
||||
for url in recognized_urls:
|
||||
repo = Addon("Test Repo", url, Addon.Status.NOT_INSTALLED, "branch")
|
||||
repo = Addon("Test Repo", url, "Addon.Status.NOT_INSTALLED", "branch")
|
||||
self.assertTrue(recognized_git_location(repo), f"{url} was unexpectedly not recognized")
|
||||
|
||||
unrecognized_urls = [
|
||||
@@ -69,7 +77,7 @@ class TestUtilities(unittest.TestCase):
|
||||
"https://github.com.malware.com/",
|
||||
]
|
||||
for url in unrecognized_urls:
|
||||
repo = Addon("Test Repo", url, Addon.Status.NOT_INSTALLED, "branch")
|
||||
repo = Addon("Test Repo", url, "Addon.Status.NOT_INSTALLED", "branch")
|
||||
self.assertFalse(recognized_git_location(repo), f"{url} was unexpectedly recognized")
|
||||
|
||||
def test_get_readme_url(self):
|
||||
@@ -90,14 +98,14 @@ class TestUtilities(unittest.TestCase):
|
||||
for url in github_urls:
|
||||
branch = "branchname"
|
||||
expected_result = f"{url}/raw/{branch}/README.md"
|
||||
repo = Addon("Test Repo", url, Addon.Status.NOT_INSTALLED, branch)
|
||||
repo = Addon("Test Repo", url, "Addon.Status.NOT_INSTALLED", branch)
|
||||
actual_result = get_readme_url(repo)
|
||||
self.assertEqual(actual_result, expected_result)
|
||||
|
||||
for url in gitlab_urls:
|
||||
branch = "branchname"
|
||||
expected_result = f"{url}/-/raw/{branch}/README.md"
|
||||
repo = Addon("Test Repo", url, Addon.Status.NOT_INSTALLED, branch)
|
||||
repo = Addon("Test Repo", url, "Addon.Status.NOT_INSTALLED", branch)
|
||||
actual_result = get_readme_url(repo)
|
||||
self.assertEqual(actual_result, expected_result)
|
||||
|
||||
@@ -125,14 +133,88 @@ class TestUtilities(unittest.TestCase):
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_get_macro_version_from_file(self):
|
||||
good_file = os.path.join(self.test_dir, "good_macro_metadata.FCStd")
|
||||
version = get_macro_version_from_file(good_file)
|
||||
self.assertEqual(version, "1.2.3")
|
||||
if FreeCAD:
|
||||
test_dir = os.path.join(
|
||||
FreeCAD.getHomePath(), "Mod", "AddonManager", "AddonManagerTest", "data"
|
||||
)
|
||||
good_file = os.path.join(test_dir, "good_macro_metadata.FCStd")
|
||||
version = get_macro_version_from_file(good_file)
|
||||
self.assertEqual(version, "1.2.3")
|
||||
|
||||
bad_file = os.path.join(self.test_dir, "bad_macro_metadata.FCStd")
|
||||
version = get_macro_version_from_file(bad_file)
|
||||
self.assertEqual(version, "", "Bad version did not yield empty string")
|
||||
bad_file = os.path.join(test_dir, "bad_macro_metadata.FCStd")
|
||||
version = get_macro_version_from_file(bad_file)
|
||||
self.assertEqual(version, "", "Bad version did not yield empty string")
|
||||
|
||||
empty_file = os.path.join(self.test_dir, "missing_macro_metadata.FCStd")
|
||||
version = get_macro_version_from_file(empty_file)
|
||||
self.assertEqual(version, "", "Missing version did not yield empty string")
|
||||
empty_file = os.path.join(test_dir, "missing_macro_metadata.FCStd")
|
||||
version = get_macro_version_from_file(empty_file)
|
||||
self.assertEqual(version, "", "Missing version did not yield empty string")
|
||||
|
||||
@patch("subprocess.Popen")
|
||||
def test_run_interruptable_subprocess_success_instant_return(self, mock_popen):
|
||||
mock_process = MagicMock()
|
||||
mock_process.communicate.return_value = ("Mocked stdout", "Mocked stderr")
|
||||
mock_process.returncode = 0
|
||||
mock_popen.return_value = mock_process
|
||||
|
||||
completed_process = run_interruptable_subprocess(["arg0", "arg1"])
|
||||
|
||||
self.assertEqual(completed_process.returncode, 0)
|
||||
self.assertEqual(completed_process.stdout, "Mocked stdout")
|
||||
self.assertEqual(completed_process.stderr, "Mocked stderr")
|
||||
|
||||
@patch("subprocess.Popen")
|
||||
def test_run_interruptable_subprocess_returns_nonzero(self, mock_popen):
|
||||
mock_process = MagicMock()
|
||||
mock_process.communicate.return_value = ("Mocked stdout", "Mocked stderr")
|
||||
mock_process.returncode = 1
|
||||
mock_popen.return_value = mock_process
|
||||
|
||||
with self.assertRaises(subprocess.CalledProcessError):
|
||||
run_interruptable_subprocess(["arg0", "arg1"])
|
||||
|
||||
@patch("subprocess.Popen")
|
||||
def test_run_interruptable_subprocess_timeout_five_times(self, mock_popen):
|
||||
"""Five times is below the limit for an error to be raised"""
|
||||
|
||||
def raises_first_five_times(timeout):
|
||||
raises_first_five_times.counter += 1
|
||||
if raises_first_five_times.counter <= 5:
|
||||
raise subprocess.TimeoutExpired("Test", timeout)
|
||||
return "Mocked stdout", None
|
||||
|
||||
raises_first_five_times.counter = 0
|
||||
|
||||
mock_process = MagicMock()
|
||||
mock_process.communicate = raises_first_five_times
|
||||
mock_process.returncode = 0
|
||||
mock_popen.return_value = mock_process
|
||||
|
||||
result = run_interruptable_subprocess(["arg0", "arg1"], 10)
|
||||
|
||||
self.assertEqual(result.returncode, 0)
|
||||
|
||||
@patch("subprocess.Popen")
|
||||
def test_run_interruptable_subprocess_timeout_ten_times(self, mock_popen):
|
||||
"""Ten times is the limit for an error to be raised (e.g. the real timeout is ten seconds)"""
|
||||
|
||||
def raises_first_ten_times(timeout=0):
|
||||
raises_first_ten_times.counter += 1
|
||||
if not raises_first_ten_times.mock_access.kill.called:
|
||||
if raises_first_ten_times.counter <= 10:
|
||||
raise subprocess.TimeoutExpired("Test", timeout)
|
||||
return "Mocked stdout", "Mocked stderr"
|
||||
|
||||
raises_first_ten_times.counter = 0
|
||||
|
||||
mock_process = MagicMock()
|
||||
mock_process.communicate = raises_first_ten_times
|
||||
raises_first_ten_times.mock_access = mock_process
|
||||
mock_process.returncode = None
|
||||
mock_popen.return_value = mock_process
|
||||
|
||||
with self.assertRaises(subprocess.CalledProcessError):
|
||||
run_interruptable_subprocess(["arg0", "arg1"], 10)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@@ -38,9 +38,10 @@ from urllib.parse import urlparse
|
||||
try:
|
||||
from PySide import QtCore, QtGui, QtWidgets
|
||||
except ImportError:
|
||||
QtCore = None
|
||||
QtWidgets = None
|
||||
QtGui = None
|
||||
try:
|
||||
from PySide6 import QtCore, QtGui, QtWidgets
|
||||
except ImportError:
|
||||
from PySide2 import QtCore, QtGui, QtWidgets
|
||||
|
||||
import addonmanager_freecad_interface as fci
|
||||
|
||||
@@ -397,7 +398,7 @@ def blocking_get(url: str, method=None) -> bytes:
|
||||
return p
|
||||
|
||||
|
||||
def run_interruptable_subprocess(args) -> subprocess.CompletedProcess:
|
||||
def run_interruptable_subprocess(args, timeout_secs: int = 10) -> subprocess.CompletedProcess:
|
||||
"""Wrap subprocess call so it can be interrupted gracefully."""
|
||||
creation_flags = 0
|
||||
if hasattr(subprocess, "CREATE_NO_WINDOW"):
|
||||
@@ -417,14 +418,25 @@ def run_interruptable_subprocess(args) -> subprocess.CompletedProcess:
|
||||
stdout = ""
|
||||
stderr = ""
|
||||
return_code = None
|
||||
counter = 0
|
||||
while return_code is None:
|
||||
counter += 1
|
||||
try:
|
||||
stdout, stderr = p.communicate(timeout=10)
|
||||
stdout, stderr = p.communicate(
|
||||
timeout=1
|
||||
) # one second timeout allows interrupting the run once per second
|
||||
return_code = p.returncode
|
||||
except subprocess.TimeoutExpired:
|
||||
if QtCore.QThread.currentThread().isInterruptionRequested():
|
||||
if (
|
||||
hasattr(QtCore, "QThread")
|
||||
and QtCore.QThread.currentThread().isInterruptionRequested()
|
||||
):
|
||||
p.kill()
|
||||
raise ProcessInterrupted()
|
||||
if counter >= timeout_secs: # The real timeout
|
||||
p.kill()
|
||||
stdout, stderr = p.communicate()
|
||||
return_code = -1
|
||||
if return_code is None or return_code != 0:
|
||||
raise subprocess.CalledProcessError(
|
||||
return_code if return_code is not None else -1, args, stdout, stderr
|
||||
|
||||
Reference in New Issue
Block a user