From 4fc6c80352fda8facba570d1ea8caa43011fa20f Mon Sep 17 00:00:00 2001 From: git_admin Date: Mon, 27 Apr 2026 08:06:21 +0000 Subject: [PATCH] Tower: upload cetmix_tower_server_queue 16.0.2.0.0 (via marketplace) --- .../tests/test_command.py | 145 ++++++++++++++++++ 1 file changed, 145 insertions(+) create mode 100644 addons/cetmix_tower_server_queue/tests/test_command.py diff --git a/addons/cetmix_tower_server_queue/tests/test_command.py b/addons/cetmix_tower_server_queue/tests/test_command.py new file mode 100644 index 0000000..2f043d9 --- /dev/null +++ b/addons/cetmix_tower_server_queue/tests/test_command.py @@ -0,0 +1,145 @@ +from datetime import timedelta +from unittest.mock import patch + +from odoo.fields import Datetime +from odoo.tools import mute_logger + +from odoo.addons.cetmix_tower_server.tests.common import TestTowerCommon + + +class TestTowerCommand(TestTowerCommon): + """Test suite for verifying zombie command detection and related + queue job cancellation. + + Tests in this class verify that commands which have been running + longer than the timeout are properly detected as zombies, and their + associated queue jobs are cancelled. + """ + + @classmethod + def setUpClass(cls): + super().setUpClass() + # Set command timeout to 10 seconds + cls.env["ir.config_parameter"].sudo().set_param( + "cetmix_tower_server.command_timeout", "10" + ) + # Set old time to 20 seconds ago (older than timeout) + # to simulate running command in past + now = Datetime.now() + cls.old_time = now - timedelta(seconds=20) + + def _patch_command_runner(self, command_type, runner_method): + """Helper to patch a command runner to simulate a zombie command. + + Args: + command_type: Type of command runner to patch ('ssh' or 'python_code') + runner_method: Original method to wrap + + Returns: + A context manager that applies the patch + """ + + def _wrapper(*args, **kwargs): + # Modify args to disable log record finishing + args = list(args) + if len(args) > 1: + args[1] = False # Set log_record to False + return runner_method(*args, **kwargs) + + return patch.object( + self.registry["cx.tower.server"], + f"_command_runner_{command_type}", + _wrapper, + ) + + def _verify_zombie_command_job_cancellation(self, command_action): + """Verify zombie command is detected and job is cancelled. + + Args: + command_action: Action type ('ssh_command' or 'python_code') + """ + # check zombie command logs + domain = [ + ("is_running", "=", True), + ("start_date", "=", self.old_time), + ("command_action", "=", command_action), + ] + zombie_command_logs = self.env["cx.tower.command.log"].search(domain) + + self.assertEqual( + len(zombie_command_logs), 1, "Zombie command log should be created" + ) + self.assertTrue( + zombie_command_logs.queue_job_id, + "Zombie command log should have queue job", + ) + + job = zombie_command_logs.queue_job_id + self.assertTrue(job.exists(), "Zombie command job should exist") + + self.assertEqual(job.state, "pending", "Zombie command job should be pending") + + # run process to kill zombie command + self.server_test_1._check_zombie_commands() + + # check that command log is cancelled + self.assertEqual( + job.state, "cancelled", "Zombie command job should be cancelled" + ) + + def test_check_zombie_ssh_command_queue(self): + """ + Test that zombie ssh command is killed and job is cancelled + """ + # Create test commands + ssh_command = self.Command.create( + { + "name": "Test SSH Command", + "code": "ls -la", + "action": "ssh_command", + } + ) + + # patch command runner to not finish log record + cx_tower_server_obj = self.registry["cx.tower.server"] + _command_runner_ssh_super = cx_tower_server_obj._command_runner_ssh + + with self._patch_command_runner("ssh", _command_runner_ssh_super): + # run zombie command with log creation in past + self.server_test_1.run_command( + ssh_command, log={"start_date": self.old_time} + ) + + # check zombie command logs + self._verify_zombie_command_job_cancellation("ssh_command") + + @mute_logger("py.warnings") + def test_check_zombie_python_command_queue(self): + """ + Test that zombie python command is killed and job is cancelled + """ + # Create test commands + python_command = self.Command.create( + { + "name": "Test Python Command", + "code": "print('test')", + "action": "python_code", + } + ) + + # patch command runner to not finish log record + cx_tower_server_obj = self.registry["cx.tower.server"] + _command_runner_python_code_super = ( + cx_tower_server_obj._command_runner_python_code + ) + + with self._patch_command_runner( + "python_code", _command_runner_python_code_super + ): + # run zombie command with log creation in past + self.server_test_1.run_command( + python_command, log={"start_date": self.old_time} + ) + + # check zombie command logs + self._verify_zombie_command_job_cancellation("python_code")