Tower: upload cetmix_tower_server_queue 16.0.2.0.0 (via marketplace)
This commit is contained in:
145
addons/cetmix_tower_server_queue/tests/test_command.py
Normal file
145
addons/cetmix_tower_server_queue/tests/test_command.py
Normal file
@@ -0,0 +1,145 @@
|
||||
from datetime import timedelta
|
||||
from unittest.mock import patch
|
||||
|
||||
from odoo.fields import Datetime
|
||||
from odoo.tools import mute_logger
|
||||
|
||||
from odoo.addons.cetmix_tower_server.tests.common import TestTowerCommon
|
||||
|
||||
|
||||
class TestTowerCommand(TestTowerCommon):
|
||||
"""Test suite for verifying zombie command detection and related
|
||||
queue job cancellation.
|
||||
|
||||
Tests in this class verify that commands which have been running
|
||||
longer than the timeout are properly detected as zombies, and their
|
||||
associated queue jobs are cancelled.
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super().setUpClass()
|
||||
# Set command timeout to 10 seconds
|
||||
cls.env["ir.config_parameter"].sudo().set_param(
|
||||
"cetmix_tower_server.command_timeout", "10"
|
||||
)
|
||||
# Set old time to 20 seconds ago (older than timeout)
|
||||
# to simulate running command in past
|
||||
now = Datetime.now()
|
||||
cls.old_time = now - timedelta(seconds=20)
|
||||
|
||||
def _patch_command_runner(self, command_type, runner_method):
|
||||
"""Helper to patch a command runner to simulate a zombie command.
|
||||
|
||||
Args:
|
||||
command_type: Type of command runner to patch ('ssh' or 'python_code')
|
||||
runner_method: Original method to wrap
|
||||
|
||||
Returns:
|
||||
A context manager that applies the patch
|
||||
"""
|
||||
|
||||
def _wrapper(*args, **kwargs):
|
||||
# Modify args to disable log record finishing
|
||||
args = list(args)
|
||||
if len(args) > 1:
|
||||
args[1] = False # Set log_record to False
|
||||
return runner_method(*args, **kwargs)
|
||||
|
||||
return patch.object(
|
||||
self.registry["cx.tower.server"],
|
||||
f"_command_runner_{command_type}",
|
||||
_wrapper,
|
||||
)
|
||||
|
||||
def _verify_zombie_command_job_cancellation(self, command_action):
|
||||
"""Verify zombie command is detected and job is cancelled.
|
||||
|
||||
Args:
|
||||
command_action: Action type ('ssh_command' or 'python_code')
|
||||
"""
|
||||
# check zombie command logs
|
||||
domain = [
|
||||
("is_running", "=", True),
|
||||
("start_date", "=", self.old_time),
|
||||
("command_action", "=", command_action),
|
||||
]
|
||||
zombie_command_logs = self.env["cx.tower.command.log"].search(domain)
|
||||
|
||||
self.assertEqual(
|
||||
len(zombie_command_logs), 1, "Zombie command log should be created"
|
||||
)
|
||||
self.assertTrue(
|
||||
zombie_command_logs.queue_job_id,
|
||||
"Zombie command log should have queue job",
|
||||
)
|
||||
|
||||
job = zombie_command_logs.queue_job_id
|
||||
self.assertTrue(job.exists(), "Zombie command job should exist")
|
||||
|
||||
self.assertEqual(job.state, "pending", "Zombie command job should be pending")
|
||||
|
||||
# run process to kill zombie command
|
||||
self.server_test_1._check_zombie_commands()
|
||||
|
||||
# check that command log is cancelled
|
||||
self.assertEqual(
|
||||
job.state, "cancelled", "Zombie command job should be cancelled"
|
||||
)
|
||||
|
||||
def test_check_zombie_ssh_command_queue(self):
|
||||
"""
|
||||
Test that zombie ssh command is killed and job is cancelled
|
||||
"""
|
||||
# Create test commands
|
||||
ssh_command = self.Command.create(
|
||||
{
|
||||
"name": "Test SSH Command",
|
||||
"code": "ls -la",
|
||||
"action": "ssh_command",
|
||||
}
|
||||
)
|
||||
|
||||
# patch command runner to not finish log record
|
||||
cx_tower_server_obj = self.registry["cx.tower.server"]
|
||||
_command_runner_ssh_super = cx_tower_server_obj._command_runner_ssh
|
||||
|
||||
with self._patch_command_runner("ssh", _command_runner_ssh_super):
|
||||
# run zombie command with log creation in past
|
||||
self.server_test_1.run_command(
|
||||
ssh_command, log={"start_date": self.old_time}
|
||||
)
|
||||
|
||||
# check zombie command logs
|
||||
self._verify_zombie_command_job_cancellation("ssh_command")
|
||||
|
||||
@mute_logger("py.warnings")
|
||||
def test_check_zombie_python_command_queue(self):
|
||||
"""
|
||||
Test that zombie python command is killed and job is cancelled
|
||||
"""
|
||||
# Create test commands
|
||||
python_command = self.Command.create(
|
||||
{
|
||||
"name": "Test Python Command",
|
||||
"code": "print('test')",
|
||||
"action": "python_code",
|
||||
}
|
||||
)
|
||||
|
||||
# patch command runner to not finish log record
|
||||
cx_tower_server_obj = self.registry["cx.tower.server"]
|
||||
_command_runner_python_code_super = (
|
||||
cx_tower_server_obj._command_runner_python_code
|
||||
)
|
||||
|
||||
with self._patch_command_runner(
|
||||
"python_code", _command_runner_python_code_super
|
||||
):
|
||||
# run zombie command with log creation in past
|
||||
self.server_test_1.run_command(
|
||||
python_command, log={"start_date": self.old_time}
|
||||
)
|
||||
|
||||
# check zombie command logs
|
||||
self._verify_zombie_command_job_cancellation("python_code")
|
||||
Reference in New Issue
Block a user