# Copyright (C) 2024 Cetmix OÜ # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). from unittest.mock import patch from odoo import fields from odoo.exceptions import AccessError, ValidationError from odoo.tools import mute_logger from .common_jets import TestTowerJetsCommon class TestTowerJet(TestTowerJetsCommon): """ Test the Jet model functionality """ # All jet-related test data is now inherited from TestTowerJetsCommon # ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ # _on_is_available Tests # ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ def test_on_is_available_explicit_request_marked_processing_before_dispatch(self): """ Regression: explicit request must be attached to the jet and set to processing before transition dispatch starts. We patch _bring_to_state (the actual dispatch) rather than _serve_jet_request so that _serve_jet_request runs for real and its side-effects (served_jet_request_id, request.state) are observable. A side_effect captures both values at the exact moment dispatch is triggered, proving ordering rather than just eventual state. """ self.jet_test.write( {"state_id": self.state_initial.id, "target_state_id": False} ) # Isolate the scenario: keep only the request created in this test. preexisting_new_requests = self.env["cx.tower.jet.request"].search( [("jet_id", "=", self.jet_test.id), ("state", "=", "new")] ) if preexisting_new_requests: preexisting_new_requests.unlink() request = self.env["cx.tower.jet.request"].create( { "server_id": self.server_test_1.id, "jet_id": self.jet_test.id, "jet_template_id": self.jet_test.jet_template_id.id, "state_requested_id": self.state_running.id, "state": "new", } ) # Capture the observable state of jet + request at dispatch time. observed = {} def capture(jet_self, target_state): jet_self.invalidate_recordset(["served_jet_request_id"]) request.invalidate_recordset(["state"]) observed["served_request_id"] = jet_self.served_jet_request_id.id observed["request_state"] = request.state with patch( "odoo.addons.cetmix_tower_server.models.cx_tower_jet.CxTowerJet._bring_to_state", autospec=True, side_effect=capture, ): self.jet_test._on_is_available() self.assertTrue( observed, "_bring_to_state must have been called; check that the request " "targets a different state than the jet's current state", ) self.assertEqual( observed["served_request_id"], request.id, "Request must be saved to served_jet_request_id before dispatch", ) self.assertEqual( observed["request_state"], "processing", "Request must be set to 'processing' before _bring_to_state is called", ) # ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ # _compute_available_actions Tests # ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ def test_compute_available_actions_no_state(self): """ Test _compute_available_actions when jet has no current state """ # Jet has template but no state self.jet_test.state_id = False # action_available_ids should include only the create action self.assertEqual( len(self.jet_test.action_available_ids), 1, "Available actions should include create action when jet has no state", ) self.assertEqual( {action.id for action in self.jet_test.action_available_ids}, {self.action_create.id}, "Available action should be the create action", ) def test_compute_available_actions_with_state_running(self): """ Test _compute_available_actions when jet has state running. Create action is not available (no state_from_id); destroy and transition actions are available. """ self.jet_test.state_id = self.state_running expected_actions = ( self.action_running_to_stopped | self.action_running_to_error | self.action_destroy ) actual_ids = {action.id for action in self.jet_test.action_available_ids} self.assertEqual( len(actual_ids), 3, "Should have 3 available actions from running state", ) self.assertNotIn( self.action_create.id, actual_ids, "Create action should not be available when jet has state", ) self.assertIn( self.action_destroy.id, actual_ids, "Destroy action should be available", ) self.assertEqual( actual_ids, {action.id for action in expected_actions}, "Should have exact set: running_to_stopped, running_to_error, destroy", ) def test_compute_available_actions_complex_scenario(self): """ Test _compute_available_actions with complex scenario """ # Use common actions from setup # Test different states test_cases = [ (self.state_initial, [self.action_initial_to_running]), ( self.state_running, [ self.action_running_to_stopped, self.action_running_to_error, self.action_destroy, ], ), (self.state_stopped, [self.action_stopped_to_running]), (self.state_error, [self.action_error_to_running]), ] for state, expected_actions in test_cases: self.jet_test.state_id = state actual_actions = self.jet_test.action_available_ids expected_actions_set = {action.id for action in expected_actions} actual_actions_set = {action.id for action in actual_actions} self.assertEqual( actual_actions_set, expected_actions_set, f"State {state.name} should have correct available actions", ) def test_compute_available_actions_dependencies(self): """ Test that _compute_available_actions has correct dependencies """ # Use existing action from common setup action = self.action_running_to_stopped # Set initial state self.jet_test.state_id = self.state_running # Should have all actions from running state expected_actions = ( self.action_running_to_stopped | self.action_running_to_error | self.action_destroy ) self.assertEqual( {action.id for action in self.jet_test.action_available_ids}, {action.id for action in expected_actions}, "Should have all actions from running state initially", ) # Change action's state_from_id (this should trigger recomputation) action.state_from_id = self.state_stopped # Jet should no longer have this specific action available # but should still have other actions from running state expected_remaining_actions = self.action_running_to_error | self.action_destroy self.assertEqual( {action.id for action in self.jet_test.action_available_ids}, {action.id for action in expected_remaining_actions}, "Should have remaining actions after changing one action's state_from_id", ) # Change jet state to match action's new state_from_id self.jet_test.state_id = self.state_stopped # Now the modified action should be available again, # plus any other actions from stopped state expected_actions = action | self.action_stopped_to_running self.assertEqual( {action.id for action in self.jet_test.action_available_ids}, {action.id for action in expected_actions}, "Should have the modified action plus other actions from stopped state", ) def test_compute_available_actions_cross_template_isolation(self): """ Test that jets only see actions from their own template """ # Create action for Odoo template odoo_action = self.JetAction.create( { "name": "Odoo Action", "reference": "odoo_action", "jet_template_id": self.jet_template_odoo.id, "state_from_id": self.state_running.id, "state_to_id": self.state_stopped.id, "state_transit_id": self.state_stopping.id, "priority": 10, } ) # Create action for WordPress template wp_action = self.JetAction.create( { "name": "WordPress Action", "reference": "wordpress_action", "jet_template_id": self.jet_template_wordpress.id, "state_from_id": self.state_running.id, "state_to_id": self.state_stopped.id, "state_transit_id": self.state_stopping.id, "priority": 10, } ) # Set both jets to running state self.jet_odoo.state_id = self.state_running self.jet_wordpress.state_id = self.state_running # Each jet should only see its own template's actions self.assertEqual( {action.id for action in self.jet_odoo.action_available_ids}, {odoo_action.id}, "Odoo jet should only see Odoo actions", ) self.assertEqual( {action.id for action in self.jet_wordpress.action_available_ids}, {wp_action.id}, "WordPress jet should only see WordPress actions", ) # Odoo jet should not see WordPress actions self.assertNotIn( wp_action.id, {action.id for action in self.jet_odoo.action_available_ids}, "Odoo jet should not see WordPress actions", ) # WordPress jet should not see Odoo actions self.assertNotIn( odoo_action.id, {action.id for action in self.jet_wordpress.action_available_ids}, "WordPress jet should not see Odoo actions", ) # ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ # Complex Template Hierarchy Tests # ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ def test_jet_template_domain_computation(self): """ Test _compute_jet_template_domain method """ # Test with server set jet_with_server = self.Jet.create( { "name": "Jet With Server", "reference": "jet_with_server", "jet_template_id": self.jet_template_test.id, "server_id": self.server_test_1.id, } ) domain = jet_with_server.jet_template_domain expected_domain = [("server_ids", "in", [self.server_test_1.id])] self.assertEqual(domain, expected_domain, "Domain should include server filter") # Test domain computation with a different server server_test_2 = self.Server.create( { "name": "Test Server 2", "ip_v4_address": "192.168.1.2", "ssh_username": "admin", "ssh_password": "password", "ssh_auth_mode": "p", "host_key": "test_key_2", "os_id": self.os_debian_10.id, } ) jet_with_different_server = self.Jet.create( { "name": "Jet With Different Server", "reference": "jet_with_different_server", "jet_template_id": self.jet_template_test.id, "server_id": server_test_2.id, } ) domain = jet_with_different_server.jet_template_domain expected_domain = [("server_ids", "in", [server_test_2.id])] self.assertEqual( domain, expected_domain, "Domain should include server filter for different server", ) # Test the domain computation method directly to verify the else branch # Create a temporary jet object to test the method without saving temp_jet = self.Jet.new( { "name": "Temp Jet", "jet_template_id": self.jet_template_test.id, "server_id": False, } ) temp_jet._compute_jet_template_domain() self.assertEqual( temp_jet.jet_template_domain, [], "Domain should be empty when server_id is False", ) def test_jet_requires_ids_computation(self): """ Test _compute_jet_requires_ids method with complex dependencies """ # Test Odoo jet dependencies odoo_deps = self.jet_odoo.jet_requires_ids self.assertEqual( len(odoo_deps), 2, "Odoo jet should have 2 direct dependencies" ) # Check that dependencies are for postgres and nginx dep_template_ids = odoo_deps.mapped( "jet_template_dependency_id.template_required_id.id" ) expected_ids = {self.jet_template_postgres.id, self.jet_template_nginx.id} self.assertEqual( set(dep_template_ids), expected_ids, "Should depend on postgres and nginx" ) # Test WooCommerce jet dependencies # (should include both Odoo and WordPress deps) woocommerce_deps = self.jet_woocommerce.jet_requires_ids self.assertEqual( len(woocommerce_deps), 2, "WooCommerce jet should have 2 direct dependencies", ) # Check that dependencies are for wordpress and odoo dep_template_ids = woocommerce_deps.mapped( "jet_template_dependency_id.template_required_id.id" ) expected_ids = {self.jet_template_wordpress.id, self.jet_template_odoo.id} self.assertEqual( set(dep_template_ids), expected_ids, "Should depend on wordpress and odoo" ) def test_jet_limit_per_server_same_server_rejected(self): """Constraint rejects creating more jets than template limit per server.""" template = self.JetTemplate.create( { "name": "Template With Limit", "reference": "template_with_limit", "limit_per_server": 1, } ) self.Jet.create( { "name": "Limited Jet 1", "reference": "limited_jet_1", "jet_template_id": template.id, "server_id": self.server_test_1.id, } ) with self.assertRaisesRegex(ValidationError, "Jet limit per server reached"): self.Jet.create( { "name": "Limited Jet 2", "reference": "limited_jet_2", "jet_template_id": template.id, "server_id": self.server_test_1.id, } ) def test_jet_limit_per_server_different_servers_allowed(self): """ Constraint allows same template on different servers but within per-server limit. """ template = self.JetTemplate.create( { "name": "Template With Per-Server Limit", "reference": "template_with_per_server_limit", "limit_per_server": 1, } ) server_test_2 = self.Server.create( { "name": "Jet Limit Test Server 2", "ip_v4_address": "192.168.1.22", "ssh_username": "admin", "ssh_password": "password", "ssh_auth_mode": "p", "host_key": "jet_limit_test_server_2_key", "os_id": self.os_debian_10.id, } ) jet_on_server_1 = self.Jet.create( { "name": "Limited Jet Server 1", "reference": "limited_jet_server_1", "jet_template_id": template.id, "server_id": self.server_test_1.id, } ) jet_on_server_2 = self.Jet.create( { "name": "Limited Jet Server 2", "reference": "limited_jet_server_2", "jet_template_id": template.id, "server_id": server_test_2.id, } ) self.assertTrue( jet_on_server_1.exists(), "Jet on first server should be created" ) self.assertTrue( jet_on_server_2.exists(), "Jet on second server should be created" ) def test_jet_requires_ids_template_change(self): """ Test _compute_jet_requires_ids for different templates """ # Create jets for different templates jet_tower_core = self.Jet.create( { "name": "Tower Core Jet", "reference": "tower_core_jet", "jet_template_id": self.jet_template_tower_core.id, "server_id": self.server_test_1.id, } ) self.assertEqual( len(jet_tower_core.jet_requires_ids), 0, "Tower core should have no dependencies", ) jet_odoo = self.Jet.create( { "name": "Odoo Jet Test", "reference": "odoo_jet_test", "jet_template_id": self.jet_template_odoo.id, "server_id": self.server_test_1.id, } ) self.assertEqual( len(jet_odoo.jet_requires_ids), 2, "Odoo should have 2 dependencies" ) jet_woocommerce = self.Jet.create( { "name": "WooCommerce Jet Test", "reference": "woocommerce_jet_test", "jet_template_id": self.jet_template_woocommerce_odoo.id, "server_id": self.server_test_1.id, } ) self.assertEqual( len(jet_woocommerce.jet_requires_ids), 2, "WooCommerce should have 2 dependencies", ) def test_jet_requires_ids_dependency_removal(self): """ Test _compute_jet_requires_ids when template dependencies are removed """ # Create jet with Odoo template jet_odoo = self.Jet.create( { "name": "Odoo Jet Test", "reference": "odoo_jet_test", "jet_template_id": self.jet_template_odoo.id, "server_id": self.server_test_1.id, } ) initial_deps = len(jet_odoo.jet_requires_ids) self.assertEqual(initial_deps, 2, "Should have 2 dependencies initially") # Remove one dependency from template postgres_dep = self.JetTemplateDependency.search( [ ("template_id", "=", self.jet_template_odoo.id), ("template_required_id", "=", self.jet_template_postgres.id), ] ) postgres_dep.unlink() # Jet dependencies should be updated self.assertEqual( len(jet_odoo.jet_requires_ids), 1, "Should have 1 dependency after removal" ) remaining_dep = jet_odoo.jet_requires_ids[0] self.assertEqual( remaining_dep.jet_template_dependency_id.template_required_id, self.jet_template_nginx, "Remaining dependency should be nginx", ) def test_jet_requires_ids_dependency_addition(self): """ Test _compute_jet_requires_ids when template dependencies are added """ # Create jet with tower core (no dependencies) jet_tower_core = self.Jet.create( { "name": "Tower Core Jet", "reference": "tower_core_jet", "jet_template_id": self.jet_template_tower_core.id, "server_id": self.server_test_1.id, } ) self.assertEqual( len(jet_tower_core.jet_requires_ids), 0, "Should have no dependencies initially", ) # Add dependency to tower core # (use a template that won't create circular dependency) new_dep = self.JetTemplateDependency.create( { "template_id": self.jet_template_tower_core.id, "template_required_id": self.jet_template_test.id, "state_required_id": self.state_running.id, } ) # Jet dependencies should be updated self.assertEqual( len(jet_tower_core.jet_requires_ids), 1, "Should have 1 dependency after addition", ) added_dep = jet_tower_core.jet_requires_ids[0] self.assertEqual( added_dep.jet_template_dependency_id, new_dep, "Added dependency should match the new dependency", ) def test_jet_requires_ids_multiple_jets_same_template(self): """ Test _compute_jet_requires_ids with multiple jets using same template """ # Create another Odoo jet jet_odoo_2 = self.Jet.create( { "name": "Odoo Jet 2", "reference": "odoo_jet_2", "jet_template_id": self.jet_template_odoo.id, "server_id": self.server_test_1.id, } ) # Both jets should have same dependencies deps_1 = self.jet_odoo.jet_requires_ids deps_2 = jet_odoo_2.jet_requires_ids self.assertEqual( len(deps_1), len(deps_2), "Both jets should have same number of dependencies", ) # Check that dependencies are the same deps_1_template_ids = deps_1.mapped( "jet_template_dependency_id.template_required_id.id" ) deps_2_template_ids = deps_2.mapped( "jet_template_dependency_id.template_required_id.id" ) self.assertEqual( set(deps_1_template_ids), set(deps_2_template_ids), "Both jets should have same dependency templates", ) def test_jet_requires_ids_consistency_with_template(self): """ Test that jet dependencies are consistent with template dependencies """ # Test with different templates templates_to_test = [ (self.jet_template_tower_core, 0), (self.jet_template_docker, 1), (self.jet_template_nginx, 1), (self.jet_template_postgres, 1), (self.jet_template_mariadb, 1), (self.jet_template_odoo, 2), (self.jet_template_wordpress, 2), (self.jet_template_woocommerce_odoo, 2), ] for template, expected_dep_count in templates_to_test: # Create a jet with this template test_jet = self.Jet.create( { "name": f"Test Jet for {template.name}", "reference": f"test_jet_{template.reference}", "jet_template_id": template.id, "server_id": self.server_test_1.id, } ) # Check dependency count actual_dep_count = len(test_jet.jet_requires_ids) self.assertEqual( actual_dep_count, expected_dep_count, f"{template.name} should have {expected_dep_count} " f"dependencies, got {actual_dep_count}", ) # Verify that all jet dependencies correspond to template dependencies template_deps = template.template_requires_ids jet_deps = test_jet.jet_requires_ids if template_deps: self.assertEqual( len(jet_deps), len(template_deps), "Jet dependencies count should match" f" template dependencies for {template.name}", ) # Check that each jet dependency corresponds to a template dependency jet_dep_template_ids = jet_deps.mapped("jet_template_dependency_id.id") template_dep_ids = template_deps.ids self.assertEqual( set(jet_dep_template_ids), set(template_dep_ids), "Jet dependencies should match template" f" dependencies for {template.name}", ) # ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ # bring_to_state Tests # ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ def test_bring_to_state_success_user_level(self): """ Test bring_to_state succeeds when user has sufficient access level. User (level 1) can access state with level 1. """ # Use existing state and set it to User access level (1) self.state_running.access_level = "1" self.state_running.invalidate_recordset(["access_level"]) # Ensure user has access to the jet self.jet_test.write({"user_ids": [(4, self.user.id)]}) self.server_test_1.write({"user_ids": [(4, self.user.id)]}) # Set jet to initial state self.jet_test.write({"state_id": self.state_initial.id}) self.jet_test.invalidate_recordset(["state_id"]) # User should be able to bring jet to user-level state self.jet_test.with_user(self.user).with_context( cetmix_tower_no_commit=True ).bring_to_state("test_running") self.assertEqual( self.jet_test.state_id, self.state_running, "Jet should be brought to user-level state by user", ) def test_bring_to_state_success_manager_level(self): """ Test bring_to_state succeeds when manager has sufficient access level. Manager (level 2) can access state with level 2. """ # Use existing state and set it to Manager access level (2) self.state_stopped.access_level = "2" self.state_stopped.invalidate_recordset(["access_level"]) # Ensure manager has access to the jet self.jet_test.write({"manager_ids": [(4, self.manager.id)]}) self.server_test_1.write({"manager_ids": [(4, self.manager.id)]}) # Set jet to running state (which has action to stopped) self.jet_test.write({"state_id": self.state_running.id}) self.jet_test.invalidate_recordset(["state_id"]) # Manager should be able to bring jet to manager-level state self.jet_test.with_user(self.manager).with_context( cetmix_tower_no_commit=True ).bring_to_state("test_stopped") self.assertEqual( self.jet_test.state_id, self.state_stopped, "Jet should be brought to manager-level state by manager", ) def test_bring_to_state_success_root_level(self): """ Test bring_to_state succeeds when root has sufficient access level. Root (level 3) can access state with level 3. """ # Use existing state and set it to Root access level (3) self.state_error.access_level = "3" self.state_error.invalidate_recordset(["access_level"]) # Root has full access, but ensure access for consistency self.jet_test.write({"manager_ids": [(4, self.root.id)]}) self.server_test_1.write({"manager_ids": [(4, self.root.id)]}) # Set jet to running state (which has action to error) self.jet_test.write({"state_id": self.state_running.id}) self.jet_test.invalidate_recordset(["state_id"]) # Root should be able to bring jet to root-level state self.jet_test.with_user(self.root).with_context( cetmix_tower_no_commit=True ).bring_to_state("test_error") self.assertEqual( self.jet_test.state_id, self.state_error, "Jet should be brought to root-level state by root", ) def test_bring_to_state_access_error_user_to_manager(self): """ Test bring_to_state raises AccessError when user (level 1) tries to access manager-level state (level 2). """ # Use existing state and set it to Manager access level (2) self.state_stopped.access_level = "2" self.state_stopped.invalidate_recordset(["access_level"]) # Ensure user has access to the jet (for the access check to work) self.jet_test.write({"user_ids": [(4, self.user.id)]}) self.server_test_1.write({"user_ids": [(4, self.user.id)]}) # Set jet to running state (which has action to stopped) self.jet_test.write({"state_id": self.state_running.id}) self.jet_test.invalidate_recordset(["state_id"]) # User should not be able to bring jet to manager-level state with self.assertRaises(AccessError) as context: self.jet_test.with_user(self.user).with_context( cetmix_tower_no_commit=True ).bring_to_state("test_stopped") self.assertIn( "You are not allowed to set the", str(context.exception), "Should raise AccessError with appropriate message", ) self.assertIn( self.state_stopped.name, str(context.exception), "Error message should include state name", ) def test_bring_to_state_access_error_user_to_root(self): """ Test bring_to_state raises AccessError when user (level 1) tries to access root-level state (level 3). """ # Use existing state and set it to Root access level (3) self.state_error.access_level = "3" self.state_error.invalidate_recordset(["access_level"]) # Ensure user has access to the jet (for the access check to work) self.jet_test.write({"user_ids": [(4, self.user.id)]}) self.server_test_1.write({"user_ids": [(4, self.user.id)]}) # Set jet to running state (which has action to error) self.jet_test.write({"state_id": self.state_running.id}) self.jet_test.invalidate_recordset(["state_id"]) # User should not be able to bring jet to root-level state with self.assertRaises(AccessError) as context: self.jet_test.with_user(self.user).with_context( cetmix_tower_no_commit=True ).bring_to_state("test_error") self.assertIn( "You are not allowed to set the", str(context.exception), "Should raise AccessError with appropriate message", ) self.assertIn( self.state_error.name, str(context.exception), "Error message should include state name", ) def test_bring_to_state_access_error_manager_to_root(self): """ Test bring_to_state raises AccessError when manager (level 2) tries to access root-level state (level 3). """ # Use existing state and set it to Root access level (3) self.state_error.access_level = "3" self.state_error.invalidate_recordset(["access_level"]) # Ensure manager has access to the jet (for the access check to work) self.jet_test.write({"manager_ids": [(4, self.manager.id)]}) self.server_test_1.write({"manager_ids": [(4, self.manager.id)]}) # Set jet to running state (which has action to error) self.jet_test.write({"state_id": self.state_running.id}) self.jet_test.invalidate_recordset(["state_id"]) # Manager should not be able to bring jet to root-level state with self.assertRaises(AccessError) as context: self.jet_test.with_user(self.manager).with_context( cetmix_tower_no_commit=True ).bring_to_state("test_error") self.assertIn( "You are not allowed to set the", str(context.exception), "Should raise AccessError with appropriate message", ) self.assertIn( self.state_error.name, str(context.exception), "Error message should include state name", ) def test_bring_to_state_manager_can_access_user_level(self): """ Test bring_to_state succeeds when manager (level 2) who IS in manager_ids accesses user-level state (level 1). Higher access levels can access lower level states. """ # Use existing state and set it to User access level (1) self.state_running.access_level = "1" self.state_running.invalidate_recordset(["access_level"]) # Ensure manager has access to the jet # Manager IS in manager_ids, so they keep their manager access level (2) self.jet_test.write({"manager_ids": [(4, self.manager.id)]}) self.server_test_1.write({"manager_ids": [(4, self.manager.id)]}) # Set jet to initial state self.jet_test.write({"state_id": self.state_initial.id}) self.jet_test.invalidate_recordset(["state_id"]) # Manager should be able to bring jet to user-level state self.jet_test.with_user(self.manager).with_context( cetmix_tower_no_commit=True ).bring_to_state("test_running") self.assertEqual( self.jet_test.state_id, self.state_running, "Manager should be able to access user-level state", ) def test_bring_to_state_manager_not_in_manager_ids_treated_as_user(self): """ Test bring_to_state treats manager (level 2) who is NOT in manager_ids as user (level 1). Manager should be able to set user-level state but not manager-level state. """ # Use existing state and set it to User access level (1) self.state_running.access_level = "1" self.state_running.invalidate_recordset(["access_level"]) # Ensure manager has access to the jet via user_ids but NOT via manager_ids self.jet_test.write({"user_ids": [(4, self.manager.id)]}) self.server_test_1.write({"user_ids": [(4, self.manager.id)]}) # Explicitly ensure manager is NOT in manager_ids self.jet_test.write({"manager_ids": [(5, 0, 0)]}) # Set jet to initial state self.jet_test.write({"state_id": self.state_initial.id}) self.jet_test.invalidate_recordset(["state_id"]) # Manager (treated as user) should be able to bring jet to user-level state self.jet_test.with_user(self.manager).with_context( cetmix_tower_no_commit=True ).bring_to_state("test_running") self.assertEqual( self.jet_test.state_id, self.state_running, "Manager not in manager_ids should be able to access user-level state", ) def test_bring_to_state_manager_not_in_manager_ids_cannot_access_manager_level( self ): """ Test bring_to_state raises AccessError when manager (level 2) who is NOT in manager_ids tries to access manager-level state (level 2). Manager should be treated as user (level 1) and cannot access level 2. """ # Use existing state and set it to Manager access level (2) self.state_stopped.access_level = "2" self.state_stopped.invalidate_recordset(["access_level"]) # Ensure manager has access to the jet via user_ids but NOT via manager_ids self.jet_test.write({"user_ids": [(4, self.manager.id)]}) self.server_test_1.write({"user_ids": [(4, self.manager.id)]}) # Explicitly ensure manager is NOT in manager_ids self.jet_test.write({"manager_ids": [(5, 0, 0)]}) # Set jet to running state (which has action to stopped) self.jet_test.write({"state_id": self.state_running.id}) self.jet_test.invalidate_recordset(["state_id"]) # Manager (treated as user) should not be able to bring jet # to manager-level state with self.assertRaises(AccessError) as context: self.jet_test.with_user(self.manager).with_context( cetmix_tower_no_commit=True ).bring_to_state("test_stopped") self.assertIn( "You are not allowed to set the", str(context.exception), "Should raise AccessError with appropriate message", ) self.assertIn( self.state_stopped.name, str(context.exception), "Error message should include state name", ) def test_bring_to_state_root_can_access_manager_level(self): """ Test bring_to_state succeeds when root (level 3) accesses manager-level state (level 2). Higher access levels can access lower level states. """ # Use existing state and set it to Manager access level (2) self.state_stopped.access_level = "2" self.state_stopped.invalidate_recordset(["access_level"]) # Root has full access, but ensure access for consistency self.jet_test.write({"manager_ids": [(4, self.root.id)]}) self.server_test_1.write({"manager_ids": [(4, self.root.id)]}) # Set jet to running state (which has action to stopped) self.jet_test.write({"state_id": self.state_running.id}) self.jet_test.invalidate_recordset(["state_id"]) # Root should be able to bring jet to manager-level state self.jet_test.with_user(self.root).with_context( cetmix_tower_no_commit=True ).bring_to_state("test_stopped") self.assertEqual( self.jet_test.state_id, self.state_stopped, "Root should be able to access manager-level state", ) def test_bring_to_state_invalid_reference(self): """ Test bring_to_state raises ValidationError when state reference is invalid. """ # Set jet to initial state self.jet_test.state_id = self.state_initial # Should raise ValidationError for invalid state reference with self.assertRaises(ValidationError) as context: self.jet_test.with_context(cetmix_tower_no_commit=True).bring_to_state( "invalid_state_reference" ) self.assertIn( "State 'invalid_state_reference' not found", str(context.exception), "Should raise ValidationError with appropriate message", ) self.assertIn( self.jet_test.display_name, str(context.exception), "Error message should include jet display name", ) # ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ # _get_user_effective_access_level Tests # ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ def test_get_user_effective_access_level_user(self): """ Test _get_user_effective_access_level returns "1" for user. """ # Ensure user has access to the jet self.jet_test.write({"user_ids": [(4, self.user.id)]}) # User should have effective access level "1" effective_level = self.jet_test.with_user( self.user )._get_user_effective_access_level() self.assertEqual( effective_level, "1", "User should have effective access level 1", ) def test_get_user_effective_access_level_manager_in_manager_ids(self): """ Test _get_user_effective_access_level returns "2" for manager who IS in manager_ids. """ # Ensure manager has access to the jet and IS in manager_ids self.jet_test.write({"manager_ids": [(4, self.manager.id)]}) # Manager in manager_ids should have effective access level "2" effective_level = self.jet_test.with_user( self.manager )._get_user_effective_access_level() self.assertEqual( effective_level, "2", "Manager in manager_ids should have effective access level 2", ) def test_get_user_effective_access_level_manager_not_in_manager_ids(self): """ Test _get_user_effective_access_level returns "1" for manager who is NOT in manager_ids (downgraded to user level). """ # Ensure manager has access to the jet via user_ids but NOT via manager_ids self.jet_test.write({"user_ids": [(4, self.manager.id)]}) # Explicitly ensure manager is NOT in manager_ids self.jet_test.write({"manager_ids": [(5, 0, 0)]}) # Manager not in manager_ids should have effective access level "1" effective_level = self.jet_test.with_user( self.manager )._get_user_effective_access_level() self.assertEqual( effective_level, "1", "Manager not in manager_ids should have effective access level 1", ) def test_get_user_effective_access_level_root(self): """ Test _get_user_effective_access_level returns "3" for root. """ # Root should have effective access level "3" regardless of manager_ids effective_level = self.jet_test.with_user( self.root )._get_user_effective_access_level() self.assertEqual( effective_level, "3", "Root should have effective access level 3", ) # ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ # unlink Tests # ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ def test_unlink_deletable_jet_with_files(self): """ Test unlink succeeds when jet is deletable and has files. Files should be unlinked after the jet is deleted. """ # Create a deletable jet (deletable defaults to True) jet = self._create_jet( "Deletable Jet", "deletable_jet", ) # Create files linked to the jet file1 = self.File.create( { "name": "test_file_1.txt", "source": "tower", "server_id": self.server_test_1.id, "server_dir": "/tmp", "jet_id": jet.id, "file_type": "text", } ) file2 = self.File.create( { "name": "test_file_2.txt", "source": "tower", "server_id": self.server_test_1.id, "server_dir": "/tmp", "jet_id": jet.id, "file_type": "text", } ) # Verify files exist self.assertEqual(len(jet.file_ids), 2, "Jet should have 2 files") self.assertIn(file1, jet.file_ids, "File 1 should be linked to jet") self.assertIn(file2, jet.file_ids, "File 2 should be linked to jet") # Store file IDs before deletion file_ids = {file1.id, file2.id} # Unlink the jet jet.unlink() # Verify jet is deleted self.assertFalse(jet.exists(), "Jet should be deleted") # Verify files are also deleted remaining_files = self.File.browse(list(file_ids)) self.assertFalse( remaining_files.exists(), "Files should be unlinked after jet deletion", ) def test_unlink_deletable_jet_without_files(self): """ Test unlink succeeds when jet is deletable but has no files. """ # Create a deletable jet without files (deletable defaults to True) jet = self._create_jet( "Deletable Jet No Files", "deletable_jet_no_files", ) # Verify jet has no files self.assertEqual(len(jet.file_ids), 0, "Jet should have no files") # Unlink the jet jet.unlink() # Verify jet is deleted self.assertFalse(jet.exists(), "Jet should be deleted") def test_unlink_not_deletable_jet_raises_error(self): """ Test unlink raises ValidationError when jet is not deletable. """ # Create a non-deletable jet jet = self._create_jet( "Not Deletable Jet", "not_deletable_jet", ) jet.write({"deletable": False}) # Attempt to unlink should raise ValidationError with self.assertRaises(ValidationError) as context: jet.unlink() # Verify error message contains jet display name self.assertIn( "cannot be deleted", str(context.exception), "Error message should mention deletion restriction", ) self.assertIn( jet.display_name, str(context.exception), "Error message should include jet display name", ) # Verify jet still exists self.assertTrue(jet.exists(), "Jet should not be deleted") def test_unlink_multiple_jets_mixed_deletable(self): """ Test unlink with multiple jets where some are deletable and some are not. Should raise ValidationError listing non-deletable jets. """ # Create deletable jet (deletable defaults to True) deletable_jet = self._create_jet( "Deletable Jet", "deletable_jet_multi", ) # Create non-deletable jet not_deletable_jet = self._create_jet( "Not Deletable Jet", "not_deletable_jet_multi", ) not_deletable_jet.write({"deletable": False}) # Attempt to unlink both should raise ValidationError jets = deletable_jet | not_deletable_jet with self.assertRaises(ValidationError) as context: jets.unlink() # Verify error message contains non-deletable jet display name self.assertIn( "cannot be deleted", str(context.exception), "Error message should mention deletion restriction", ) self.assertIn( not_deletable_jet.display_name, str(context.exception), "Error message should include non-deletable jet display name", ) # Verify both jets still exist self.assertTrue(deletable_jet.exists(), "Deletable jet should not be deleted") self.assertTrue( not_deletable_jet.exists(), "Non-deletable jet should not be deleted" ) # ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ # create_waypoint Tests # ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ def test_create_waypoint_with_record_template(self): """ Test create_waypoint with waypoint template record """ # Get the default name from the helper function default_vals = self.jet_test._prepare_waypoint_values( self.waypoint_template, name=None ) expected_default_name = default_vals["name"] # Create waypoint using template record waypoint = self.jet_test.create_waypoint(self.waypoint_template) # Should return a waypoint record self.assertTrue(waypoint, "Should return a waypoint record") self.assertTrue(waypoint.exists(), "Waypoint should exist") self.assertEqual( waypoint.jet_id.id, self.jet_test.id, "Waypoint should belong to the jet", ) self.assertEqual( waypoint.waypoint_template_id.id, self.waypoint_template.id, "Waypoint should use the correct template", ) self.assertEqual( waypoint.name, expected_default_name, "Waypoint should have default name from helper function", ) # Reference is auto-generated, so just verify it exists and is not empty self.assertTrue( waypoint.reference, "Waypoint should have an auto-generated reference", ) def test_create_waypoint_with_string_reference(self): """ Test create_waypoint with waypoint template string reference """ # Use the template's reference (mandatory field, always present) template_reference = self.waypoint_template.reference # Create waypoint using string reference waypoint = self.jet_test.create_waypoint(template_reference) # Should return a waypoint record self.assertTrue(waypoint, "Should return a waypoint record") self.assertTrue(waypoint.exists(), "Waypoint should exist") self.assertEqual( waypoint.waypoint_template_id.id, self.waypoint_template.id, "Waypoint should use the correct template from reference", ) # Reference is auto-generated, so just verify it exists and is not empty self.assertTrue( waypoint.reference, "Waypoint should have an auto-generated reference", ) def test_create_waypoint_with_name(self): """ Test create_waypoint with custom name """ # Create waypoint with custom name waypoint = self.jet_test.create_waypoint( self.waypoint_template, name="Custom Waypoint Name" ) # Should return a waypoint record with custom name self.assertTrue(waypoint, "Should return a waypoint record") self.assertEqual( waypoint.name, "Custom Waypoint Name", "Waypoint should have the custom name", ) # Reference is auto-generated, so just verify it exists and is not empty self.assertTrue( waypoint.reference, "Waypoint should have an auto-generated reference", ) def test_create_waypoint_with_fly_here(self): """ Test create_waypoint with fly_here parameter Note: fly_here should set is_destination=True, and after prepare() the waypoint should automatically fly to if is_destination is True """ # Create waypoint with fly_here=True waypoint = self.jet_test.create_waypoint(self.waypoint_template, fly_here=True) # Should return a waypoint record self.assertTrue(waypoint, "Should return a waypoint record") self.assertTrue(waypoint.exists(), "Waypoint should exist") # Verify that the waypoint flew to # (state should be "current" in synchronous tests) self.assertEqual( waypoint.state, "current", "Waypoint should have flown to and " "become current (tests run synchronously)", ) # Verify jet's waypoint_id was updated self.assertEqual( self.jet_test.waypoint_id.id, waypoint.id, "Jet's waypoint_id should be updated to the flown-to waypoint", ) @mute_logger("odoo.addons.cetmix_tower_server.models.cx_tower_jet") def test_create_waypoint_jet_busy(self): """ Test create_waypoint when jet is busy (has target_state_id) """ # Set jet to busy state (has target_state_id) self.jet_test.target_state_id = self.state_running # Try to create waypoint with self.assertRaises(ValidationError): self.jet_test.create_waypoint(self.waypoint_template) @mute_logger("odoo.addons.cetmix_tower_server.models.cx_tower_jet") def test_create_waypoint_template_not_found(self): """ Test create_waypoint with non-existent template reference """ # Mute logger error for this test with self.assertRaises(ValidationError): self.jet_test.create_waypoint("non_existent_reference") @mute_logger("odoo.addons.cetmix_tower_server.models.cx_tower_jet") def test_create_waypoint_template_wrong_jet_template(self): """ Test create_waypoint with template from different jet template """ # Create a waypoint template for a different jet template other_jet_template = self.JetTemplate.create( { "name": "Other Jet Template", "reference": "other_jet_template", } ) other_waypoint_template = self.JetWaypointTemplate.create( { "name": "Other Waypoint Template", "jet_template_id": other_jet_template.id, } ) # Mute logger error for this test with self.assertRaises(ValidationError): # Try to create waypoint with template from different jet template self.jet_test.create_waypoint(other_waypoint_template) # ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ # Create a Waypoint command (flight plan) tests # ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ def test_create_waypoint_command_success_fly_here_false(self): """Create a Waypoint command from flight plan: waypoint created, log finished by callback.""" command = self.Command.create( { "name": "Create waypoint command", "action": "create_waypoint", "waypoint_template_id": self.waypoint_template.id, "fly_here": False, } ) plan = self.Plan.create({"name": "Plan create waypoint"}) self.plan_line.create( { "plan_id": plan.id, "sequence": 10, "command_id": command.id, } ) initial_waypoint_count = len(self.jet_test.waypoint_ids) plan_log = self.server_test_1.sudo().run_flight_plan(plan, jet=self.jet_test) self.assertTrue(plan_log, "Plan log should be created") command_logs = plan_log.command_log_ids.filtered( lambda log: log.command_id == command ) self.assertEqual( len(command_logs), 1, "Exactly one command log for create_waypoint" ) log_record = command_logs[0] self.assertTrue( log_record.finish_date, "Command log should be finished by waypoint callback", ) self.assertEqual( log_record.command_status, 0, "Command should finish with success", ) self.assertEqual( len(self.jet_test.waypoint_ids), initial_waypoint_count + 1, "One new waypoint should be created", ) new_waypoint = self.jet_test.waypoint_ids.filtered( lambda w: w.created_from_command_log_id == log_record ) self.assertEqual(len(new_waypoint), 1, "One waypoint linked to command log") new_waypoint = new_waypoint[0] self.assertEqual( new_waypoint.state, "ready", "Waypoint should be in ready state (fly_here=False)", ) self.assertEqual( new_waypoint.created_from_command_log_id, log_record, "Waypoint should reference the command log", ) def test_create_waypoint_command_success_fly_here_true(self): """Create a Waypoint command with fly_here: waypoint becomes current.""" command = self.Command.create( { "name": "Create waypoint fly here", "action": "create_waypoint", "waypoint_template_id": self.waypoint_template.id, "fly_here": True, } ) plan = self.Plan.create({"name": "Plan create waypoint fly here"}) self.plan_line.create( { "plan_id": plan.id, "sequence": 10, "command_id": command.id, } ) plan_log = self.server_test_1.sudo().run_flight_plan(plan, jet=self.jet_test) command_logs = plan_log.command_log_ids.filtered( lambda log: log.command_id == command ) log_record = command_logs[0] self.assertTrue(log_record.finish_date, "Command log should be finished") self.assertEqual(log_record.command_status, 0, "Command should succeed") waypoints_with_log = self.jet_test.waypoint_ids.filtered( lambda w: w.created_from_command_log_id == log_record ) self.assertEqual( len(waypoints_with_log), 1, "One waypoint created from command", ) self.assertEqual( waypoints_with_log.state, "current", "Waypoint should be current when fly_here=True", ) self.assertEqual( self.jet_test.waypoint_id, waypoints_with_log, "Jet waypoint_id should point to the new waypoint", ) def test_create_waypoint_command_no_jet(self): """Create a Waypoint command run without jet: command log finished with JET_NOT_FOUND.""" from ..models.constants import JET_NOT_FOUND command = self.Command.create( { "name": "Create waypoint no jet", "action": "create_waypoint", "waypoint_template_id": self.waypoint_template.id, "fly_here": False, } ) plan = self.Plan.create({"name": "Plan no jet"}) self.plan_line.create( {"plan_id": plan.id, "sequence": 10, "command_id": command.id} ) plan_log = self.server_test_1.sudo().run_flight_plan(plan) command_logs = plan_log.command_log_ids.filtered( lambda log: log.command_id == command ) self.assertEqual(len(command_logs), 1) self.assertEqual( command_logs.command_status, JET_NOT_FOUND, "Should finish with JET_NOT_FOUND when no jet in plan", ) self.assertTrue(command_logs.finish_date) def test_create_waypoint_command_no_template(self): """Create a Waypoint command without waypoint template: WAYPOINT_TEMPLATE_NOT_FOUND.""" from ..models.constants import WAYPOINT_TEMPLATE_NOT_FOUND command = self.Command.create( { "name": "Create waypoint no template", "action": "create_waypoint", "fly_here": False, } ) plan = self.Plan.create({"name": "Plan no template"}) self.plan_line.create( {"plan_id": plan.id, "sequence": 10, "command_id": command.id} ) plan_log = self.server_test_1.sudo().run_flight_plan(plan, jet=self.jet_test) command_logs = plan_log.command_log_ids.filtered( lambda log: log.command_id == command ) self.assertEqual(len(command_logs), 1) self.assertEqual( command_logs.command_status, WAYPOINT_TEMPLATE_NOT_FOUND, "Should finish with WAYPOINT_TEMPLATE_NOT_FOUND", ) def test_create_waypoint_command_jet_busy(self): """ Create a Waypoint when jet is busy (e.g. from flight plan): ignore_busy=True, waypoint created, log success. """ self.jet_test.target_state_id = self.state_running command = self.Command.create( { "name": "Create waypoint jet busy", "action": "create_waypoint", "waypoint_template_id": self.waypoint_template.id, "fly_here": False, } ) plan = self.Plan.create({"name": "Plan jet busy"}) self.plan_line.create( {"plan_id": plan.id, "sequence": 10, "command_id": command.id} ) initial_waypoint_count = len(self.jet_test.waypoint_ids) with mute_logger("odoo.addons.cetmix_tower_server.models.cx_tower_jet"): plan_log = self.server_test_1.sudo().run_flight_plan( plan, jet=self.jet_test ) command_logs = plan_log.command_log_ids.filtered( lambda log: log.command_id == command ) self.assertEqual(len(command_logs), 1) self.assertTrue( command_logs.finish_date, "Command log should be finished by waypoint callback when jet busy", ) self.assertEqual( command_logs.command_status, 0, "Create waypoint command should succeed when jet is busy " "(ignore_busy=True)", ) self.assertEqual( len(self.jet_test.waypoint_ids), initial_waypoint_count + 1, "One new waypoint should be created despite jet busy", ) self.jet_test.target_state_id = False def test_create_waypoint_command_wrong_jet_template(self): """Create a Waypoint with template for another jet template: False and WAYPOINT_CREATE_FAILED.""" from ..models.constants import WAYPOINT_CREATE_FAILED other_jet_template = self.JetTemplate.create( { "name": "Other Jet Template", "reference": "other_jet_template_cmd", } ) other_waypoint_template = self.JetWaypointTemplate.create( { "name": "Other Waypoint Template", "jet_template_id": other_jet_template.id, } ) command = self.Command.create( { "name": "Create waypoint wrong template", "action": "create_waypoint", "waypoint_template_id": other_waypoint_template.id, "fly_here": False, } ) plan = self.Plan.create({"name": "Plan wrong template"}) self.plan_line.create( {"plan_id": plan.id, "sequence": 10, "command_id": command.id} ) with mute_logger("odoo.addons.cetmix_tower_server.models.cx_tower_jet"): plan_log = self.server_test_1.sudo().run_flight_plan( plan, jet=self.jet_test ) command_logs = plan_log.command_log_ids.filtered( lambda log: log.command_id == command ) self.assertEqual(len(command_logs), 1) self.assertEqual( command_logs.command_status, WAYPOINT_CREATE_FAILED, "Should finish with WAYPOINT_CREATE_FAILED when template is " "for another jet template", ) self.assertTrue(command_logs.finish_date) def test_create_waypoint_command_waypoint_reaches_error(self): """Create plan fails: waypoint goes to error, callback finishes command log with error.""" from ..models.constants import WAYPOINT_CREATE_FAILED fail_command = self.Command.create( { "name": "Fail command", "action": "python_code", "code": "result = {'exit_code': 1, 'message': 'fail'}", } ) fail_plan = self.Plan.create({"name": "Plan that fails"}) self.plan_line.create( { "plan_id": fail_plan.id, "sequence": 10, "command_id": fail_command.id, } ) waypoint_template_with_failing_plan = self.JetWaypointTemplate.create( { "name": "Waypoint template with failing create plan", "jet_template_id": self.jet_template_test.id, "plan_create_id": fail_plan.id, } ) command = self.Command.create( { "name": "Create waypoint with failing plan", "action": "create_waypoint", "waypoint_template_id": waypoint_template_with_failing_plan.id, "fly_here": False, } ) plan = self.Plan.create({"name": "Plan create waypoint error"}) self.plan_line.create( {"plan_id": plan.id, "sequence": 10, "command_id": command.id} ) plan_log = self.server_test_1.sudo().run_flight_plan(plan, jet=self.jet_test) command_logs = plan_log.command_log_ids.filtered( lambda log: log.command_id == command ) self.assertEqual(len(command_logs), 1) log_record = command_logs[0] self.assertTrue( log_record.finish_date, "Command log should be finished by waypoint callback when " "waypoint reaches error", ) self.assertNotEqual( log_record.command_status, 0, "Command should finish with error status", ) self.assertEqual( log_record.command_status, WAYPOINT_CREATE_FAILED, "Callback should use WAYPOINT_CREATE_FAILED when plan fails", ) waypoints_with_log = self.jet_test.waypoint_ids.filtered( lambda w: w.created_from_command_log_id == log_record ) self.assertEqual(len(waypoints_with_log), 1) self.assertEqual( waypoints_with_log.state, "error", "Waypoint should be in error state after create plan fails", ) def test_finalize_create_waypoint_command_log_double_finish_guard(self): """Calling _finalize_create_waypoint_command_log twice does not double-finish.""" waypoint = self.jet_test.create_waypoint( self.waypoint_template, created_from_command_log=None, ) log_record = self.CommandLog.create( { "server_id": self.server_test_1.id, "command_id": self.Command.create( { "name": "Dummy create waypoint", "action": "create_waypoint", "waypoint_template_id": self.waypoint_template.id, } ).id, "start_date": fields.Datetime.now(), } ) waypoint.created_from_command_log_id = log_record self.assertFalse(log_record.finish_date, "Log should not be finished yet") waypoint._finalize_create_waypoint_command_log(success=True) self.assertTrue(log_record.finish_date, "Log should be finished once") finish_date_first = log_record.finish_date waypoint._finalize_create_waypoint_command_log(success=True) self.assertEqual( log_record.finish_date, finish_date_first, "Second call should not change finish_date (guard)", )