Tower: upload cetmix_tower_server 16.0.2.2.9 (via marketplace)
This commit is contained in:
288
addons/cetmix_tower_server/tests/test_scheduled_task.py
Normal file
288
addons/cetmix_tower_server/tests/test_scheduled_task.py
Normal file
@@ -0,0 +1,288 @@
|
||||
# Copyright (C) 2025 Cetmix OÜ
|
||||
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
|
||||
from odoo import fields
|
||||
from odoo.exceptions import AccessError
|
||||
|
||||
from .common import TestTowerCommon
|
||||
|
||||
|
||||
class TestCxTowerScheduledTask(TestTowerCommon):
|
||||
"""Test the cx.tower.scheduled.task model."""
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super().setUpClass()
|
||||
|
||||
# Create an additional server for multi-server command test
|
||||
cls.server_test_2 = cls.Server.create(
|
||||
{
|
||||
"name": "Test 2",
|
||||
"ip_v4_address": "localhost",
|
||||
"ssh_username": "admin",
|
||||
"ssh_password": "password",
|
||||
"ssh_auth_mode": "p",
|
||||
"host_key": "test_key",
|
||||
"os_id": cls.os_debian_10.id,
|
||||
}
|
||||
)
|
||||
|
||||
# Scheduled task: command (multi-server)
|
||||
cls.command_scheduled_task = cls.ScheduledTask.create(
|
||||
{
|
||||
"name": "Test Command Scheduled Task",
|
||||
"action": "command",
|
||||
"command_id": cls.command_list_dir.id,
|
||||
"interval_number": 1,
|
||||
"interval_type": "days",
|
||||
"next_call": fields.Datetime.now(),
|
||||
"server_ids": [(6, 0, [cls.server_test_1.id, cls.server_test_2.id])],
|
||||
}
|
||||
)
|
||||
|
||||
# Scheduled task: plan (single server)
|
||||
cls.plan_scheduled_task = cls.ScheduledTask.create(
|
||||
{
|
||||
"name": "Test Plan Scheduled Task",
|
||||
"action": "plan",
|
||||
"plan_id": cls.plan_1.id,
|
||||
"interval_number": 1,
|
||||
"interval_type": "days",
|
||||
"next_call": fields.Datetime.now(),
|
||||
"server_ids": [(6, 0, [cls.server_test_1.id])],
|
||||
}
|
||||
)
|
||||
|
||||
# Custom variable for task (option type)
|
||||
cls.variable_odoo_versions = cls.Variable.create(
|
||||
{
|
||||
"name": "odoo_versions",
|
||||
"variable_type": "o",
|
||||
}
|
||||
)
|
||||
cls.variable_option_16_0 = cls.VariableOption.create(
|
||||
{
|
||||
"name": "16.0",
|
||||
"value_char": "16.0",
|
||||
"variable_id": cls.variable_odoo_versions.id,
|
||||
}
|
||||
)
|
||||
|
||||
# Add custom variables to tasks
|
||||
cls.scheduled_task_cv_os = cls.ScheduledTaskCv.create(
|
||||
{
|
||||
"scheduled_task_id": cls.command_scheduled_task.id,
|
||||
"variable_id": cls.variable_os.id,
|
||||
"value_char": "Windows 2k",
|
||||
}
|
||||
)
|
||||
cls.scheduled_task_cv_version = cls.ScheduledTaskCv.create(
|
||||
{
|
||||
"scheduled_task_id": cls.command_scheduled_task.id,
|
||||
"variable_id": cls.variable_odoo_versions.id,
|
||||
"option_id": cls.variable_option_16_0.id,
|
||||
}
|
||||
)
|
||||
cls.scheduled_task_cv_version_plan = cls.ScheduledTaskCv.create(
|
||||
{
|
||||
"scheduled_task_id": cls.plan_scheduled_task.id,
|
||||
"variable_id": cls.variable_odoo_versions.id,
|
||||
"option_id": cls.variable_option_16_0.id,
|
||||
}
|
||||
)
|
||||
|
||||
def _assert_log_records(self, log_model, scheduled_task, expected_count):
|
||||
"""Helper: Assert that log records exist for the task"""
|
||||
logs = log_model.search([("scheduled_task_id", "=", scheduled_task.id)])
|
||||
self.assertTrue(logs, f"{log_model._name} logs should be created after run.")
|
||||
self.assertEqual(
|
||||
len(logs),
|
||||
expected_count,
|
||||
f"Expected {expected_count} logs for {scheduled_task.display_name}, "
|
||||
f"got {len(logs)}.",
|
||||
)
|
||||
|
||||
def _assert_next_and_last_call_changed(
|
||||
self, task, last_call_before, next_call_before
|
||||
):
|
||||
"""Helper: Assert next_call and last_call changed after run"""
|
||||
task.invalidate_recordset()
|
||||
self.assertNotEqual(
|
||||
task.last_call, last_call_before, "last_call must be changed after run."
|
||||
)
|
||||
self.assertNotEqual(
|
||||
task.next_call, next_call_before, "next_call must be changed after run."
|
||||
)
|
||||
|
||||
def test_reserve_tasks_atomic(self):
|
||||
"""Scheduled Task: reserve_tasks must only lock available"""
|
||||
tasks = self.command_scheduled_task + self.plan_scheduled_task
|
||||
reserved = tasks._reserve_tasks()
|
||||
self.assertEqual(
|
||||
set(reserved.ids), set(tasks.ids), "Both tasks should be reserved"
|
||||
)
|
||||
# Repeated reservation should return empty (already running)
|
||||
tasks.invalidate_recordset()
|
||||
reserved_again = tasks._reserve_tasks()
|
||||
self.assertFalse(
|
||||
reserved_again, "Already reserved tasks must not be reserved again"
|
||||
)
|
||||
|
||||
def test_run_task_command(self):
|
||||
"""Running a scheduled command task creates logs per server."""
|
||||
logs_before = self.CommandLog.search(
|
||||
[("scheduled_task_id", "=", self.command_scheduled_task.id)]
|
||||
)
|
||||
self.assertFalse(logs_before, "No command logs should exist before run.")
|
||||
|
||||
last_call_before = self.command_scheduled_task.last_call
|
||||
next_call_before = self.command_scheduled_task.next_call
|
||||
|
||||
self.command_scheduled_task._run()
|
||||
self._assert_next_and_last_call_changed(
|
||||
self.command_scheduled_task, last_call_before, next_call_before
|
||||
)
|
||||
self._assert_log_records(
|
||||
self.CommandLog,
|
||||
self.command_scheduled_task,
|
||||
expected_count=len(self.command_scheduled_task.server_ids),
|
||||
)
|
||||
|
||||
def test_run_task_plan(self):
|
||||
"""Running a scheduled plan task creates one log per server."""
|
||||
logs_before = self.PlanLog.search(
|
||||
[("scheduled_task_id", "=", self.plan_scheduled_task.id)]
|
||||
)
|
||||
self.assertFalse(logs_before, "No plan logs should exist before run.")
|
||||
|
||||
last_call_before = self.plan_scheduled_task.last_call
|
||||
next_call_before = self.plan_scheduled_task.next_call
|
||||
|
||||
self.plan_scheduled_task._run()
|
||||
self._assert_next_and_last_call_changed(
|
||||
self.plan_scheduled_task, last_call_before, next_call_before
|
||||
)
|
||||
self._assert_log_records(
|
||||
self.PlanLog,
|
||||
self.plan_scheduled_task,
|
||||
expected_count=len(self.plan_scheduled_task.server_ids),
|
||||
)
|
||||
|
||||
def test_user_write_create_unlink_access(self):
|
||||
"""User: cannot create, write or unlink scheduled tasks."""
|
||||
with self.assertRaises(AccessError):
|
||||
self.ScheduledTask.with_user(self.user).create(
|
||||
{
|
||||
"name": "Test",
|
||||
"action": "command",
|
||||
"command_id": self.command_list_dir.id,
|
||||
"server_ids": [(6, 0, [self.server_test_1.id])],
|
||||
}
|
||||
)
|
||||
with self.assertRaises(AccessError):
|
||||
self.command_scheduled_task.with_user(self.user).write({"sequence": 33})
|
||||
with self.assertRaises(AccessError):
|
||||
self.command_scheduled_task.with_user(self.user).unlink()
|
||||
|
||||
def test_manager_read_access(self):
|
||||
"""Manager: can read scheduled task if in manager_ids or in server's
|
||||
manager_ids/user_ids."""
|
||||
self.command_scheduled_task.manager_ids = [(6, 0, [self.manager.id])]
|
||||
tasks = self.ScheduledTask.with_user(self.manager).search(
|
||||
[("id", "=", self.command_scheduled_task.id)]
|
||||
)
|
||||
self.assertIn(
|
||||
self.command_scheduled_task,
|
||||
tasks,
|
||||
"Manager should be able to read their task.",
|
||||
)
|
||||
|
||||
# Remove from manager_ids, but add to server manager_ids
|
||||
self.command_scheduled_task.manager_ids = [(6, 0, [])]
|
||||
self.server_test_1.manager_ids = [(6, 0, [self.manager.id])]
|
||||
tasks = self.ScheduledTask.with_user(self.manager).search(
|
||||
[("id", "=", self.command_scheduled_task.id)]
|
||||
)
|
||||
self.assertIn(
|
||||
self.command_scheduled_task,
|
||||
tasks,
|
||||
"Manager should be able to read task via server manager_ids.",
|
||||
)
|
||||
|
||||
# Remove manager from everywhere
|
||||
self.server_test_1.manager_ids = [(6, 0, [])]
|
||||
tasks = self.ScheduledTask.with_user(self.manager).search(
|
||||
[("id", "=", self.command_scheduled_task.id)]
|
||||
)
|
||||
self.assertNotIn(
|
||||
self.command_scheduled_task,
|
||||
tasks,
|
||||
"Manager should NOT be able to read task without relation.",
|
||||
)
|
||||
|
||||
def test_manager_write_create_access(self):
|
||||
"""Manager: can create/write if in manager_ids, else denied."""
|
||||
# Create as manager
|
||||
task = self.ScheduledTask.with_user(self.manager).create(
|
||||
{
|
||||
"name": "Test",
|
||||
"action": "command",
|
||||
"command_id": self.command_list_dir.id,
|
||||
"manager_ids": [(6, 0, [self.manager.id])],
|
||||
"server_ids": [(6, 0, [self.server_test_1.id])],
|
||||
}
|
||||
)
|
||||
try:
|
||||
task.with_user(self.manager).write({"sequence": 77})
|
||||
except AccessError:
|
||||
self.fail("Manager should be able to write their own scheduled tasks.")
|
||||
|
||||
# Should fail if not in manager_ids
|
||||
self.command_scheduled_task.manager_ids = [(6, 0, [])]
|
||||
with self.assertRaises(AccessError):
|
||||
self.command_scheduled_task.with_user(self.manager).write({"sequence": 11})
|
||||
|
||||
def test_manager_unlink_access(self):
|
||||
"""Manager: can unlink only their own tasks (in manager_ids & creator)."""
|
||||
# Create as manager
|
||||
task = self.ScheduledTask.with_user(self.manager).create(
|
||||
{
|
||||
"name": "Test",
|
||||
"action": "command",
|
||||
"command_id": self.command_list_dir.id,
|
||||
"manager_ids": [(6, 0, [self.manager.id])],
|
||||
"server_ids": [(6, 0, [self.server_test_1.id])],
|
||||
}
|
||||
)
|
||||
try:
|
||||
task.with_user(self.manager).unlink()
|
||||
except AccessError:
|
||||
self.fail("Manager should be able to unlink their own task.")
|
||||
|
||||
# Not creator
|
||||
with self.assertRaises(AccessError):
|
||||
self.command_scheduled_task.with_user(self.manager).unlink()
|
||||
|
||||
def test_root_unrestricted_access(self):
|
||||
"""Root: full unrestricted access to all scheduled tasks."""
|
||||
# Read
|
||||
tasks = self.ScheduledTask.with_user(self.root).search(
|
||||
[("id", "=", self.command_scheduled_task.id)]
|
||||
)
|
||||
self.assertIn(
|
||||
self.command_scheduled_task, tasks, "Root should be able to read any task."
|
||||
)
|
||||
|
||||
# Create
|
||||
task = self.ScheduledTask.with_user(self.root).create(
|
||||
{
|
||||
"name": "Test",
|
||||
"action": "command",
|
||||
"command_id": self.command_list_dir.id,
|
||||
"server_ids": [(6, 0, [self.server_test_1.id])],
|
||||
}
|
||||
)
|
||||
try:
|
||||
task.with_user(self.root).write({"sequence": 123})
|
||||
task.with_user(self.root).unlink()
|
||||
except AccessError:
|
||||
self.fail("Root should be able to write/unlink any scheduled task.")
|
||||
Reference in New Issue
Block a user