diff --git a/addons/cetmix_tower_server/tests/test_scheduled_task.py b/addons/cetmix_tower_server/tests/test_scheduled_task.py new file mode 100644 index 0000000..f93f5a7 --- /dev/null +++ b/addons/cetmix_tower_server/tests/test_scheduled_task.py @@ -0,0 +1,893 @@ +# Copyright (C) 2025 Cetmix OÜ +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). +from datetime import datetime + +from odoo import fields +from odoo.exceptions import AccessError, ValidationError + +from .common import TestTowerCommon + + +class TestTowerScheduledTask(TestTowerCommon): + """Test the cx.tower.scheduled.task model.""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + + # Create an additional server for multi-server command test + cls.server_test_2 = cls.Server.create( + { + "name": "Test 2", + "ip_v4_address": "localhost", + "ssh_username": "admin", + "ssh_password": "password", + "ssh_auth_mode": "p", + "host_key": "test_key", + "os_id": cls.os_debian_10.id, + } + ) + + # Scheduled task: command (multi-server) + cls.command_scheduled_task = cls.ScheduledTask.create( + { + "name": "Test Command Scheduled Task", + "action": "command", + "command_id": cls.command_list_dir.id, + "interval_number": 1, + "interval_type": "days", + "next_call": fields.Datetime.now(), + "server_ids": [(6, 0, [cls.server_test_1.id, cls.server_test_2.id])], + } + ) + + # Scheduled task: plan (single server) + cls.plan_scheduled_task = cls.ScheduledTask.create( + { + "name": "Test Plan Scheduled Task", + "action": "plan", + "plan_id": cls.plan_1.id, + "interval_number": 1, + "interval_type": "days", + "next_call": fields.Datetime.now(), + "server_ids": [(6, 0, [cls.server_test_1.id])], + } + ) + + # Custom variable for task (option type) + cls.variable_odoo_versions = cls.Variable.create( + { + "name": "odoo_versions", + "variable_type": "o", + } + ) + cls.variable_option_16_0 = cls.VariableOption.create( + { + "name": "16.0", + "value_char": "16.0", + "variable_id": cls.variable_odoo_versions.id, + } + ) + + # Add custom variables to tasks + cls.scheduled_task_cv_os = cls.ScheduledTaskCv.create( + { + "scheduled_task_id": cls.command_scheduled_task.id, + "variable_id": cls.variable_os.id, + "value_char": "Windows 2k", + } + ) + cls.scheduled_task_cv_version = cls.ScheduledTaskCv.create( + { + "scheduled_task_id": cls.command_scheduled_task.id, + "variable_id": cls.variable_odoo_versions.id, + "option_id": cls.variable_option_16_0.id, + } + ) + cls.scheduled_task_cv_version_plan = cls.ScheduledTaskCv.create( + { + "scheduled_task_id": cls.plan_scheduled_task.id, + "variable_id": cls.variable_odoo_versions.id, + "option_id": cls.variable_option_16_0.id, + } + ) + + # Create additional Jet Template for access testing + cls.jet_template_test_access = cls.JetTemplate.create( + { + "name": "Test Jet Template for Access", + "server_ids": [(4, cls.server_test_1.id)], + } + ) + + # Create additional Jet for access testing + cls.jet_test_access = cls.Jet.create( + { + "name": "Test Jet for Access", + "jet_template_id": cls.jet_template_test_access.id, + "server_id": cls.server_test_1.id, + } + ) + + # Scheduled task with Jet and Jet Template for access testing + cls.jet_scheduled_task = cls.ScheduledTask.create( + { + "name": "Test Jet Scheduled Task", + "action": "command", + "command_id": cls.command_list_dir.id, + "interval_number": 1, + "interval_type": "days", + "next_call": fields.Datetime.now(), + "jet_ids": [(6, 0, [cls.jet_test_access.id])], + "jet_template_ids": [(6, 0, [cls.jet_template_test_access.id])], + } + ) + + def _assert_log_records(self, log_model, scheduled_task, expected_count): + """Helper: Assert that log records exist for the task""" + logs = log_model.search([("scheduled_task_id", "=", scheduled_task.id)]) + self.assertTrue(logs, f"{log_model._name} logs should be created after run.") + self.assertEqual( + len(logs), + expected_count, + f"Expected {expected_count} logs for {scheduled_task.display_name}, " + f"got {len(logs)}.", + ) + + def _assert_next_and_last_call_changed( + self, task, last_call_before, next_call_before + ): + """Helper: Assert next_call and last_call changed after run""" + task.invalidate_recordset() + self.assertNotEqual( + task.last_call, last_call_before, "last_call must be changed after run." + ) + self.assertNotEqual( + task.next_call, next_call_before, "next_call must be changed after run." + ) + + def _clear_all_access( + self, + scheduled_task, + jet=None, + jet_template=None, + server=None, + server_template=None, + ): + """Helper: Clear all access paths for a scheduled task and related objects.""" + scheduled_task.manager_ids = [(5, 0, 0)] + scheduled_task.user_ids = [(5, 0, 0)] + if jet: + jet.manager_ids = [(5, 0, 0)] + jet.user_ids = [(5, 0, 0)] + if jet_template: + jet_template.manager_ids = [(5, 0, 0)] + jet_template.user_ids = [(5, 0, 0)] + if server: + server.manager_ids = [(5, 0, 0)] + server.user_ids = [(5, 0, 0)] + if server_template: + server_template.manager_ids = [(5, 0, 0)] + server_template.user_ids = [(5, 0, 0)] + + def test_reserve_tasks_atomic(self): + """Scheduled Task: reserve_tasks must only lock available""" + tasks = self.command_scheduled_task + self.plan_scheduled_task + reserved = tasks._reserve_tasks() + self.assertEqual( + set(reserved.ids), set(tasks.ids), "Both tasks should be reserved" + ) + # Repeated reservation should return empty (already running) + tasks.invalidate_recordset() + reserved_again = tasks._reserve_tasks() + self.assertFalse( + reserved_again, "Already reserved tasks must not be reserved again" + ) + + def test_run_task_command(self): + """Running a scheduled command task creates logs per server.""" + logs_before = self.CommandLog.search( + [("scheduled_task_id", "=", self.command_scheduled_task.id)] + ) + self.assertFalse(logs_before, "No command logs should exist before run.") + + last_call_before = self.command_scheduled_task.last_call + next_call_before = self.command_scheduled_task.next_call + + self.command_scheduled_task._run() + self._assert_next_and_last_call_changed( + self.command_scheduled_task, last_call_before, next_call_before + ) + self._assert_log_records( + self.CommandLog, + self.command_scheduled_task, + expected_count=len(self.command_scheduled_task.server_ids), + ) + + def test_run_task_plan(self): + """Running a scheduled plan task creates one log per server.""" + logs_before = self.PlanLog.search( + [("scheduled_task_id", "=", self.plan_scheduled_task.id)] + ) + self.assertFalse(logs_before, "No plan logs should exist before run.") + + last_call_before = self.plan_scheduled_task.last_call + next_call_before = self.plan_scheduled_task.next_call + + self.plan_scheduled_task._run() + self._assert_next_and_last_call_changed( + self.plan_scheduled_task, last_call_before, next_call_before + ) + self._assert_log_records( + self.PlanLog, + self.plan_scheduled_task, + expected_count=len(self.plan_scheduled_task.server_ids), + ) + + def test_user_write_create_unlink_access(self): + """User: cannot create, write or unlink scheduled tasks.""" + with self.assertRaises(AccessError): + self.ScheduledTask.with_user(self.user).create( + { + "name": "Test", + "action": "command", + "command_id": self.command_list_dir.id, + "server_ids": [(6, 0, [self.server_test_1.id])], + } + ) + with self.assertRaises(AccessError): + self.command_scheduled_task.with_user(self.user).write({"sequence": 33}) + with self.assertRaises(AccessError): + self.command_scheduled_task.with_user(self.user).unlink() + + def test_manager_read_access(self): + """Manager: can read scheduled task if in manager_ids or in server's + manager_ids/user_ids.""" + self.command_scheduled_task.manager_ids = [(6, 0, [self.manager.id])] + tasks = self.ScheduledTask.with_user(self.manager).search( + [("id", "=", self.command_scheduled_task.id)] + ) + self.assertIn( + self.command_scheduled_task, + tasks, + "Manager should be able to read their task.", + ) + + # Remove from manager_ids, but add to server manager_ids + self.command_scheduled_task.manager_ids = [(5, 0, 0)] + self.server_test_1.manager_ids = [(6, 0, [self.manager.id])] + tasks = self.ScheduledTask.with_user(self.manager).search( + [("id", "=", self.command_scheduled_task.id)] + ) + self.assertIn( + self.command_scheduled_task, + tasks, + "Manager should be able to read task via server manager_ids.", + ) + + # Test server user_ids access + self.server_test_1.manager_ids = [(5, 0, 0)] + self.server_test_1.user_ids = [(6, 0, [self.manager.id])] + tasks = self.ScheduledTask.with_user(self.manager).search( + [("id", "=", self.command_scheduled_task.id)] + ) + self.assertIn( + self.command_scheduled_task, + tasks, + "Manager should be able to read task via server user_ids.", + ) + + # Remove manager from everywhere + self._clear_all_access(self.command_scheduled_task, server=self.server_test_1) + tasks = self.ScheduledTask.with_user(self.manager).search( + [("id", "=", self.command_scheduled_task.id)] + ) + self.assertNotIn( + self.command_scheduled_task, + tasks, + "Manager should NOT be able to read task without relation.", + ) + + def test_manager_read_access_via_jet(self): + """Manager: can read scheduled task if in jet's user_ids/manager_ids.""" + # Test access via jet manager_ids + self.jet_test_access.manager_ids = [(6, 0, [self.manager.id])] + tasks = self.ScheduledTask.with_user(self.manager).search( + [("id", "=", self.jet_scheduled_task.id)] + ) + self.assertIn( + self.jet_scheduled_task, + tasks, + "Manager should be able to read task via jet manager_ids.", + ) + + # Test access via jet user_ids + self.jet_test_access.manager_ids = [(5, 0, 0)] + self.jet_test_access.user_ids = [(6, 0, [self.manager.id])] + tasks = self.ScheduledTask.with_user(self.manager).search( + [("id", "=", self.jet_scheduled_task.id)] + ) + self.assertIn( + self.jet_scheduled_task, + tasks, + "Manager should be able to read task via jet user_ids.", + ) + + # Test access via jet_template manager_ids + self.jet_test_access.user_ids = [(5, 0, 0)] + self.jet_template_test_access.manager_ids = [(6, 0, [self.manager.id])] + tasks = self.ScheduledTask.with_user(self.manager).search( + [("id", "=", self.jet_scheduled_task.id)] + ) + self.assertIn( + self.jet_scheduled_task, + tasks, + "Manager should be able to read task via jet_template manager_ids.", + ) + + # Test access via jet_template user_ids + self.jet_template_test_access.manager_ids = [(5, 0, 0)] + self.jet_template_test_access.user_ids = [(6, 0, [self.manager.id])] + tasks = self.ScheduledTask.with_user(self.manager).search( + [("id", "=", self.jet_scheduled_task.id)] + ) + self.assertIn( + self.jet_scheduled_task, + tasks, + "Manager should be able to read task via jet_template user_ids.", + ) + + # Remove manager from everywhere + self._clear_all_access( + self.jet_scheduled_task, + jet=self.jet_test_access, + jet_template=self.jet_template_test_access, + server=self.server_test_1, + ) + tasks = self.ScheduledTask.with_user(self.manager).search( + [("id", "=", self.jet_scheduled_task.id)] + ) + self.assertNotIn( + self.jet_scheduled_task, + tasks, + "Manager should NOT be able to read task without relation.", + ) + + def test_manager_read_access_via_server_template(self): + """Manager: can read scheduled task if in server_template's + user_ids/manager_ids.""" + # Create scheduled task with server template + server_template_task = self.ScheduledTask.create( + { + "name": "Test Server Template Scheduled Task", + "action": "command", + "command_id": self.command_list_dir.id, + "interval_number": 1, + "interval_type": "days", + "next_call": fields.Datetime.now(), + "server_template_ids": [(6, 0, [self.server_template_sample.id])], + } + ) + + # Test access via server_template manager_ids + self.server_template_sample.manager_ids = [(6, 0, [self.manager.id])] + tasks = self.ScheduledTask.with_user(self.manager).search( + [("id", "=", server_template_task.id)] + ) + self.assertIn( + server_template_task, + tasks, + "Manager should be able to read task via server_template manager_ids.", + ) + + # Test access via server_template user_ids + self.server_template_sample.manager_ids = [(5, 0, 0)] + self.server_template_sample.user_ids = [(6, 0, [self.manager.id])] + tasks = self.ScheduledTask.with_user(self.manager).search( + [("id", "=", server_template_task.id)] + ) + self.assertIn( + server_template_task, + tasks, + "Manager should be able to read task via server_template user_ids.", + ) + + # Remove manager from everywhere + self._clear_all_access( + server_template_task, + server_template=self.server_template_sample, + server=self.server_test_1, + ) + tasks = self.ScheduledTask.with_user(self.manager).search( + [("id", "=", server_template_task.id)] + ) + self.assertNotIn( + server_template_task, + tasks, + "Manager should NOT be able to read task without relation.", + ) + + def test_manager_write_create_access(self): + """Manager: can create/write if in manager_ids, else denied.""" + # Create as manager + task = self.ScheduledTask.with_user(self.manager).create( + { + "name": "Test", + "action": "command", + "command_id": self.command_list_dir.id, + "manager_ids": [(6, 0, [self.manager.id])], + "server_ids": [(6, 0, [self.server_test_1.id])], + } + ) + try: + task.with_user(self.manager).write({"sequence": 77}) + except AccessError: + self.fail("Manager should be able to write their own scheduled tasks.") + + # Should fail if not in manager_ids + self.command_scheduled_task.manager_ids = [(5, 0, 0)] + with self.assertRaises(AccessError): + self.command_scheduled_task.with_user(self.manager).write({"sequence": 11}) + + def test_manager_unlink_access(self): + """Manager: can unlink only their own tasks (in manager_ids & creator).""" + # Create as manager + task = self.ScheduledTask.with_user(self.manager).create( + { + "name": "Test", + "action": "command", + "command_id": self.command_list_dir.id, + "manager_ids": [(6, 0, [self.manager.id])], + "server_ids": [(6, 0, [self.server_test_1.id])], + } + ) + try: + task.with_user(self.manager).unlink() + except AccessError: + self.fail("Manager should be able to unlink their own task.") + + # Not creator + with self.assertRaises(AccessError): + self.command_scheduled_task.with_user(self.manager).unlink() + + def test_root_unrestricted_access(self): + """Root: full unrestricted access to all scheduled tasks.""" + # Read + tasks = self.ScheduledTask.with_user(self.root).search( + [("id", "=", self.command_scheduled_task.id)] + ) + self.assertIn( + self.command_scheduled_task, tasks, "Root should be able to read any task." + ) + + # Create + task = self.ScheduledTask.with_user(self.root).create( + { + "name": "Test", + "action": "command", + "command_id": self.command_list_dir.id, + "server_ids": [(6, 0, [self.server_test_1.id])], + } + ) + try: + task.with_user(self.root).write({"sequence": 123}) + task.with_user(self.root).unlink() + except AccessError: + self.fail("Root should be able to write/unlink any scheduled task.") + + def test_get_next_call_dow_wednesday(self): + """Test _get_next_call_dow when today is Wednesday. + Task runs Monday, Wednesday, Friday -> should return Friday.""" + # Create task with Monday, Wednesday, Friday selected + task = self.ScheduledTask.create( + { + "name": "Test DOW Task", + "action": "command", + "command_id": self.command_list_dir.id, + "interval_type": "dow", + "monday": True, + "wednesday": True, + "friday": True, + "server_ids": [(6, 0, [self.server_test_1.id])], + } + ) + + # Create a Wednesday datetime (2024-01-03 is a Wednesday) + # Set time to 10:30:45 + wednesday_date = datetime(2024, 1, 3, 10, 30, 45) + + # Calculate next call + next_call = task._get_next_call_dow(task, wednesday_date) + + # Should be Friday (2 days ahead) at the same time + expected_friday = datetime(2024, 1, 5, 10, 30, 45) + self.assertEqual( + next_call, + expected_friday, + "Next call from Wednesday should be Friday at the same time.", + ) + + def test_get_next_call_dow_friday(self): + """Test _get_next_call_dow when today is Friday. + Task runs Monday, Wednesday, Friday -> should return Monday (next week).""" + # Create task with Monday, Wednesday, Friday selected + task = self.ScheduledTask.create( + { + "name": "Test DOW Task", + "action": "command", + "command_id": self.command_list_dir.id, + "interval_type": "dow", + "monday": True, + "wednesday": True, + "friday": True, + "server_ids": [(6, 0, [self.server_test_1.id])], + } + ) + + # Create a Friday datetime (2024-01-05 is a Friday) + # Set time to 14:15:30 + friday_date = datetime(2024, 1, 5, 14, 15, 30) + + # Calculate next call + next_call = task._get_next_call_dow(task, friday_date) + + # Should be Monday next week (3 days ahead) at the same time + expected_monday = datetime(2024, 1, 8, 14, 15, 30) + self.assertEqual( + next_call, + expected_monday, + "Next call from Friday should be Monday next week at the same time.", + ) + + def test_check_days_of_week_constraint(self): + """ + Test _check_days_of_week constraint: + no days selected should raise ValidationError. + """ + # Try to create a task with interval_type="dow" but no days selected + with self.assertRaises(ValidationError) as context: + self.ScheduledTask.create( + { + "name": "Test DOW Task No Days", + "action": "command", + "command_id": self.command_list_dir.id, + "interval_type": "dow", + "monday": False, + "tuesday": False, + "wednesday": False, + "thursday": False, + "friday": False, + "saturday": False, + "sunday": False, + "server_ids": [(6, 0, [self.server_test_1.id])], + } + ) + self.assertIn( + "At least one day of week must be selected", + str(context.exception), + "ValidationError should mention that at " "least one day must be selected.", + ) + + # Try to update an existing task to have no days selected + task = self.ScheduledTask.create( + { + "name": "Test DOW Task", + "action": "command", + "command_id": self.command_list_dir.id, + "interval_type": "dow", + "monday": True, + "server_ids": [(6, 0, [self.server_test_1.id])], + } + ) + with self.assertRaises(ValidationError): + task.write( + { + "monday": False, + "tuesday": False, + "wednesday": False, + "thursday": False, + "friday": False, + "saturday": False, + "sunday": False, + } + ) + + def test_get_next_call_dow_single_day_monday(self): + """Test _get_next_call_dow edge case: only Monday selected, + current day is Monday. + Should wrap to next week's Monday.""" + # Create task with only Monday selected + task = self.ScheduledTask.create( + { + "name": "Test DOW Task Single Day", + "action": "command", + "command_id": self.command_list_dir.id, + "interval_type": "dow", + "monday": True, + "server_ids": [(6, 0, [self.server_test_1.id])], + } + ) + + # Create a Monday datetime (2024-01-01 is a Monday) + # Set time to 09:00:00 + monday_date = datetime(2024, 1, 1, 9, 0, 0) + + # Calculate next call + next_call = task._get_next_call_dow(task, monday_date) + + # Should be Monday next week (7 days ahead) at the same time + expected_next_monday = datetime(2024, 1, 8, 9, 0, 0) + self.assertEqual( + next_call, + expected_next_monday, + "Next call from Monday (only day selected) should be" + " next Monday at the same time.", + ) + + def test_scheduled_task_cv_manager_read_access(self): + """Manager: can read scheduled task CV if in scheduled task's + manager_ids/user_ids or via server's manager_ids/user_ids.""" + # Test access via scheduled task manager_ids + self.command_scheduled_task.manager_ids = [(6, 0, [self.manager.id])] + cvs = self.ScheduledTaskCv.with_user(self.manager).search( + [("id", "=", self.scheduled_task_cv_os.id)] + ) + self.assertIn( + self.scheduled_task_cv_os, + cvs, + "Manager should be able to read CV via scheduled task manager_ids.", + ) + + # Test access via scheduled task user_ids + self.command_scheduled_task.manager_ids = [(5, 0, 0)] + self.command_scheduled_task.user_ids = [(6, 0, [self.manager.id])] + cvs = self.ScheduledTaskCv.with_user(self.manager).search( + [("id", "=", self.scheduled_task_cv_os.id)] + ) + self.assertIn( + self.scheduled_task_cv_os, + cvs, + "Manager should be able to read CV via scheduled task user_ids.", + ) + + # Test access via server manager_ids + self.command_scheduled_task.user_ids = [(5, 0, 0)] + self.server_test_1.manager_ids = [(6, 0, [self.manager.id])] + cvs = self.ScheduledTaskCv.with_user(self.manager).search( + [("id", "=", self.scheduled_task_cv_os.id)] + ) + self.assertIn( + self.scheduled_task_cv_os, + cvs, + "Manager should be able to read CV via server manager_ids.", + ) + + # Test access via server user_ids + self.server_test_1.manager_ids = [(5, 0, 0)] + self.server_test_1.user_ids = [(6, 0, [self.manager.id])] + cvs = self.ScheduledTaskCv.with_user(self.manager).search( + [("id", "=", self.scheduled_task_cv_os.id)] + ) + self.assertIn( + self.scheduled_task_cv_os, + cvs, + "Manager should be able to read CV via server user_ids.", + ) + + # Remove manager from everywhere + self.server_test_1.user_ids = [(5, 0, 0)] + cvs = self.ScheduledTaskCv.with_user(self.manager).search( + [("id", "=", self.scheduled_task_cv_os.id)] + ) + self.assertNotIn( + self.scheduled_task_cv_os, + cvs, + "Manager should NOT be able to read CV without relation.", + ) + + def test_scheduled_task_cv_manager_read_access_via_jet(self): + """Manager: can read scheduled task CV if in jet's user_ids/manager_ids.""" + # Create CV for jet scheduled task + jet_cv = self.ScheduledTaskCv.create( + { + "scheduled_task_id": self.jet_scheduled_task.id, + "variable_id": self.variable_os.id, + "value_char": "Linux", + } + ) + + # Test access via jet manager_ids + self.jet_test_access.manager_ids = [(6, 0, [self.manager.id])] + cvs = self.ScheduledTaskCv.with_user(self.manager).search( + [("id", "=", jet_cv.id)] + ) + self.assertIn( + jet_cv, + cvs, + "Manager should be able to read CV via jet manager_ids.", + ) + + # Test access via jet user_ids + self.jet_test_access.manager_ids = [(5, 0, 0)] + self.jet_test_access.user_ids = [(6, 0, [self.manager.id])] + cvs = self.ScheduledTaskCv.with_user(self.manager).search( + [("id", "=", jet_cv.id)] + ) + self.assertIn( + jet_cv, + cvs, + "Manager should be able to read CV via jet user_ids.", + ) + + # Test access via jet_template manager_ids + self.jet_test_access.user_ids = [(5, 0, 0)] + self.jet_template_test_access.manager_ids = [(6, 0, [self.manager.id])] + cvs = self.ScheduledTaskCv.with_user(self.manager).search( + [("id", "=", jet_cv.id)] + ) + self.assertIn( + jet_cv, + cvs, + "Manager should be able to read CV via jet_template manager_ids.", + ) + + # Test access via jet_template user_ids + self.jet_template_test_access.manager_ids = [(5, 0, 0)] + self.jet_template_test_access.user_ids = [(6, 0, [self.manager.id])] + cvs = self.ScheduledTaskCv.with_user(self.manager).search( + [("id", "=", jet_cv.id)] + ) + self.assertIn( + jet_cv, + cvs, + "Manager should be able to read CV via jet_template user_ids.", + ) + + # Remove manager from everywhere + self._clear_all_access( + self.jet_scheduled_task, + jet=self.jet_test_access, + jet_template=self.jet_template_test_access, + server=self.server_test_1, + ) + cvs = self.ScheduledTaskCv.with_user(self.manager).search( + [("id", "=", jet_cv.id)] + ) + self.assertNotIn( + jet_cv, + cvs, + "Manager should NOT be able to read CV without relation.", + ) + + def test_scheduled_task_cv_manager_read_access_via_server_template(self): + """Manager: can read scheduled task CV if in server_template's + user_ids/manager_ids.""" + # Create scheduled task with server template + server_template_task = self.ScheduledTask.create( + { + "name": "Test Server Template Scheduled Task for CV", + "action": "command", + "command_id": self.command_list_dir.id, + "interval_number": 1, + "interval_type": "days", + "next_call": fields.Datetime.now(), + "server_template_ids": [(6, 0, [self.server_template_sample.id])], + } + ) + server_template_cv = self.ScheduledTaskCv.create( + { + "scheduled_task_id": server_template_task.id, + "variable_id": self.variable_os.id, + "value_char": "Debian", + } + ) + + # Test access via server_template manager_ids + self.server_template_sample.manager_ids = [(6, 0, [self.manager.id])] + cvs = self.ScheduledTaskCv.with_user(self.manager).search( + [("id", "=", server_template_cv.id)] + ) + self.assertIn( + server_template_cv, + cvs, + "Manager should be able to read CV via server_template manager_ids.", + ) + + # Test access via server_template user_ids + self.server_template_sample.manager_ids = [(5, 0, 0)] + self.server_template_sample.user_ids = [(6, 0, [self.manager.id])] + cvs = self.ScheduledTaskCv.with_user(self.manager).search( + [("id", "=", server_template_cv.id)] + ) + self.assertIn( + server_template_cv, + cvs, + "Manager should be able to read CV via server_template user_ids.", + ) + + # Remove manager from everywhere + self._clear_all_access( + server_template_task, + server_template=self.server_template_sample, + server=self.server_test_1, + ) + cvs = self.ScheduledTaskCv.with_user(self.manager).search( + [("id", "=", server_template_cv.id)] + ) + self.assertNotIn( + server_template_cv, + cvs, + "Manager should NOT be able to read CV without relation.", + ) + + def test_scheduled_task_cv_manager_write_create_access(self): + """Manager: can create/write CV if in scheduled task's manager_ids.""" + # Create CV as manager + self.command_scheduled_task.manager_ids = [(6, 0, [self.manager.id])] + cv = self.ScheduledTaskCv.with_user(self.manager).create( + { + "scheduled_task_id": self.command_scheduled_task.id, + "variable_id": self.variable_os.id, + "value_char": "Ubuntu", + } + ) + try: + cv.with_user(self.manager).write({"value_char": "Fedora"}) + except AccessError: + self.fail( + "Manager should be able to write CV if in scheduled task manager_ids." + ) + + # Should fail if not in manager_ids + self.command_scheduled_task.manager_ids = [(5, 0, 0)] + with self.assertRaises(AccessError): + self.scheduled_task_cv_os.with_user(self.manager).write( + {"value_char": "CentOS"} + ) + + def test_scheduled_task_cv_manager_unlink_access(self): + """Manager: can unlink CV only if in scheduled task's manager_ids & creator.""" + # Create CV as manager + self.command_scheduled_task.manager_ids = [(6, 0, [self.manager.id])] + cv = self.ScheduledTaskCv.with_user(self.manager).create( + { + "scheduled_task_id": self.command_scheduled_task.id, + "variable_id": self.variable_os.id, + "value_char": "Arch", + } + ) + try: + cv.with_user(self.manager).unlink() + except AccessError: + self.fail("Manager should be able to unlink CV they created.") + + # Not creator + self.command_scheduled_task.manager_ids = [(6, 0, [self.manager.id])] + with self.assertRaises(AccessError): + self.scheduled_task_cv_os.with_user(self.manager).unlink() + + def test_scheduled_task_cv_root_unrestricted_access(self): + """Root: full unrestricted access to all scheduled task CVs.""" + # Read + cvs = self.ScheduledTaskCv.with_user(self.root).search( + [("id", "=", self.scheduled_task_cv_os.id)] + ) + self.assertIn( + self.scheduled_task_cv_os, + cvs, + "Root should be able to read any CV.", + ) + + # Create + cv = self.ScheduledTaskCv.with_user(self.root).create( + { + "scheduled_task_id": self.command_scheduled_task.id, + "variable_id": self.variable_os.id, + "value_char": "SUSE", + } + ) + try: + cv.with_user(self.root).write({"value_char": "OpenSUSE"}) + cv.with_user(self.root).unlink() + except AccessError: + self.fail("Root should be able to write/unlink any scheduled task CV.")