From f8be9708f29eeccd7266bee95c0282a5a6ad264f Mon Sep 17 00:00:00 2001 From: git_admin Date: Mon, 27 Apr 2026 08:18:28 +0000 Subject: [PATCH] Tower: upload cetmix_tower_server 16.0.3.0.1 (via marketplace) --- addons/cetmix_tower_server/tests/test_plan.py | 2899 +++++++++++++++++ 1 file changed, 2899 insertions(+) create mode 100644 addons/cetmix_tower_server/tests/test_plan.py diff --git a/addons/cetmix_tower_server/tests/test_plan.py b/addons/cetmix_tower_server/tests/test_plan.py new file mode 100644 index 0000000..da8970e --- /dev/null +++ b/addons/cetmix_tower_server/tests/test_plan.py @@ -0,0 +1,2899 @@ +# Copyright (C) 2022 Cetmix OÜ +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). +from unittest.mock import patch + +from odoo import _, fields +from odoo.exceptions import AccessError, ValidationError +from odoo.tools.misc import mute_logger + +from ..models.constants import ( + ANOTHER_PLAN_RUNNING, + GENERAL_ERROR, + PLAN_IS_EMPTY, + PLAN_LINE_CONDITION_CHECK_FAILED, + PLAN_NOT_COMPATIBLE_WITH_SERVER, + PLAN_STOPPED, +) +from .common import TestTowerCommon + + +class TestTowerPlan(TestTowerCommon): + """Test the cx.tower.plan model.""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + + # Commands + cls.command_run_flight_plan_1 = cls.Command.create( + { + "name": "Run Flight Plan", + "action": "plan", + "flight_plan_id": cls.plan_1.id, + } + ) + cls.command_python_custom_variable_values_1 = cls.Command.create( + { + "name": "Python command to set custom variable values", + "action": "python_code", + "code": """ +custom_values['test_path_'] = '/test_path' +custom_values['test_dir'] = 'test_dir' +custom_values['_my_value'] = 'Just To Test' +""", + } + ) + cls.command_python_custom_variable_values_2 = cls.Command.create( + { + "name": "Python command to update custom variable values", + "action": "python_code", + "code": f""" +custom_values['test_path_'] = '/another_test_path' +custom_values['random_var_reference'] = 'random_var_value' +custom_values['{cls.variable_url.reference}'] = 'https://www.cetmix.com' +""", + } + ) + # Flight plan + cls.plan_2 = cls.Plan.create( + { + "name": "Test plan 2", + "note": "Run another flight plan", + } + ) + cls.plan_2_line_1 = cls.plan_line.create( + { + "sequence": 5, + "plan_id": cls.plan_2.id, + "command_id": cls.command_run_flight_plan_1.id, + } + ) + cls.plan_2_line_2 = cls.plan_line.create( + { + "sequence": 10, + "plan_id": cls.plan_2.id, + "command_id": cls.command_create_dir.id, + } + ) + # Flight plan with access level 1 to test user access rights + cls.plan_3 = cls.Plan.create( + { + "name": "Test plan 3", + "note": "Test user access rights", + "access_level": "1", + "line_ids": [ + (0, 0, {"command_id": cls.command_create_dir.id, "sequence": 1}), + ], + } + ) + # Create line for plan 3 + cls.plan_3_line_1 = cls.plan_line.create( + { + "plan_id": cls.plan_3.id, + "command_id": cls.command_create_dir.id, + "sequence": 10, + } + ) + cls.plan_3_line_1_action = cls.env["cx.tower.plan.line.action"].create( + { + "line_id": cls.plan_3_line_1.id, + "condition": "==", + "value_char": "test", + "action": "e", + } + ) + cls.variable_value = cls.env["cx.tower.variable.value"].create( + { + "variable_id": cls.variable_os.id, + "value_char": "Windows 2k", + "plan_line_action_id": cls.plan_3_line_1_action.id, + } + ) + cls.server = cls.Server.create( + { + "name": "Plan Test Server", + "ssh_username": "test", + "ssh_password": "test", + "ip_v4_address": "localhost", + "ssh_port": 22, + "user_ids": [(6, 0, [cls.user.id])], + "manager_ids": [(6, 0, [cls.manager.id])], + "skip_host_key": True, + } + ) + + def _create_plan(self, **kwargs): + """Helper method to create a flight plan.""" + vals = { + "name": "Test Flight Plan", + "access_level": "1", # override default for user tests + "user_ids": [(6, 0, [])], + "manager_ids": [(6, 0, [])], + "server_ids": [(6, 0, [])], + } + if kwargs: + vals.update(kwargs) + return self.Plan.create(vals) + + def test_user_read_access(self): + """ + For a user: + Read access is allowed if access_level == "1" and + either the plan's own user_ids includes the user + OR at least one related server (via server_ids) + includes the user in its user_ids. + """ + # Case 1: Plan with access_level "1" and user + # included in plan.user_ids. + plan1 = self._create_plan( + **{ + "access_level": "1", + "user_ids": [(6, 0, [self.user.id])], + } + ) + recs1 = self.Plan.with_user(self.user).search([("id", "=", plan1.id)]) + self.assertIn( + plan1, + recs1, + "User should see the plan if in " "plan.user_ids and access_level == '1'.", + ) + + # Case 2: Plan with access_level "1" with no direct user_ids, + # but with a related server that grants access. + plan2 = self._create_plan( + **{ + "access_level": "1", + "user_ids": [(6, 0, [])], + "server_ids": [(6, 0, [self.server.id])], + } + ) + recs2 = self.Plan.with_user(self.user).search([("id", "=", plan2.id)]) + self.assertIn( + plan2, + recs2, + "User should see the plan if a " + "related server.user_ids includes the user.", + ) + + # Negative: Plan with access_level "1" + # with neither direct nor server-based access. + plan3 = self._create_plan( + **{ + "access_level": "1", + "user_ids": [(6, 0, [])], + "server_ids": [(6, 0, [])], + } + ) + recs3 = self.Plan.with_user(self.user).search([("id", "=", plan3.id)]) + self.assertNotIn( + plan3, + recs3, + "User should not see the plan if not granted access.", + ) + + # Also, a user should not be allowed to create a plan. + with self.assertRaises(AccessError): + self.Plan.with_user(self.user).create( + { + "name": "Test Plan", + "access_level": "1", + "user_ids": [(6, 0, [self.user.id])], + } + ) + # ...and modify a plan that they have access to. + with self.assertRaises(AccessError): + plan1.with_user(self.user).write({"name": "User Updated Plan"}) + + def test_manager_read_access(self): + """ + For a manager: + Read access is allowed if access_level <= "2" AND + EITHER the plan itself grants access + (its user_ids or manager_ids includes the manager) + OR either there are no related servers OR a related server + grants access (its user_ids or manager_ids includes the manager). + """ + # Case 1: Plan with access_level "2" and plan.manager_ids + # includes the manager. + plan1 = self._create_plan( + **{ + "access_level": "2", + "manager_ids": [(6, 0, [self.manager.id])], + } + ) + recs1 = self.Plan.with_user(self.manager).search([("id", "=", plan1.id)]) + self.assertIn( + plan1, + recs1, + "Manager should see the plan if in " + "plan.manager_ids and access_level <= '2'.", + ) + + # Case 2: Plan with access_level "2" that does not grant direct access, + # but a related server grants access via its manager_ids. + plan2 = self._create_plan( + **{ + "access_level": "2", + "user_ids": [(6, 0, [])], + "manager_ids": [(6, 0, [])], + "server_ids": [(6, 0, [self.server.id])], + } + ) + recs2 = self.Plan.with_user(self.manager).search([("id", "=", plan2.id)]) + self.assertIn( + plan2, + recs2, + "Manager should see the plan if related " + "server.manager_ids includes the manager.", + ) + + # Case 3 negative: Plan with access_level "2" with no granted access + # if it's linked to a server that does not grant access. + plan3 = self._create_plan( + **{ + "access_level": "2", + "user_ids": [(6, 0, [])], + "manager_ids": [(6, 0, [])], + "server_ids": [(6, 0, [self.server_test_1.id])], + } + ) + recs3 = self.Plan.with_user(self.manager).search([("id", "=", plan3.id)]) + self.assertNotIn( + plan3, + recs3, + "Manager should not see the plan " + "if not granted access to related server.", + ) + + # Case 4 positive: Plan with access_level "2" with no linked servers + # and no related servers that grant access. + plan4 = self._create_plan( + **{ + "access_level": "2", + "user_ids": [(6, 0, [])], + "manager_ids": [(6, 0, [])], + "server_ids": [(6, 0, [])], + } + ) + recs4 = self.Plan.with_user(self.manager).search([("id", "=", plan4.id)]) + self.assertIn( + plan4, + recs4, + "Manager should see the plan if not linked to any servers.", + ) + + # Case 5 negative: raise access level to 3 + # and check if manager can see the plan + plan4.access_level = "3" + recs5 = self.Plan.with_user(self.manager).search([("id", "=", plan4.id)]) + self.assertNotIn( + plan4, + recs5, + "Manager should not see the plan " "if access level is raised to 3.", + ) + + def test_manager_write_create_access(self): + """ + For a manager: + Write (update) and create access are allowed if access_level <= "2" AND + the plan's own manager_ids includes the manager. + """ + # Case 1: Plan with access_level "2" and plan.manager_ids + # includes the manager should allow to update the plan. + plan1 = self._create_plan( + **{ + "access_level": "2", + "manager_ids": [(6, 0, [self.manager.id])], + } + ) + try: + plan1.with_user(self.manager).write({"name": "Manager Updated Plan"}) + except AccessError: + self.fail( + "Manager should be able to update the plan if " "in plan.manager_ids.", + ) + self.assertEqual( + plan1.with_user(self.manager).name, + "Manager Updated Plan", + ) + + # Case 2: Attempt to create a plan as a manager without + # including their ID in manager_ids should fail. + with self.assertRaises(AccessError): + self.Plan.with_user(self.manager).create( + { + "name": "Manager Created Plan", + "access_level": "2", + "manager_ids": [(6, 0, [])], + } + ) + + # Case 3: Create a plan with manager added to manager_ids + # should be allowed. + try: + self.Plan.with_user(self.manager).create( + { + "name": "Manager Created Plan", + "access_level": "2", + "manager_ids": [(6, 0, [self.manager.id])], + } + ) + except AccessError: + self.fail( + "Manager should be able to create a plan " + "with himself added to manager_ids.", + ) + + def test_manager_unlink_access(self): + """ + For a manager: + Unlink (delete) access is allowed if access_level <= "2", + the current user is the record creator, + AND the plan's own manager_ids includes the manager. + """ + # Scenario 1: Plan created by the manager with plan.manager_ids + # including the manager. + plan1 = self.Plan.with_user(self.manager).create( + { + "name": "Manager Created Plan", + "access_level": "2", + } + ) + try: + plan1.unlink() + except AccessError: + self.fail( + "Manager should be able to delete the plan " + "they created if in plan.manager_ids.", + ) + + # Scenario 2: Plan created by another user, even if + # plan.manager_ids includes the manager. + plan2 = self._create_plan( + **{ + "access_level": "2", + "manager_ids": [(6, 0, [self.manager.id])], + } + ) + with self.assertRaises(AccessError): + plan2.with_user(self.manager).unlink() + + def test_root_unrestricted_access(self): + """ + For a root user: + Unlimited access: root can read, write, create, and delete plans + regardless of access_level or related servers. + """ + plan = self._create_plan( + **{ + "access_level": "3", # above threshold for managers + } + ) + recs = self.Plan.with_user(self.root).search([("id", "=", plan.id)]) + self.assertIn( + plan, + recs, + "Root should see the plan regardless of restrictions.", + ) + try: + plan.with_user(self.root).write({"name": "Root Updated Plan"}) + except AccessError: + self.fail("Root should be able to update the plan without restrictions.") + self.assertEqual(plan.with_user(self.root).name, "Root Updated Plan") + plan2 = self.Plan.with_user(self.root).create( + { + "name": "Root Created Plan", + "access_level": "3", + } + ) + self.assertTrue( + plan2, + "Root should be able to create a plan without restrictions.", + ) + plan2.with_user(self.root).unlink() + recs_after = self.Plan.with_user(self.root).search([("id", "=", plan2.id)]) + self.assertFalse( + recs_after, + "Root should be able to delete the plan without restrictions.", + ) + + def test_plan_line_action_name(self): + """Test plan line action naming""" + + # Add new line + plan_line_1 = self.plan_line.create( + { + "plan_id": self.plan_1.id, + "command_id": self.command_create_dir.id, + "sequence": 10, + } + ) + + # Add new action with custom + action_1 = self.plan_line_action.create( + { + "line_id": plan_line_1.id, + "condition": "==", + "value_char": "35", + "action": "e", + } + ) + + # Check if action name is composed correctly + expected_action_string = _( + "If exit code == 35 then Exit with command exit code" + ) + self.assertEqual( + action_1.name, + expected_action_string, + msg="Action name doesn't match expected one", + ) + + def test_plan_get_next_action_values(self): + """Test _get_next_action_values() + + NB: This test relies on demo data and might fail if it is modified + """ + # Ensure demo date integrity just in case demo date is modified + self.assertEqual( + self.plan_1.line_ids[0].action_ids[1].custom_exit_code, + 255, + "Plan 1 line #1 action #2 custom exit code must be equal to 255", + ) + + # Create a new plan log. + plan_line_1 = self.plan_1.line_ids[0] # Using command 1 from Plan 1 + plan_log = self.PlanLog.create( + { + "server_id": self.server_test_1.id, + "plan_id": self.plan_1.id, + "is_running": True, + "start_date": fields.Datetime.now(), + "plan_line_executed_id": plan_line_1.id, + } + ) + + # ************************ + # Test with exit code == 0 + # Must run the next command + # ************************ + command_log = self.CommandLog.create( + { + "plan_log_id": plan_log.id, + "server_id": self.server_test_1.id, + "command_id": plan_line_1.command_id.id, + "command_response": "Ok", + "command_status": 0, # Error code + } + ) + action, exit_code, next_line_id = self.plan_1._get_next_action_values( + command_log + ) + self.assertEqual(action, "n", msg="Action must be 'Run next action'") + self.assertEqual(exit_code, 0, msg="Exit code must be equal to 0") + self.assertEqual( + next_line_id, + self.plan_line_2, + msg="Next line must be Line #2", + ) + + # ************************ + # Test with exit code == 8 + # Must exit with custom code + # ************************ + command_log.command_status = 8 + + action, exit_code, next_line_id = self.plan_1._get_next_action_values( + command_log + ) + self.assertEqual(action, "ec", msg="Action must be 'Exit with custom code'") + self.assertEqual(exit_code, 255, msg="Exit code must be equal to 255") + self.assertIsNone(next_line_id, msg="Next line must be None") + + # ************************ + # Test with exit code == -12 + # Plan on error action must be triggered because no action condition is matched + # ************************ + command_log.command_status = -12 + + action, exit_code, next_line_id = self.plan_1._get_next_action_values( + command_log + ) + self.assertEqual(action, "e", msg="Action must be 'Exit with command code'") + self.assertEqual(exit_code, -12, msg="Exit code must be equal to -12") + self.assertIsNone(next_line_id, msg="Next line must be None") + + # ************************ + # Change Plan 'On error action' of the plan to 'Run next command' + # Next line must be Line #2 + # ************************ + + command_log.command_status = -12 + self.plan_1.on_error_action = "n" + + action, exit_code, next_line_id = self.plan_1._get_next_action_values( + command_log + ) + self.assertEqual(action, "n", msg="Action must be 'Run next action'") + self.assertEqual(exit_code, -12, msg="Exit code must be equal to -12") + self.assertEqual( + next_line_id, + self.plan_line_2, + msg="Next line must be Line #2", + ) + + # ************************ + # Run Line 2 (the last one). + # Action 2 will be triggered which is "Run next line". + # However because this is the last line of the plan must exit with command code. + # ************************ + + plan_line_2 = self.plan_1.line_ids[1] + plan_log.plan_line_executed_id = plan_line_2.id + command_log.command_status = 3 + + action, exit_code, next_line_id = self.plan_1._get_next_action_values( + command_log + ) + self.assertEqual(action, "e", msg="Action must be 'Exit with command code'") + self.assertEqual(exit_code, 3, msg="Exit code must be equal to 3") + self.assertIsNone(next_line_id, msg="Next line must be None") + + # ************************ + # Run Line 2 (the last one). + # Fallback plan action must be triggered because no action condition is matched + # However because this is the last line of the plan must exit with command code. + # ************************ + + command_log.command_status = 1 + + action, exit_code, next_line_id = self.plan_1._get_next_action_values( + command_log + ) + self.assertEqual(action, "e", msg="Action must be 'Exit with command code'") + self.assertEqual(exit_code, 1, msg="Exit code must be equal to 1") + self.assertIsNone(next_line_id, msg="Next line must be None") + + def test_plan_run_single(self): + """Test plan execution results""" + + # Add user as user to Server1 + self.server_test_1.user_ids = [(4, self.user_bob.id)] + + # Ensure that access error is raised + # Because user_bob is not in any Tower group + with self.assertRaises(AccessError): + self.plan_1.with_user(self.user_bob)._run_single(self.server_test_1) + + # Add user to the "User" group + self.add_to_group(self.user_bob, "cetmix_tower_server.group_user") + + # Ensure that access error is raised + # Because plan access level is "Manager" and user_bob is in "User" group + with self.assertRaises(AccessError): + self.plan_1.with_user(self.user_bob)._run_single(self.server_test_1) + + # Set access level to 1 and link to server1 + # so Bob can execute the plan + self.write_and_invalidate( + self.plan_1, + **{"access_level": "1", "server_ids": [(4, self.server_test_1.id)]}, + ) + + self.env["ir.rule"].invalidate_model() + # Run plan + self.plan_1.with_user(self.user_bob)._run_single(self.server_test_1) + + # Check plan log + plan_log_rec = self.PlanLog.search([("server_id", "=", self.server_test_1.id)]) + + # Must be a single record + self.assertEqual(len(plan_log_rec), 1, msg="Must be a single plan record") + + # Ensure all commands were triggered + expected_command_count = 2 + self.assertEqual( + len(plan_log_rec.command_log_ids), + expected_command_count, + msg=f"Must run {expected_command_count} commands", + ) + + # Check plan status + expected_plan_status = 0 + self.assertEqual( + plan_log_rec.plan_status, + expected_plan_status, + msg=f"Plan status must be equal to {expected_plan_status}", + ) + + # ************************ + # Change condition in line #1. + # Action 1 will be triggered which is "Exit with custom code" 29. + # ************************ + action_to_tweak = self.plan_line_1_action_1 + action_to_tweak.write({"custom_exit_code": 29, "action": "ec"}) + + # Run plan + self.plan_1._run_single(self.server_test_1) + + # Check plan log + plan_log_records = self.PlanLog.search( + [("server_id", "=", self.server_test_1.id)] + ) + + # Must be two plan log record + self.assertEqual(len(plan_log_records), 2, msg="Must be 2 plan log records") + plan_log_rec = plan_log_records[0] + + # Ensure all commands were triggered + expected_command_count = 1 + self.assertEqual( + len(plan_log_rec.command_log_ids), + expected_command_count, + msg=f"Must run {expected_command_count} commands", + ) + + # Check plan status + expected_plan_status = 29 + self.assertEqual( + plan_log_rec.plan_status, + expected_plan_status, + msg=f"Plan status must be equal to {expected_plan_status}", + ) + + # Ensure 'path' was substituted with the plan line custom 'path' + self.assertEqual( + self.plan_line_1.path, + plan_log_rec.command_log_ids.path, + "Path in command log must be the same as in the flight plan line", + ) + + def test_plan_and_command_access_level(self): + # Remove userbob from all cxtower_server groups + self.remove_from_group( + self.user_bob, + [ + "cetmix_tower_server.group_user", + "cetmix_tower_server.group_manager", + "cetmix_tower_server.group_root", + ], + ) + + # Add user_bob to group_manager + self.add_to_group(self.user_bob, "cetmix_tower_server.group_manager") + + # Add user_bob as manager to the plan + self.plan_1.manager_ids = [(4, self.user_bob.id)] + + # check if plan and commands included has same access level + self.assertEqual(self.plan_1.access_level, "2") + self.assertEqual(self.command_create_dir.access_level, "2") + self.assertEqual(self.command_list_dir.access_level, "2") + + # check that if we modify plan access level to make it lower than the + # access_level of the commands related with it access level, + # access_level_warn_msg will be created + self.plan_1.with_user(self.user_bob).write({"access_level": "1"}) + self.assertTrue(self.plan_1.access_level_warn_msg) + + # Add user_bob to group_root + self.add_to_group(self.user_bob, "cetmix_tower_server.group_root") + + # check if user_bob can make plan access leve higher than commands access level + self.plan_1.with_user(self.user_bob).write({"access_level": "3"}) + self.assertEqual(self.plan_1.access_level, "3") + + # check that if we create a new plan with an access_level lower than + # the access_level of the command related with access_level_warn_msg + # will be created + command_1 = self.Command.create( + {"name": "New Test Command", "access_level": "3"} + ) + + self.plan_2 = self.Plan.create( + { + "name": "Test plan 2", + "note": "Create directory and list its content", + } + ) + self.plan_line_2_1 = self.plan_line.create( + { + "sequence": 5, + "plan_id": self.plan_2.id, + "command_id": command_1.id, + } + ) + self.assertTrue(self.plan_2.access_level_warn_msg) + + def test_multiple_plan_create_write(self): + """Test multiple plan create/write cases""" + # Create multiple plans at once + plans_data = [ + { + "name": "Test Plan 1", + "note": "Plan 1 Note", + "tag_ids": [(6, 0, [self.tag_test_staging.id])], + }, + { + "name": "Test Plan 2", + "note": "Plan 2 Note", + "tag_ids": [(6, 0, [self.tag_test_production.id])], + }, + { + "name": "Test Plan 3", + "note": "Plan 3 Note", + "tag_ids": [(6, 0, [self.tag_test_staging.id])], + }, + ] + created_plans = self.Plan.create(plans_data) + # Check that all plans are created successfully + self.assertTrue(all(created_plans)) + # Update the access level of the created plans + created_plans.write({"access_level": "3"}) + # Check that all plans are updated successfully + self.assertTrue(all(plan.access_level == "3" for plan in created_plans)) + + def test_plan_with_first_not_executable_condition(self): + """ + Test plan with not executable condition for first plan line + """ + # Add condition for the first plan line + self.plan_line_1.condition = "{{ odoo_version }} == '14.0'" + # Run plan + self.plan_1._run_single(self.server_test_1) + # Check plan log + plan_log_records = self.PlanLog.search( + [("server_id", "=", self.server_test_1.id)] + ) + self.assertEqual( + len(plan_log_records.command_log_ids), + 2, + msg="Must be two command records", + ) + self.assertTrue( + plan_log_records.command_log_ids[0].is_skipped, + msg="First command must be skipped", + ) + self.assertFalse( + plan_log_records.command_log_ids[1].is_skipped, + msg="Second command not must be skipped", + ) + + def test_plan_with_second_not_executable_condition(self): + """ + Test plan with not executable condition for second plan line + """ + # Add condition for second plan line + self.plan_line_2.condition = "{{ odoo_version }} == '14.0'" + # Run plan + self.plan_1._run_single(self.server_test_1) + # Check plan log + plan_log_records = self.PlanLog.search( + [("server_id", "=", self.server_test_1.id)] + ) + self.assertEqual( + len(plan_log_records.command_log_ids), + 2, + msg="Must be two command records", + ) + self.assertTrue( + plan_log_records.command_log_ids[1].is_skipped, + msg="Second command must be skipped", + ) + self.assertFalse( + plan_log_records.command_log_ids[0].is_skipped, + msg="First command not must be skipped", + ) + + def test_plan_with_executable_condition(self): + """ + Test plan with executable condition for plan line + """ + # Add condition for first plan line + self.plan_line_1.condition = "1 == 1" + # Create a global value for the 'Version' variable + self.VariableValue.create( + {"variable_id": self.variable_version.id, "value_char": "14.0"} + ) + # Add condition with variable + self.plan_line_2.condition = ( + "{{ " + self.variable_version.name + " }} == '14.0'" + ) + # Run plan + self.plan_1._run_single(self.server_test_1) + # Check commands + plan_log_records = self.PlanLog.search( + [("server_id", "=", self.server_test_1.id)] + ) + self.assertEqual( + len(plan_log_records.command_log_ids), + 2, + msg="Must be two command records", + ) + self.assertTrue( + all(not command.is_skipped for command in plan_log_records.command_log_ids), + msg="All command should be executed", + ) + + def test_plan_with_update_variables(self): + """ + Test plan updates custom (in-flight) values + """ + # Add new variable to server + self.VariableValue.create( + { + "variable_id": self.variable_version.id, + "value_char": "14.0", + "server_id": self.server_test_1.id, + } + ) + # Create new variable value on action + self.VariableValue.create( + { + "variable_id": self.variable_version.id, + "value_char": "16.0", + "plan_line_action_id": self.plan_line_1_action_1.id, + } + ) + # Add a new variable value on action for a variable absent on the server + self.VariableValue.create( + { + "variable_id": self.variable_os.id, + "value_char": "Ubuntu", + "plan_line_action_id": self.plan_line_1_action_1.id, + } + ) + # Pre-run sanity: server holds initial value and no OS value + exist_server_values = self.server_test_1.variable_value_ids.filtered( + lambda rec: rec.variable_id == self.variable_version + ) + self.assertEqual( + len(exist_server_values), + 1, + "The server should have only one value for the variable", + ) + self.assertEqual( + exist_server_values.value_char, + "14.0", + "The server variable value should be '14.0'", + ) + exist_server_values = self.server_test_1.variable_value_ids.filtered( + lambda rec: rec.variable_id == self.variable_os + ) + self.assertFalse( + exist_server_values, "The server should not have this variable" + ) + # Run plan + self.plan_1._run_single(self.server_test_1) + # After run: server values MUST remain unchanged + server_version_val = self.server_test_1.variable_value_ids.filtered( + lambda rec: rec.variable_id == self.variable_version + ) + self.assertEqual( + server_version_val.value_char, + "14.0", + "Server variable value must remain unchanged", + ) + self.assertFalse( + self.server_test_1.variable_value_ids.filtered( + lambda rec: rec.variable_id == self.variable_os + ), + "Server must not receive new variable from action", + ) + + # But custom (in-flight) values MUST be updated in logs + plan_log = self.PlanLog.search( + [("server_id", "=", self.server_test_1.id)], order="id desc", limit=1 + ) + self.assertTrue(plan_log, "Plan log should exist after run") + self.assertEqual( + plan_log.variable_values[self.variable_version.reference], + "16.0", + "Plan log must contain updated custom value", + ) + self.assertEqual( + plan_log.variable_values[self.variable_os.reference], + "Ubuntu", + "Plan log must contain new custom value", + ) + + last_command_log = plan_log.command_log_ids and plan_log.command_log_ids[-1] + self.assertTrue(last_command_log, "Command log should exist after run") + self.assertEqual( + last_command_log.variable_values[self.variable_version.reference], + "16.0", + "Command log must contain updated custom value", + ) + + def test_plan_with_action_variables_for_condition(self): + """ + Test plan with update server variables and use new + value as condition for next plan line + """ + # Add new variable to server + self.VariableValue.create( + { + "variable_id": self.variable_version.id, + "value_char": "14.0", + "server_id": self.server_test_1.id, + } + ) + # Create new variable value to action to update existing server variable + self.VariableValue.create( + { + "variable_id": self.variable_version.id, + "value_char": "16.0", + "plan_line_action_id": self.plan_line_1_action_1.id, + } + ) + # Add condition with variable + self.plan_line_2.condition = ( + "{{ " + self.variable_version.name + " }} == '14.0'" + ) + # Run plan + self.plan_1._run_single(self.server_test_1) + # Check commands + plan_log_records = self.PlanLog.search( + [("server_id", "=", self.server_test_1.id)] + ) + # The second line of the plan should be skipped because the + # first line of the plan updated the value of the variable + self.assertTrue( + plan_log_records.command_log_ids[1].is_skipped, + msg="Second command must be skipped", + ) + + # Change condition for plan line + self.plan_line_2.condition = ( + "{{ " + self.variable_version.name + " }} == '16.0'" + ) + # Run plan + self.plan_1._run_single(self.server_test_1) + # Check commands + new_plan_log_records = ( + self.PlanLog.search([("server_id", "=", self.server_test_1.id)]) + - plan_log_records + ) + # The second line of the plan should be skipped because the + # first line of the plan updated the value of the variable + self.assertFalse( + new_plan_log_records.command_log_ids[1].is_skipped, + msg="The second plan line should not be skipped", + ) + + def test_flight_plan_copy(self): + """Test duplicating a Flight Plan with lines, actions, and variable values""" + + # Create a Flight Plan + plan = self.Plan.create( + { + "name": "Test Flight Plan", + "note": "Test Note", + } + ) + + # Create a command for the plan line + command = self.Command.create( + { + "name": "Test Command", + # Command to get Linux kernel version + "code": "uname -r", + } + ) + + # Create a Flight Plan Line + plan_line = self.plan_line.create( + { + "plan_id": plan.id, + "command_id": command.id, + "path": "/test/path", + # Condition based on Linux version + "condition": '{{ test_linux_version }} >= "5.0"', + } + ) + + # Create a variable for the action + variable = self.Variable.create({"name": "test_linux_version"}) + + # Create an Action for the Plan Line + action = self.plan_line_action.create( + { + "line_id": plan_line.id, + "action": "n", # next action + "condition": "==", + "value_char": "0", # condition for success + } + ) + + # Create a Variable Value for the Action + self.env["cx.tower.variable.value"].create( + { + "variable_id": variable.id, + "value_char": "5.0", + "plan_line_action_id": action.id, + } + ) + + # Duplicate the Flight Plan + copied_plan = plan.copy() + + # Ensure the new Flight Plan was created with a new ID + self.assertNotEqual( + copied_plan.id, + plan.id, + "Copied plan should have a different ID from the original", + ) + + # Check that the copied plan has the same number of lines + self.assertEqual( + len(copied_plan.line_ids), + len(plan.line_ids), + "Copied plan should have the same number of lines as the original", + ) + + # Check that the copied plan's lines have the same actions as the original + original_line = plan.line_ids + copied_line = copied_plan.line_ids + + # Ensure the command, condition, and custom path are copied correctly + self.assertEqual( + copied_line.command_id.id, + original_line.command_id.id, + "Command should be the same in copied line", + ) + self.assertEqual( + copied_line.path, + original_line.path, + "Custom path should be the same in copied line", + ) + self.assertEqual( + copied_line.condition, + original_line.condition, + "Condition should be the same in copied line", + ) + + # Ensure actions were copied correctly + self.assertEqual( + len(copied_line.action_ids), + len(original_line.action_ids), + "Number of actions should be the same in the copied line", + ) + self.assertEqual( + copied_line.action_ids.action, + original_line.action_ids.action, + "Action should be the same in the copied line", + ) + self.assertEqual( + copied_line.action_ids.condition, + original_line.action_ids.condition, + "Action condition should be the same in the copied line", + ) + self.assertEqual( + copied_line.action_ids.value_char, + original_line.action_ids.value_char, + "Action value should be the same in the copied line", + ) + + # Check that variable values were copied correctly + original_action = original_line.action_ids + copied_action = copied_line.action_ids + + self.assertEqual( + len(copied_action.variable_value_ids), + len(original_action.variable_value_ids), + "Number of variable values should be the same in the copied action", + ) + + self.assertEqual( + copied_action.variable_value_ids.variable_id.id, + original_action.variable_value_ids.variable_id.id, + "Variable should be the same in the copied action", + ) + self.assertEqual( + copied_action.variable_value_ids.value_char, + original_action.variable_value_ids.value_char, + "Variable value should be the same in the copied action", + ) + + def test_plan_with_another_plan(self): + """ + Test to check running another plan from current plan + """ + # Check plan logs + plan_log_records = self.PlanLog.search( + [("server_id", "=", self.server_test_1.id)] + ) + self.assertEqual(len(plan_log_records), 0, "Plan logs should be empty") + # Run plan + self.plan_2._run_single(self.server_test_1) + # Check plan logs after execute command with plan action + plan_log_records = self.PlanLog.search( + [("server_id", "=", self.server_test_1.id)] + ) + self.assertEqual(len(plan_log_records), 2, msg="Should be 2 plan logs") + + parent_plan_log = plan_log_records.filtered( + lambda rec: rec.plan_id == self.plan_2 + ) + self.assertTrue(parent_plan_log, "The log for Plan 2 must exist!") + self.assertEqual( + parent_plan_log.plan_status, 0, "Plan log should success status" + ) + + child_plan_log = plan_log_records - parent_plan_log + self.assertEqual( + child_plan_log.parent_flight_plan_log_id, + parent_plan_log, + "Second plan log should contain parent log link", + ) + triggering = parent_plan_log.command_log_ids.filtered( + lambda log: log.triggered_plan_log_id + ) + self.assertEqual( + len(triggering), 1, "Expected exactly one triggering command log" + ) + self.assertEqual( + child_plan_log.plan_status, + triggering.command_status, + "Parent run-plan command status must equal child plan status", + ) + self.assertEqual( + parent_plan_log.command_log_ids.triggered_plan_log_id, + child_plan_log, + "The command triggered plan line should be equal to child plan", + ) + + # Check that we cannot add recursive plan + with self.assertRaisesRegex( + ValidationError, "Recursive plan call detected in plan.*" + ): + self.plan_line.create( + { + "sequence": 20, + "plan_id": self.plan_1.id, + "command_id": self.command_run_flight_plan_1.id, + } + ) + + # Delete plan lines from first plan + self.plan_1.line_ids = False + # Run plan + self.plan_2._run_single(self.server_test_1) + plan_log_records = ( + self.PlanLog.search([("server_id", "=", self.server_test_1.id)]) + - plan_log_records + ) + + parent_plan_log = plan_log_records.filtered( + lambda rec: rec.plan_id == self.plan_2 + ) + self.assertTrue(parent_plan_log, "The log for Plan 2 must exist!") + self.assertEqual( + parent_plan_log.plan_status, PLAN_IS_EMPTY, "Plan log should failed status" + ) + + child_plan_log = plan_log_records - parent_plan_log + self.assertEqual( + child_plan_log.parent_flight_plan_log_id, + parent_plan_log, + "Second plan log should contain parent log link", + ) + self.assertEqual( + child_plan_log.plan_status, + parent_plan_log.command_log_ids.command_status, + "The command status of parent plan should be equal " + "of status second flight plan", + ) + + def test_plan_with_two_plans(self): + """ + Test to check two plans from plan + """ + self.plan_line.create( + { + "sequence": 15, + "plan_id": self.plan_2.id, + "command_id": self.command_run_flight_plan_1.id, + } + ) + # Check plan logs + plan_log_records = self.PlanLog.search( + [("server_id", "=", self.server_test_1.id)] + ) + self.assertEqual(len(plan_log_records), 0, "Plan logs should be empty") + # Run plan + self.plan_2._run_single(self.server_test_1) + # Check plan logs after execute command with plan action + plan_log_records = self.PlanLog.search( + [("server_id", "=", self.server_test_1.id)] + ) + self.assertEqual(len(plan_log_records), 3, msg="Should be 3 plan logs") + + def test_plan_with_nested_plans(self): + """ + Test to check two plans from plan + """ + command_run_flight_plan_2 = self.Command.create( + { + "name": "Run Flight Plan", + "action": "plan", + "flight_plan_id": self.plan_2.id, + } + ) + plan_3 = self.Plan.create( + { + "name": "Test plan 3", + "note": "Run flight plan 2", + } + ) + self.plan_line.create( + { + "sequence": 5, + "plan_id": plan_3.id, + "command_id": command_run_flight_plan_2.id, + } + ) + # Check plan logs + plan_log_records = self.PlanLog.search( + [("server_id", "=", self.server_test_1.id)] + ) + self.assertEqual(len(plan_log_records), 0, "Plan logs should be empty") + # Run plan + plan_3._run_single(self.server_test_1) + # Check plan logs after execute command with plan action + plan_log_records = self.PlanLog.search( + [("server_id", "=", self.server_test_1.id)] + ) + self.assertEqual(len(plan_log_records), 3, msg="Should be 3 plan logs") + + last_child_plan_log = plan_log_records.filtered( + lambda rec: rec.plan_id == self.plan_1 + ) + self.assertTrue(last_child_plan_log, "The log for Plan 1 must exist!") + self.assertEqual( + last_child_plan_log.plan_status, 0, "Plan log should success status" + ) + + self.assertIn( + last_child_plan_log.parent_flight_plan_log_id, + plan_log_records, + "Parent plan logs should exist", + ) + self.assertEqual( + last_child_plan_log.parent_flight_plan_log_id.plan_id, + self.plan_2, + "Parent plan should be equal to plan 2", + ) + + child_plan_log = plan_log_records.filtered( + lambda rec: rec.plan_id == self.plan_2 + ) + self.assertIn( + child_plan_log.parent_flight_plan_log_id, + plan_log_records, + "Parent plan logs should exist", + ) + self.assertEqual( + child_plan_log.parent_flight_plan_log_id.plan_id, + plan_3, + "Parent plan should be equal to plan 3", + ) + self.assertEqual( + child_plan_log.command_log_ids.triggered_plan_log_id, + last_child_plan_log, + "The command triggered plan line should be equal to last child plan", + ) + self.assertEqual( + child_plan_log.command_log_ids.triggered_plan_log_id, + last_child_plan_log, + "The command triggered plan line should be equal to last child plan", + ) + parent_plan_log = plan_log_records - child_plan_log - last_child_plan_log + self.assertEqual( + parent_plan_log.command_log_ids.triggered_plan_log_id, + child_plan_log, + "The command triggered plan line from parent plan " + "should be equal to child plan", + ) + + # Check that we cannot change command with existing plan, + # because it's recursive plan + with self.assertRaisesRegex( + ValidationError, "Recursive plan call detected in plan.*" + ): + self.plan_line_1.write( + { + "command_id": command_run_flight_plan_2.id, + } + ) + + # Set the previous command back + + self.plan_line_1.write( + { + "command_id": self.command_create_dir.id, + } + ) + # --- Check server dependency handling + + # Remove all existing flight plan logs + self.PlanLog.search([]).unlink() + + # Set server dependency for plan 2 + self.plan_2.write( + { + "server_ids": [(6, 0, [self.server.id])], + } + ) + plan_log = self.server_test_1.run_flight_plan(self.plan_2) + self.assertEqual(plan_log.plan_status, PLAN_NOT_COMPATIBLE_WITH_SERVER) + + # Run plan on allowed server + plan_log = self.server.run_flight_plan(self.plan_2) + self.assertEqual(plan_log.plan_status, 0) + + def test_failed_first_child_plan_with_another_plan(self): + """ + Check that child plan was failed then parent plan is failed too + """ + # Add new plan line + self.plan_line.create( + { + "sequence": 15, + "plan_id": self.plan_2.id, + "command_id": self.command_run_flight_plan_1.id, + } + ) + # Check plan logs + plan_log_records = self.PlanLog.search( + [("server_id", "=", self.server_test_1.id)] + ) + self.assertEqual(len(plan_log_records), 0, "Plan logs should be empty") + + # Simulate a failed Plan 1. To achieve this, we need to update the command + # associated with Plan 1 to apply the desired side effect. + self.plan_1.line_ids.command_id[0].code = "fail" + + # Run plan + self.plan_2._run_single(self.server_test_1) + + # Check plan logs after execute command with plan action + plan_log_records = self.PlanLog.search( + [("server_id", "=", self.server_test_1.id)] + ) + # 2 logs only because plan should exist with error after first failed command + self.assertEqual(len(plan_log_records), 2, msg="Should be 2 plan logs") + + parent_plan_log = plan_log_records.filtered( + lambda rec: rec.plan_id == self.plan_2 + ) + self.assertTrue(parent_plan_log, "The log for Plan 2 must exist!") + self.assertEqual( + parent_plan_log.plan_status, GENERAL_ERROR, "Plan log should failed status" + ) + + child_plan_log = plan_log_records - parent_plan_log + self.assertEqual( + child_plan_log.parent_flight_plan_log_id, + parent_plan_log, + "Second plan log should contain parent log link", + ) + self.assertEqual( + child_plan_log.plan_status, + parent_plan_log.command_log_ids.command_status, + "The command status of main plan should be equal " + "of status second flight plan", + ) + + def test_failed_second_child_plan_with_another_plan(self): + """ + Check that child plan was failed then parent plan is failed too + """ + # Add new plan line + line = self.plan_line.create( + { + "sequence": 15, + "plan_id": self.plan_2.id, + "command_id": self.command_run_flight_plan_1.id, + } + ) + + cx_tower_plan_obj = self.registry["cx.tower.plan"] + _run_single_super = cx_tower_plan_obj._run_single + + def _run_single(this, *args, **kwargs): + if ( + this == self.plan_1 + and this.env["cx.tower.plan.log"] + .browse(kwargs["log"]["plan_log_id"]) + .plan_line_executed_id + == line + ): + # Simulate a failed Plan 1. To achieve this, we need to update + # the command associated with Plan 1 to apply the desired side effect. + self.plan_1.line_ids.command_id[0].code = "fail" + return _run_single_super(this, *args, **kwargs) + + with patch.object(cx_tower_plan_obj, "_run_single", _run_single): + # Run plan + self.plan_2._run_single(self.server_test_1) + + # Check plan logs after execute command with plan action + plan_log_records = self.PlanLog.search( + [("server_id", "=", self.server_test_1.id)] + ) + # 3 logs because plan should exist with error after second failed command + self.assertEqual(len(plan_log_records), 3, msg="Should be 3 plan logs") + + parent_plan_log = plan_log_records.filtered( + lambda rec: rec.plan_id == self.plan_2 + ) + self.assertTrue(parent_plan_log, "The log for Plan 2 must exist!") + self.assertEqual( + parent_plan_log.plan_status, GENERAL_ERROR, "Plan log should failed status" + ) + + child_plan_log = plan_log_records - parent_plan_log + self.assertEqual( + child_plan_log.parent_flight_plan_log_id, + parent_plan_log, + "Second plan log should contain parent log link", + ) + self.assertEqual( + len(child_plan_log), + 2, + "Must be 2 child plan logs", + ) + self.assertIn( + GENERAL_ERROR, + child_plan_log.mapped("plan_status"), + "One of plan status of child plan must be GENERAL_ERROR", + ) + self.assertIn( + 0, + child_plan_log.mapped("plan_status"), + "One of plan status of child plan must be GENERAL_ERROR", + ) + + def test_plan_with_another_plan_with_condition(self): + """ + Test that parent plan will success finished + if child plan executable by condition + """ + # Add condition for first plan line + self.plan_line_1.condition = "1 == 1" + # Check plan logs + plan_log_records = self.PlanLog.search( + [("server_id", "=", self.server_test_1.id)] + ) + self.assertEqual(len(plan_log_records), 0, "Plan logs should be empty") + # Run plan + self.plan_2._run_single(self.server_test_1) + # Check plan logs after execute command with plan action + plan_log_records = self.PlanLog.search( + [("server_id", "=", self.server_test_1.id)] + ) + + self.assertEqual(len(plan_log_records), 2, msg="Should be 2 plan logs") + + parent_plan_log = plan_log_records.filtered( + lambda rec: rec.plan_id == self.plan_2 + ) + self.assertTrue(parent_plan_log, "The log for Plan 2 must exist!") + self.assertEqual( + parent_plan_log.plan_status, 0, "Plan log should success status" + ) + + child_plan_log = plan_log_records - parent_plan_log + self.assertEqual( + child_plan_log.parent_flight_plan_log_id, + parent_plan_log, + "Second plan log should contain parent log link", + ) + self.assertEqual( + child_plan_log.plan_status, + parent_plan_log.command_log_ids.filtered( + lambda log: log.triggered_plan_log_id + ).command_status, + "The command status of main plan should be equal " + "of status second flight plan", + ) + + def test_plan_with_another_plan_with_not_executable_condition(self): + """ + Test plan with not executable condition for second plan line + """ + # Add condition for first plan line + self.plan_line_1.condition = "{{ odoo_version }} == '14.0'" + # Check plan logs + plan_log_records = self.PlanLog.search( + [("server_id", "=", self.server_test_1.id)] + ) + self.assertEqual(len(plan_log_records), 0, "Plan logs should be empty") + # Run plan + self.plan_2._run_single(self.server_test_1) + + # Check plan logs after execute command with plan action + plan_log_records = self.PlanLog.search( + [("server_id", "=", self.server_test_1.id)] + ) + + self.assertEqual(len(plan_log_records), 2, msg="Should be 2 plan logs") + + self.assertIn( + PLAN_LINE_CONDITION_CHECK_FAILED, + plan_log_records.command_log_ids.mapped("command_status"), + "One of commands should be skipped", + ) + + def test_plan_with_another_plan_with_all_not_executable_condition(self): + """ + Test plan with not executable condition for second plan line + """ + # Add condition for all plan lines + self.plan_line_1.condition = "{{ odoo_version }} == '14.0'" + self.plan_line_2.condition = "{{ odoo_version }} == '14.0'" + + self.plan_2_line_1.condition = "{{ odoo_version }} == '14.0'" + self.plan_2_line_2.condition = "{{ odoo_version }} == '14.0'" + + self.plan_2._run_single(self.server_test_1) + + # Check plan logs after execute command with plan action + plan_log_records = self.PlanLog.search( + [("server_id", "=", self.server_test_1.id)] + ) + + self.assertEqual(len(plan_log_records), 1, msg="Should be 1 plan logs") + self.assertEqual( + PLAN_LINE_CONDITION_CHECK_FAILED, + plan_log_records.command_log_ids.filtered( + lambda log: log.command_id == self.command_run_flight_plan_1 + ).command_status, + "Command status should be skipped", + ) + + def test_plan_unlink(self): + plan = self.plan_1.copy() + plan_id = plan.id + plan_line_ids = plan.line_ids + plan_line_action_ids = plan.mapped("line_ids.action_ids") + + plan.unlink() + + self.assertFalse( + self.Plan.search([("id", "=", plan_id)]), msg="Plan should be deleted" + ) + self.assertFalse( + self.plan_line.search([("id", "in", plan_line_ids.ids)]), + msg="Plan line should be deleted when Plan is deleted", + ) + self.assertFalse( + self.plan_line_action.search([("id", "in", plan_line_action_ids.ids)]), + msg="Plan line action should be deleted when Plan line is deleted", + ) + + def test_plan_command_server_compatibility(self): + """Test plan execution with server-restricted flight plans""" + # Create a new test server + test_server = self.Server.create( + { + "name": "Test Server", + "ip_v4_address": "localhost", + "ssh_username": "admin", + "ssh_password": "password", + "ssh_auth_mode": "p", + "host_key": "test_key", + } + ) + + # Create a flight plan restricted to the test server + plan = self.Plan.create( + { + "name": "Server Restricted Plan", + "server_ids": [(6, 0, [test_server.id])], + "line_ids": [ + (0, 0, {"command_id": self.command_create_dir.id, "sequence": 1}) + ], + } + ) + + # Should fail when executing on non-allowed server + plan_log = plan._run_single(self.server_test_1) + self.assertEqual(plan_log.plan_status, PLAN_NOT_COMPATIBLE_WITH_SERVER) + + # Should work on allowed server + plan._run_single(test_server) + plan_log = self.PlanLog.search( + [("plan_id", "=", plan.id), ("server_id", "=", test_server.id)], limit=1 + ) + self.assertEqual(plan_log.command_log_ids.command_status, 0) + + def test_another_plan_running(self): + """Test the parallel plan running""" + + # Ensure that the plan doesn't allow parallel running + self.plan_1.write({"allow_parallel_run": False}) + + # Create a new plan log with a plan that is already running + self.PlanLog.create( + { + "plan_id": self.plan_1.id, + "server_id": self.server_test_1.id, + "start_date": fields.Datetime.now(), + } + ) + + # Launch the same plan on the same server + plan_log = self.server_test_1.run_flight_plan(self.plan_1) + self.assertEqual(plan_log.plan_status, ANOTHER_PLAN_RUNNING) + + # Now allow parallel running + self.plan_1.write({"allow_parallel_run": True}) + + # Launch the same plan on the same server + plan_log = self.server_test_1.run_flight_plan(self.plan_1) + self.assertEqual(plan_log.plan_status, 0) + + def test_plan_custom_variables(self): + """Test plan with custom variables""" + command_python_1_id = self.command_python_custom_variable_values_1.id + command_python_2_id = self.command_python_custom_variable_values_2.id + + plan = self._create_plan( + **{ + "name": "Plan with custom variables", + "line_ids": [ + ( + 0, + 0, + { + "command_id": command_python_1_id, + "sequence": 1, + }, + ), + (0, 0, {"command_id": self.command_create_dir.id, "sequence": 2}), + ( + 0, + 0, + { + "command_id": command_python_2_id, + "sequence": 3, + }, + ), + (0, 0, {"command_id": self.command_create_dir.id, "sequence": 4}), + ], + } + ) + + # Run plan + plan_log = self.server_test_1.run_flight_plan(plan) + + # Check that custom variable values were updated correctly + # (The log of plan should contain the last updatedvalues) + self.assertEqual(plan_log.variable_values["test_path_"], "/another_test_path") + self.assertEqual(plan_log.variable_values["test_dir"], "test_dir") + self.assertEqual( + plan_log.variable_values["random_var_reference"], "random_var_value" + ) + self.assertEqual(plan_log.variable_values["_my_value"], "Just To Test") + + command_logs = plan_log.command_log_ids + self.assertEqual( + len(command_logs), + len(plan.line_ids), + f"Should be {len(plan.line_ids)} command logs.", + ) + + # Check that custom variable values were created correctly + # in first python command log + command_python_command_1_log = command_logs.filtered( + lambda log: log.command_id.id == command_python_1_id + ) + self.assertEqual( + command_python_command_1_log.variable_values["test_path_"], "/test_path" + ) + self.assertEqual( + command_python_command_1_log.variable_values["test_dir"], "test_dir" + ) + self.assertEqual( + command_python_command_1_log.variable_values["_my_value"], "Just To Test" + ) + + # Check that custom variable values used in rendered command code + command_create_dir_logs = command_logs.filtered( + lambda log: log.command_id == self.command_create_dir + ) + first_command_create_dir_log = command_create_dir_logs[0] + second_command_create_dir_log = command_create_dir_logs[1] + + # the first_command_create_dir_log.code is equal to + # 'cd /test_path && mkdir test_dir' + # because rendered code contains custom variable values updated + # from first python command + self.assertEqual( + first_command_create_dir_log.code, "cd /test_path && mkdir test_dir" + ) + + # Check that custom variable values were updated correctly in command logs + command_python_command_2_log = command_logs.filtered( + lambda log: log.command_id.id == command_python_2_id + ) + self.assertEqual( + command_python_command_2_log.variable_values["test_path_"], + "/another_test_path", + ) + self.assertEqual( + command_python_command_2_log.variable_values["test_dir"], "test_dir" + ) + self.assertEqual( + command_python_command_2_log.variable_values["random_var_reference"], + "random_var_value", + ) + self.assertEqual( + command_python_command_2_log.variable_values["_my_value"], "Just To Test" + ) + self.assertEqual( + command_python_command_2_log.variable_values[self.variable_url.reference], + "https://www.cetmix.com", + ) + + # the second_command_create_dir_log.code is equal to + # 'cd /another_test_path && mkdir test_dir' + # because rendered code contains custom variable values updated + # from second python command + self.assertEqual( + second_command_create_dir_log.code, + "cd /another_test_path && mkdir test_dir", + ) + + def test_plan_custom_variables_wizard(self): + """Test plan with custom variables from wizard""" + command_python_1_id = self.command_python_custom_variable_values_1.id + command_python_2_id = self.command_python_custom_variable_values_2.id + plan = self._create_plan( + **{ + "name": "Plan with custom variables", + "line_ids": [ + ( + 0, + 0, + { + "command_id": command_python_1_id, + "sequence": 1, + }, + ), + (0, 0, {"command_id": self.command_create_dir.id, "sequence": 2}), + ( + 0, + 0, + { + "command_id": command_python_2_id, + "sequence": 3, + }, + ), + (0, 0, {"command_id": self.command_create_dir.id, "sequence": 4}), + ], + } + ) + + # Create wizard with custom variable values + wizard = self.env["cx.tower.plan.run.wizard"].create( + { + "plan_id": plan.id, + "server_ids": [(6, 0, [self.server_test_1.id])], + "custom_variable_value_ids": [ + ( + 0, + 0, + { + "variable_id": self.variable_version.id, + "value_char": "16.0", + }, + ), + ], + } + ) + + # Run wizard + action = wizard.run_flight_plan() + plan_log = self.PlanLog.search( + [("label", "=", action["context"]["search_default_label"])], + limit=1, + ) + self.assertTrue(plan_log, "Plan log should be created") + + # Check that custom variable values were updated correctly + # (The log of plan should contain the last updated + # values + custom variable value from wizard) + self.assertEqual(plan_log.variable_values["test_path_"], "/another_test_path") + self.assertEqual(plan_log.variable_values["test_dir"], "test_dir") + self.assertEqual( + plan_log.variable_values["random_var_reference"], "random_var_value" + ) + self.assertEqual(plan_log.variable_values["_my_value"], "Just To Test") + self.assertEqual( + plan_log.variable_values[self.variable_version.reference], "16.0" + ) + + def test_plan_with_another_plan_custom_variables(self): + """Test plan with another plan with custom variables""" + # Create plan with next structure: + # Plan 1: + # - Command 1: Run plan 2 + # - Command 2: Run Python command to set custom variable values + # - Command 3: Create directory + # Plan 2: + # - Command 1: Python command to set custom variable values + # - Command 2: Create directory + # - Command 3: Python command to update custom variable values + + command_python_1_id = self.command_python_custom_variable_values_1.id + command_python_2_id = self.command_python_custom_variable_values_2.id + plan2 = self._create_plan( + **{ + "name": "Plan 2", + "line_ids": [ + ( + 0, + 0, + { + "command_id": command_python_1_id, + "sequence": 1, + }, + ), + (0, 0, {"command_id": self.command_create_dir.id, "sequence": 2}), + ( + 0, + 0, + { + "command_id": command_python_2_id, + "sequence": 3, + }, + ), + ], + } + ) + + command_run_plan_2 = self.Command.create( + { + "name": "Run Flight Plan", + "action": "plan", + "flight_plan_id": plan2.id, + } + ) + command_python_custom_variable_values_3 = self.Command.create( + { + "name": "Python command to update custom variable values", + "action": "python_code", + "code": """ +custom_values['random_var_reference'] = 'another_random_var_value' +""", + } + ) + + plan1 = self._create_plan( + **{ + "name": "Plan 1", + "line_ids": [ + (0, 0, {"command_id": command_run_plan_2.id, "sequence": 1}), + ( + 0, + 0, + { + "command_id": command_python_custom_variable_values_3.id, + "sequence": 2, + }, + ), + (0, 0, {"command_id": self.command_create_dir.id, "sequence": 3}), + ], + } + ) + + # Create wizard with custom variable values + wizard = self.env["cx.tower.plan.run.wizard"].create( + { + "plan_id": plan1.id, + "server_ids": [(6, 0, [self.server_test_1.id])], + "custom_variable_value_ids": [ + ( + 0, + 0, + { + "variable_id": self.variable_version.id, + "value_char": "16.0", + }, + ), + ( + 0, + 0, + { + "variable_id": self.variable_url.id, + "value_char": "https://www.test.com", + }, + ), + ], + } + ) + + # Run wizard + action = wizard.run_flight_plan() + plan_log = self.PlanLog.search( + [("label", "=", action["context"]["search_default_label"])], + limit=1, + ) + self.assertTrue(plan_log, "Plan log should be created") + + command_logs = plan_log.command_log_ids + self.assertEqual( + len(command_logs), + len(plan1.line_ids), + f"Should be {len(plan1.line_ids)} command logs.", + ) + + # First command log is run plan 2 log that contains custom variable values + # updated from plan 2. This command log should contain the same custom + # variable values as from plan 2 log + run_plan2_command_log = command_logs[0] + run_plan2_command_log_variable_values = run_plan2_command_log.variable_values + + plan2_log = run_plan2_command_log.triggered_plan_log_id + plan2_log_variable_values = plan2_log.variable_values + + # check that variable values are the same + self.assertEqual( + run_plan2_command_log_variable_values, plan2_log_variable_values + ) + + # Before finished command (run child plan): we have next variable values: + # {'test_version': '16.0', 'test_url': 'https://www.test.com'} + + # After finished command (run child plan): we have next variable values: + # { + # 'test_version': '16.0', + # 'test_url': 'https://www.cetmix.com', + # 'test_path_': '/another_test_path', + # 'test_dir': 'test_dir', + # '_my_value': 'Just To Test', + # 'random_var_reference': 'random_var_value' + # } + self.assertEqual( + run_plan2_command_log_variable_values["test_path_"], "/another_test_path" + ) + self.assertEqual(run_plan2_command_log_variable_values["test_dir"], "test_dir") + self.assertEqual( + run_plan2_command_log_variable_values["_my_value"], "Just To Test" + ) + self.assertEqual( + run_plan2_command_log_variable_values["random_var_reference"], + "random_var_value", + ) + self.assertEqual( + run_plan2_command_log_variable_values[self.variable_version.reference], + "16.0", + ) + self.assertEqual( + run_plan2_command_log_variable_values[self.variable_url.reference], + "https://www.cetmix.com", + ) + + # After finished main plan: we have next variable values: + # { + # 'test_version': '16.0', + # 'test_url': 'https://www.cetmix.com', + # 'test_path_': '/another_test_path', + # 'test_dir': 'test_dir', + # '_my_value': 'Just To Test', + # 'random_var_reference': 'another_random_var_value' + # } + self.assertEqual(plan_log.variable_values["test_path_"], "/another_test_path") + self.assertEqual(plan_log.variable_values["test_dir"], "test_dir") + self.assertEqual(plan_log.variable_values["_my_value"], "Just To Test") + self.assertEqual( + plan_log.variable_values["random_var_reference"], "another_random_var_value" + ) + self.assertEqual( + plan_log.variable_values[self.variable_version.reference], "16.0" + ) + self.assertEqual( + plan_log.variable_values[self.variable_url.reference], + "https://www.cetmix.com", + ) + + @mute_logger("odoo.addons.cetmix_tower_server.models.cetmix_tower") + def test_plan_render_jet_template(self): + """Test plan rendering jet template""" + plan_log_record_count = self.PlanLog.search_count( + [("server_id", "=", self.server_test_1.id)] + ) + self.assertEqual(plan_log_record_count, 0, "Plan logs should be empty") + + # Set variable values for the server + res = self.CetmixTower.server_set_variable_value( + self.server_test_1.reference, "test_path_", "/opt/tower" + ) + self.assertEqual(res["exit_code"], 0, "Variable 'test_path_' not found/updated") + res = self.CetmixTower.server_set_variable_value( + self.server_test_1.reference, "test_dir", "server1" + ) + self.assertEqual(res["exit_code"], 0, "Variable 'test_dir' not found/updated") + + # -- 1-- + # Run plan without jet template + self.server_test_1.run_flight_plan(self.plan_2) + + plan_log = self.PlanLog.search( + [ + ("plan_id", "=", self.plan_2.id), + ("server_id", "=", self.server_test_1.id), + ], + ) + self.assertEqual(len(plan_log), 1, "A single plan log should be created") + self.assertEqual( + len(plan_log.command_log_ids), 2, "Two commands should be executed" + ) + self.assertFalse(plan_log.jet_template_id, "Jet template should be empty") + + # Check the SSH command output. Second command + rendered_code_expected = "cd /opt/tower && mkdir server1" + ssh_command_log = plan_log.command_log_ids[1] + self.assertEqual( + ssh_command_log.code, rendered_code_expected, "SSH command should succeed" + ) + + # Check the nested plan command output. + # This is needed to ensure that the nested plan commands + # are rendered properly. + nested_ssh_command_log = plan_log.command_log_ids[ + 0 + ].triggered_plan_log_id.command_log_ids[0] + self.assertEqual( + nested_ssh_command_log.code, + rendered_code_expected, + "SSH command should succeed", + ) + + # -- 2 -- + # Run plan with jet template + + # Delete previous plan log + plan_log.unlink() + + self.server_test_1.run_flight_plan( + self.plan_2, jet_template=self.jet_template_sample + ) + + plan_log = self.PlanLog.search( + [ + ("plan_id", "=", self.plan_2.id), + ("server_id", "=", self.server_test_1.id), + ], + ) + self.assertEqual(len(plan_log), 1, "A single plan log should be created") + self.assertEqual( + len(plan_log.command_log_ids), 2, "Two commands should be executed" + ) + self.assertEqual( + plan_log.jet_template_id, + self.jet_template_sample, + "Jet template doesn't match", + ) + + # Check the SSH command output. Second command + rendered_code_expected = "cd /jets/templates/template1 && mkdir jet_templates" + ssh_command_log = plan_log.command_log_ids[1] + self.assertEqual( + ssh_command_log.code, rendered_code_expected, "SSH command should succeed" + ) + + # Check the nested plan command output. + # This is needed to ensure that the nested plan commands + # are rendered properly. + nested_ssh_command_log = plan_log.command_log_ids[ + 0 + ].triggered_plan_log_id.command_log_ids[0] + self.assertEqual( + nested_ssh_command_log.code, + rendered_code_expected, + "SSH command should succeed", + ) + + # -- 3 -- + # Run plan with jet + # Delete previous plan log + plan_log.unlink() + + self.server_test_1.run_flight_plan(self.plan_2, jet=self.jet_sample) + + plan_log = self.PlanLog.search( + [ + ("plan_id", "=", self.plan_2.id), + ("server_id", "=", self.server_test_1.id), + ], + ) + self.assertEqual(len(plan_log), 1, "A single plan log should be created") + self.assertEqual( + len(plan_log.command_log_ids), 2, "Two commands should be executed" + ) + self.assertEqual( + plan_log.jet_template_id, + self.jet_template_sample, + "Jet template doesn't match", + ) + self.assertEqual(plan_log.jet_id, self.jet_sample, "Jet doesn't match") + + # Check the SSH command output. Second command + rendered_code_expected = "cd /jets/jet1 && mkdir jet_templates" + ssh_command_log = plan_log.command_log_ids[1] + self.assertEqual( + ssh_command_log.code, rendered_code_expected, "SSH command should succeed" + ) + + # Check the nested plan command output. + # This is needed to ensure that the nested plan commands + # are rendered properly. + nested_ssh_command_log = plan_log.command_log_ids[ + 0 + ].triggered_plan_log_id.command_log_ids[0] + self.assertEqual( + nested_ssh_command_log.code, + rendered_code_expected, + "SSH command should succeed", + ) + + def test_plan_with_custom_values_in_condition(self): + """ + Ensure that plan line conditions see updated custom_values + produced by previous commands. + + 1) python sets test_path_ = '/test_path' + 2) create_dir with condition "{{ test_path_ }} == '/test_path'" -> executes + 3) python updates test_path_ = '/another_test_path' + 4) create_dir with condition "{{ test_path_ }} == '/another_test_path'" + -> executes + Then invert conditions and check both lines are skipped appropriately. + """ + command_python_1_id = self.command_python_custom_variable_values_1.id + command_python_2_id = self.command_python_custom_variable_values_2.id + + plan = self._create_plan( + **{ + "name": "Plan with custom_values in condition", + "line_ids": [ + (0, 0, {"command_id": command_python_1_id, "sequence": 1}), + ( + 0, + 0, + { + "command_id": self.command_create_dir.id, + "sequence": 2, + "condition": "{{ test_path_ }} == '/test_path'", + }, + ), + (0, 0, {"command_id": command_python_2_id, "sequence": 3}), + ( + 0, + 0, + { + "command_id": self.command_create_dir.id, + "sequence": 4, + "condition": "{{ test_path_ }} == '/another_test_path'", + }, + ), + ], + } + ) + + plan_log = self.server_test_1.run_flight_plan(plan) + + logs = plan_log.command_log_ids + self.assertEqual(len(logs), 4, "Should be 4 command logs") + + create_dir_logs = logs.filtered( + lambda line: line.command_id == self.command_create_dir + ) + self.assertEqual(len(create_dir_logs), 2, "Should be 2 create_dir logs") + + self.assertFalse( + create_dir_logs[0].is_skipped, "First create_dir must be executed" + ) + self.assertFalse( + create_dir_logs[1].is_skipped, "Second create_dir must be executed" + ) + + self.assertIn("/test_path", create_dir_logs[0].code) + self.assertIn("/another_test_path", create_dir_logs[1].code) + + def test_plan_stop_mid_execution(self): + """ + Test that plan is correctly marked as stopped and + further commands are not executed. + """ + plan = self._create_plan( + name="Test Plan Stop", + line_ids=[ + (0, 0, {"command_id": self.command_create_dir.id, "sequence": 1}), + (0, 0, {"command_id": self.command_list_dir.id, "sequence": 2}), + ], + ) + server = self.server_test_1 + + cx_tower_plan_line_obj = self.registry["cx.tower.plan.line"] + _run_super = cx_tower_plan_line_obj._run + + # Save plan_log for control is_running + plan_log_holder = {} + + def fake_run(self, server, plan_log_record, **kwargs): + # Save plan_log for control is_running + plan_log_holder["log"] = plan_log_record + + # Call stop() after first command + if len(plan_log_record.command_log_ids) == 0: + plan_log_record.stop() + # After this call plan_log should be stopped, + # and finish_date should be filled + # Continue execution in standard way + return _run_super(self, server, plan_log_record, **kwargs) + + with patch.object(cx_tower_plan_line_obj, "_run", new=fake_run): + plan_log = plan._run_single(server) + + self.assertTrue(plan_log.is_stopped, "Plan should be stopped") + self.assertFalse(plan_log.is_running, "Plan should not be in running status") + self.assertEqual( + plan_log.plan_status, PLAN_STOPPED, "Status should be PLAN_STOPPED" + ) + self.assertTrue(plan_log.finish_date, "Finish date should be filled") + self.assertLessEqual( + len(plan_log.command_log_ids), + 1, + "There should be maximum one command in the log", + ) + + def test_flight_plan_reference_update(self): + """Test flight plan reference update cascades to dependent models""" + # 1. Add a variable value to plan_line_1_action_2 + variable_value = self.VariableValue.create( + { + "variable_id": self.variable_os.id, + "value_char": "Ubuntu 20.04", + "plan_line_action_id": self.plan_line_1_action_2.id, + } + ) + + # Store original references for comparison + original_plan_reference = self.plan_1.reference + original_plan_line_1_reference = self.plan_line_1.reference + original_plan_line_2_reference = self.plan_line_2.reference + original_plan_line_1_action_1_reference = self.plan_line_1_action_1.reference + original_plan_line_1_action_2_reference = self.plan_line_1_action_2.reference + original_plan_line_2_action_1_reference = self.plan_line_2_action_1.reference + original_plan_line_2_action_2_reference = self.plan_line_2_action_2.reference + original_variable_value_reference = variable_value.reference + + # 2. Change the reference for plan_1 to "nice_new_plan" + self.plan_1.write({"reference": "nice_new_plan"}) + + # 3. Verify that references are updated for plan lines + # Invalidate models to refresh all references + self.env["cx.tower.plan"].invalidate_model(["reference"]) + self.env["cx.tower.plan.line"].invalidate_model(["reference"]) + self.env["cx.tower.plan.line.action"].invalidate_model(["reference"]) + self.env["cx.tower.variable.value"].invalidate_model(["reference"]) + + # Check that plan reference was updated + self.assertEqual(self.plan_1.reference, "nice_new_plan") + self.assertNotEqual(self.plan_1.reference, original_plan_reference) + + # Check that plan line references were updated to include the new plan reference + self.assertIn("nice_new_plan", self.plan_line_1.reference) + self.assertIn("nice_new_plan", self.plan_line_2.reference) + self.assertNotEqual(self.plan_line_1.reference, original_plan_line_1_reference) + self.assertNotEqual(self.plan_line_2.reference, original_plan_line_2_reference) + + # Check that plan line action references were updated + self.assertIn("nice_new_plan", self.plan_line_1_action_1.reference) + self.assertIn("nice_new_plan", self.plan_line_1_action_2.reference) + self.assertIn("nice_new_plan", self.plan_line_2_action_1.reference) + self.assertIn("nice_new_plan", self.plan_line_2_action_2.reference) + self.assertNotEqual( + self.plan_line_1_action_1.reference, original_plan_line_1_action_1_reference + ) + self.assertNotEqual( + self.plan_line_1_action_2.reference, original_plan_line_1_action_2_reference + ) + self.assertNotEqual( + self.plan_line_2_action_1.reference, original_plan_line_2_action_1_reference + ) + self.assertNotEqual( + self.plan_line_2_action_2.reference, original_plan_line_2_action_2_reference + ) + + # Check that variable value reference was updated + # to include the new plan reference + self.assertIn("nice_new_plan", variable_value.reference) + self.assertNotEqual(variable_value.reference, original_variable_value_reference) + + # Verify the reference pattern for variable value follows the expected format: + # ___ # noqa: E501 + expected_pattern = ( + f"{self.variable_os.reference}_variable_value_plan_line_action_" + f"{self.plan_line_1_action_2.reference}" + ) + self.assertEqual(variable_value.reference, expected_pattern) + + def test_flight_plan_with_child_plan_command_exception(self): + """ + Test flight plan with child plan where command exception occurs. + + Scenario: + - Main flight plan has 2 commands: + 1. Simple python command (success) + 2. Child flight plan with 2 commands where first fails with command exception + - Verify error propagation: command -> child plan -> main plan + - The command exception is simulated using the existing mocking system + that raises exceptions when commands contain "raise" + """ + + # Create child flight plan with 2 commands + child_plan = self.Plan.create( + { + "name": "Child Plan with Error", + "note": "Child plan that will fail on first command", + } + ) + + # Command 1 of child plan - will fail with command exception + child_command_1 = self.Command.create( + { + "name": "Child Command 1 - Command Exception", + "action": "ssh_command", + "code": "raise", # This will trigger command exception in mock + } + ) + + # Command 2 of child plan - should not execute due to error in command 1 + child_command_2 = self.Command.create( + { + "name": "Child Command 2 - Should Not Run", + "action": "python_code", + "code": """ +result = { + "exit_code": 0, + "message": "This should not execute" +} + """, + } + ) + + # Create plan lines for child plan + self.plan_line.create( + { + "sequence": 10, + "plan_id": child_plan.id, + "command_id": child_command_1.id, + } + ) + self.plan_line.create( + { + "sequence": 20, + "plan_id": child_plan.id, + "command_id": child_command_2.id, + } + ) + + # Create command to run child plan + run_child_plan_command = self.Command.create( + { + "name": "Run Child Plan", + "action": "plan", + "flight_plan_id": child_plan.id, + } + ) + + # Create main flight plan with 2 commands + main_plan = self.Plan.create( + { + "name": "Main Plan with Child Plan", + "note": "Main plan with python command and child plan", + } + ) + + # Command 1 of main plan - simple python command (should succeed) + main_command_1 = self.Command.create( + { + "name": "Main Command 1 - Python Success", + "action": "python_code", + "code": """ +result = { + "exit_code": 0, + "message": "Main plan python command executed successfully" +} + """, + } + ) + + # Command 2 of main plan - run child plan (will fail) + main_command_2 = run_child_plan_command + + # Create plan lines for main plan + self.plan_line.create( + { + "sequence": 10, + "plan_id": main_plan.id, + "command_id": main_command_1.id, + } + ) + self.plan_line.create( + { + "sequence": 20, + "plan_id": main_plan.id, + "command_id": main_command_2.id, + } + ) + # Run the first command again + self.plan_line.create( + { + "sequence": 30, + "plan_id": main_plan.id, + "command_id": main_command_1.id, + } + ) + + # Run the main flight plan + plan_log = self.server_test_1.run_flight_plan(main_plan) + + # Verify main plan finished with error + self.assertNotEqual( + plan_log.plan_status, 0, "Main plan should not finish successfully" + ) + + # Get all plan logs for verification + all_plan_logs = plan_log | self.PlanLog.search( + [("parent_flight_plan_log_id", "=", plan_log.id)] + ) + + # Should have 2 plan logs: main plan and child plan + self.assertEqual( + len(all_plan_logs), 2, "Should have 2 plan logs: main and child" + ) + + main_plan_log = all_plan_logs.filtered(lambda log: log.plan_id == main_plan) + child_plan_log = all_plan_logs.filtered( + lambda log: log.parent_flight_plan_log_id == main_plan_log + ) + + self.assertTrue(main_plan_log, "Main plan log should exist") + self.assertTrue(child_plan_log, "Child plan log should exist") + + # Verify child plan finished with error + # The child plan should finish with an error + # (either SSH_CONNECTION_ERROR or GENERAL_ERROR) + self.assertNotEqual( + child_plan_log.plan_status, + 0, + "Child plan should not finish successfully", + ) + + # Get command logs for verification + all_command_logs = self.CommandLog.search( + [("plan_log_id", "in", all_plan_logs.ids)] + ) + + # Should have 3 command logs: main python, + # run child plan, child command exception + self.assertEqual(len(all_command_logs), 3, "Should have 3 command logs") + + # Find specific command logs + main_python_log = all_command_logs.filtered( + lambda log: log.command_id == main_command_1 + ) + run_child_plan_log = all_command_logs.filtered( + lambda log: log.command_id == main_command_2 + ) + child_ssh_error_log = all_command_logs.filtered( + lambda log: log.command_id == child_command_1 + ) + + # Verify main python command succeeded + self.assertEqual( + main_python_log.command_status, 0, "Main python command should succeed" + ) + self.assertEqual( + main_python_log.command_response, + "Main plan python command executed successfully", + "Main python command should have correct response", + ) + + # Verify run child plan command failed + # The command should fail with an error + # (either SSH_CONNECTION_ERROR or GENERAL_ERROR) + self.assertNotEqual( + run_child_plan_log.command_status, + 0, + "Run child plan command should fail", + ) + + # Verify child SSH command failed + # The SSH command should fail with an error status + # (could be GENERAL_ERROR -100 or 255 depending on how the exception is handled) + self.assertNotEqual( + child_ssh_error_log.command_status, 0, "Child SSH command should fail" + ) + # The error message should contain information about + # the SSH connection failure + # The exact error message may vary depending + # on how the exception is handled + self.assertTrue( + child_ssh_error_log.command_error, + "Child SSH command should have an error message", + ) + + # Verify that child command 2 was not executed (no log for it) + child_command_2_log = all_command_logs.filtered( + lambda log: log.command_id == child_command_2 + ) + self.assertFalse( + child_command_2_log, "Child command 2 should not have been executed" + ) + + # Verify plan log relationships + self.assertEqual( + main_plan_log.command_log_ids, + main_python_log | run_child_plan_log, + "Main plan should have correct command logs", + ) + + self.assertEqual( + child_plan_log.command_log_ids, + child_ssh_error_log, + "Child plan should have only the failed command log", + ) + + # Verify that the error propagated correctly through the hierarchy + # The error should propagate from command -> child plan -> main plan + # The specific error codes may vary depending + # on how the system handles the error + self.assertNotEqual( + main_plan_log.plan_status, + 0, + "Error should propagate from child to main plan", + ) + self.assertNotEqual( + child_plan_log.plan_status, 0, "Error should be present in child plan" + ) + self.assertNotEqual( + child_ssh_error_log.command_status, + 0, + "SSH command should have an error status", + ) + self.assertEqual( + child_ssh_error_log.command_status, + child_plan_log.plan_status, + "Child plan should have the same error status as the SSH command", + ) + self.assertEqual( + child_ssh_error_log.command_status, + main_plan_log.plan_status, + "Main plan should have the same error status as the SSH command", + ) + + def test_skip_command_error_flow(self): + """Plan flow: + 1) success, 2) success, 3) error -> sets command_error variable, + 4) skipped if not var, 5) runs if var and exits -1. + """ + # Create commands + command_success = self.Command.create( + { + "name": "Command -> Success", + "action": "python_code", + "code": "# Just return default values", + } + ) + command_error = self.Command.create( + { + "name": "Command -> Error", + "action": "python_code", + "code": "result = {'exit_code': -100, 'message': 'Error'}", + } + ) + command_after_failed = self.Command.create( + { + "name": "Command -> After failed", + "action": "python_code", + "code": ( + "name = server.name + ' --after-failed-- '\n" + "server.write({'name': name})" + ), + } + ) + command_last_one = self.Command.create( + { + "name": "Command -> The last one", + "action": "python_code", + "code": ( + "name = server.name + ' --last-one-- '\n" + "server.write({'name': name})" + ), + } + ) + + # Variable used in conditions + variable_command_error = self.Variable.create( + { + "name": "command_error", + "reference": "test_command_error", + "variable_type": "s", + } + ) + + # Plan and lines + plan = self.Plan.create( + { + "name": "Test skip command error", + "on_error_action": "e", + "custom_exit_code": 0, + } + ) + + self.plan_line.create( + {"sequence": 10, "plan_id": plan.id, "command_id": command_success.id} + ) + self.plan_line.create( + {"sequence": 20, "plan_id": plan.id, "command_id": command_success.id} + ) + + line3 = self.plan_line.create( + {"sequence": 30, "plan_id": plan.id, "command_id": command_error.id} + ) + action3 = self.plan_line_action.create( + { + "line_id": line3.id, + "sequence": 10, + "condition": "!=", + "value_char": "0", + "action": "n", + } + ) + + self.VariableValue.create( + { + "variable_id": variable_command_error.id, + "value_char": "1", + "plan_line_action_id": action3.id, + } + ) + + self.plan_line.create( + { + "sequence": 40, + "plan_id": plan.id, + "command_id": command_after_failed.id, + "condition": "not {{ test_command_error }}", + "variable_ids": [(6, 0, [variable_command_error.id])], + } + ) + + line5 = self.plan_line.create( + { + "sequence": 50, + "plan_id": plan.id, + "command_id": command_last_one.id, + "condition": "{{ test_command_error }}", + "variable_ids": [(6, 0, [variable_command_error.id])], + } + ) + self.plan_line_action.create( + { + "line_id": line5.id, + "sequence": 10, + "condition": "==", + "value_char": "0", + "action": "ec", + "custom_exit_code": -1, + } + ) + + plan_log = self.server_test_1.run_flight_plan(plan) + + self.assertEqual(len(plan_log.command_log_ids), 5) + logs = plan_log.command_log_ids + self.assertTrue( + all( + log.command_status == 0 + for log in logs.filtered(lambda log: log.command_id == command_success) + ) + ) + + error_log = logs.filtered(lambda log: log.command_id == command_error) + self.assertIn(variable_command_error.reference, error_log.variable_values) + self.assertTrue(error_log.command_status == GENERAL_ERROR) + + self.assertTrue( + logs.filtered(lambda log: log.command_id == command_after_failed).mapped( + "command_status" + )[0] + == PLAN_LINE_CONDITION_CHECK_FAILED + ) + self.assertTrue( + logs.filtered(lambda log: log.command_id == command_last_one).mapped( + "command_status" + )[0] + == 0 + ) + + # Final plan status must be custom exit code -1 from line 5 action + self.assertEqual(plan_log.plan_status, -1) + + def test_plan_line_condition_error(self): + """Test plan line condition error + First line is skipped because of condition error + Second line is executed successfully + """ + # Create commands + command_success = self.Command.create( + { + "name": "Command -> Success", + "action": "python_code", + "code": "# Just return default values", + } + ) + + # Plan and lines + plan = self.Plan.create( + { + "name": "Test plan line condition error", + } + ) + + self.plan_line.create( + { + "sequence": 10, + "plan_id": plan.id, + "command_id": command_success.id, + "condition": "=q", + }, + ) + self.plan_line.create( + {"sequence": 20, "plan_id": plan.id, "command_id": command_success.id} + ) + + with mute_logger("odoo.addons.cetmix_tower_server.models.cx_tower_plan_line"): + plan_log = self.server_test_1.run_flight_plan(plan) + + # Must be 2 command logs + self.assertEqual(len(plan_log.command_log_ids), 2) + logs = plan_log.command_log_ids + self.assertTrue(logs[0].is_skipped) + self.assertTrue(logs[1].command_status == 0) + + def test_custom_values_not_defined_but_updated(self): + """Test custom values not defined but updated + First command is executed successfully + Second command is executed successfully and updates custom values + """ + # Create commands + command_1 = self.Command.create( + { + "name": "Command -> Success", + "action": "python_code", + "code": "# Just return default values", + } + ) + command_2 = self.Command.create( + { + "name": "Command -> Success", + "action": "python_code", + "code": "custom_values.update({'some_value': '1'})", + } + ) + + # Plan and lines + plan = self.Plan.create( + { + "name": "Test custom values not defined but updated", + } + ) + + self.plan_line.create( + { + "sequence": 10, + "plan_id": plan.id, + "command_id": command_1.id, + }, + ) + + self.plan_line.create( + { + "sequence": 20, + "plan_id": plan.id, + "command_id": command_2.id, + }, + ) + plan_log = self.server_test_1.run_flight_plan(plan) + + # Must be 2 command logs + self.assertEqual(len(plan_log.command_log_ids), 2) + logs = plan_log.command_log_ids + # Both commands should be successful + self.assertEqual(logs[0].command_status, 0) + self.assertEqual(logs[1].command_status, 0) + # Custom values should be updated + self.assertEqual(plan_log.variable_values, {"some_value": "1"}) + + def test_last_flight_plan_line_post_run_action_is_executed(self): + """ + Test last flight plan line post run action is executed + """ + # Create commands + command_error = self.Command.create( + { + "name": "Command -> Error", + "action": "python_code", + "code": "result = {'exit_code': -100, 'message': 'Error'}", + } + ) + + # Plan and lines + plan = self.Plan.create( + { + "name": "Test post run action", + "on_error_action": "e", + "custom_exit_code": 0, + } + ) + + line1 = self.plan_line.create( + {"sequence": 10, "plan_id": plan.id, "command_id": command_error.id} + ) + self.plan_line_action.create( + { + "line_id": line1.id, + "sequence": 10, + "condition": "!=", + "value_char": "0", + "action": "n", + } + ) + line2 = self.plan_line.create( + {"sequence": 20, "plan_id": plan.id, "command_id": command_error.id} + ) + self.plan_line_action.create( + { + "line_id": line2.id, + "sequence": 10, + "condition": "!=", + "value_char": "0", + "action": "ec", + "custom_exit_code": 0, + } + ) + + plan_log = self.server_test_1.run_flight_plan(plan) + + self.assertEqual(len(plan_log.command_log_ids), 2) + + # Final plan status must be custom exit code 0 + self.assertEqual(plan_log.plan_status, 0)