From d0059616aa8349027dc4526d205e6bb63b8dee14 Mon Sep 17 00:00:00 2001 From: git_admin Date: Mon, 27 Apr 2026 08:43:17 +0000 Subject: [PATCH] Tower: upload cetmix_tower_server 16.0.2.2.9 (via marketplace) --- .../cetmix_tower_server/tests/test_command.py | 1911 +++++++++++++++++ 1 file changed, 1911 insertions(+) create mode 100644 addons/cetmix_tower_server/tests/test_command.py diff --git a/addons/cetmix_tower_server/tests/test_command.py b/addons/cetmix_tower_server/tests/test_command.py new file mode 100644 index 0000000..b2042ee --- /dev/null +++ b/addons/cetmix_tower_server/tests/test_command.py @@ -0,0 +1,1911 @@ +# Copyright (C) 2022 Cetmix OÜ +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). +from datetime import timedelta +from unittest.mock import patch + +from odoo.exceptions import AccessError, ValidationError +from odoo.fields import Datetime +from odoo.tests.common import Form +from odoo.tools import mute_logger + +from ..models.constants import ( + ANOTHER_COMMAND_RUNNING, + COMMAND_TIMED_OUT, + COMMAND_TIMED_OUT_MESSAGE, + GENERAL_ERROR, +) +from .common import TestTowerCommon + + +class TestTowerCommand(TestTowerCommon): + """ + Test the command model. + + Important! + As this model inherits from the `cx.tower.template.mixin` + we will tests template rendering methods in this class too. + + """ + + @classmethod + def setUpClass(cls): + super().setUpClass() + + # Save variable values for Server 1 + with Form(cls.server_test_1) as f: + with f.variable_value_ids.new() as line: + line.variable_id = cls.variable_dir + line.value_char = "test-odoo-1" + with f.variable_value_ids.new() as line: + line.variable_id = cls.variable_path + line.value_char = "/opt/tower" + f.save() + + # Secret key + cls.secret_folder_key = cls.Key.create( + { + "name": "Folder", + "reference": "FOLDER", + "secret_value": "secretFolder", + "key_type": "s", + } + ) + cls.secret_python_key = cls.Key.create( + { + "name": "python", + "reference": "PYTHON", + "secret_value": "secretPythonCode", + "key_type": "s", + } + ) + + # secret value as multi line string + cls.python_ssh_key = cls.Key.create( + { + "name": "Test Python SSH Key", + "reference": "test_python_ssh_key", + "key_type": "s", + "secret_value": """ + Python + much + key + """, + } + ) + + cls.secret_test_rsa_key = cls.Key.create( + { + "name": "test rsa", + "reference": "test_rsa", + "secret_value": """-----BEGIN RSA PRIVATE KEY----- +MIIBOgIBAAJBAKj34GkxFhD90vcNLYLInFEX6Ppy1tPf9Cnzj4p4WGeKLs1Pt8Qu +KUpRKfFLfRYC9AIKjbJTWit+CqvjWYzvQwECAwEAAQJAIJLixBy2qpFoS4DSmoEm +o3qGy0t6z09AIJtH+5OeRV1be+N4cDYJKffGzDa88vQENZiRm0GRq6a+HPGQMd2k +TQIhAKMSvzIBnni7ot/OSie2TmJLY4SwTQAevXysE2RbFDYdAiEBCUEaRQnMnbp7 +9mxDXDf6AU0cN/RPBjb9qSHDcWZHGzUCIG2Es59z8ugGrDY+pxLQnwfotadxd+Uy +v/Ow5T0q5gIJAiEAyS4RaI9YG8EWx/2w0T67ZUVAw8eOMB6BIUg0Xcu+3okCIBOs +/5OiPgoTdSy7bcF9IGpSE8ZgGKzgYQVZeN97YE00 +-----END RSA PRIVATE KEY----- """, + "key_type": "s", + } + ) + # Command + cls.command_create_new_command = cls.Command.create( + { + "name": "Create new command", + "action": "python_code", + "code": """ +server_name = {{ tower.server.name }} +if server_name and #!cxtower.secret.FOLDER!# == "secretFolder": + # We don't actually create a new command because it will raise + # access error if user doesn't have access to 'create' operation. + # Instead we just return a dummy command result. + command = "new command" + result = {"exit_code": 0, "message": "New command was created"} +else: + result = {"exit_code": %s, "message": "error"} + """ + % GENERAL_ERROR, + } + ) + + cls.command_python_command_1 = cls.Command.create( + { + "name": "Python command with secret #1", + "action": "python_code", + "code": """ +result = { + "exit_code": 0, + "message": #!cxtower.secret.PYTHON!#, +} + """, + } + ) + + cls.command_python_command_2 = cls.Command.create( + { + "name": "Python command with secret #2", + "action": "python_code", + "code": """ +result = { + "exit_code": 0, + "message": 'We use #!cxtower.secret.PYTHON!#' , +} + """, + } + ) + + cls.command_python_command_3 = cls.Command.create( + { + "name": "Python command with secret #3", + "action": "python_code", + "code": """ +result = { + "exit_code": 0, + "message": ""#!cxtower.secret.test_rsa!#"" , +} + """, + } + ) + + cls.command_python_command_4 = cls.Command.create( + { + "name": "Python command with secret #4", + "action": "python_code", + "code": """ +top_secret = #!cxtower.secret.test_python_ssh_key!# +result = { + "exit_code": 0, + "message": top_secret , +} + """, + } + ) + cls.server = cls.Server.create( + { + "name": "Test Server", + "user_ids": [(6, 0, [cls.user.id])], + "manager_ids": [(6, 0, [cls.manager.id])], + "ssh_username": "test", + "ssh_password": "test", + "ip_v4_address": "127.0.0.1", + } + ) + + def _create_command(self, **kwargs): + """Helper to create a command record with default values.""" + vals = { + "name": "Test Command", + "access_level": "1", # override default + "user_ids": [(6, 0, [])], + "manager_ids": [(6, 0, [])], + "server_ids": [(6, 0, [])], + } + if kwargs: + vals.update(kwargs) + return self.Command.create(vals) + + def test_user_read_access(self): + """ + For a user: + Read access is allowed if access_level == "1" and either the command's + own user_ids includes the user OR a related server (via server_ids) + includes the user in its user_ids. + """ + # Case 1: Command with access_level "1" and user in command.user_ids. + cmd1 = self._create_command( + **{ + "access_level": "1", + "user_ids": [(6, 0, [self.user.id])], + } + ) + recs1 = self.Command.with_user(self.user).search([("id", "=", cmd1.id)]) + self.assertIn( + cmd1, + recs1, + "User should see the command if in command.user_ids" + " and access_level == '1'.", + ) + + # Case 2: Command with access_level "1" and user not in command.user_ids + # but in a related server. + cmd2 = self._create_command( + **{ + "access_level": "1", + "user_ids": [(6, 0, [])], + "server_ids": [(6, 0, [self.server.id])], + } + ) + recs2 = self.Command.with_user(self.user).search([("id", "=", cmd2.id)]) + self.assertIn( + cmd2, + recs2, + "User should see the command if related server.user_ids includes the user.", + ) + + # Negative: If access_level is "1" but neither command.user_ids + # nor server_ids.user_ids includes the user. + cmd3 = self._create_command( + **{ + "access_level": "1", + "user_ids": [(6, 0, [])], + "server_ids": [(6, 0, [])], + } + ) + recs3 = self.Command.with_user(self.user).search([("id", "=", cmd3.id)]) + self.assertNotIn( + cmd3, + recs3, + "User should not see the command if not granted access.", + ) + + def test_manager_read_access(self): + """ + For a manager: + Allowed to read a command if access_level <= "2" AND + (either the command itself grants access via user_ids or manager_ids + OR there are no related servers OR a related server grants access via + its user_ids or manager_ids). + """ + # Case 1: Command with access_level "2" and command.manager_ids + # includes the manager but the server is not related to the command. + another_server = self.Server.create( + { + "name": "Another Server", + "ip_v4_address": "127.0.0.2", + "ssh_username": "test", + "ssh_password": "test", + "user_ids": [(6, 0, [])], + "manager_ids": [(6, 0, [])], + } + ) + cmd1 = self._create_command( + **{ + "access_level": "2", + "manager_ids": [(6, 0, [self.manager.id])], + "server_ids": [(6, 0, [another_server.id])], + } + ) + recs1 = self.Command.with_user(self.manager).search([("id", "=", cmd1.id)]) + self.assertIn( + cmd1, + recs1, + "Manager should see the command if in command.manager_ids" + " and access_level <= '2'.", + ) + + # Case 2: Command with access_level "2" that does not grant access + # on the command itself, but a related server grants access via + # but a related server grants access via its manager_ids. + cmd2 = self._create_command( + **{ + "access_level": "2", + "user_ids": [(6, 0, [])], + "manager_ids": [(6, 0, [])], + "server_ids": [(6, 0, [self.server.id])], + } + ) + recs2 = self.Command.with_user(self.manager).search([("id", "=", cmd2.id)]) + self.assertIn( + cmd2, + recs2, + "Manager should see the command if related server.manager_ids" + " includes the manager.", + ) + + # Positive: Command with access_level "2" without any granted access. + cmd3 = self._create_command( + **{ + "access_level": "2", + "user_ids": [(6, 0, [])], + "manager_ids": [(6, 0, [])], + "server_ids": [(6, 0, [])], + } + ) + recs3 = self.Command.with_user(self.manager).search([("id", "=", cmd3.id)]) + self.assertIn( + cmd3, + recs3, + "Manager should see the command if not granted access " + "but not related to any server.", + ) + + # Case 3: Remove from manager in the cmd1. + # Should not see the command because it belongs to another server. + cmd1.manager_ids = [(3, self.manager.id)] + recs4 = self.Command.with_user(self.manager).search([("id", "=", cmd1.id)]) + self.assertNotIn( + cmd1, + recs4, + "Manager should not see the command if " + "removed from command.manager_ids." + " and command belongs to another server.", + ) + + def test_manager_write_create_access(self): + """ + For a manager: + Allowed to write and create a command if access_level <= "2" AND + the command's own manager_ids includes the manager. + """ + # Case: Command with access_level "2" and manager_ids includes the manager. + cmd1 = self._create_command( + **{ + "access_level": "2", + "manager_ids": [(6, 0, [self.manager.id])], + } + ) + try: + cmd1.with_user(self.manager).write({"name": "Manager Updated Command"}) + except AccessError: + self.fail( + "Manager should be able to update the command " + "if in command.manager_ids." + ) + self.assertEqual(cmd1.with_user(self.manager).name, "Manager Updated Command") + + # Attempt to create a command as manager without including their ID + # in manager_ids should fail. + cmd_invalid_vals = { + "name": "Invalid Manager Create", + "access_level": "2", + "manager_ids": [(6, 0, [])], + "action": "python_code", + "code": "print('dummy')", + } + with self.assertRaises(AccessError): + self.Command.with_user(self.manager).create(cmd_invalid_vals) + + def test_manager_unlink_access(self): + """ + For a manager: + Allowed to delete a command if access_level <= "2", + the current user is the record creator, + AND the command's own manager_ids includes the manager. + """ + # Scenario 1: Command created by the manager with manager_ids + # including the manager. + cmd1 = self.Command.with_user(self.manager).create( + { + "name": "Manager Created Command", + "access_level": "2", + } + ) + try: + cmd1.unlink() + except AccessError: + self.fail( + "Manager should be able to delete a command " + "they created if in command.manager_ids." + ) + + # Scenario 2: Command created by someone else + # even if manager_ids includes the manager. + cmd2 = self._create_command( + **{ + "access_level": "2", + "manager_ids": [(6, 0, [self.manager.id])], + } + ) + with self.assertRaises(AccessError): + cmd2.with_user(self.manager).unlink() + + def test_root_unrestricted_access(self): + """ + For a root user: + Unlimited access: root can read, write, create, and delete commands + regardless of access_level or related servers. + """ + cmd = self._create_command( + **{ + "access_level": "3", # above the threshold for managers + } + ) + recs = self.Command.with_user(self.root).search([("id", "=", cmd.id)]) + self.assertIn( + cmd, + recs, + "Root should see the command regardless of restrictions.", + ) + try: + cmd.with_user(self.root).write({"name": "Root Updated Command"}) + except AccessError: + self.fail( + "Root should be able to update the command " "without restrictions." + ) + self.assertEqual(cmd.with_user(self.root).name, "Root Updated Command") + cmd2 = self.Command.with_user(self.root).create( + { + "name": "Root Created Command", + "access_level": "3", + "action": "python_code", + "code": "print('root')", + } + ) + self.assertTrue( + cmd2, + "Root should be able to create a command " "without restrictions.", + ) + cmd2.with_user(self.root).unlink() + recs_after = self.Command.with_user(self.root).search([("id", "=", cmd2.id)]) + self.assertFalse( + recs_after, + "Root should be able to delete the command without restrictions.", + ) + + def test_ssh_command_prepare_method_without_path(self): + """Test ssh command preparation in different modes without path""" + + server = self.server_test_1 + + single_command = "ls -a /tmp" + multiple_commands = "ls -a /tmp && mkdir /tmp/test" + + sudo_mode = "p" + + # Prepare single command for sudo with password + cmd = server._prepare_ssh_command(single_command, path=None, sudo=sudo_mode) + self.assertEqual( + cmd, + [f"{self.sudo_prefix} {single_command}"], + msg=( + "Single command for sudo with password should be " + "equal to list with the original command" + "as an only element" + ), + ) + + # Prepare multiple commands for sudo with password + cmd = server._prepare_ssh_command(multiple_commands, path=None, sudo=sudo_mode) + self.assertEqual( + cmd, + [ + f"{self.sudo_prefix} ls -a /tmp", + f"{self.sudo_prefix} mkdir /tmp/test", + ], + msg=( + "Multiple commands with sudo with password should be " + "a list of separated commands from original line" + ), + ) + + sudo_mode = "n" + + # Prepare single command for sudo without password + cmd = server._prepare_ssh_command(single_command, path=None, sudo=sudo_mode) + self.assertEqual( + cmd, + f"{self.sudo_prefix} {single_command}", + msg=( + "Single command with sudo without password should be " + f'equal to the original command prefixed with "{self.sudo_prefix}"' + ), + ) + + # Prepare multiple commands for sudo without password + cmd = server._prepare_ssh_command(multiple_commands, path=None, sudo=sudo_mode) + self.assertEqual( + cmd, + f"{self.sudo_prefix} ls -a /tmp && {self.sudo_prefix} mkdir /tmp/test", + msg=( + "Multiple commands with sudo with password should be " + "a re-joined string from list of separated original " + f'each prefixed with "{self.sudo_prefix}"' + ), + ) + + # Prepare single command without sudo + cmd = server._prepare_ssh_command(single_command) + self.assertEqual( + cmd, + single_command, + msg=( + "Single command without sudo should be " + "equal to the original command " + ), + ) + + # Prepare multiple without sudo + cmd = server._prepare_ssh_command(multiple_commands) + self.assertEqual( + cmd, + multiple_commands, + msg=( + "Multiple commands without sudo should be " + "equal to the original line of commands" + ), + ) + + def test_ssh_command_prepare_method_with_path(self): + """Test command preparation in different modes without path""" + + server = self.server_test_1 + + single_command = "ls -a /tmp" + multiple_commands = "ls -a /tmp && mkdir /tmp/test" + path = "/home/doge" + + sudo_mode = "p" + + # Prepare single command for sudo with password + cmd = server._prepare_ssh_command(single_command, path=path, sudo=sudo_mode) + self.assertEqual( + cmd, + [f"cd {path}", f"{self.sudo_prefix} {single_command}"], + msg=( + "Single command for sudo with password should be " + "equal to list of two elements:" + " change directory and original command" + ), + ) + + # Prepare multiple commands for sudo with password + cmd = server._prepare_ssh_command(multiple_commands, path=path, sudo=sudo_mode) + self.assertEqual( + cmd, + [ + f"cd {path}", + f"{self.sudo_prefix} ls -a /tmp", + f"{self.sudo_prefix} mkdir /tmp/test", + ], + msg=( + "Multiple commands with sudo with password should be " + "a list of separated commands from original line" + ), + ) + + sudo_mode = "n" + + # Prepare single command for sudo without password + cmd = server._prepare_ssh_command(single_command, path=path, sudo=sudo_mode) + self.assertEqual( + cmd, + f"cd {path} && {self.sudo_prefix} {single_command}", + msg=( + "Single command with sudo without password should be " + f'equal to the original command prefixed with "{self.sudo_prefix}"' + ), + ) + + # Prepare multiple commands for sudo without password + cmd = server._prepare_ssh_command(multiple_commands, path=path, sudo=sudo_mode) + self.assertEqual( + cmd, + f"cd {path} && {self.sudo_prefix} ls -a /tmp && {self.sudo_prefix} mkdir /tmp/test", # noqa + msg=( + "Multiple commands with sudo with password should be " + "a re-joined string from list of separated original " + f'each prefixed with "{self.sudo_prefix}"' + ), + ) + + # Prepare single command without sudo + cmd = server._prepare_ssh_command(single_command, path=path) + self.assertEqual( + cmd, + f"cd {path} && {single_command}", + msg=( + "Single command for without sudo should be " + "equal to the the original command" + "with 'cd {{ path }} && ' prefix" + ), + ) + + # Prepare multiple commands without sudo + cmd = server._prepare_ssh_command(multiple_commands, path=path) + self.assertEqual( + cmd, + f"cd {path} && {multiple_commands}", # noqa + msg=( + "Multiple commands without sudo should be " + "original command with 'change directory' command prepended" + ), + ) + + def test_ssh_command_no_split_for_sudo_without_path(self): + """If no_split_for_sudo=True, even '&&' shouldn’t split into a list.""" + server = self.server_test_1 + cmd_line = "echo a && echo b" + sudo_mode = "p" + result = server._prepare_ssh_command( + cmd_line, sudo=sudo_mode, no_split_for_sudo=True + ) + expected = [f"{self.sudo_prefix} {cmd_line}"] + self.assertEqual( + result, expected, "With no_split_for_sudo, '&&' must not produce a list" + ) + + def test_ssh_command_no_split_for_sudo_with_path(self): + """Same, but with a custom cwd prefix.""" + server = self.server_test_1 + cmd_line = "echo a && echo b" + path = "/tmp" + sudo_mode = "p" + result = server._prepare_ssh_command( + cmd_line, path=path, sudo=sudo_mode, no_split_for_sudo=True + ) + expected = [f"cd {path}", f"{self.sudo_prefix} {cmd_line}"] + self.assertEqual( + result, + expected, + "With no_split_for_sudo and path, the entire '&&' string remains un-split", + ) + + def test_server_render_command(self): + """Test rendering command using `_render_command` method + of cx.tower.server + """ + + # -- 1 -- + # Test with default path + rendered_command = self.server_test_1._render_command(self.command_create_dir) + rendered_code_expected = "cd /opt/tower && mkdir test-odoo-1" + rendered_path_expected = f"/home/{self.server_test_1.ssh_username}" + + self.assertEqual( + rendered_command["rendered_code"], + rendered_code_expected, + "Rendered code doesn't match", + ) + self.assertEqual( + rendered_command["rendered_path"], + rendered_path_expected, + "Rendered path doesn't match", + ) + + # -- 2 -- + # Test with custom path + rendered_command = self.server_test_1._render_command( + self.command_create_dir, path="/such/much/path" + ) + rendered_code_expected = "cd /opt/tower && mkdir test-odoo-1" + rendered_path_expected = "/such/much/path" + + self.assertEqual( + rendered_command["rendered_code"], + rendered_code_expected, + "Rendered code doesn't match", + ) + self.assertEqual( + rendered_command["rendered_path"], + rendered_path_expected, + "Rendered path doesn't match", + ) + + # -- 3 -- + # Set variable_path to None and check again + variable_value_path = self.server_test_1.variable_value_ids.filtered( + lambda var_val: var_val.variable_id.id == self.variable_path.id + ) + variable_value_path.value_char = None + rendered_command = self.server_test_1._render_command(self.command_create_dir) + rendered_code_expected = "cd False && mkdir test-odoo-1" + rendered_path_expected = f"/home/{self.server_test_1.ssh_username}" + + self.assertEqual( + rendered_command["rendered_code"], + rendered_code_expected, + "Rendered code doesn't match", + ) + self.assertEqual( + rendered_command["rendered_path"], + rendered_path_expected, + "Rendered path doesn't match", + ) + + # -- 4 -- + # Set both path and code to None + self.write_and_invalidate( + self.command_create_dir, **{"code": None, "path": None} + ) + rendered_command = self.server_test_1._render_command(self.command_create_dir) + + self.assertFalse( + rendered_command["rendered_code"], "Rendered code doesn't match" + ) + self.assertFalse( + rendered_command["rendered_path"], "Rendered path doesn't match" + ) + + def test_server_render_command_with_custom_variable_values(self): + """Test rendering command using `_render_command` method + of cx.tower.server with custom variable values + """ + self.write_and_invalidate( + self.server_test_1, + **{"user_ids": [(4, self.user.id)], "manager_ids": [(4, self.manager.id)]}, + ) + # -- 1 -- + # Set custom variable values + custom_variable_values = { + "test_path_": "/pepe/memes", + "other_path": "/etc/chad", + } + + # Modify command path + self.write_and_invalidate( + self.command_create_dir, + **{"path": "{{ other_path }}/{{ tower.server.username }}"}, + ) + + # Render command + rendered_command = self.server_test_1.with_user(self.manager)._render_command( + self.command_create_dir, custom_variable_values=custom_variable_values + ) + rendered_code_expected = "cd /pepe/memes && mkdir test-odoo-1" + rendered_path_expected = f"/etc/chad/{self.server_test_1.ssh_username}" + + self.assertEqual( + rendered_command["rendered_code"], + rendered_code_expected, + "Rendered code doesn't match", + ) + self.assertEqual( + rendered_command["rendered_path"], + rendered_path_expected, + "Rendered path doesn't match", + ) + + # -- 2 -- + # Test with user who doesn't have access to the server + rendered_command = self.server_test_1.with_user(self.user)._render_command( + self.command_create_dir, custom_variable_values=custom_variable_values + ) + rendered_code_expected = "cd /opt/tower && mkdir test-odoo-1" + rendered_path_expected = f"None/{self.server_test_1.ssh_username}" + + self.assertEqual( + rendered_command["rendered_code"], + rendered_code_expected, + "Rendered code doesn't match", + ) + self.assertEqual( + rendered_command["rendered_path"], + rendered_path_expected, + "Rendered path doesn't match", + ) + + def test_server_render_command_variable_with_value_modifier(self): + """Test rendering command using `_render_command` method + of cx.tower.server. + Use variable with value modifier for testing. + """ + + # -- 1 -- + # Set modifiers for variables + modifier_for_path = """ +if 'opt' in value: + result = value.replace('opt', 'home') +else: + result = value +""" + self.variable_path.applied_expression = modifier_for_path + + modifier_for_dir = """ +pattern = r'(?i)odoo' +replacement = 'sap' +result = re.sub(pattern, replacement, value) +""" + self.variable_dir.applied_expression = modifier_for_dir + + # -- 1 -- + # Test with default path + rendered_command = self.server_test_1._render_command(self.command_create_dir) + rendered_code_expected = "cd /home/tower && mkdir test-sap-1" + rendered_path_expected = f"/home/{self.server_test_1.ssh_username}" + + self.assertEqual( + rendered_command["rendered_code"], + rendered_code_expected, + "Rendered code doesn't match", + ) + self.assertEqual( + rendered_command["rendered_path"], + rendered_path_expected, + "Rendered path doesn't match", + ) + + # -- 2 -- + # Set invalid expression modifier + self.variable_path.applied_expression = "invalid" + with mute_logger( + "odoo.addons.cetmix_tower_server.models.cx_tower_variable_mixin" + ): + rendered_command = self.server_test_1._render_command( + self.command_create_dir + ) + rendered_code_expected = "cd /opt/tower && mkdir test-sap-1" + rendered_path_expected = f"/home/{self.server_test_1.ssh_username}" + + self.assertEqual( + rendered_command["rendered_code"], + rendered_code_expected, + "Rendered code doesn't match", + ) + self.assertEqual( + rendered_command["rendered_path"], + rendered_path_expected, + "Rendered path doesn't match", + ) + + # -- 3 -- + # Test with variable in variable value + complex_variable = self.Variable.create( + { + "name": "Complex Variable", + "applied_expression": "result = value.replace('opt', 'meme')", + } + ) + # Create a complex variable value + self.VariableValue.create( + { + "variable_id": complex_variable.id, + "value_char": "{{ test_path_ }}/{{ test_dir }}", + } + ) + command_with_complex_variable = self.Command.create( + { + "name": "Command with complex variable", + "code": "cd {{ complex_variable }}", + "action": "ssh_command", + } + ) + with mute_logger( + "odoo.addons.cetmix_tower_server.models.cx_tower_variable_mixin" + ): + rendered_command = self.server_test_1._render_command( + command_with_complex_variable + ) + rendered_code_expected = "cd /meme/tower/test-sap-1" + self.assertEqual( + rendered_command["rendered_code"], + rendered_code_expected, + "Rendered code doesn't match", + ) + + # -- 4 -- + # Remove modifier from variable "Path" and check again + self.variable_dir.applied_expression = None + with mute_logger( + "odoo.addons.cetmix_tower_server.models.cx_tower_variable_mixin" + ): + rendered_command = self.server_test_1._render_command( + command_with_complex_variable + ) + rendered_code_expected = "cd /meme/tower/test-odoo-1" + + self.assertEqual( + rendered_command["rendered_code"], + rendered_code_expected, + "Rendered code doesn't match", + ) + + def test_render_code_generic(self): + """Test generic (aka ssh) code template direct rendering""" + + # Only 'test_path_' must be rendered + args = {"test_path_": "/tmp", "test_os": "debian"} + res = self.command_create_dir.render_code(**args) + rendered_code = res.get(self.command_create_dir.id) + rendered_code_expected = "cd /tmp && mkdir " + self.assertEqual( + rendered_code, + rendered_code_expected, + msg=f"Must be rendered as '{rendered_code_expected}'", + ) + + # 'test_path_' and 'dir' must be rendered + args = {"test_path_": "/tmp", "os": "debian", "test_dir": "odoo"} + res = self.command_create_dir.render_code(**args) + rendered_code = res.get(self.command_create_dir.id) + self.assertEqual( + rendered_code, + "cd /tmp && mkdir odoo", + msg="Must be rendered as 'cd /tmp && mkdir odoo'", + ) + + def test_run_command_with_variables(self): + """Test code execution using command log records""" + + x = 1 # Used to distinguish labels + + # Check with all available "sudo" option + for sudo in [False, "n", "p"]: + # Add label to track command log + self.server_test_1.use_sudo = sudo + command_label = f"Test Command {x}" + custom_values = {"log": {"label": command_label}} + + # Run command for Server 1 + self.server_test_1.run_command( + self.command_create_dir, sudo=sudo, **custom_values + ) + + # Expected rendered command code + rendered_code_expected = "cd /opt/tower && mkdir test-odoo-1" + + # Get command log + log_record = self.CommandLog.search([("label", "=", command_label)]) + + # Check log values + self.assertEqual(len(log_record), 1, msg="Must be a single log record") + self.assertEqual( + log_record.server_id.id, + self.server_test_1.id, + msg="Record must belong to Test 1", + ) + self.assertEqual( + log_record.command_id.id, + self.command_create_dir.id, + msg="Record must belong to command 'Create dir'", + ) + self.assertEqual( + log_record.code, + rendered_code_expected, + msg=f"Rendered code must be '{rendered_code_expected}'", + ) + self.assertEqual( + log_record.command_status, 0, msg="Command status must be equal to 0" + ) + self.assertEqual( + log_record.use_sudo, + sudo, + msg="'sudo' param in log doesn't match the command one", + ) + + # Increment label counter + x += 1 + + def test_run_command_with_keys(self): + """Test command with keys in code""" + + # Command + code = "cd {{ test_path_ }} && mkdir #!cxtower.secret.FOLDER!#" + command_with_keys = self.Command.create( + {"name": "Command with keys", "code": code} + ) + + # Parse command with key parser to ensure key is parsed correctly + code_parsed_expected = "cd {{ test_path_ }} && mkdir secretFolder" + code_parsed = self.Key._parse_code(code) + self.assertEqual( + code_parsed, + code_parsed_expected, + msg="Parsed code doesn't match expected one", + ) + + # Add label to track command log + command_label = "Test Command with keys" + custom_values = {"log": {"label": command_label}} + + # Run command for Server 1 + self.server_test_1.run_command(command_with_keys, **custom_values) + + # Expected rendered command code + rendered_code_expected = "cd /opt/tower && mkdir #!cxtower.secret.FOLDER!#" + + # Get command log + log_record = self.CommandLog.search([("label", "=", command_label)]) + + # Check log values + self.assertEqual(len(log_record), 1, msg="Must be a single log record") + self.assertEqual( + log_record.server_id.id, + self.server_test_1.id, + msg=("Record must belong %s", self.server_test_1.name), + ) + self.assertEqual( + log_record.command_id.id, + command_with_keys.id, + msg=("Record must belong to command %s", command_with_keys.name), + ) + self.assertEqual( + log_record.code, + rendered_code_expected, + msg=f"Rendered code must be '{rendered_code_expected}'", + ) + self.assertEqual( + log_record.command_status, 0, msg="Command status must be equal to 0" + ) + + def test_parse_ssh_command_result(self): + """Test ssh command result parsing""" + + placeholder = self.Key.SECRET_VALUE_PLACEHOLDER + # ------------------------------------------------------- + # Case 1: regular command execution result with no error + # We are testing secret value placeholder here + # ------------------------------------------------------- + status = 0 + response = ["Such much", f"Doge like SSH {placeholder}"] + error = [] + + ssh_command_result = self.Server._parse_command_results( + status, response, error, key_values=[f"{self.secret_2.secret_value}"] + ) + + # Get result + result_status = ssh_command_result["status"] + result_response = ssh_command_result["response"] + result_error = ssh_command_result["error"] + + self.assertEqual( + result_status, + result_status, + "Status in result must be the same as the initial one", + ) + self.assertEqual( + result_response, + f"Such muchDoge like SSH {placeholder}", + "Response in result doesn't match expected", + ) + self.assertIsNone(result_error, "Error in response must be set to None") + + # ------------------------------------------------------- + # Case 2: no response but an error + # ------------------------------------------------------- + status = 1 + response = [] + error = ["Ooops", "I did", "it again"] + + ssh_command_result = self.Server._parse_command_results(status, response, error) + + # Get result + result_status = ssh_command_result["status"] + result_response = ssh_command_result["response"] + result_error = ssh_command_result["error"] + + self.assertEqual( + result_status, + result_status, + "Status in result must be the same as the initial one", + ) + self.assertIsNone(result_response, "Response in response must be set to None") + self.assertEqual( + result_error, "OoopsI didit again", "Error in result doesn't match expected" + ) + + # ------------------------------------------------------- + # Case 3: several codes all 0, no response but an error + # ------------------------------------------------------- + status = [0, 0, 0] + response = [] + error = ["Ooops", "I did", "it again"] + + ssh_command_result = self.Server._parse_command_results(status, response, error) + + # Get result + result_status = ssh_command_result["status"] + result_response = ssh_command_result["response"] + result_error = ssh_command_result["error"] + + self.assertEqual( + result_status, 0, "Status in result doesn't match expected one" + ) + self.assertIsNone(result_response, "Response in response must be set to None") + self.assertEqual( + result_error, "OoopsI didit again", "Error in result doesn't match expected" + ) + + # ------------------------------------------------------- + # Case 4: codes [0,1,0,4,0], no response but an error + # ------------------------------------------------------- + status = [0, 1, 0, 4, 0] + response = [] + error = ["Ooops", "I did", "it again"] + + ssh_command_result = self.Server._parse_command_results(status, response, error) + + # Get result + result_status = ssh_command_result["status"] + result_response = ssh_command_result["response"] + result_error = ssh_command_result["error"] + + self.assertEqual( + result_status, 4, "Status in result doesn't match expected one" + ) + self.assertIsNone(result_response, "Response in response must be set to None") + self.assertEqual( + result_error, "OoopsI didit again", "Error in result doesn't match expected" + ) + + # ------------------------------------------------------- + # Case 5: regular command execution result with no error + # However the command result is saved in the "error" value. + # For example this happens in 'docker build'. + # ------------------------------------------------------- + status = 0 + error = ["Such much", f"Doge like SSH {placeholder}"] + response = [] + + ssh_command_result = self.Server._parse_command_results( + status, response, error, key_values=[f"{self.secret_2.secret_value}"] + ) + + # Get result + result_status = ssh_command_result["status"] + result_response = ssh_command_result["response"] + result_error = ssh_command_result["error"] + + self.assertEqual( + result_status, + result_status, + "Status in result must be the same as the initial one", + ) + self.assertEqual( + result_error, + f"Such muchDoge like SSH {placeholder}", + "Response in result doesn't match expected", + ) + self.assertIsNone(result_response, "Error in response must be set to None") + + def test_tower_command_action_file_using_template(self): + """ + Test action file using template for tower source + """ + with patch( + "odoo.addons.cetmix_tower_server.models.cx_tower_server.CxTowerServer.upload_file", + return_value="ok", + ): + self.server_test_1.run_command( + self.command_create_file_with_template_tower_source + ) + + log_text_create_success = "File created and uploaded successfully" + log_text_file_exists = "An error occurred: File already exists on server." + + # Get command log + log_record = self.CommandLog.search( + [ + ("server_id", "=", self.server_test_1.id), + ( + "command_id", + "=", + self.command_create_file_with_template_tower_source.id, + ), + ("command_response", "=", log_text_create_success), + ] + ) + + self.assertEqual(len(log_record), 1, msg="Must be a single log record") + + with patch( + "odoo.addons.cetmix_tower_server.models.cx_tower_server.CxTowerServer.upload_file", + return_value="ok", + ): + self.server_test_1.run_command( + self.command_create_file_with_template_tower_source + ) + + log_record_2 = self.CommandLog.search( + [ + ("server_id", "=", self.server_test_1.id), + ( + "command_id", + "=", + self.command_create_file_with_template_tower_source.id, + ), + ("command_error", "=", log_text_file_exists), + ] + ) + + self.assertEqual(len(log_record_2), 1, msg="Must be a single log record") + + def test_server_command_action_file_using_template(self): + """ + Test action file using template for server source + """ + self.assertFalse(self.template_file_server.file_ids) + + def download_file(this, remote_path): + return b"Hello, world!" + + cx_tower_server_obj = self.registry["cx.tower.server"] + + with patch.object(cx_tower_server_obj, "download_file", download_file): + self.server_test_1.run_command( + self.command_create_file_with_template_server_source + ) + + log_text_create_success = "File created and uploaded successfully" + log_text_file_exists = "An error occurred: File already exists on server." + + # Get command log + log_record = self.CommandLog.search( + [ + ("server_id", "=", self.server_test_1.id), + ( + "command_id", + "=", + self.command_create_file_with_template_server_source.id, + ), + ("command_response", "=", log_text_create_success), + ] + ) + + self.assertEqual(len(log_record), 1, msg="Must be a single log record") + self.assertEqual( + len(self.template_file_server.file_ids), 1, msg="Must be one file!" + ) + self.assertEqual( + self.template_file_server.file_ids.source, + "server", + msg="The File source must be 'server'", + ) + + with patch.object(cx_tower_server_obj, "download_file", download_file): + self.server_test_1.run_command( + self.command_create_file_with_template_server_source + ) + + log_record_2 = self.CommandLog.search( + [ + ("server_id", "=", self.server_test_1.id), + ( + "command_id", + "=", + self.command_create_file_with_template_server_source.id, + ), + ("command_error", "=", log_text_file_exists), + ] + ) + + self.assertEqual(len(log_record_2), 1, msg="Must be a single log record") + + def test_run_command_no_command_log(self): + """Run command without creating a log record. + Such commands return execution result directly. + """ + # Add label to track command log + command_label = "Test Command with keys" + custom_values = {"log": {"label": command_label}} + + # Run command for Server 1 + command_result = self.server_test_1.with_context( + no_command_log=True + ).run_command(self.command_create_dir, **custom_values) + self.assertEqual( + command_result["status"], 0, "Command status doesn't match expected one" + ) + self.assertEqual( + command_result["response"], + "ok", + "Command response doesn't match expected one", + ) + self.assertIsNone( + command_result["error"], "Command error doesn't match expected one" + ) + + def test_another_command_is_running(self): + """Test a case when another command is running on the same server""" + + # Remove all existing command logs + self.CommandLog.search([]).unlink() + + # Create a new command log + initial_command_log = self.CommandLog.create( + { + "server_id": self.server_test_1.id, + "command_id": self.command_create_new_command.id, + "start_date": Datetime.now(), + } + ) + + # Run the command without creating a log record + command_result = self.server_test_1.with_context( + no_command_log=True + ).run_command(self.command_create_new_command) + self.assertEqual(command_result["status"], ANOTHER_COMMAND_RUNNING) + + # Run the command with creating a log record + command_result = self.server_test_1.run_command(self.command_create_new_command) + + # Get the command log + command_log = self.CommandLog.search( + [ + ("server_id", "=", self.server_test_1.id), + ("command_id", "=", self.command_create_new_command.id), + ("id", "!=", initial_command_log.id), + ] + ) + self.assertEqual(len(command_log), 1, "Must be a single log record") + self.assertEqual(command_log.command_status, ANOTHER_COMMAND_RUNNING) + + def test_file_using_template_create_if_exists(self): + """Test uploading file using template if it exists on server.""" + + command = self.command_create_file_with_template_server_source + command.write({"if_file_exists": "skip"}) + + # Create file to make sure that it exists on the server + file_template = command.file_template_id + orig_file = file_template.create_file( + server=self.server_test_1, + server_dir=file_template.server_dir, + if_file_exists=command.if_file_exists, + ) + + self.assertTrue(orig_file, "File must be created on the server") + + # Test if file exists and command is set to "skip" + skipped_file = file_template.create_file( + server=self.server_test_1, + server_dir=file_template.server_dir, + if_file_exists=command.if_file_exists, + ) + self.assertEqual( + orig_file, + skipped_file, + "Skip should return the existing file, not create a new one", + ) + self.assertEqual( + self.env["cx.tower.file"].search_count( + [ + ("template_id", "=", file_template.id), + ("server_id", "=", self.server_test_1.id), + ] + ), + 1, + "There must be exactly one physical file record after skip", + ) + + # Change command to raise an error if file exists + command.write({"if_file_exists": "raise"}) + with self.assertRaisesRegex( + ValidationError, + "File already exists on server.", + ): + file_template.create_file( + server=self.server_test_1, + server_dir=file_template.server_dir, + if_file_exists=command.if_file_exists, + ) + # Change command to "overwrite" file if it exists + command.write({"if_file_exists": "overwrite"}) + # Run command again, it should overwrite the file + file_template.create_file( + server=self.server_test_1, + server_dir=file_template.server_dir, + if_file_exists=command.if_file_exists, + ) + self.assertEqual( + self.env["cx.tower.file"].search_count( + [ + ("template_id", "=", file_template.id), + ("server_id", "=", self.server_test_1.id), + ("server_dir", "=", file_template.server_dir), + ] + ), + 1, + "There must be exactly one physical file record after overwrite", + ) + self.assertEqual( + orig_file.code, + file_template.code, + "File code must match template after overwrite", + ) + self.assertEqual( + orig_file.name, + file_template.file_name, + "File name must match template after overwrite", + ) + self.assertEqual( + orig_file.source, + file_template.source, + "File source must match template after overwrite", + ) + + def test_is_file_disconnected_from_template(self): + """Test if file is disconnected from template after being created.""" + + initial_files = self.server_test_1.file_ids + command = self.command_create_file_with_template_server_source + + command.disconnect_file = True + self.server_test_1.run_command(command=command) + + new_files = self.server_test_1.file_ids - initial_files + self.assertEqual(len(new_files), 1, "Must be one new file created") + self.assertEqual( + new_files.code_on_server, + command.file_template_id.code, + "File code must match template", + ) + self.assertFalse( + new_files.template_id, "File must be disconnected from template" + ) + + # --------------------- + # ********************* + # Python commands + # ********************* + # --------------------- + + def test_render_code_python(self): + """Test Python code template direct rendering""" + + rendered_command = self.server_test_1._render_command( + self.command_create_new_command + ) + + # Note: this is rendered as for Server Test 1 + rendered_code_pythonic = ( + f""" +server_name = "{self.server_test_1.name}" +if server_name and #!cxtower.secret.FOLDER!# == "secretFolder": + # We don't actually create a new command because it will raise + # access error if user doesn't have access to 'create' operation. + # Instead we just return a dummy command result. + command = "new command" + result = {{"exit_code": 0, "message": "New command was created"}} +else: + result = {{"exit_code": %s, "message": "error"}} + """ + % GENERAL_ERROR + ) + + self.assertEqual( + rendered_command["rendered_code"], + rendered_code_pythonic, + "Rendered code doesn't match", + ) + + def test_execute_python_command(self): + """ + Run command with python action. + """ + command_result = self.server_test_1.with_context( + no_command_log=True + ).run_command(self.command_create_new_command) + self.assertEqual( + command_result["status"], 0, "The command result status must be 0" + ) + self.assertEqual( + command_result["response"], + "New command was created", + "The response must be text", + ) + + # Check error is raises + self.secret_folder_key.secret_value = "not_a_secretFolder" + command_result = self.server_test_1.with_context( + no_command_log=True + ).run_command(self.command_create_new_command) + self.assertEqual( + command_result["status"], + GENERAL_ERROR, + "The command result status must be GENERAL_ERROR", + ) + self.assertEqual( + command_result["error"], + "error", + "The error response must be contain text - error", + ) + + def test_run_python_code(self): + """ + Test python execution code + """ + rendered_command = self.server_test_1._render_command( + self.command_create_new_command + ) + + command_result = self.server_test_1._run_python_code( + rendered_command["rendered_code"] + ) + self.assertEqual( + command_result["status"], 0, "The command result status must be 0" + ) + self.assertEqual( + command_result["response"], + "New command was created", + "The response must be text", + ) + self.assertIsNone( + command_result["error"], + "Error in command result must be set to None", + ) + + def test_run_command_without_set_server_status(self): + """ + Test command execution without setting server status + """ + # Set command access level to "user" + self.command_create_new_command.write({"access_level": "1"}) + + # Add user to command + self.write_and_invalidate( + self.server_test_1, **{"user_ids": [(4, self.user.id)]} + ) + + # Reset access rule cache + self.env["ir.rule"].invalidate_recordset() + + # Run command + server_status = self.server_test_1.status + + result = ( + self.server_test_1.with_context(no_command_log=True) + .with_user(self.user) + .run_command(self.command_create_new_command) + ) + + # Check command result + self.assertEqual(result["status"], 0, "Command status must be 0") + self.assertEqual( + self.server_test_1.status, server_status, "Server status must be 'running'" + ) + + def test_run_command_with_set_server_status(self): + """ + Test command execution with setting server status + """ + # Set server status to "down" + self.command_create_new_command.write({"server_status": "stopping"}) + + # Run command + self.server_test_1.with_context(no_command_log=True).run_command( + self.command_create_new_command + ) + + # Check command result + self.assertEqual( + self.server_test_1.status, "stopping", "Server status must be 'stopping'" + ) + + def test_run_python_code_with_secret(self): + """ + Test execution of Python code with a secret value. + This test ensures that a command is rendered and executed correctly, + and that the secret value is correctly handled and replaced in the output. + """ + + placeholder = self.Key.SECRET_VALUE_PLACEHOLDER + # Case 1 + # Render the command using server_test_1 + rendered_command = self.server_test_1._render_command( + self.command_python_command_1 + ) + + # Run the rendered Python code + command_result = self.server_test_1._run_python_code( + rendered_command["rendered_code"] + ) + + # Assert that the command execution status is 0 (indicating success) + self.assertEqual( + command_result["status"], 0, "The command result status must be 0" + ) + + # Assert that the response contains the secret spoiler text + self.assertEqual( + command_result["response"], + placeholder, + "The response must correctly include the secret value placeholder", + ) + + # Assert that no error occurred during execution (error should be None) + self.assertIsNone( + command_result["error"], + "The error in command result must be None", + ) + + # Case 2 + # Render the command using server_test_1 + rendered_command = self.server_test_1._render_command( + self.command_python_command_2 + ) + + # Run the rendered Python code + command_result = self.server_test_1._run_python_code( + rendered_command["rendered_code"] + ) + + # Assert that the command execution status is 0 (indicating success) + self.assertEqual( + command_result["status"], 0, "The command result status must be 0" + ) + + # Assert that the response contains the secret spoiler text + self.assertEqual( + command_result["response"], + f'We use "{placeholder}"', + "The response must correctly include the secret value placeholder", + ) + + # Assert that no error occurred during execution (error should be None) + self.assertIsNone( + command_result["error"], + "The error in command result must be None", + ) + + # Case 3 + # Render the command using server_test_1 + rendered_command = self.server_test_1._render_command( + self.command_python_command_3 + ) + + # Run the rendered Python code + command_result = self.server_test_1._run_python_code( + rendered_command["rendered_code"] + ) + + # Assert that the command execution status is 0 (indicating success) + self.assertEqual( + command_result["status"], 0, "The command result status must be 0" + ) + + # Assert that the response contains the secret spoiler text + self.assertEqual( + command_result["response"], + placeholder, + "The response must correctly include the secret value placeholder", + ) + + # Assert that no error occurred during execution (error should be None) + self.assertIsNone( + command_result["error"], + "The error in command result must be None", + ) + + # Case 4 + # Render the command using server_test_1 + rendered_command = self.server_test_1._render_command( + self.command_python_command_4 + ) + + # Run the rendered Python code + # SSH keys are not parsed inline, so we should raise a validation error + command_result = self.server_test_1._run_python_code( + rendered_command["rendered_code"] + ) + + # Assert that the command execution status is 0 (indicating success) + self.assertEqual( + command_result["status"], 0, "The command result status must be 0" + ) + + # Assert that the response contains the secret spoiler text + self.assertEqual( + command_result["response"], + placeholder, + "The response must correctly include the secret value placeholder", + ) + + # Assert that no error occurred during execution (error should be None) + self.assertIsNone( + command_result["error"], + "The error in command result must be None", + ) + + def test_command_with_secret(self): + """ + Test case to verify that when a command includes a secret reference, + the secret key is automatically linked with the command. + """ + + # Command with a secret reference + code = "cd {{ test_path_ }} && mkdir #!cxtower.secret.FOLDER!#" + + secrets = self.Command._extract_secret_ids(code) + secret_folder_key = self.secret_folder_key + self.assertIn( + secret_folder_key, + secrets, + msg=( + f"The expected secret ID #{secret_folder_key.id} " + "was not found in the provided code." + ), + ) + + command_with_keys = self.Command.create( + {"name": "Command with keys", "code": code} + ) + + # -- 1 -- + # Assert that the secret key is linked with the command + self.assertIn( + secret_folder_key, + command_with_keys.secret_ids, + msg="The secret key is not linked with the command.", + ) + + # -- 2 -- + # Update the command's code to remove the secret reference + updated_code = "cd {{ test_path_ }} && mkdir new_folder" + command_with_keys.code = updated_code + + self.assertFalse( + command_with_keys.secret_ids, + msg=( + "The secret_ids field should be empty after " + "removing the secret reference from command." + ), + ) + + # -- 3 -- + # Create a secret with the same reference but connected to another server + another_server = self.server_test_1.copy({"name": "another server"}) + another_secret = self.Key.create( + { + "name": "another secret", + "reference": secret_folder_key.reference, + "key_type": "s", + } + ) + another_secret_value = self.KeyValue.create( + { + "key_id": another_secret.id, + "server_id": another_server.id, + "secret_value": "another secret value", + } + ) + # Set original code again + command_with_keys.code = code + self.assertEqual( + len(command_with_keys.secret_ids), + 1, + msg="Must be only one secret", + ) + self.assertIn( + secret_folder_key, + command_with_keys.secret_ids, + msg="The secret key is not linked with the command.", + ) + self.assertNotIn( + another_secret, + command_with_keys.secret_ids, + msg="The another secret is linked with the command.", + ) + + # -- 4 -- + # Connect command to server and secret to another server + # and ensure it's unlinked from the command. + yet_one_more_server = self.server_test_1.copy({"name": "yet one more server"}) + + self.write_and_invalidate( + another_secret_value, **{"server_id": yet_one_more_server.id} + ) + self.write_and_invalidate( + command_with_keys, **{"server_ids": self.server_test_1} + ) + self.assertEqual( + len(command_with_keys.secret_ids), + 1, + msg="Must be one secret", + ) + + def test_check_zombie_commands(self): + """Test checking and marking zombie commands""" + # Create test commands + ssh_command = self.Command.create( + { + "name": "Test SSH Command", + "code": "ls -la", + "action": "ssh_command", + } + ) + python_command = self.Command.create( + { + "name": "Test Python Command", + "code": "print('test')", + "action": "python_code", + } + ) + plan_command = self.Command.create( + { + "name": "Test Plan Command", + "code": "test plan", + "action": "plan", + } + ) + + # Set command timeout to 10 seconds + self.env["ir.config_parameter"].sudo().set_param( + "cetmix_tower_server.command_timeout", "10" + ) + + # Create command logs with different start times + now = Datetime.now() + old_time = now - timedelta(seconds=20) # Older than timeout + recent_time = now - timedelta(seconds=5) # Within timeout + + # Create zombie SSH command log + zombie_ssh_log = self.CommandLog.create( + { + "command_id": ssh_command.id, + "server_id": self.server_test_1.id, + "start_date": old_time, + } + ) + + # Create zombie Python command log + zombie_python_log = self.CommandLog.create( + { + "command_id": python_command.id, + "server_id": self.server_test_1.id, + "start_date": old_time, + } + ) + + # Create non-zombie command logs + active_ssh_log = self.CommandLog.create( + { + "command_id": ssh_command.id, + "server_id": self.server_test_1.id, + "start_date": recent_time, + } + ) + + plan_log = self.CommandLog.create( + { + "command_id": plan_command.id, + "server_id": self.server_test_1.id, + "start_date": old_time, + } + ) + + # Test with timeout set + self.server_test_1._check_zombie_commands() + + # Check zombie commands are marked as finished + self.assertFalse( + zombie_ssh_log.is_running, "Zombie SSH command should be marked as finished" + ) + self.assertFalse( + zombie_python_log.is_running, + "Zombie Python command should be marked as finished", + ) + self.assertEqual( + zombie_ssh_log.command_status, + COMMAND_TIMED_OUT, + "Zombie SSH command should have timed out status", + ) + self.assertEqual( + zombie_python_log.command_error, + COMMAND_TIMED_OUT_MESSAGE, + "Zombie Python command should have timeout error message", + ) + + # Check non-zombie commands are still running + self.assertTrue( + active_ssh_log.is_running, "Recent command should still be running" + ) + self.assertTrue( + plan_log.is_running, "Plan command should not be affected by timeout" + ) + + # Test with timeout disabled + self.env["ir.config_parameter"].sudo().set_param( + "cetmix_tower_server.command_timeout", "0" + ) + + # Create new zombie command log + new_zombie_log = self.CommandLog.create( + { + "command_id": ssh_command.id, + "server_id": self.server_test_1.id, + "start_date": old_time, + } + ) + + self.server_test_1._check_zombie_commands() + self.assertNotEqual( + new_zombie_log.command_status, + COMMAND_TIMED_OUT, + "Commands should not be marked as timed out when timeout is disabled", + ) + + def test_command_with_malformed_code(self): + """Test rendering command using `_render_command` method + of cx.tower.server with malformed code + """ + + with self.assertRaises(ValidationError): + self.Command.create( + { + "name": "Test Malformed Command", + "code": "cd {{ !@238203 }} && mkdir #!cxtower.secret.FOLDER!#", + "action": "ssh_command", + } + )