From 495bb536f17ab0e8c2219e5026fe0842e3620398 Mon Sep 17 00:00:00 2001 From: git_admin Date: Mon, 27 Apr 2026 08:43:30 +0000 Subject: [PATCH] Tower: upload cetmix_tower_server 16.0.2.2.9 (via marketplace) --- .../tests/test_scheduled_task.py | 288 ++++++++++++++++++ 1 file changed, 288 insertions(+) create mode 100644 addons/cetmix_tower_server/tests/test_scheduled_task.py diff --git a/addons/cetmix_tower_server/tests/test_scheduled_task.py b/addons/cetmix_tower_server/tests/test_scheduled_task.py new file mode 100644 index 0000000..88ba407 --- /dev/null +++ b/addons/cetmix_tower_server/tests/test_scheduled_task.py @@ -0,0 +1,288 @@ +# Copyright (C) 2025 Cetmix OÜ +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). +from odoo import fields +from odoo.exceptions import AccessError + +from .common import TestTowerCommon + + +class TestCxTowerScheduledTask(TestTowerCommon): + """Test the cx.tower.scheduled.task model.""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + + # Create an additional server for multi-server command test + cls.server_test_2 = cls.Server.create( + { + "name": "Test 2", + "ip_v4_address": "localhost", + "ssh_username": "admin", + "ssh_password": "password", + "ssh_auth_mode": "p", + "host_key": "test_key", + "os_id": cls.os_debian_10.id, + } + ) + + # Scheduled task: command (multi-server) + cls.command_scheduled_task = cls.ScheduledTask.create( + { + "name": "Test Command Scheduled Task", + "action": "command", + "command_id": cls.command_list_dir.id, + "interval_number": 1, + "interval_type": "days", + "next_call": fields.Datetime.now(), + "server_ids": [(6, 0, [cls.server_test_1.id, cls.server_test_2.id])], + } + ) + + # Scheduled task: plan (single server) + cls.plan_scheduled_task = cls.ScheduledTask.create( + { + "name": "Test Plan Scheduled Task", + "action": "plan", + "plan_id": cls.plan_1.id, + "interval_number": 1, + "interval_type": "days", + "next_call": fields.Datetime.now(), + "server_ids": [(6, 0, [cls.server_test_1.id])], + } + ) + + # Custom variable for task (option type) + cls.variable_odoo_versions = cls.Variable.create( + { + "name": "odoo_versions", + "variable_type": "o", + } + ) + cls.variable_option_16_0 = cls.VariableOption.create( + { + "name": "16.0", + "value_char": "16.0", + "variable_id": cls.variable_odoo_versions.id, + } + ) + + # Add custom variables to tasks + cls.scheduled_task_cv_os = cls.ScheduledTaskCv.create( + { + "scheduled_task_id": cls.command_scheduled_task.id, + "variable_id": cls.variable_os.id, + "value_char": "Windows 2k", + } + ) + cls.scheduled_task_cv_version = cls.ScheduledTaskCv.create( + { + "scheduled_task_id": cls.command_scheduled_task.id, + "variable_id": cls.variable_odoo_versions.id, + "option_id": cls.variable_option_16_0.id, + } + ) + cls.scheduled_task_cv_version_plan = cls.ScheduledTaskCv.create( + { + "scheduled_task_id": cls.plan_scheduled_task.id, + "variable_id": cls.variable_odoo_versions.id, + "option_id": cls.variable_option_16_0.id, + } + ) + + def _assert_log_records(self, log_model, scheduled_task, expected_count): + """Helper: Assert that log records exist for the task""" + logs = log_model.search([("scheduled_task_id", "=", scheduled_task.id)]) + self.assertTrue(logs, f"{log_model._name} logs should be created after run.") + self.assertEqual( + len(logs), + expected_count, + f"Expected {expected_count} logs for {scheduled_task.display_name}, " + f"got {len(logs)}.", + ) + + def _assert_next_and_last_call_changed( + self, task, last_call_before, next_call_before + ): + """Helper: Assert next_call and last_call changed after run""" + task.invalidate_recordset() + self.assertNotEqual( + task.last_call, last_call_before, "last_call must be changed after run." + ) + self.assertNotEqual( + task.next_call, next_call_before, "next_call must be changed after run." + ) + + def test_reserve_tasks_atomic(self): + """Scheduled Task: reserve_tasks must only lock available""" + tasks = self.command_scheduled_task + self.plan_scheduled_task + reserved = tasks._reserve_tasks() + self.assertEqual( + set(reserved.ids), set(tasks.ids), "Both tasks should be reserved" + ) + # Repeated reservation should return empty (already running) + tasks.invalidate_recordset() + reserved_again = tasks._reserve_tasks() + self.assertFalse( + reserved_again, "Already reserved tasks must not be reserved again" + ) + + def test_run_task_command(self): + """Running a scheduled command task creates logs per server.""" + logs_before = self.CommandLog.search( + [("scheduled_task_id", "=", self.command_scheduled_task.id)] + ) + self.assertFalse(logs_before, "No command logs should exist before run.") + + last_call_before = self.command_scheduled_task.last_call + next_call_before = self.command_scheduled_task.next_call + + self.command_scheduled_task._run() + self._assert_next_and_last_call_changed( + self.command_scheduled_task, last_call_before, next_call_before + ) + self._assert_log_records( + self.CommandLog, + self.command_scheduled_task, + expected_count=len(self.command_scheduled_task.server_ids), + ) + + def test_run_task_plan(self): + """Running a scheduled plan task creates one log per server.""" + logs_before = self.PlanLog.search( + [("scheduled_task_id", "=", self.plan_scheduled_task.id)] + ) + self.assertFalse(logs_before, "No plan logs should exist before run.") + + last_call_before = self.plan_scheduled_task.last_call + next_call_before = self.plan_scheduled_task.next_call + + self.plan_scheduled_task._run() + self._assert_next_and_last_call_changed( + self.plan_scheduled_task, last_call_before, next_call_before + ) + self._assert_log_records( + self.PlanLog, + self.plan_scheduled_task, + expected_count=len(self.plan_scheduled_task.server_ids), + ) + + def test_user_write_create_unlink_access(self): + """User: cannot create, write or unlink scheduled tasks.""" + with self.assertRaises(AccessError): + self.ScheduledTask.with_user(self.user).create( + { + "name": "Test", + "action": "command", + "command_id": self.command_list_dir.id, + "server_ids": [(6, 0, [self.server_test_1.id])], + } + ) + with self.assertRaises(AccessError): + self.command_scheduled_task.with_user(self.user).write({"sequence": 33}) + with self.assertRaises(AccessError): + self.command_scheduled_task.with_user(self.user).unlink() + + def test_manager_read_access(self): + """Manager: can read scheduled task if in manager_ids or in server's + manager_ids/user_ids.""" + self.command_scheduled_task.manager_ids = [(6, 0, [self.manager.id])] + tasks = self.ScheduledTask.with_user(self.manager).search( + [("id", "=", self.command_scheduled_task.id)] + ) + self.assertIn( + self.command_scheduled_task, + tasks, + "Manager should be able to read their task.", + ) + + # Remove from manager_ids, but add to server manager_ids + self.command_scheduled_task.manager_ids = [(6, 0, [])] + self.server_test_1.manager_ids = [(6, 0, [self.manager.id])] + tasks = self.ScheduledTask.with_user(self.manager).search( + [("id", "=", self.command_scheduled_task.id)] + ) + self.assertIn( + self.command_scheduled_task, + tasks, + "Manager should be able to read task via server manager_ids.", + ) + + # Remove manager from everywhere + self.server_test_1.manager_ids = [(6, 0, [])] + tasks = self.ScheduledTask.with_user(self.manager).search( + [("id", "=", self.command_scheduled_task.id)] + ) + self.assertNotIn( + self.command_scheduled_task, + tasks, + "Manager should NOT be able to read task without relation.", + ) + + def test_manager_write_create_access(self): + """Manager: can create/write if in manager_ids, else denied.""" + # Create as manager + task = self.ScheduledTask.with_user(self.manager).create( + { + "name": "Test", + "action": "command", + "command_id": self.command_list_dir.id, + "manager_ids": [(6, 0, [self.manager.id])], + "server_ids": [(6, 0, [self.server_test_1.id])], + } + ) + try: + task.with_user(self.manager).write({"sequence": 77}) + except AccessError: + self.fail("Manager should be able to write their own scheduled tasks.") + + # Should fail if not in manager_ids + self.command_scheduled_task.manager_ids = [(6, 0, [])] + with self.assertRaises(AccessError): + self.command_scheduled_task.with_user(self.manager).write({"sequence": 11}) + + def test_manager_unlink_access(self): + """Manager: can unlink only their own tasks (in manager_ids & creator).""" + # Create as manager + task = self.ScheduledTask.with_user(self.manager).create( + { + "name": "Test", + "action": "command", + "command_id": self.command_list_dir.id, + "manager_ids": [(6, 0, [self.manager.id])], + "server_ids": [(6, 0, [self.server_test_1.id])], + } + ) + try: + task.with_user(self.manager).unlink() + except AccessError: + self.fail("Manager should be able to unlink their own task.") + + # Not creator + with self.assertRaises(AccessError): + self.command_scheduled_task.with_user(self.manager).unlink() + + def test_root_unrestricted_access(self): + """Root: full unrestricted access to all scheduled tasks.""" + # Read + tasks = self.ScheduledTask.with_user(self.root).search( + [("id", "=", self.command_scheduled_task.id)] + ) + self.assertIn( + self.command_scheduled_task, tasks, "Root should be able to read any task." + ) + + # Create + task = self.ScheduledTask.with_user(self.root).create( + { + "name": "Test", + "action": "command", + "command_id": self.command_list_dir.id, + "server_ids": [(6, 0, [self.server_test_1.id])], + } + ) + try: + task.with_user(self.root).write({"sequence": 123}) + task.with_user(self.root).unlink() + except AccessError: + self.fail("Root should be able to write/unlink any scheduled task.")