From ca0996089da5da80b71d9d0f43136ed9b5d69745 Mon Sep 17 00:00:00 2001 From: git_admin Date: Mon, 27 Apr 2026 08:18:31 +0000 Subject: [PATCH] Tower: upload cetmix_tower_server 16.0.3.0.1 (via marketplace) --- .../cetmix_tower_server/tests/test_server.py | 890 ++++++++++++++++++ 1 file changed, 890 insertions(+) create mode 100644 addons/cetmix_tower_server/tests/test_server.py diff --git a/addons/cetmix_tower_server/tests/test_server.py b/addons/cetmix_tower_server/tests/test_server.py new file mode 100644 index 0000000..d279423 --- /dev/null +++ b/addons/cetmix_tower_server/tests/test_server.py @@ -0,0 +1,890 @@ +from odoo.exceptions import AccessError, ValidationError + +from ..models.constants import COMMAND_NOT_COMPATIBLE_WITH_SERVER +from .common import TestTowerCommon + + +class TestTowerServer(TestTowerCommon): + @classmethod + def setUpClass(cls): + super().setUpClass() + + cls.os_ubuntu_20_04 = cls.env["cx.tower.os"].create({"name": "Ubuntu 20.04"}) + + # Define model variables to avoid unsubscriptable errors + Key = cls.env["cx.tower.key"] + Server = cls.env["cx.tower.server"] + + secret_1 = Key.create( + { + "name": "Secret 1", + "secret_value": "secret_value_1", + "key_type": "s", + }, + ) + secret_2 = Key.create( + { + "name": "Secret 2", + "secret_value": "secret_value_2", + "key_type": "s", + }, + ) + cls.server_test_2 = Server.create( + { + "name": "Test Server #2", + "color": 2, + "ip_v4_address": "localhost", + "ssh_username": "admin", + "ssh_password": "password", + "ssh_auth_mode": "k", + "host_key": "test_key", + "use_sudo": "p", + "ssh_key_id": cls.key_1.id, + "os_id": cls.os_ubuntu_20_04.id, + "secret_ids": [ + ( + 0, + 0, + { + "key_id": secret_1.id, + "secret_value": "secret_value_1", + }, + ), + ( + 0, + 0, + { + "key_id": secret_2.id, + "secret_value": "secret_value_2", + }, + ), + ], + "tag_ids": [(6, 0, [cls.tag_test_production.id])], + } + ) + + # Files + File = cls.env["cx.tower.file"] + cls.server_test_2_file = File.create( + { + "name": "tower_demo_without_template_{{ branch }}.txt", + "source": "tower", + "server_id": cls.server_test_2.id, + "server_dir": "{{ test_path }}", + "code": "Please, check url: {{ url }}", + } + ) + + # Flight plan to delete the server + Command = cls.env["cx.tower.command"] + Plan = cls.env["cx.tower.plan"] + + # Add a command to delete the server + cls.command_delete_server = Command.create( + { + "name": "Python command for deleting server", + "action": "python_code", + "code": """ +partner = env["res.partner"].create({"name": "Partner 1", "ref": "delete_server"}) +result = { + "exit_code": 0, + "message": partner.name, +} + """, + } + ) + + cls.plan_delete_server = Plan.create( + { + "name": "Delete server", + "line_ids": [ + (0, 0, {"command_id": cls.command_delete_server.id, "sequence": 1}), + ], + } + ) + + # Create two test users that belong only to the "User" group. + cls.user1 = cls.Users.create( + { + "name": "Test User 1", + "login": "test_user1", + "email": "test_user1@example.com", + "groups_id": [(6, 0, [cls.group_user.id])], + } + ) + cls.user2 = cls.Users.create( + { + "name": "Test User 2", + "login": "test_user2", + "email": "test_user2@example.com", + "groups_id": [(6, 0, [cls.group_user.id])], + } + ) + # Create two "Manager" group users. + cls.manager1 = cls.Users.create( + { + "name": "Manager 1", + "login": "manager1", + "email": "manager1@example.com", + "groups_id": [(6, 0, [cls.group_manager.id])], + } + ) + cls.manager2 = cls.Users.create( + { + "name": "Manager 2", + "login": "manager2", + "email": "manager2@example.com", + "groups_id": [(6, 0, [cls.group_manager.id])], + } + ) + + def test_server_copy(self): + """Test server copy""" + + # Let's say we have auto sync enabled on one of the files in server 2 + self.server_test_2_file.auto_sync = True + fields_to_check = [ + "ip_v4_address", + "ip_v6_address", + "ssh_username", + "ssh_password", + "ssh_key_id", + ] + + # Crete a log from file of type 'server' + file_for_log = self.File.create( + { + "source": "server", + "name": "test.log", + "server_dir": "/tmp", + "server_id": self.server_test_2.id, + "code": "Some log record - server", + } + ) + + server_log_server = self.ServerLog.create( + { + "name": "Log from file", + "server_id": self.server_test_2.id, + "log_type": "file", + "file_id": file_for_log.id, + } + ) + # Add variable values to server + self.env["cx.tower.variable.value"].create( + { + "server_id": self.server_test_2.id, + "variable_id": self.variable_dir.id, + "value_char": "test", + } + ) + + # Copy server 2 + server_test_2_copy = self.server_test_2.copy() + + # The name of copy should contain '~ (copy)' suffix + self.assertTrue( + server_test_2_copy.name == self.server_test_2.name + " (copy)", + msg="Server name should contain '~ (copy)' suffix!", + ) + + # Check server logs + # Check that the copied server has the same number of server logs + self.assertEqual( + len(server_test_2_copy.server_log_ids), + len(self.server_test_2.server_log_ids), + ( + "Copied template should have the same " + "number of server logs as the original" + ), + ) + + # Ensure the first server log in the copied server matches the original + copied_log = server_test_2_copy.server_log_ids + self.assertEqual( + copied_log.name, + server_log_server.name, + "Server log name should be the same in the copied server", + ) + self.assertEqual( + copied_log.command_id.id, + server_log_server.command_id.id, + "Command ID should be the same in the copied server log", + ) + self.assertEqual( + copied_log.command_id.code, + server_log_server.command_id.code, + "Command code should be the same in the copied server log", + ) + + # Check fields match list + for field_ in fields_to_check: + self.assertTrue( + getattr(server_test_2_copy, field_) + == getattr(self.server_test_2, field_), + msg=( + f"Field {field_} value on server copy " + "does not match with the source!" + ), + ) + + # Check if auto sync is disabled on the all the files + # in the copied server + self.assertTrue( + all([not file.auto_sync for file in server_test_2_copy.file_ids]), + msg="Auto sync should be disabled on all the files in the copied server!", + ) + + # Check if 'keep_when_deleted' option is enabled on all the files + # in the copied server + self.assertTrue( + all([file.keep_when_deleted for file in server_test_2_copy.file_ids]), + msg=( + "keep_when_deleted option should be enabled on all the files " + "in the copied server!" + ), + ) + + # Check if secret values of keys in the copied server are the same + # as in source server + self.assertTrue( + all( + [ + key_copy.secret_value == key_src.secret_value + for key_src, key_copy in zip( # noqa: B905 we need to run on Python 3.10 + self.server_test_2.secret_ids.sudo(), + server_test_2_copy.secret_ids.sudo(), + ) + ] + ), + msg=( + "Secret values of keys in the copied server " + "should be the same as in source server!" + ), + ) + + # Variable names and values in server copy should be the same + # as in source server + self.assertTrue( + all( + [ + var_copy.variable_reference == var_src.variable_reference + and var_copy.value_char == var_src.value_char + for var_src, var_copy in zip( # noqa: B905 we need to run on Python 3.10 + self.server_test_2.variable_value_ids, + server_test_2_copy.variable_value_ids, + ) + ] + ), + msg=( + "Variable names and values in server copy " + "should be the same as in source server!" + ), + ) + + # Copy copied server + server_test_2_new_copy = server_test_2_copy.copy() + # Variable names and values in server copy should be the same + # as in source server + self.assertTrue( + all( + [ + var_copy.variable_reference == var_src.variable_reference + and var_copy.value_char == var_src.value_char + and var_copy.reference == f"{var_src.reference}_copy" + for var_src, var_copy in zip( # noqa: B905 we need to run on Python 3.10 + server_test_2_copy.variable_value_ids, + server_test_2_new_copy.variable_value_ids, + ) + ] + ), + msg=( + "Variable names and values in server copy " + "should be the same as in source server!" + ), + ) + + def test_server_archive_unarchive(self): + """Test Server archived/unarchived""" + server = self.server_test_1.copy() + self.assertTrue(server, msg="Server must be unarchived") + server.toggle_active() + server.toggle_active() + self.assertTrue(server, msg="Server must be unarchived") + + def test_server_unlink(self): + """ + Test cascading deletion of server and its related records. + """ + secret_1 = self.Key.create( + { + "name": "Secret 1", + "secret_value": "secret_value_1", + "key_type": "s", + }, + ) + # Create a test server + server = self.Server.create( + { + "name": "Test Server #3", + "color": 3, + "ip_v4_address": "localhost", + "ssh_username": "admin", + "ssh_password": "password", + "ssh_auth_mode": "k", + "use_sudo": "p", + "ssh_key_id": self.key_1.id, + "host_key": "test_key", + "os_id": self.os_ubuntu_20_04.id, + "secret_ids": [ + ( + 0, + 0, + { + "key_id": secret_1.id, + "secret_value": "secret_value_1", + }, + ), + ], + } + ) + + # Create related file + file = self.File.create( + {"name": "Test File", "server_id": server.id, "source": "server"} + ) + + # Related secret + secret = server.secret_ids[0] + + variable_meme = self.Variable.create({"name": "meme"}) + + # Create related variable value + variable_value = self.env["cx.tower.variable.value"].create( + { + "variable_id": variable_meme.id, # Replace with valid reference + "value_char": "Test Value", + "server_id": server.id, + } + ) + plan_1 = self.Plan.create( + { + "name": "Test plan", + "note": "Create directory and list its content", + } + ) + # Create a related plan log + plan_log = self.PlanLog.create( + { + "server_id": server.id, + "plan_id": plan_1.id, # Replace with valid reference + } + ) + + # Check that all records are created + self.assertTrue(server, "Server should be created successfully") + self.assertTrue(file, "File should be created successfully") + self.assertTrue(secret, "Secret should be created successfully") + self.assertTrue(variable_value, "Variable Value should be created successfully") + self.assertTrue(plan_log, "Plan Log should be created successfully") + + # Collect IDs for verification post-deletion + file_id = file.id + variable_value_id = variable_value.id + plan_log_id = plan_log.id + + # Delete the server + server.unlink() + + # Verify that the server is deleted + self.assertFalse( + self.Server.search([("id", "=", server.id)]), + msg="Server should be deleted", + ) + # Verify that related records are deleted + self.assertFalse( + self.File.search([("id", "=", file_id)]), + msg="File should be deleted when server is deleted", + ) + # Verify that unrelated records are not affected + self.assertTrue( + self.Plan.search([("id", "=", plan_1.id)]), + msg="Unrelated plan should not be deleted when server is deleted", + ) + self.assertFalse( + self.KeyValue.search([("id", "=", secret.id)]), + msg="Secret should be deleted when server is deleted", + ) + self.assertFalse( + self.VariableValue.search([("id", "=", variable_value_id)]), + msg="Variable Value should be deleted when server is deleted", + ) + self.assertFalse( + self.PlanLog.search([("id", "=", plan_log_id)]), + msg="Plan Log should be deleted when server is deleted", + ) + + def test_server_delete_plan_success(self): + """Test server delete plan""" + + # Set plan to delete the server + self.server_test_2.plan_delete_id = self.plan_delete_server.id + + # Delete the server + self.server_test_2.unlink() + + # Check if the server has been deleted + self.assertFalse( + self.server_test_2.exists(), + msg="Server should be deleted", + ) + + # Check if the partner has been created + self.assertTrue( + self.env["res.partner"].search([("ref", "=", "delete_server")]), + msg="Partner should be created", + ) + + def test_server_delete_plan_error(self): + """Test server delete plan error""" + + # Modify the command to fail + self.command_delete_server.code = """ +result = { + "exit_code": 4, + "message": 'Such much error', +} + """ + # Set plan to delete the server + self.server_test_2.plan_delete_id = self.plan_delete_server.id + + # Delete the server + self.server_test_2.unlink() + + # Check if the server has been deleted + self.assertTrue( + self.server_test_2.exists(), + msg="Server should not be deleted", + ) + + self.assertEqual( + self.server_test_2.status, + "delete_error", + msg="Server status should be delete_error", + ) + + # ------------------------------------------------------------ + # ---- Access + # ------------------------------------------------------------ + def test_user_record_not_visible_without_user_ids(self): + """ + Test that a user in the 'cetmix_tower_server.group_user' group cannot see + a Tower Server record if not added to user_ids. + """ + # Create a Tower Server record without any user_ids. + record = self.Server.create( + { + "name": "User Visibility Test", + "ip_v4_address": "localhost", + "ssh_username": "admin", + "ssh_password": "password", + "ssh_auth_mode": "p", + "os_id": self.os_debian_10.id, + "user_ids": [(5, 0, 0)], + } + ) + # As user1, search for the record. Since user1's partner is not subscribed, + # the record should not be returned. + records = self.Server.with_user(self.user1).search([("id", "=", record.id)]) + self.assertFalse( + records, + "User1 should not see the record if not added to user_ids.", + ) + + def test_user_record_visible_after_added_to_user_ids(self): + """ + Test that a user sees a Tower Server record after being added to user_ids. + """ + record = self.Server.create( + { + "name": "User Visibility Test", + "ip_v4_address": "localhost", + "ssh_username": "admin", + "ssh_password": "password", + "ssh_auth_mode": "p", + "os_id": self.os_debian_10.id, + "user_ids": [(4, self.user1.id)], + } + ) + # Now, as user1 the record should be visible. + records = self.Server.with_user(self.user1).search([("id", "=", record.id)]) + self.assertTrue( + records, + "User1 should see the record after being added to message_partner_ids.", + ) + + def test_only_added_user_can_see(self): + """ + Test that only the added user can see the Tower Server record. + """ + record = self.Server.create( + { + "name": "User Visibility Test", + "ip_v4_address": "localhost", + "ssh_username": "admin", + "ssh_password": "password", + "ssh_auth_mode": "p", + "os_id": self.os_debian_10.id, + "user_ids": [(4, self.user1.id)], + } + ) + # Subscribe only user1's partner. + records_user1 = self.Server.with_user(self.user1).search( + [("id", "=", record.id)] + ) + records_user2 = self.Server.with_user(self.user2).search( + [("id", "=", record.id)] + ) + self.assertTrue( + records_user1, "User1 should see the record after being added to user_ids." + ) + self.assertFalse( + records_user2, + "User2 should not see the record if they are not added to user_ids.", + ) + + def test_manager_read_access_as_follower(self): + """A manager should be able to read a record if his partner is a follower.""" + + # Create a record without any managers in manager_ids. + record = self.Server.create( + { + "name": "Test Server (Follower)", + "ip_v4_address": "localhost", + "ssh_username": "admin", + "ssh_password": "password", + "ssh_auth_mode": "p", + "os_id": self.os_debian_10.id, + # Explicitly clear manager_ids + "manager_ids": [(6, 0, [])], + } + ) + # Subscribe manager1 to the record so that his partner becomes a follower. + record.write({"user_ids": [(4, self.manager1.id)]}) + + # As manager1 (a follower) the record should be visible. + records = self.Server.with_user(self.manager1).search([("id", "=", record.id)]) + self.assertTrue(records, "Manager1 (user) must be able to read the record.") + + # As manager2 (not a follower and not in manager_ids) + # the record should not be visible. + records = self.Server.with_user(self.manager2).search([("id", "=", record.id)]) + self.assertFalse( + records, + "Manager2 (not user_ids and not in manager_ids) must not see the record.", + ) + + def test_manager_read_access_as_manager_ids(self): + """A manager should be able to read a record if he is added to manager_ids.""" + + # Create a record with manager2 added to manager_ids. + record = self.Server.create( + { + "name": "Test Server (Manager)", + "ip_v4_address": "localhost", + "ssh_username": "admin", + "ssh_password": "password", + "ssh_auth_mode": "p", + "os_id": self.os_debian_10.id, + "manager_ids": [(6, 0, [self.manager2.id])], + } + ) + # Without adding to user_ids, manager2 should be able to see the record. + records = self.Server.with_user(self.manager2).search([("id", "=", record.id)]) + self.assertTrue( + records, "Manager2 (in manager_ids) must be able to read the record." + ) + + # Manager1 is not added to user_ids nor in manager_ids + # so should not see the record. + records = self.Server.with_user(self.manager1).search([("id", "=", record.id)]) + self.assertFalse( + records, + "Manager1 (neither user_ids nor in manager_ids) must not see the record.", + ) + + # Add manager1 to user_ids + record.write({"user_ids": [(4, self.manager1.id)]}) + records = self.Server.with_user(self.manager1).search([("id", "=", record.id)]) + self.assertTrue( + records, + "Manager1 (added to user_ids) must be able to see the record.", + ) + + def test_manager_write_access(self): + """A manager should be able to update a record only if he is in manager_ids.""" + + # Create a record with no managers. + record = self.Server.create( + { + "name": "Test Server (Write)", + "ip_v4_address": "localhost", + "ssh_username": "admin", + "ssh_password": "password", + "ssh_auth_mode": "p", + "os_id": self.os_debian_10.id, + "manager_ids": [(6, 0, [])], + } + ) + + # Manager1 (not in manager_ids) tries to update: should raise an AccessError. + with self.assertRaises(AccessError): + record.with_user(self.manager1).write({"name": "Updated Name"}) + + # Update the record to include manager1 in manager_ids. + record.write({"manager_ids": [(4, self.manager1.id)]}) + try: + record.with_user(self.manager1).write({"name": "Updated Name"}) + except AccessError: + self.fail( + "Manager1 must be able to update the " + "record after being added to manager_ids." + ) + + def test_manager_create_access(self): + """ + A manager should be allowed to create a record only if he is added + in the "Managers". + """ + # Manager1 attempts to create a record without including himself in manager_ids. + with self.assertRaises(AccessError): + self.Server.with_user(self.manager1).create( + { + "name": "Test Server (Create Denied)", + "ip_v4_address": "localhost", + "ssh_username": "admin", + "ssh_password": "password", + "ssh_auth_mode": "p", + "os_id": self.os_debian_10.id, + "manager_ids": [(6, 0, [])], + } + ) + + # Manager1 creates a record with himself added to manager_ids. + try: + record = self.Server.with_user(self.manager1).create( + { + "name": "Test Server (Create Allowed)", + "ip_v4_address": "localhost", + "ssh_username": "admin", + "ssh_password": "password", + "ssh_auth_mode": "p", + "os_id": self.os_debian_10.id, + "manager_ids": [(6, 0, [self.manager1.id])], + } + ) + self.assertTrue( + record, + "Manager1 must be able to create the record if he is in manager_ids.", + ) + except AccessError: + self.fail( + "Manager1 should be allowed to create a " + "record when included in manager_ids." + ) + + def test_manager_delete_access(self): + """ + A manager should be allowed to delete a record only if: + - He is in the manager_ids field, and + - He is the creator of the record. + """ + + # -- Scenario 1: Manager1 creates a record with himself in manager_ids. + record = self.Server.with_user(self.manager1).create( + { + "name": "Test Server (Delete Allowed)", + "ip_v4_address": "localhost", + "ssh_username": "admin", + "ssh_password": "password", + "ssh_auth_mode": "p", + "os_id": self.os_debian_10.id, + "manager_ids": [(6, 0, [self.manager1.id])], + } + ) + # Manager1 should be able to delete his own record. + try: + record.with_user(self.manager1).unlink() + except AccessError: + self.fail( + "Manager1 must be able to delete his own record if in manager_ids." + ) + + # -- Scenario 2: Manager2 creates a record (with himself in manager_ids). + record2 = self.Server.with_user(self.manager2).create( + { + "name": "Test Server (Delete Denied - Not Creator)", + "ip_v4_address": "localhost", + "ssh_username": "admin", + "ssh_password": "password", + "ssh_auth_mode": "p", + "os_id": self.os_debian_10.id, + "manager_ids": [(6, 0, [self.manager2.id, self.manager1.id])], + } + ) + # Manager1, should not be able to delete record2. + with self.assertRaises(AccessError): + record2.with_user(self.manager1).unlink() + + # Remove manager2 from manager_ids. + record2.write({"manager_ids": [(6, 0, [])]}) + + # Manager2 should not be able to delete record2 now + # because he is not in manager_ids. + with self.assertRaises(AccessError): + record2.with_user(self.manager2).unlink() + + def test_command_server_compatibility(self): + """Test command compatibility with servers""" + # Create a command restricted to specific servers + command = self.Command.create( + { + "name": "Restricted Command", + "action": "ssh_command", + "code": "echo 'test'", + "server_ids": [(6, 0, [self.server_test_1.id])], + } + ) + + # Should work on allowed server + try: + self.server_test_1.run_command(command) + except Exception as e: + self.fail(f"Command should execute on allowed server but failed: {e}") + + # Should fail on non-allowed server + command_result = self.server_test_2.with_context( + no_command_log=True + ).run_command(command) + self.assertEqual( + command_result["status"], + COMMAND_NOT_COMPATIBLE_WITH_SERVER, + "Command should not execute on non-allowed server", + ) + + # Clear all existing command logs + self.CommandLog.search([]).unlink() + # Same test but with command log + self.server_test_2.run_command(command) + + command_log = self.CommandLog.search([]) + self.assertEqual(len(command_log), 1, "Must be a single log record") + self.assertEqual( + command_log.command_status, + COMMAND_NOT_COMPATIBLE_WITH_SERVER, + "Command should not execute on non-allowed server", + ) + + # Command without server restrictions should work on any server + unrestricted_command = self.Command.create( + { + "name": "Unrestricted Command", + "action": "ssh_command", + "code": "echo 'test'", + } + ) + + try: + self.server_test_1.run_command(unrestricted_command) + self.server_test_2.run_command(unrestricted_command) + except Exception as e: + self.fail( + f"Unrestricted command should execute on any server but failed: {e}" + ) + + def test_server_host_key_validation(self): + """Test server host key validation""" + server = self.Server.create( + { + "name": "Test Server", + "ip_v4_address": "localhost", + "ssh_username": "admin", + "ssh_password": "password", + "ssh_auth_mode": "p", + "os_id": self.os_debian_10.id, + "host_key": "test_key", + "skip_host_key": False, + } + ) + # Test with host key + server.test_ssh_connection() + + # Test without host key + server.host_key = None + with self.assertRaises(ValidationError): + server.test_ssh_connection() + + # Test with skip_host_key + server.skip_host_key = True + server.test_ssh_connection() + + def test_server_reference_update(self): + """Test server reference update cascades to dependent models""" + # 1. Add a variable value to server_test_1 + variable_value = self.VariableValue.create( + { + "variable_id": self.variable_os.id, + "value_char": "Ubuntu 20.04", + "server_id": self.server_test_1.id, + } + ) + + # 2. Add a file to server_test_1 + server_file = self.File.create( + { + "name": "test_file.txt", + "server_id": self.server_test_1.id, + "source": "tower", + "code": "Test file content", + } + ) + + # Store original references for comparison + original_server_reference = self.server_test_1.reference + original_variable_value_reference = variable_value.reference + original_file_reference = server_file.reference + + # 3. Change the reference for server_test_1 to "awesome_server" + self.server_test_1.write({"reference": "awesome_server"}) + + # 4. Verify that references are updated for dependent models + # Invalidate models to refresh all references + self.env["cx.tower.server"].invalidate_model(["reference"]) + self.env["cx.tower.variable.value"].invalidate_model(["reference"]) + self.env["cx.tower.file"].invalidate_model(["reference"]) + + # Check that server reference was updated + self.assertEqual(self.server_test_1.reference, "awesome_server") + self.assertNotEqual(self.server_test_1.reference, original_server_reference) + + # Check that variable value reference was updated + # to include the new server reference + self.assertIn("awesome_server", variable_value.reference) + self.assertNotEqual(variable_value.reference, original_variable_value_reference) + + # Check that file reference was updated to include the new server reference + self.assertIn("awesome_server", server_file.reference) + self.assertNotEqual(server_file.reference, original_file_reference) + + # Verify the reference pattern for variable value follows the expected format: + # ___ # noqa: E501 + expected_variable_pattern = ( + f"{self.variable_os.reference}_variable_value_server_" + f"{self.server_test_1.reference}" + ) + self.assertEqual(variable_value.reference, expected_variable_pattern) + + # Verify the reference pattern for file follows the expected format: + # __ + expected_file_pattern = f"{self.server_test_1.reference}_file_1" + self.assertEqual(server_file.reference, expected_file_pattern)