Addon Manager: Correct run_interruptable_subprocess
communicate() has to be called after a final kill() to get the output
This commit is contained in:
committed by
Yorik van Havre
parent
3969fc56db
commit
795af3f8d0
@@ -22,16 +22,26 @@
|
||||
# ***************************************************************************
|
||||
|
||||
import unittest
|
||||
from unittest.mock import MagicMock, patch
|
||||
import os
|
||||
import FreeCAD
|
||||
import sys
|
||||
import subprocess
|
||||
|
||||
from Addon import Addon
|
||||
try:
|
||||
import FreeCAD
|
||||
except ImportError:
|
||||
FreeCAD = None
|
||||
|
||||
sys.path.append("../..")
|
||||
|
||||
from AddonManagerTest.app.mocks import MockAddon as Addon
|
||||
|
||||
from addonmanager_utilities import (
|
||||
recognized_git_location,
|
||||
get_readme_url,
|
||||
get_assigned_string_literal,
|
||||
get_macro_version_from_file,
|
||||
run_interruptable_subprocess,
|
||||
)
|
||||
|
||||
|
||||
@@ -40,9 +50,7 @@ class TestUtilities(unittest.TestCase):
|
||||
MODULE = "test_utilities" # file name without extension
|
||||
|
||||
def setUp(self):
|
||||
self.test_dir = os.path.join(
|
||||
FreeCAD.getHomePath(), "Mod", "AddonManager", "AddonManagerTest", "data"
|
||||
)
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
@@ -59,7 +67,7 @@ class TestUtilities(unittest.TestCase):
|
||||
"https://salsa.debian.org/science-team/freecad",
|
||||
]
|
||||
for url in recognized_urls:
|
||||
repo = Addon("Test Repo", url, Addon.Status.NOT_INSTALLED, "branch")
|
||||
repo = Addon("Test Repo", url, "Addon.Status.NOT_INSTALLED", "branch")
|
||||
self.assertTrue(recognized_git_location(repo), f"{url} was unexpectedly not recognized")
|
||||
|
||||
unrecognized_urls = [
|
||||
@@ -69,7 +77,7 @@ class TestUtilities(unittest.TestCase):
|
||||
"https://github.com.malware.com/",
|
||||
]
|
||||
for url in unrecognized_urls:
|
||||
repo = Addon("Test Repo", url, Addon.Status.NOT_INSTALLED, "branch")
|
||||
repo = Addon("Test Repo", url, "Addon.Status.NOT_INSTALLED", "branch")
|
||||
self.assertFalse(recognized_git_location(repo), f"{url} was unexpectedly recognized")
|
||||
|
||||
def test_get_readme_url(self):
|
||||
@@ -90,14 +98,14 @@ class TestUtilities(unittest.TestCase):
|
||||
for url in github_urls:
|
||||
branch = "branchname"
|
||||
expected_result = f"{url}/raw/{branch}/README.md"
|
||||
repo = Addon("Test Repo", url, Addon.Status.NOT_INSTALLED, branch)
|
||||
repo = Addon("Test Repo", url, "Addon.Status.NOT_INSTALLED", branch)
|
||||
actual_result = get_readme_url(repo)
|
||||
self.assertEqual(actual_result, expected_result)
|
||||
|
||||
for url in gitlab_urls:
|
||||
branch = "branchname"
|
||||
expected_result = f"{url}/-/raw/{branch}/README.md"
|
||||
repo = Addon("Test Repo", url, Addon.Status.NOT_INSTALLED, branch)
|
||||
repo = Addon("Test Repo", url, "Addon.Status.NOT_INSTALLED", branch)
|
||||
actual_result = get_readme_url(repo)
|
||||
self.assertEqual(actual_result, expected_result)
|
||||
|
||||
@@ -125,14 +133,88 @@ class TestUtilities(unittest.TestCase):
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_get_macro_version_from_file(self):
|
||||
good_file = os.path.join(self.test_dir, "good_macro_metadata.FCStd")
|
||||
version = get_macro_version_from_file(good_file)
|
||||
self.assertEqual(version, "1.2.3")
|
||||
if FreeCAD:
|
||||
test_dir = os.path.join(
|
||||
FreeCAD.getHomePath(), "Mod", "AddonManager", "AddonManagerTest", "data"
|
||||
)
|
||||
good_file = os.path.join(test_dir, "good_macro_metadata.FCStd")
|
||||
version = get_macro_version_from_file(good_file)
|
||||
self.assertEqual(version, "1.2.3")
|
||||
|
||||
bad_file = os.path.join(self.test_dir, "bad_macro_metadata.FCStd")
|
||||
version = get_macro_version_from_file(bad_file)
|
||||
self.assertEqual(version, "", "Bad version did not yield empty string")
|
||||
bad_file = os.path.join(test_dir, "bad_macro_metadata.FCStd")
|
||||
version = get_macro_version_from_file(bad_file)
|
||||
self.assertEqual(version, "", "Bad version did not yield empty string")
|
||||
|
||||
empty_file = os.path.join(self.test_dir, "missing_macro_metadata.FCStd")
|
||||
version = get_macro_version_from_file(empty_file)
|
||||
self.assertEqual(version, "", "Missing version did not yield empty string")
|
||||
empty_file = os.path.join(test_dir, "missing_macro_metadata.FCStd")
|
||||
version = get_macro_version_from_file(empty_file)
|
||||
self.assertEqual(version, "", "Missing version did not yield empty string")
|
||||
|
||||
@patch("subprocess.Popen")
|
||||
def test_run_interruptable_subprocess_success_instant_return(self, mock_popen):
|
||||
mock_process = MagicMock()
|
||||
mock_process.communicate.return_value = ("Mocked stdout", "Mocked stderr")
|
||||
mock_process.returncode = 0
|
||||
mock_popen.return_value = mock_process
|
||||
|
||||
completed_process = run_interruptable_subprocess(["arg0", "arg1"])
|
||||
|
||||
self.assertEqual(completed_process.returncode, 0)
|
||||
self.assertEqual(completed_process.stdout, "Mocked stdout")
|
||||
self.assertEqual(completed_process.stderr, "Mocked stderr")
|
||||
|
||||
@patch("subprocess.Popen")
|
||||
def test_run_interruptable_subprocess_returns_nonzero(self, mock_popen):
|
||||
mock_process = MagicMock()
|
||||
mock_process.communicate.return_value = ("Mocked stdout", "Mocked stderr")
|
||||
mock_process.returncode = 1
|
||||
mock_popen.return_value = mock_process
|
||||
|
||||
with self.assertRaises(subprocess.CalledProcessError):
|
||||
run_interruptable_subprocess(["arg0", "arg1"])
|
||||
|
||||
@patch("subprocess.Popen")
|
||||
def test_run_interruptable_subprocess_timeout_five_times(self, mock_popen):
|
||||
"""Five times is below the limit for an error to be raised"""
|
||||
|
||||
def raises_first_five_times(timeout):
|
||||
raises_first_five_times.counter += 1
|
||||
if raises_first_five_times.counter <= 5:
|
||||
raise subprocess.TimeoutExpired("Test", timeout)
|
||||
return "Mocked stdout", None
|
||||
|
||||
raises_first_five_times.counter = 0
|
||||
|
||||
mock_process = MagicMock()
|
||||
mock_process.communicate = raises_first_five_times
|
||||
mock_process.returncode = 0
|
||||
mock_popen.return_value = mock_process
|
||||
|
||||
result = run_interruptable_subprocess(["arg0", "arg1"], 10)
|
||||
|
||||
self.assertEqual(result.returncode, 0)
|
||||
|
||||
@patch("subprocess.Popen")
|
||||
def test_run_interruptable_subprocess_timeout_ten_times(self, mock_popen):
|
||||
"""Ten times is the limit for an error to be raised (e.g. the real timeout is ten seconds)"""
|
||||
|
||||
def raises_first_ten_times(timeout=0):
|
||||
raises_first_ten_times.counter += 1
|
||||
if not raises_first_ten_times.mock_access.kill.called:
|
||||
if raises_first_ten_times.counter <= 10:
|
||||
raise subprocess.TimeoutExpired("Test", timeout)
|
||||
return "Mocked stdout", "Mocked stderr"
|
||||
|
||||
raises_first_ten_times.counter = 0
|
||||
|
||||
mock_process = MagicMock()
|
||||
mock_process.communicate = raises_first_ten_times
|
||||
raises_first_ten_times.mock_access = mock_process
|
||||
mock_process.returncode = None
|
||||
mock_popen.return_value = mock_process
|
||||
|
||||
with self.assertRaises(subprocess.CalledProcessError):
|
||||
run_interruptable_subprocess(["arg0", "arg1"], 10)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user