Files
odoo-addons/addons/cetmix_tower_server/tests/test_scheduled_task.py

289 lines
11 KiB
Python

# Copyright (C) 2025 Cetmix OÜ
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from odoo import fields
from odoo.exceptions import AccessError
from .common import TestTowerCommon
class TestCxTowerScheduledTask(TestTowerCommon):
"""Test the cx.tower.scheduled.task model."""
@classmethod
def setUpClass(cls):
super().setUpClass()
# Create an additional server for multi-server command test
cls.server_test_2 = cls.Server.create(
{
"name": "Test 2",
"ip_v4_address": "localhost",
"ssh_username": "admin",
"ssh_password": "password",
"ssh_auth_mode": "p",
"host_key": "test_key",
"os_id": cls.os_debian_10.id,
}
)
# Scheduled task: command (multi-server)
cls.command_scheduled_task = cls.ScheduledTask.create(
{
"name": "Test Command Scheduled Task",
"action": "command",
"command_id": cls.command_list_dir.id,
"interval_number": 1,
"interval_type": "days",
"next_call": fields.Datetime.now(),
"server_ids": [(6, 0, [cls.server_test_1.id, cls.server_test_2.id])],
}
)
# Scheduled task: plan (single server)
cls.plan_scheduled_task = cls.ScheduledTask.create(
{
"name": "Test Plan Scheduled Task",
"action": "plan",
"plan_id": cls.plan_1.id,
"interval_number": 1,
"interval_type": "days",
"next_call": fields.Datetime.now(),
"server_ids": [(6, 0, [cls.server_test_1.id])],
}
)
# Custom variable for task (option type)
cls.variable_odoo_versions = cls.Variable.create(
{
"name": "odoo_versions",
"variable_type": "o",
}
)
cls.variable_option_16_0 = cls.VariableOption.create(
{
"name": "16.0",
"value_char": "16.0",
"variable_id": cls.variable_odoo_versions.id,
}
)
# Add custom variables to tasks
cls.scheduled_task_cv_os = cls.ScheduledTaskCv.create(
{
"scheduled_task_id": cls.command_scheduled_task.id,
"variable_id": cls.variable_os.id,
"value_char": "Windows 2k",
}
)
cls.scheduled_task_cv_version = cls.ScheduledTaskCv.create(
{
"scheduled_task_id": cls.command_scheduled_task.id,
"variable_id": cls.variable_odoo_versions.id,
"option_id": cls.variable_option_16_0.id,
}
)
cls.scheduled_task_cv_version_plan = cls.ScheduledTaskCv.create(
{
"scheduled_task_id": cls.plan_scheduled_task.id,
"variable_id": cls.variable_odoo_versions.id,
"option_id": cls.variable_option_16_0.id,
}
)
def _assert_log_records(self, log_model, scheduled_task, expected_count):
"""Helper: Assert that log records exist for the task"""
logs = log_model.search([("scheduled_task_id", "=", scheduled_task.id)])
self.assertTrue(logs, f"{log_model._name} logs should be created after run.")
self.assertEqual(
len(logs),
expected_count,
f"Expected {expected_count} logs for {scheduled_task.display_name}, "
f"got {len(logs)}.",
)
def _assert_next_and_last_call_changed(
self, task, last_call_before, next_call_before
):
"""Helper: Assert next_call and last_call changed after run"""
task.invalidate_recordset()
self.assertNotEqual(
task.last_call, last_call_before, "last_call must be changed after run."
)
self.assertNotEqual(
task.next_call, next_call_before, "next_call must be changed after run."
)
def test_reserve_tasks_atomic(self):
"""Scheduled Task: reserve_tasks must only lock available"""
tasks = self.command_scheduled_task + self.plan_scheduled_task
reserved = tasks._reserve_tasks()
self.assertEqual(
set(reserved.ids), set(tasks.ids), "Both tasks should be reserved"
)
# Repeated reservation should return empty (already running)
tasks.invalidate_recordset()
reserved_again = tasks._reserve_tasks()
self.assertFalse(
reserved_again, "Already reserved tasks must not be reserved again"
)
def test_run_task_command(self):
"""Running a scheduled command task creates logs per server."""
logs_before = self.CommandLog.search(
[("scheduled_task_id", "=", self.command_scheduled_task.id)]
)
self.assertFalse(logs_before, "No command logs should exist before run.")
last_call_before = self.command_scheduled_task.last_call
next_call_before = self.command_scheduled_task.next_call
self.command_scheduled_task._run()
self._assert_next_and_last_call_changed(
self.command_scheduled_task, last_call_before, next_call_before
)
self._assert_log_records(
self.CommandLog,
self.command_scheduled_task,
expected_count=len(self.command_scheduled_task.server_ids),
)
def test_run_task_plan(self):
"""Running a scheduled plan task creates one log per server."""
logs_before = self.PlanLog.search(
[("scheduled_task_id", "=", self.plan_scheduled_task.id)]
)
self.assertFalse(logs_before, "No plan logs should exist before run.")
last_call_before = self.plan_scheduled_task.last_call
next_call_before = self.plan_scheduled_task.next_call
self.plan_scheduled_task._run()
self._assert_next_and_last_call_changed(
self.plan_scheduled_task, last_call_before, next_call_before
)
self._assert_log_records(
self.PlanLog,
self.plan_scheduled_task,
expected_count=len(self.plan_scheduled_task.server_ids),
)
def test_user_write_create_unlink_access(self):
"""User: cannot create, write or unlink scheduled tasks."""
with self.assertRaises(AccessError):
self.ScheduledTask.with_user(self.user).create(
{
"name": "Test",
"action": "command",
"command_id": self.command_list_dir.id,
"server_ids": [(6, 0, [self.server_test_1.id])],
}
)
with self.assertRaises(AccessError):
self.command_scheduled_task.with_user(self.user).write({"sequence": 33})
with self.assertRaises(AccessError):
self.command_scheduled_task.with_user(self.user).unlink()
def test_manager_read_access(self):
"""Manager: can read scheduled task if in manager_ids or in server's
manager_ids/user_ids."""
self.command_scheduled_task.manager_ids = [(6, 0, [self.manager.id])]
tasks = self.ScheduledTask.with_user(self.manager).search(
[("id", "=", self.command_scheduled_task.id)]
)
self.assertIn(
self.command_scheduled_task,
tasks,
"Manager should be able to read their task.",
)
# Remove from manager_ids, but add to server manager_ids
self.command_scheduled_task.manager_ids = [(6, 0, [])]
self.server_test_1.manager_ids = [(6, 0, [self.manager.id])]
tasks = self.ScheduledTask.with_user(self.manager).search(
[("id", "=", self.command_scheduled_task.id)]
)
self.assertIn(
self.command_scheduled_task,
tasks,
"Manager should be able to read task via server manager_ids.",
)
# Remove manager from everywhere
self.server_test_1.manager_ids = [(6, 0, [])]
tasks = self.ScheduledTask.with_user(self.manager).search(
[("id", "=", self.command_scheduled_task.id)]
)
self.assertNotIn(
self.command_scheduled_task,
tasks,
"Manager should NOT be able to read task without relation.",
)
def test_manager_write_create_access(self):
"""Manager: can create/write if in manager_ids, else denied."""
# Create as manager
task = self.ScheduledTask.with_user(self.manager).create(
{
"name": "Test",
"action": "command",
"command_id": self.command_list_dir.id,
"manager_ids": [(6, 0, [self.manager.id])],
"server_ids": [(6, 0, [self.server_test_1.id])],
}
)
try:
task.with_user(self.manager).write({"sequence": 77})
except AccessError:
self.fail("Manager should be able to write their own scheduled tasks.")
# Should fail if not in manager_ids
self.command_scheduled_task.manager_ids = [(6, 0, [])]
with self.assertRaises(AccessError):
self.command_scheduled_task.with_user(self.manager).write({"sequence": 11})
def test_manager_unlink_access(self):
"""Manager: can unlink only their own tasks (in manager_ids & creator)."""
# Create as manager
task = self.ScheduledTask.with_user(self.manager).create(
{
"name": "Test",
"action": "command",
"command_id": self.command_list_dir.id,
"manager_ids": [(6, 0, [self.manager.id])],
"server_ids": [(6, 0, [self.server_test_1.id])],
}
)
try:
task.with_user(self.manager).unlink()
except AccessError:
self.fail("Manager should be able to unlink their own task.")
# Not creator
with self.assertRaises(AccessError):
self.command_scheduled_task.with_user(self.manager).unlink()
def test_root_unrestricted_access(self):
"""Root: full unrestricted access to all scheduled tasks."""
# Read
tasks = self.ScheduledTask.with_user(self.root).search(
[("id", "=", self.command_scheduled_task.id)]
)
self.assertIn(
self.command_scheduled_task, tasks, "Root should be able to read any task."
)
# Create
task = self.ScheduledTask.with_user(self.root).create(
{
"name": "Test",
"action": "command",
"command_id": self.command_list_dir.id,
"server_ids": [(6, 0, [self.server_test_1.id])],
}
)
try:
task.with_user(self.root).write({"sequence": 123})
task.with_user(self.root).unlink()
except AccessError:
self.fail("Root should be able to write/unlink any scheduled task.")