diff --git a/addons/cetmix_tower_server/tests/test_vault_mixin.py b/addons/cetmix_tower_server/tests/test_vault_mixin.py new file mode 100644 index 0000000..6f2e6af --- /dev/null +++ b/addons/cetmix_tower_server/tests/test_vault_mixin.py @@ -0,0 +1,506 @@ +# Copyright (C) 2022 Cetmix OÜ +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). + + +from .common import TestTowerCommon + + +class TestVaultMixin(TestTowerCommon): + """Test vault mixin functionality.""" + + def test_vault_mixin_secret_fields(self): + """Test vault mixin functionality for secret fields + (host_key and ssh_password)""" + # Create a server with initial secret values + initial_password = "initial_password" + initial_host_key = "initial_host_key" + + server = self.Server.create( + { + "name": "Vault Test Server", + "ip_v4_address": "localhost", + "ssh_username": "admin", + "ssh_password": initial_password, + "ssh_auth_mode": "p", + "os_id": self.os_debian_10.id, + "host_key": initial_host_key, + "skip_host_key": False, + } + ) + + # Test 1: Verify initial values are stored in vault and accessible + # Read values using common way - should return placeholder + self.assertEqual( + server.ssh_password, + self.Server.SECRET_VALUE_PLACEHOLDER, + "ssh_password should return placeholder value when read normally", + ) + self.assertEqual( + server.host_key, + self.Server.SECRET_VALUE_PLACEHOLDER, + "host_key should return placeholder value when read normally", + ) + + # Read using _get_secret_values() - should return actual initial values + secret_values = server._get_secret_values() + self.assertIsNotNone(secret_values, "secret_values should not be None") + self.assertIn(server.id, secret_values, "Server ID should be in secret values") + + server_secrets = secret_values[server.id] + self.assertIn( + "ssh_password", server_secrets, "ssh_password should be in secret values" + ) + self.assertIn("host_key", server_secrets, "host_key should be in secret values") + + self.assertEqual( + server_secrets["ssh_password"], + initial_password, + "ssh_password should return initial value from vault", + ) + self.assertEqual( + server_secrets["host_key"], + initial_host_key, + "host_key should return initial value from vault", + ) + + # Read individual fields using _get_secret_value() + # should return initial values + retrieved_password = server._get_secret_value("ssh_password") + retrieved_host_key = server._get_secret_value("host_key") + + self.assertEqual( + retrieved_password, + initial_password, + "_get_secret_value should return correct initial ssh_password", + ) + self.assertEqual( + retrieved_host_key, + initial_host_key, + "_get_secret_value should return correct initial host_key", + ) + + # Test 2: Save new values to secret fields + new_password = "new_secure_password_123" + new_host_key = "new_host_key_456" + + server.write( + { + "ssh_password": new_password, + "host_key": new_host_key, + } + ) + + # Test 3: Read values using common way after update - should return placeholder + # Note: In Odoo, we need to re-read the record to see updated values + server = self.Server.browse(server.id) + self.assertEqual( + server.ssh_password, + self.Server.SECRET_VALUE_PLACEHOLDER, + "ssh_password should return placeholder value when read normally " + "after update", + ) + self.assertEqual( + server.host_key, + self.Server.SECRET_VALUE_PLACEHOLDER, + "host_key should return placeholder value when read normally " + "after update", + ) + + # Test 4: Read using _get_secret_values() after update + # should return new values + secret_values = server._get_secret_values() + self.assertIsNotNone( + secret_values, "secret_values should not be None after update" + ) + self.assertIn( + server.id, + secret_values, + "Server ID should be in secret values after update", + ) + + server_secrets = secret_values[server.id] + self.assertIn( + "ssh_password", + server_secrets, + "ssh_password should be in secret values after update", + ) + self.assertIn( + "host_key", + server_secrets, + "host_key should be in secret values after update", + ) + + self.assertEqual( + server_secrets["ssh_password"], + new_password, + "ssh_password should return new value from vault after update", + ) + self.assertEqual( + server_secrets["host_key"], + new_host_key, + "host_key should return new value from vault after update", + ) + + # Test 5: Read individual fields using _get_secret_value() after update + # Get both values in one call using _get_secret_values() + secret_values = server._get_secret_values() + self.assertIsNotNone( + secret_values, "secret_values should not be None for individual field test" + ) + self.assertIn( + server.id, + secret_values, + "Server ID should be in secret values for individual field test", + ) + + server_secrets = secret_values[server.id] + retrieved_password = server_secrets["ssh_password"] + retrieved_host_key = server_secrets["host_key"] + + self.assertEqual( + retrieved_password, + new_password, + "_get_secret_values should return correct new ssh_password after update", + ) + self.assertEqual( + retrieved_host_key, + new_host_key, + "_get_secret_values should return correct new host_key after update", + ) + + # Test 6: Verify that non-secret fields are not affected + self.assertEqual( + server.name, + "Vault Test Server", + "Non-secret field should not be affected by vault mixin", + ) + self.assertEqual( + server.ssh_username, + "admin", + "Non-secret field should not be affected by vault mixin", + ) + + def test_vault_mixin_create_with_secret_fields(self): + """Test vault mixin functionality when creating records with secret fields""" + # Create a server with secret fields + server = self.Server.create( + { + "name": "Create Test Server", + "ip_v4_address": "localhost", + "ssh_username": "admin", + "ssh_password": "create_password", + "ssh_auth_mode": "p", + "os_id": self.os_debian_10.id, + "host_key": "create_host_key", + "skip_host_key": False, + } + ) + + # Verify secret fields are stored in vault and not in main table + self.assertEqual( + server.ssh_password, + self.Server.SECRET_VALUE_PLACEHOLDER, + "ssh_password should return placeholder after creation", + ) + self.assertEqual( + server.host_key, + self.Server.SECRET_VALUE_PLACEHOLDER, + "host_key should return placeholder after creation", + ) + + # Verify actual values are accessible via vault methods + secret_values = server._get_secret_values() + self.assertIn( + server.id, + secret_values, + "Server ID should be in secret values after creation", + ) + + server_secrets = secret_values[server.id] + self.assertEqual( + server_secrets["ssh_password"], + "create_password", + "ssh_password should be stored in vault after creation", + ) + self.assertEqual( + server_secrets["host_key"], + "create_host_key", + "host_key should be stored in vault after creation", + ) + + def test_vault_mixin_delete_secret_fields(self): + """Test vault mixin functionality when deleting secret field values""" + # Create a server with secret fields + server = self.Server.create( + { + "name": "Delete Test Server", + "ip_v4_address": "localhost", + "ssh_username": "admin", + "ssh_password": "delete_password", + "ssh_auth_mode": "p", + "os_id": self.os_debian_10.id, + "host_key": "delete_host_key", + "skip_host_key": False, + } + ) + + # Verify initial values exist + secret_values = server._get_secret_values() + self.assertIn( + "ssh_password", + secret_values[server.id], + "ssh_password should exist initially", + ) + self.assertIn( + "host_key", secret_values[server.id], "host_key should exist initially" + ) + + # Delete secret field values + server.write( + { + "ssh_password": False, + "host_key": False, + } + ) + + # Verify values are removed from vault + secret_values = server._get_secret_values() + server_secrets = secret_values.get(server.id, {}) + + self.assertNotIn( + "ssh_password", server_secrets, "ssh_password should be removed from vault" + ) + self.assertNotIn( + "host_key", server_secrets, "host_key should be removed from vault" + ) + + # Verify normal field access still returns placeholders + server = self.Server.browse(server.id) + self.assertEqual( + server.ssh_password, + self.Server.SECRET_VALUE_PLACEHOLDER, + "ssh_password should return placeholder after deletion", + ) + self.assertEqual( + server.host_key, + self.Server.SECRET_VALUE_PLACEHOLDER, + "host_key should return placeholder after deletion", + ) + + def test_vault_mixin_bulk_create_with_secret_fields(self): + """Test vault mixin functionality when creating multiple servers with different + secret field configurations""" + placeholder = self.Server.SECRET_VALUE_PLACEHOLDER + # Create 3 servers with different secret field configurations + servers_data = [ + { + "name": "Server 1 - Both Fields", + "ip_v4_address": "localhost", + "ssh_username": "admin", + "ssh_password": "password1", + "ssh_auth_mode": "p", + "os_id": self.os_debian_10.id, + "host_key": "host_key1", + "skip_host_key": False, + }, + { + "name": "Server 2 - Host Key Only", + "ip_v4_address": "localhost", + "ssh_username": "admin", + "ssh_auth_mode": "k", + "os_id": self.os_debian_10.id, + "host_key": "host_key2", + "skip_host_key": False, + "ssh_key_id": self.key_1.id, + }, + { + "name": "Server 3 - SSH Password Only", + "ip_v4_address": "localhost", + "ssh_username": "admin", + "ssh_password": "password3", + "ssh_auth_mode": "p", + "os_id": self.os_debian_10.id, + "skip_host_key": True, + }, + ] + + # Create all servers in one call + servers = self.Server.create(servers_data) + + # Verify we have 3 servers + self.assertEqual(len(servers), 3, "Should have created 3 servers") + + # Test 1: Get values for all 3 servers regular way - should return placeholders + for server in servers: + self.assertEqual( + server.ssh_password, + placeholder, + f"Server {server.name} ssh_password should return placeholder " + f"when read normally", + ) + + self.assertEqual( + server.host_key, + placeholder, + f"Server {server.name} host_key should return placeholder " + f"when read normally", + ) + + # Test 2: Get values for all 3 servers at once using _get_secret_values() + all_secret_values = servers._get_secret_values() + self.assertIsNotNone(all_secret_values, "all_secret_values should not be None") + + # Verify Server 1 (both fields) + server1 = servers[0] + self.assertIn( + server1.id, all_secret_values, "Server 1 should be in secret values" + ) + server1_secrets = all_secret_values[server1.id] + + self.assertEqual( + server1_secrets.get("ssh_password"), + "password1", + "Server 1 ssh_password should be preserved correctly in vault", + ) + self.assertEqual( + server1_secrets.get("host_key"), + "host_key1", + "Server 1 host_key should be preserved correctly in vault", + ) + + # Verify Server 2 (host key only) + server2 = servers[1] + self.assertIn( + server2.id, all_secret_values, "Server 2 should be in secret values" + ) + server2_secrets = all_secret_values[server2.id] + + self.assertIsNone( + server2_secrets.get("ssh_password"), + "Server 2 should not have ssh_password in vault", + ) + self.assertEqual( + server2_secrets.get("host_key"), + "host_key2", + "Server 2 host_key should be preserved correctly in vault", + ) + + # Verify Server 3 (ssh password only) + server3 = servers[2] + self.assertIn( + server3.id, all_secret_values, "Server 3 should be in secret values" + ) + server3_secrets = all_secret_values[server3.id] + + self.assertEqual( + server3_secrets.get("ssh_password"), + "password3", + "Server 3 ssh_password should be preserved correctly in vault", + ) + self.assertIsNone( + server3_secrets.get("host_key"), + "Server 3 should not have host_key in vault", + ) + + # Test 3: Verify that non-secret fields are not affected + for server in servers: + self.assertIsNotNone( + server.name, + f"Server {server.id} name should not be affected by vault mixin", + ) + self.assertIsNotNone( + server.ssh_username, + f"Server {server.id} ssh_username should not be affected " + f"by vault mixin", + ) + self.assertIsNotNone( + server.ip_v4_address, + f"Server {server.id} ip_v4_address should not be affected " + f"by vault mixin", + ) + + # Test 4: Modify secret fields and verify changes are handled correctly + # Change the ssh password and remove the host key from Server 1 + server1 = servers.filtered(lambda s: s.name == "Server 1 - Both Fields") + server1.write( + { + "ssh_password": "updated_password1", + "host_key": False, + } + ) + + # Remove host key and add an ssh password in Server 2 + server2 = servers.filtered(lambda s: s.name == "Server 2 - Host Key Only") + server2.write( + { + "host_key": False, + "ssh_password": "new_password2", + } + ) + + # Remove ssh password from Server 3 + server3 = servers.filtered(lambda s: s.name == "Server 3 - SSH Password Only") + server3.write( + { + "ssh_password": False, + } + ) + + # Test 5: Get values for all 3 servers regular way after modifications + # Ensure that all values are replaced with placeholders + for server in servers: + self.assertEqual( + server.ssh_password, + placeholder, + f"Server {server.id} ssh_password should return placeholder " + f"after modifications", + ) + self.assertEqual( + server.host_key, + placeholder, + f"Server {server.id} host_key should return placeholder " + f"after modifications", + ) + + # Test 6: Get values for all 3 servers at once using _get_secret_values() + # Ensure that all values are preserved correctly after modifications + all_secret_values = servers._get_secret_values() + self.assertIsNotNone( + all_secret_values, + "all_secret_values should not be None after modifications", + ) + + # Verify Server 1 (updated password, no host key) + server1 = servers[0] + server1_secrets = all_secret_values[server1.id] + + self.assertEqual( + server1_secrets.get("ssh_password"), + "updated_password1", + "Server 1 ssh_password should be updated correctly in vault", + ) + self.assertIsNone( + server1_secrets.get("host_key"), + "Server 1 host_key should be removed from vault", + ) + + # Verify Server 2 (new password, no host key) + server2_secrets = all_secret_values[server2.id] + + self.assertEqual( + server2_secrets.get("ssh_password"), + "new_password2", + "Server 2 ssh_password should be added correctly in vault", + ) + self.assertIsNone( + server2_secrets.get("host_key"), + "Server 2 host_key should be removed from vault", + ) + + # Verify Server 3 (no ssh password, no host key) + # Server 3 should not be in the result since it has no secret values + self.assertNotIn( + server3.id, + all_secret_values, + "Server 3 should not be in secret values since it has no secret fields", + )