1751 lines
66 KiB
Python
1751 lines
66 KiB
Python
# Copyright (C) 2024 Cetmix OÜ
|
|
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
|
|
|
|
from unittest.mock import patch
|
|
|
|
from odoo import fields
|
|
from odoo.exceptions import AccessError, ValidationError
|
|
from odoo.tools import mute_logger
|
|
|
|
from .common_jets import TestTowerJetsCommon
|
|
|
|
|
|
class TestTowerJet(TestTowerJetsCommon):
|
|
"""
|
|
Test the Jet model functionality
|
|
"""
|
|
|
|
# All jet-related test data is now inherited from TestTowerJetsCommon
|
|
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
# _on_is_available Tests
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
|
|
def test_on_is_available_explicit_request_marked_processing_before_dispatch(self):
|
|
"""
|
|
Regression: explicit request must be attached to the jet and set to
|
|
processing before transition dispatch starts.
|
|
|
|
We patch _bring_to_state (the actual dispatch) rather than
|
|
_serve_jet_request so that _serve_jet_request runs for real and its
|
|
side-effects (served_jet_request_id, request.state) are observable.
|
|
A side_effect captures both values at the exact moment dispatch is
|
|
triggered, proving ordering rather than just eventual state.
|
|
"""
|
|
self.jet_test.write(
|
|
{"state_id": self.state_initial.id, "target_state_id": False}
|
|
)
|
|
# Isolate the scenario: keep only the request created in this test.
|
|
preexisting_new_requests = self.env["cx.tower.jet.request"].search(
|
|
[("jet_id", "=", self.jet_test.id), ("state", "=", "new")]
|
|
)
|
|
if preexisting_new_requests:
|
|
preexisting_new_requests.unlink()
|
|
request = self.env["cx.tower.jet.request"].create(
|
|
{
|
|
"server_id": self.server_test_1.id,
|
|
"jet_id": self.jet_test.id,
|
|
"jet_template_id": self.jet_test.jet_template_id.id,
|
|
"state_requested_id": self.state_running.id,
|
|
"state": "new",
|
|
}
|
|
)
|
|
|
|
# Capture the observable state of jet + request at dispatch time.
|
|
observed = {}
|
|
|
|
def capture(jet_self, target_state):
|
|
jet_self.invalidate_recordset(["served_jet_request_id"])
|
|
request.invalidate_recordset(["state"])
|
|
observed["served_request_id"] = jet_self.served_jet_request_id.id
|
|
observed["request_state"] = request.state
|
|
|
|
with patch(
|
|
"odoo.addons.cetmix_tower_server.models.cx_tower_jet.CxTowerJet._bring_to_state",
|
|
autospec=True,
|
|
side_effect=capture,
|
|
):
|
|
self.jet_test._on_is_available()
|
|
|
|
self.assertTrue(
|
|
observed,
|
|
"_bring_to_state must have been called; check that the request "
|
|
"targets a different state than the jet's current state",
|
|
)
|
|
self.assertEqual(
|
|
observed["served_request_id"],
|
|
request.id,
|
|
"Request must be saved to served_jet_request_id before dispatch",
|
|
)
|
|
self.assertEqual(
|
|
observed["request_state"],
|
|
"processing",
|
|
"Request must be set to 'processing' before _bring_to_state is called",
|
|
)
|
|
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
# _compute_available_actions Tests
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
|
|
def test_compute_available_actions_no_state(self):
|
|
"""
|
|
Test _compute_available_actions when jet has no current state
|
|
"""
|
|
# Jet has template but no state
|
|
self.jet_test.state_id = False
|
|
|
|
# action_available_ids should include only the create action
|
|
self.assertEqual(
|
|
len(self.jet_test.action_available_ids),
|
|
1,
|
|
"Available actions should include create action when jet has no state",
|
|
)
|
|
self.assertEqual(
|
|
{action.id for action in self.jet_test.action_available_ids},
|
|
{self.action_create.id},
|
|
"Available action should be the create action",
|
|
)
|
|
|
|
def test_compute_available_actions_with_state_running(self):
|
|
"""
|
|
Test _compute_available_actions when jet has state running.
|
|
Create action is not available (no state_from_id); destroy and
|
|
transition actions are available.
|
|
"""
|
|
self.jet_test.state_id = self.state_running
|
|
|
|
expected_actions = (
|
|
self.action_running_to_stopped
|
|
| self.action_running_to_error
|
|
| self.action_destroy
|
|
)
|
|
actual_ids = {action.id for action in self.jet_test.action_available_ids}
|
|
|
|
self.assertEqual(
|
|
len(actual_ids),
|
|
3,
|
|
"Should have 3 available actions from running state",
|
|
)
|
|
self.assertNotIn(
|
|
self.action_create.id,
|
|
actual_ids,
|
|
"Create action should not be available when jet has state",
|
|
)
|
|
self.assertIn(
|
|
self.action_destroy.id,
|
|
actual_ids,
|
|
"Destroy action should be available",
|
|
)
|
|
self.assertEqual(
|
|
actual_ids,
|
|
{action.id for action in expected_actions},
|
|
"Should have exact set: running_to_stopped, running_to_error, destroy",
|
|
)
|
|
|
|
def test_compute_available_actions_complex_scenario(self):
|
|
"""
|
|
Test _compute_available_actions with complex scenario
|
|
"""
|
|
# Use common actions from setup
|
|
|
|
# Test different states
|
|
test_cases = [
|
|
(self.state_initial, [self.action_initial_to_running]),
|
|
(
|
|
self.state_running,
|
|
[
|
|
self.action_running_to_stopped,
|
|
self.action_running_to_error,
|
|
self.action_destroy,
|
|
],
|
|
),
|
|
(self.state_stopped, [self.action_stopped_to_running]),
|
|
(self.state_error, [self.action_error_to_running]),
|
|
]
|
|
|
|
for state, expected_actions in test_cases:
|
|
self.jet_test.state_id = state
|
|
actual_actions = self.jet_test.action_available_ids
|
|
expected_actions_set = {action.id for action in expected_actions}
|
|
actual_actions_set = {action.id for action in actual_actions}
|
|
|
|
self.assertEqual(
|
|
actual_actions_set,
|
|
expected_actions_set,
|
|
f"State {state.name} should have correct available actions",
|
|
)
|
|
|
|
def test_compute_available_actions_dependencies(self):
|
|
"""
|
|
Test that _compute_available_actions has correct dependencies
|
|
"""
|
|
# Use existing action from common setup
|
|
action = self.action_running_to_stopped
|
|
|
|
# Set initial state
|
|
self.jet_test.state_id = self.state_running
|
|
# Should have all actions from running state
|
|
expected_actions = (
|
|
self.action_running_to_stopped
|
|
| self.action_running_to_error
|
|
| self.action_destroy
|
|
)
|
|
self.assertEqual(
|
|
{action.id for action in self.jet_test.action_available_ids},
|
|
{action.id for action in expected_actions},
|
|
"Should have all actions from running state initially",
|
|
)
|
|
|
|
# Change action's state_from_id (this should trigger recomputation)
|
|
action.state_from_id = self.state_stopped
|
|
|
|
# Jet should no longer have this specific action available
|
|
# but should still have other actions from running state
|
|
expected_remaining_actions = self.action_running_to_error | self.action_destroy
|
|
self.assertEqual(
|
|
{action.id for action in self.jet_test.action_available_ids},
|
|
{action.id for action in expected_remaining_actions},
|
|
"Should have remaining actions after changing one action's state_from_id",
|
|
)
|
|
|
|
# Change jet state to match action's new state_from_id
|
|
self.jet_test.state_id = self.state_stopped
|
|
|
|
# Now the modified action should be available again,
|
|
# plus any other actions from stopped state
|
|
expected_actions = action | self.action_stopped_to_running
|
|
self.assertEqual(
|
|
{action.id for action in self.jet_test.action_available_ids},
|
|
{action.id for action in expected_actions},
|
|
"Should have the modified action plus other actions from stopped state",
|
|
)
|
|
|
|
def test_compute_available_actions_cross_template_isolation(self):
|
|
"""
|
|
Test that jets only see actions from their own template
|
|
"""
|
|
# Create action for Odoo template
|
|
odoo_action = self.JetAction.create(
|
|
{
|
|
"name": "Odoo Action",
|
|
"reference": "odoo_action",
|
|
"jet_template_id": self.jet_template_odoo.id,
|
|
"state_from_id": self.state_running.id,
|
|
"state_to_id": self.state_stopped.id,
|
|
"state_transit_id": self.state_stopping.id,
|
|
"priority": 10,
|
|
}
|
|
)
|
|
|
|
# Create action for WordPress template
|
|
wp_action = self.JetAction.create(
|
|
{
|
|
"name": "WordPress Action",
|
|
"reference": "wordpress_action",
|
|
"jet_template_id": self.jet_template_wordpress.id,
|
|
"state_from_id": self.state_running.id,
|
|
"state_to_id": self.state_stopped.id,
|
|
"state_transit_id": self.state_stopping.id,
|
|
"priority": 10,
|
|
}
|
|
)
|
|
|
|
# Set both jets to running state
|
|
self.jet_odoo.state_id = self.state_running
|
|
self.jet_wordpress.state_id = self.state_running
|
|
|
|
# Each jet should only see its own template's actions
|
|
self.assertEqual(
|
|
{action.id for action in self.jet_odoo.action_available_ids},
|
|
{odoo_action.id},
|
|
"Odoo jet should only see Odoo actions",
|
|
)
|
|
self.assertEqual(
|
|
{action.id for action in self.jet_wordpress.action_available_ids},
|
|
{wp_action.id},
|
|
"WordPress jet should only see WordPress actions",
|
|
)
|
|
|
|
# Odoo jet should not see WordPress actions
|
|
self.assertNotIn(
|
|
wp_action.id,
|
|
{action.id for action in self.jet_odoo.action_available_ids},
|
|
"Odoo jet should not see WordPress actions",
|
|
)
|
|
# WordPress jet should not see Odoo actions
|
|
self.assertNotIn(
|
|
odoo_action.id,
|
|
{action.id for action in self.jet_wordpress.action_available_ids},
|
|
"WordPress jet should not see Odoo actions",
|
|
)
|
|
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
# Complex Template Hierarchy Tests
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
|
|
def test_jet_template_domain_computation(self):
|
|
"""
|
|
Test _compute_jet_template_domain method
|
|
"""
|
|
# Test with server set
|
|
jet_with_server = self.Jet.create(
|
|
{
|
|
"name": "Jet With Server",
|
|
"reference": "jet_with_server",
|
|
"jet_template_id": self.jet_template_test.id,
|
|
"server_id": self.server_test_1.id,
|
|
}
|
|
)
|
|
domain = jet_with_server.jet_template_domain
|
|
expected_domain = [("server_ids", "in", [self.server_test_1.id])]
|
|
self.assertEqual(domain, expected_domain, "Domain should include server filter")
|
|
|
|
# Test domain computation with a different server
|
|
server_test_2 = self.Server.create(
|
|
{
|
|
"name": "Test Server 2",
|
|
"ip_v4_address": "192.168.1.2",
|
|
"ssh_username": "admin",
|
|
"ssh_password": "password",
|
|
"ssh_auth_mode": "p",
|
|
"host_key": "test_key_2",
|
|
"os_id": self.os_debian_10.id,
|
|
}
|
|
)
|
|
jet_with_different_server = self.Jet.create(
|
|
{
|
|
"name": "Jet With Different Server",
|
|
"reference": "jet_with_different_server",
|
|
"jet_template_id": self.jet_template_test.id,
|
|
"server_id": server_test_2.id,
|
|
}
|
|
)
|
|
domain = jet_with_different_server.jet_template_domain
|
|
expected_domain = [("server_ids", "in", [server_test_2.id])]
|
|
self.assertEqual(
|
|
domain,
|
|
expected_domain,
|
|
"Domain should include server filter for different server",
|
|
)
|
|
|
|
# Test the domain computation method directly to verify the else branch
|
|
# Create a temporary jet object to test the method without saving
|
|
temp_jet = self.Jet.new(
|
|
{
|
|
"name": "Temp Jet",
|
|
"jet_template_id": self.jet_template_test.id,
|
|
"server_id": False,
|
|
}
|
|
)
|
|
temp_jet._compute_jet_template_domain()
|
|
self.assertEqual(
|
|
temp_jet.jet_template_domain,
|
|
[],
|
|
"Domain should be empty when server_id is False",
|
|
)
|
|
|
|
def test_jet_requires_ids_computation(self):
|
|
"""
|
|
Test _compute_jet_requires_ids method with complex dependencies
|
|
"""
|
|
# Test Odoo jet dependencies
|
|
odoo_deps = self.jet_odoo.jet_requires_ids
|
|
self.assertEqual(
|
|
len(odoo_deps), 2, "Odoo jet should have 2 direct dependencies"
|
|
)
|
|
|
|
# Check that dependencies are for postgres and nginx
|
|
dep_template_ids = odoo_deps.mapped(
|
|
"jet_template_dependency_id.template_required_id.id"
|
|
)
|
|
expected_ids = {self.jet_template_postgres.id, self.jet_template_nginx.id}
|
|
self.assertEqual(
|
|
set(dep_template_ids), expected_ids, "Should depend on postgres and nginx"
|
|
)
|
|
|
|
# Test WooCommerce jet dependencies
|
|
# (should include both Odoo and WordPress deps)
|
|
woocommerce_deps = self.jet_woocommerce.jet_requires_ids
|
|
self.assertEqual(
|
|
len(woocommerce_deps),
|
|
2,
|
|
"WooCommerce jet should have 2 direct dependencies",
|
|
)
|
|
|
|
# Check that dependencies are for wordpress and odoo
|
|
dep_template_ids = woocommerce_deps.mapped(
|
|
"jet_template_dependency_id.template_required_id.id"
|
|
)
|
|
expected_ids = {self.jet_template_wordpress.id, self.jet_template_odoo.id}
|
|
self.assertEqual(
|
|
set(dep_template_ids), expected_ids, "Should depend on wordpress and odoo"
|
|
)
|
|
|
|
def test_jet_limit_per_server_same_server_rejected(self):
|
|
"""Constraint rejects creating more jets than template limit per server."""
|
|
template = self.JetTemplate.create(
|
|
{
|
|
"name": "Template With Limit",
|
|
"reference": "template_with_limit",
|
|
"limit_per_server": 1,
|
|
}
|
|
)
|
|
self.Jet.create(
|
|
{
|
|
"name": "Limited Jet 1",
|
|
"reference": "limited_jet_1",
|
|
"jet_template_id": template.id,
|
|
"server_id": self.server_test_1.id,
|
|
}
|
|
)
|
|
|
|
with self.assertRaisesRegex(ValidationError, "Jet limit per server reached"):
|
|
self.Jet.create(
|
|
{
|
|
"name": "Limited Jet 2",
|
|
"reference": "limited_jet_2",
|
|
"jet_template_id": template.id,
|
|
"server_id": self.server_test_1.id,
|
|
}
|
|
)
|
|
|
|
def test_jet_limit_per_server_different_servers_allowed(self):
|
|
"""
|
|
Constraint allows same template on different servers
|
|
but within per-server limit.
|
|
"""
|
|
template = self.JetTemplate.create(
|
|
{
|
|
"name": "Template With Per-Server Limit",
|
|
"reference": "template_with_per_server_limit",
|
|
"limit_per_server": 1,
|
|
}
|
|
)
|
|
server_test_2 = self.Server.create(
|
|
{
|
|
"name": "Jet Limit Test Server 2",
|
|
"ip_v4_address": "192.168.1.22",
|
|
"ssh_username": "admin",
|
|
"ssh_password": "password",
|
|
"ssh_auth_mode": "p",
|
|
"host_key": "jet_limit_test_server_2_key",
|
|
"os_id": self.os_debian_10.id,
|
|
}
|
|
)
|
|
|
|
jet_on_server_1 = self.Jet.create(
|
|
{
|
|
"name": "Limited Jet Server 1",
|
|
"reference": "limited_jet_server_1",
|
|
"jet_template_id": template.id,
|
|
"server_id": self.server_test_1.id,
|
|
}
|
|
)
|
|
jet_on_server_2 = self.Jet.create(
|
|
{
|
|
"name": "Limited Jet Server 2",
|
|
"reference": "limited_jet_server_2",
|
|
"jet_template_id": template.id,
|
|
"server_id": server_test_2.id,
|
|
}
|
|
)
|
|
|
|
self.assertTrue(
|
|
jet_on_server_1.exists(), "Jet on first server should be created"
|
|
)
|
|
self.assertTrue(
|
|
jet_on_server_2.exists(), "Jet on second server should be created"
|
|
)
|
|
|
|
def test_jet_requires_ids_template_change(self):
|
|
"""
|
|
Test _compute_jet_requires_ids for different templates
|
|
"""
|
|
# Create jets for different templates
|
|
jet_tower_core = self.Jet.create(
|
|
{
|
|
"name": "Tower Core Jet",
|
|
"reference": "tower_core_jet",
|
|
"jet_template_id": self.jet_template_tower_core.id,
|
|
"server_id": self.server_test_1.id,
|
|
}
|
|
)
|
|
self.assertEqual(
|
|
len(jet_tower_core.jet_requires_ids),
|
|
0,
|
|
"Tower core should have no dependencies",
|
|
)
|
|
|
|
jet_odoo = self.Jet.create(
|
|
{
|
|
"name": "Odoo Jet Test",
|
|
"reference": "odoo_jet_test",
|
|
"jet_template_id": self.jet_template_odoo.id,
|
|
"server_id": self.server_test_1.id,
|
|
}
|
|
)
|
|
self.assertEqual(
|
|
len(jet_odoo.jet_requires_ids), 2, "Odoo should have 2 dependencies"
|
|
)
|
|
|
|
jet_woocommerce = self.Jet.create(
|
|
{
|
|
"name": "WooCommerce Jet Test",
|
|
"reference": "woocommerce_jet_test",
|
|
"jet_template_id": self.jet_template_woocommerce_odoo.id,
|
|
"server_id": self.server_test_1.id,
|
|
}
|
|
)
|
|
self.assertEqual(
|
|
len(jet_woocommerce.jet_requires_ids),
|
|
2,
|
|
"WooCommerce should have 2 dependencies",
|
|
)
|
|
|
|
def test_jet_requires_ids_dependency_removal(self):
|
|
"""
|
|
Test _compute_jet_requires_ids when template dependencies are removed
|
|
"""
|
|
# Create jet with Odoo template
|
|
jet_odoo = self.Jet.create(
|
|
{
|
|
"name": "Odoo Jet Test",
|
|
"reference": "odoo_jet_test",
|
|
"jet_template_id": self.jet_template_odoo.id,
|
|
"server_id": self.server_test_1.id,
|
|
}
|
|
)
|
|
initial_deps = len(jet_odoo.jet_requires_ids)
|
|
self.assertEqual(initial_deps, 2, "Should have 2 dependencies initially")
|
|
|
|
# Remove one dependency from template
|
|
postgres_dep = self.JetTemplateDependency.search(
|
|
[
|
|
("template_id", "=", self.jet_template_odoo.id),
|
|
("template_required_id", "=", self.jet_template_postgres.id),
|
|
]
|
|
)
|
|
postgres_dep.unlink()
|
|
|
|
# Jet dependencies should be updated
|
|
self.assertEqual(
|
|
len(jet_odoo.jet_requires_ids), 1, "Should have 1 dependency after removal"
|
|
)
|
|
remaining_dep = jet_odoo.jet_requires_ids[0]
|
|
self.assertEqual(
|
|
remaining_dep.jet_template_dependency_id.template_required_id,
|
|
self.jet_template_nginx,
|
|
"Remaining dependency should be nginx",
|
|
)
|
|
|
|
def test_jet_requires_ids_dependency_addition(self):
|
|
"""
|
|
Test _compute_jet_requires_ids when template dependencies are added
|
|
"""
|
|
# Create jet with tower core (no dependencies)
|
|
jet_tower_core = self.Jet.create(
|
|
{
|
|
"name": "Tower Core Jet",
|
|
"reference": "tower_core_jet",
|
|
"jet_template_id": self.jet_template_tower_core.id,
|
|
"server_id": self.server_test_1.id,
|
|
}
|
|
)
|
|
self.assertEqual(
|
|
len(jet_tower_core.jet_requires_ids),
|
|
0,
|
|
"Should have no dependencies initially",
|
|
)
|
|
|
|
# Add dependency to tower core
|
|
# (use a template that won't create circular dependency)
|
|
new_dep = self.JetTemplateDependency.create(
|
|
{
|
|
"template_id": self.jet_template_tower_core.id,
|
|
"template_required_id": self.jet_template_test.id,
|
|
"state_required_id": self.state_running.id,
|
|
}
|
|
)
|
|
|
|
# Jet dependencies should be updated
|
|
self.assertEqual(
|
|
len(jet_tower_core.jet_requires_ids),
|
|
1,
|
|
"Should have 1 dependency after addition",
|
|
)
|
|
added_dep = jet_tower_core.jet_requires_ids[0]
|
|
self.assertEqual(
|
|
added_dep.jet_template_dependency_id,
|
|
new_dep,
|
|
"Added dependency should match the new dependency",
|
|
)
|
|
|
|
def test_jet_requires_ids_multiple_jets_same_template(self):
|
|
"""
|
|
Test _compute_jet_requires_ids with multiple jets using same template
|
|
"""
|
|
# Create another Odoo jet
|
|
jet_odoo_2 = self.Jet.create(
|
|
{
|
|
"name": "Odoo Jet 2",
|
|
"reference": "odoo_jet_2",
|
|
"jet_template_id": self.jet_template_odoo.id,
|
|
"server_id": self.server_test_1.id,
|
|
}
|
|
)
|
|
|
|
# Both jets should have same dependencies
|
|
deps_1 = self.jet_odoo.jet_requires_ids
|
|
deps_2 = jet_odoo_2.jet_requires_ids
|
|
|
|
self.assertEqual(
|
|
len(deps_1),
|
|
len(deps_2),
|
|
"Both jets should have same number of dependencies",
|
|
)
|
|
|
|
# Check that dependencies are the same
|
|
deps_1_template_ids = deps_1.mapped(
|
|
"jet_template_dependency_id.template_required_id.id"
|
|
)
|
|
deps_2_template_ids = deps_2.mapped(
|
|
"jet_template_dependency_id.template_required_id.id"
|
|
)
|
|
self.assertEqual(
|
|
set(deps_1_template_ids),
|
|
set(deps_2_template_ids),
|
|
"Both jets should have same dependency templates",
|
|
)
|
|
|
|
def test_jet_requires_ids_consistency_with_template(self):
|
|
"""
|
|
Test that jet dependencies are consistent with template dependencies
|
|
"""
|
|
# Test with different templates
|
|
templates_to_test = [
|
|
(self.jet_template_tower_core, 0),
|
|
(self.jet_template_docker, 1),
|
|
(self.jet_template_nginx, 1),
|
|
(self.jet_template_postgres, 1),
|
|
(self.jet_template_mariadb, 1),
|
|
(self.jet_template_odoo, 2),
|
|
(self.jet_template_wordpress, 2),
|
|
(self.jet_template_woocommerce_odoo, 2),
|
|
]
|
|
|
|
for template, expected_dep_count in templates_to_test:
|
|
# Create a jet with this template
|
|
test_jet = self.Jet.create(
|
|
{
|
|
"name": f"Test Jet for {template.name}",
|
|
"reference": f"test_jet_{template.reference}",
|
|
"jet_template_id": template.id,
|
|
"server_id": self.server_test_1.id,
|
|
}
|
|
)
|
|
|
|
# Check dependency count
|
|
actual_dep_count = len(test_jet.jet_requires_ids)
|
|
self.assertEqual(
|
|
actual_dep_count,
|
|
expected_dep_count,
|
|
f"{template.name} should have {expected_dep_count} "
|
|
f"dependencies, got {actual_dep_count}",
|
|
)
|
|
|
|
# Verify that all jet dependencies correspond to template dependencies
|
|
template_deps = template.template_requires_ids
|
|
jet_deps = test_jet.jet_requires_ids
|
|
|
|
if template_deps:
|
|
self.assertEqual(
|
|
len(jet_deps),
|
|
len(template_deps),
|
|
"Jet dependencies count should match"
|
|
f" template dependencies for {template.name}",
|
|
)
|
|
|
|
# Check that each jet dependency corresponds to a template dependency
|
|
jet_dep_template_ids = jet_deps.mapped("jet_template_dependency_id.id")
|
|
template_dep_ids = template_deps.ids
|
|
self.assertEqual(
|
|
set(jet_dep_template_ids),
|
|
set(template_dep_ids),
|
|
"Jet dependencies should match template"
|
|
f" dependencies for {template.name}",
|
|
)
|
|
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
# bring_to_state Tests
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
|
|
def test_bring_to_state_success_user_level(self):
|
|
"""
|
|
Test bring_to_state succeeds when user has sufficient access level.
|
|
User (level 1) can access state with level 1.
|
|
"""
|
|
# Use existing state and set it to User access level (1)
|
|
self.state_running.access_level = "1"
|
|
self.state_running.invalidate_recordset(["access_level"])
|
|
|
|
# Ensure user has access to the jet
|
|
self.jet_test.write({"user_ids": [(4, self.user.id)]})
|
|
self.server_test_1.write({"user_ids": [(4, self.user.id)]})
|
|
|
|
# Set jet to initial state
|
|
self.jet_test.write({"state_id": self.state_initial.id})
|
|
self.jet_test.invalidate_recordset(["state_id"])
|
|
|
|
# User should be able to bring jet to user-level state
|
|
self.jet_test.with_user(self.user).with_context(
|
|
cetmix_tower_no_commit=True
|
|
).bring_to_state("test_running")
|
|
self.assertEqual(
|
|
self.jet_test.state_id,
|
|
self.state_running,
|
|
"Jet should be brought to user-level state by user",
|
|
)
|
|
|
|
def test_bring_to_state_success_manager_level(self):
|
|
"""
|
|
Test bring_to_state succeeds when manager has sufficient access level.
|
|
Manager (level 2) can access state with level 2.
|
|
"""
|
|
# Use existing state and set it to Manager access level (2)
|
|
self.state_stopped.access_level = "2"
|
|
self.state_stopped.invalidate_recordset(["access_level"])
|
|
|
|
# Ensure manager has access to the jet
|
|
self.jet_test.write({"manager_ids": [(4, self.manager.id)]})
|
|
self.server_test_1.write({"manager_ids": [(4, self.manager.id)]})
|
|
|
|
# Set jet to running state (which has action to stopped)
|
|
self.jet_test.write({"state_id": self.state_running.id})
|
|
self.jet_test.invalidate_recordset(["state_id"])
|
|
|
|
# Manager should be able to bring jet to manager-level state
|
|
self.jet_test.with_user(self.manager).with_context(
|
|
cetmix_tower_no_commit=True
|
|
).bring_to_state("test_stopped")
|
|
self.assertEqual(
|
|
self.jet_test.state_id,
|
|
self.state_stopped,
|
|
"Jet should be brought to manager-level state by manager",
|
|
)
|
|
|
|
def test_bring_to_state_success_root_level(self):
|
|
"""
|
|
Test bring_to_state succeeds when root has sufficient access level.
|
|
Root (level 3) can access state with level 3.
|
|
"""
|
|
# Use existing state and set it to Root access level (3)
|
|
self.state_error.access_level = "3"
|
|
self.state_error.invalidate_recordset(["access_level"])
|
|
|
|
# Root has full access, but ensure access for consistency
|
|
self.jet_test.write({"manager_ids": [(4, self.root.id)]})
|
|
self.server_test_1.write({"manager_ids": [(4, self.root.id)]})
|
|
|
|
# Set jet to running state (which has action to error)
|
|
self.jet_test.write({"state_id": self.state_running.id})
|
|
self.jet_test.invalidate_recordset(["state_id"])
|
|
|
|
# Root should be able to bring jet to root-level state
|
|
self.jet_test.with_user(self.root).with_context(
|
|
cetmix_tower_no_commit=True
|
|
).bring_to_state("test_error")
|
|
self.assertEqual(
|
|
self.jet_test.state_id,
|
|
self.state_error,
|
|
"Jet should be brought to root-level state by root",
|
|
)
|
|
|
|
def test_bring_to_state_access_error_user_to_manager(self):
|
|
"""
|
|
Test bring_to_state raises AccessError when user (level 1)
|
|
tries to access manager-level state (level 2).
|
|
"""
|
|
# Use existing state and set it to Manager access level (2)
|
|
self.state_stopped.access_level = "2"
|
|
self.state_stopped.invalidate_recordset(["access_level"])
|
|
|
|
# Ensure user has access to the jet (for the access check to work)
|
|
self.jet_test.write({"user_ids": [(4, self.user.id)]})
|
|
self.server_test_1.write({"user_ids": [(4, self.user.id)]})
|
|
|
|
# Set jet to running state (which has action to stopped)
|
|
self.jet_test.write({"state_id": self.state_running.id})
|
|
self.jet_test.invalidate_recordset(["state_id"])
|
|
|
|
# User should not be able to bring jet to manager-level state
|
|
with self.assertRaises(AccessError) as context:
|
|
self.jet_test.with_user(self.user).with_context(
|
|
cetmix_tower_no_commit=True
|
|
).bring_to_state("test_stopped")
|
|
|
|
self.assertIn(
|
|
"You are not allowed to set the",
|
|
str(context.exception),
|
|
"Should raise AccessError with appropriate message",
|
|
)
|
|
self.assertIn(
|
|
self.state_stopped.name,
|
|
str(context.exception),
|
|
"Error message should include state name",
|
|
)
|
|
|
|
def test_bring_to_state_access_error_user_to_root(self):
|
|
"""
|
|
Test bring_to_state raises AccessError when user (level 1)
|
|
tries to access root-level state (level 3).
|
|
"""
|
|
# Use existing state and set it to Root access level (3)
|
|
self.state_error.access_level = "3"
|
|
self.state_error.invalidate_recordset(["access_level"])
|
|
|
|
# Ensure user has access to the jet (for the access check to work)
|
|
self.jet_test.write({"user_ids": [(4, self.user.id)]})
|
|
self.server_test_1.write({"user_ids": [(4, self.user.id)]})
|
|
|
|
# Set jet to running state (which has action to error)
|
|
self.jet_test.write({"state_id": self.state_running.id})
|
|
self.jet_test.invalidate_recordset(["state_id"])
|
|
|
|
# User should not be able to bring jet to root-level state
|
|
with self.assertRaises(AccessError) as context:
|
|
self.jet_test.with_user(self.user).with_context(
|
|
cetmix_tower_no_commit=True
|
|
).bring_to_state("test_error")
|
|
|
|
self.assertIn(
|
|
"You are not allowed to set the",
|
|
str(context.exception),
|
|
"Should raise AccessError with appropriate message",
|
|
)
|
|
self.assertIn(
|
|
self.state_error.name,
|
|
str(context.exception),
|
|
"Error message should include state name",
|
|
)
|
|
|
|
def test_bring_to_state_access_error_manager_to_root(self):
|
|
"""
|
|
Test bring_to_state raises AccessError when manager (level 2)
|
|
tries to access root-level state (level 3).
|
|
"""
|
|
# Use existing state and set it to Root access level (3)
|
|
self.state_error.access_level = "3"
|
|
self.state_error.invalidate_recordset(["access_level"])
|
|
|
|
# Ensure manager has access to the jet (for the access check to work)
|
|
self.jet_test.write({"manager_ids": [(4, self.manager.id)]})
|
|
self.server_test_1.write({"manager_ids": [(4, self.manager.id)]})
|
|
|
|
# Set jet to running state (which has action to error)
|
|
self.jet_test.write({"state_id": self.state_running.id})
|
|
self.jet_test.invalidate_recordset(["state_id"])
|
|
|
|
# Manager should not be able to bring jet to root-level state
|
|
with self.assertRaises(AccessError) as context:
|
|
self.jet_test.with_user(self.manager).with_context(
|
|
cetmix_tower_no_commit=True
|
|
).bring_to_state("test_error")
|
|
|
|
self.assertIn(
|
|
"You are not allowed to set the",
|
|
str(context.exception),
|
|
"Should raise AccessError with appropriate message",
|
|
)
|
|
self.assertIn(
|
|
self.state_error.name,
|
|
str(context.exception),
|
|
"Error message should include state name",
|
|
)
|
|
|
|
def test_bring_to_state_manager_can_access_user_level(self):
|
|
"""
|
|
Test bring_to_state succeeds when manager (level 2) who IS in manager_ids
|
|
accesses user-level state (level 1).
|
|
Higher access levels can access lower level states.
|
|
"""
|
|
# Use existing state and set it to User access level (1)
|
|
self.state_running.access_level = "1"
|
|
self.state_running.invalidate_recordset(["access_level"])
|
|
|
|
# Ensure manager has access to the jet
|
|
# Manager IS in manager_ids, so they keep their manager access level (2)
|
|
self.jet_test.write({"manager_ids": [(4, self.manager.id)]})
|
|
self.server_test_1.write({"manager_ids": [(4, self.manager.id)]})
|
|
|
|
# Set jet to initial state
|
|
self.jet_test.write({"state_id": self.state_initial.id})
|
|
self.jet_test.invalidate_recordset(["state_id"])
|
|
|
|
# Manager should be able to bring jet to user-level state
|
|
self.jet_test.with_user(self.manager).with_context(
|
|
cetmix_tower_no_commit=True
|
|
).bring_to_state("test_running")
|
|
self.assertEqual(
|
|
self.jet_test.state_id,
|
|
self.state_running,
|
|
"Manager should be able to access user-level state",
|
|
)
|
|
|
|
def test_bring_to_state_manager_not_in_manager_ids_treated_as_user(self):
|
|
"""
|
|
Test bring_to_state treats manager (level 2) who is NOT in manager_ids
|
|
as user (level 1).
|
|
Manager should be able to set user-level state but not manager-level state.
|
|
"""
|
|
# Use existing state and set it to User access level (1)
|
|
self.state_running.access_level = "1"
|
|
self.state_running.invalidate_recordset(["access_level"])
|
|
|
|
# Ensure manager has access to the jet via user_ids but NOT via manager_ids
|
|
self.jet_test.write({"user_ids": [(4, self.manager.id)]})
|
|
self.server_test_1.write({"user_ids": [(4, self.manager.id)]})
|
|
# Explicitly ensure manager is NOT in manager_ids
|
|
self.jet_test.write({"manager_ids": [(5, 0, 0)]})
|
|
|
|
# Set jet to initial state
|
|
self.jet_test.write({"state_id": self.state_initial.id})
|
|
self.jet_test.invalidate_recordset(["state_id"])
|
|
|
|
# Manager (treated as user) should be able to bring jet to user-level state
|
|
self.jet_test.with_user(self.manager).with_context(
|
|
cetmix_tower_no_commit=True
|
|
).bring_to_state("test_running")
|
|
self.assertEqual(
|
|
self.jet_test.state_id,
|
|
self.state_running,
|
|
"Manager not in manager_ids should be able to access user-level state",
|
|
)
|
|
|
|
def test_bring_to_state_manager_not_in_manager_ids_cannot_access_manager_level(
|
|
self
|
|
):
|
|
"""
|
|
Test bring_to_state raises AccessError when manager (level 2) who is NOT
|
|
in manager_ids tries to access manager-level state (level 2).
|
|
Manager should be treated as user (level 1) and cannot access level 2.
|
|
"""
|
|
# Use existing state and set it to Manager access level (2)
|
|
self.state_stopped.access_level = "2"
|
|
self.state_stopped.invalidate_recordset(["access_level"])
|
|
|
|
# Ensure manager has access to the jet via user_ids but NOT via manager_ids
|
|
self.jet_test.write({"user_ids": [(4, self.manager.id)]})
|
|
self.server_test_1.write({"user_ids": [(4, self.manager.id)]})
|
|
# Explicitly ensure manager is NOT in manager_ids
|
|
self.jet_test.write({"manager_ids": [(5, 0, 0)]})
|
|
|
|
# Set jet to running state (which has action to stopped)
|
|
self.jet_test.write({"state_id": self.state_running.id})
|
|
self.jet_test.invalidate_recordset(["state_id"])
|
|
|
|
# Manager (treated as user) should not be able to bring jet
|
|
# to manager-level state
|
|
with self.assertRaises(AccessError) as context:
|
|
self.jet_test.with_user(self.manager).with_context(
|
|
cetmix_tower_no_commit=True
|
|
).bring_to_state("test_stopped")
|
|
|
|
self.assertIn(
|
|
"You are not allowed to set the",
|
|
str(context.exception),
|
|
"Should raise AccessError with appropriate message",
|
|
)
|
|
self.assertIn(
|
|
self.state_stopped.name,
|
|
str(context.exception),
|
|
"Error message should include state name",
|
|
)
|
|
|
|
def test_bring_to_state_root_can_access_manager_level(self):
|
|
"""
|
|
Test bring_to_state succeeds when root (level 3)
|
|
accesses manager-level state (level 2).
|
|
Higher access levels can access lower level states.
|
|
"""
|
|
# Use existing state and set it to Manager access level (2)
|
|
self.state_stopped.access_level = "2"
|
|
self.state_stopped.invalidate_recordset(["access_level"])
|
|
|
|
# Root has full access, but ensure access for consistency
|
|
self.jet_test.write({"manager_ids": [(4, self.root.id)]})
|
|
self.server_test_1.write({"manager_ids": [(4, self.root.id)]})
|
|
|
|
# Set jet to running state (which has action to stopped)
|
|
self.jet_test.write({"state_id": self.state_running.id})
|
|
self.jet_test.invalidate_recordset(["state_id"])
|
|
|
|
# Root should be able to bring jet to manager-level state
|
|
self.jet_test.with_user(self.root).with_context(
|
|
cetmix_tower_no_commit=True
|
|
).bring_to_state("test_stopped")
|
|
self.assertEqual(
|
|
self.jet_test.state_id,
|
|
self.state_stopped,
|
|
"Root should be able to access manager-level state",
|
|
)
|
|
|
|
def test_bring_to_state_invalid_reference(self):
|
|
"""
|
|
Test bring_to_state raises ValidationError when state reference is invalid.
|
|
"""
|
|
# Set jet to initial state
|
|
self.jet_test.state_id = self.state_initial
|
|
|
|
# Should raise ValidationError for invalid state reference
|
|
with self.assertRaises(ValidationError) as context:
|
|
self.jet_test.with_context(cetmix_tower_no_commit=True).bring_to_state(
|
|
"invalid_state_reference"
|
|
)
|
|
|
|
self.assertIn(
|
|
"State 'invalid_state_reference' not found",
|
|
str(context.exception),
|
|
"Should raise ValidationError with appropriate message",
|
|
)
|
|
self.assertIn(
|
|
self.jet_test.display_name,
|
|
str(context.exception),
|
|
"Error message should include jet display name",
|
|
)
|
|
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
# _get_user_effective_access_level Tests
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
|
|
def test_get_user_effective_access_level_user(self):
|
|
"""
|
|
Test _get_user_effective_access_level returns "1" for user.
|
|
"""
|
|
# Ensure user has access to the jet
|
|
self.jet_test.write({"user_ids": [(4, self.user.id)]})
|
|
|
|
# User should have effective access level "1"
|
|
effective_level = self.jet_test.with_user(
|
|
self.user
|
|
)._get_user_effective_access_level()
|
|
self.assertEqual(
|
|
effective_level,
|
|
"1",
|
|
"User should have effective access level 1",
|
|
)
|
|
|
|
def test_get_user_effective_access_level_manager_in_manager_ids(self):
|
|
"""
|
|
Test _get_user_effective_access_level returns "2" for manager
|
|
who IS in manager_ids.
|
|
"""
|
|
# Ensure manager has access to the jet and IS in manager_ids
|
|
self.jet_test.write({"manager_ids": [(4, self.manager.id)]})
|
|
|
|
# Manager in manager_ids should have effective access level "2"
|
|
effective_level = self.jet_test.with_user(
|
|
self.manager
|
|
)._get_user_effective_access_level()
|
|
self.assertEqual(
|
|
effective_level,
|
|
"2",
|
|
"Manager in manager_ids should have effective access level 2",
|
|
)
|
|
|
|
def test_get_user_effective_access_level_manager_not_in_manager_ids(self):
|
|
"""
|
|
Test _get_user_effective_access_level returns "1" for manager
|
|
who is NOT in manager_ids (downgraded to user level).
|
|
"""
|
|
# Ensure manager has access to the jet via user_ids but NOT via manager_ids
|
|
self.jet_test.write({"user_ids": [(4, self.manager.id)]})
|
|
# Explicitly ensure manager is NOT in manager_ids
|
|
self.jet_test.write({"manager_ids": [(5, 0, 0)]})
|
|
|
|
# Manager not in manager_ids should have effective access level "1"
|
|
effective_level = self.jet_test.with_user(
|
|
self.manager
|
|
)._get_user_effective_access_level()
|
|
self.assertEqual(
|
|
effective_level,
|
|
"1",
|
|
"Manager not in manager_ids should have effective access level 1",
|
|
)
|
|
|
|
def test_get_user_effective_access_level_root(self):
|
|
"""
|
|
Test _get_user_effective_access_level returns "3" for root.
|
|
"""
|
|
# Root should have effective access level "3" regardless of manager_ids
|
|
effective_level = self.jet_test.with_user(
|
|
self.root
|
|
)._get_user_effective_access_level()
|
|
self.assertEqual(
|
|
effective_level,
|
|
"3",
|
|
"Root should have effective access level 3",
|
|
)
|
|
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
# unlink Tests
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
|
|
def test_unlink_deletable_jet_with_files(self):
|
|
"""
|
|
Test unlink succeeds when jet is deletable and has files.
|
|
Files should be unlinked after the jet is deleted.
|
|
"""
|
|
# Create a deletable jet (deletable defaults to True)
|
|
jet = self._create_jet(
|
|
"Deletable Jet",
|
|
"deletable_jet",
|
|
)
|
|
|
|
# Create files linked to the jet
|
|
file1 = self.File.create(
|
|
{
|
|
"name": "test_file_1.txt",
|
|
"source": "tower",
|
|
"server_id": self.server_test_1.id,
|
|
"server_dir": "/tmp",
|
|
"jet_id": jet.id,
|
|
"file_type": "text",
|
|
}
|
|
)
|
|
file2 = self.File.create(
|
|
{
|
|
"name": "test_file_2.txt",
|
|
"source": "tower",
|
|
"server_id": self.server_test_1.id,
|
|
"server_dir": "/tmp",
|
|
"jet_id": jet.id,
|
|
"file_type": "text",
|
|
}
|
|
)
|
|
|
|
# Verify files exist
|
|
self.assertEqual(len(jet.file_ids), 2, "Jet should have 2 files")
|
|
self.assertIn(file1, jet.file_ids, "File 1 should be linked to jet")
|
|
self.assertIn(file2, jet.file_ids, "File 2 should be linked to jet")
|
|
|
|
# Store file IDs before deletion
|
|
file_ids = {file1.id, file2.id}
|
|
|
|
# Unlink the jet
|
|
jet.unlink()
|
|
|
|
# Verify jet is deleted
|
|
self.assertFalse(jet.exists(), "Jet should be deleted")
|
|
|
|
# Verify files are also deleted
|
|
remaining_files = self.File.browse(list(file_ids))
|
|
self.assertFalse(
|
|
remaining_files.exists(),
|
|
"Files should be unlinked after jet deletion",
|
|
)
|
|
|
|
def test_unlink_deletable_jet_without_files(self):
|
|
"""
|
|
Test unlink succeeds when jet is deletable but has no files.
|
|
"""
|
|
# Create a deletable jet without files (deletable defaults to True)
|
|
jet = self._create_jet(
|
|
"Deletable Jet No Files",
|
|
"deletable_jet_no_files",
|
|
)
|
|
|
|
# Verify jet has no files
|
|
self.assertEqual(len(jet.file_ids), 0, "Jet should have no files")
|
|
|
|
# Unlink the jet
|
|
jet.unlink()
|
|
|
|
# Verify jet is deleted
|
|
self.assertFalse(jet.exists(), "Jet should be deleted")
|
|
|
|
def test_unlink_not_deletable_jet_raises_error(self):
|
|
"""
|
|
Test unlink raises ValidationError when jet is not deletable.
|
|
"""
|
|
# Create a non-deletable jet
|
|
jet = self._create_jet(
|
|
"Not Deletable Jet",
|
|
"not_deletable_jet",
|
|
)
|
|
jet.write({"deletable": False})
|
|
|
|
# Attempt to unlink should raise ValidationError
|
|
with self.assertRaises(ValidationError) as context:
|
|
jet.unlink()
|
|
|
|
# Verify error message contains jet display name
|
|
self.assertIn(
|
|
"cannot be deleted",
|
|
str(context.exception),
|
|
"Error message should mention deletion restriction",
|
|
)
|
|
self.assertIn(
|
|
jet.display_name,
|
|
str(context.exception),
|
|
"Error message should include jet display name",
|
|
)
|
|
|
|
# Verify jet still exists
|
|
self.assertTrue(jet.exists(), "Jet should not be deleted")
|
|
|
|
def test_unlink_multiple_jets_mixed_deletable(self):
|
|
"""
|
|
Test unlink with multiple jets where some are deletable and some are not.
|
|
Should raise ValidationError listing non-deletable jets.
|
|
"""
|
|
# Create deletable jet (deletable defaults to True)
|
|
deletable_jet = self._create_jet(
|
|
"Deletable Jet",
|
|
"deletable_jet_multi",
|
|
)
|
|
|
|
# Create non-deletable jet
|
|
not_deletable_jet = self._create_jet(
|
|
"Not Deletable Jet",
|
|
"not_deletable_jet_multi",
|
|
)
|
|
not_deletable_jet.write({"deletable": False})
|
|
|
|
# Attempt to unlink both should raise ValidationError
|
|
jets = deletable_jet | not_deletable_jet
|
|
with self.assertRaises(ValidationError) as context:
|
|
jets.unlink()
|
|
|
|
# Verify error message contains non-deletable jet display name
|
|
self.assertIn(
|
|
"cannot be deleted",
|
|
str(context.exception),
|
|
"Error message should mention deletion restriction",
|
|
)
|
|
self.assertIn(
|
|
not_deletable_jet.display_name,
|
|
str(context.exception),
|
|
"Error message should include non-deletable jet display name",
|
|
)
|
|
|
|
# Verify both jets still exist
|
|
self.assertTrue(deletable_jet.exists(), "Deletable jet should not be deleted")
|
|
self.assertTrue(
|
|
not_deletable_jet.exists(), "Non-deletable jet should not be deleted"
|
|
)
|
|
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
# create_waypoint Tests
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
|
|
def test_create_waypoint_with_record_template(self):
|
|
"""
|
|
Test create_waypoint with waypoint template record
|
|
"""
|
|
# Get the default name from the helper function
|
|
default_vals = self.jet_test._prepare_waypoint_values(
|
|
self.waypoint_template, name=None
|
|
)
|
|
expected_default_name = default_vals["name"]
|
|
|
|
# Create waypoint using template record
|
|
waypoint = self.jet_test.create_waypoint(self.waypoint_template)
|
|
|
|
# Should return a waypoint record
|
|
self.assertTrue(waypoint, "Should return a waypoint record")
|
|
self.assertTrue(waypoint.exists(), "Waypoint should exist")
|
|
self.assertEqual(
|
|
waypoint.jet_id.id,
|
|
self.jet_test.id,
|
|
"Waypoint should belong to the jet",
|
|
)
|
|
self.assertEqual(
|
|
waypoint.waypoint_template_id.id,
|
|
self.waypoint_template.id,
|
|
"Waypoint should use the correct template",
|
|
)
|
|
self.assertEqual(
|
|
waypoint.name,
|
|
expected_default_name,
|
|
"Waypoint should have default name from helper function",
|
|
)
|
|
# Reference is auto-generated, so just verify it exists and is not empty
|
|
self.assertTrue(
|
|
waypoint.reference,
|
|
"Waypoint should have an auto-generated reference",
|
|
)
|
|
|
|
def test_create_waypoint_with_string_reference(self):
|
|
"""
|
|
Test create_waypoint with waypoint template string reference
|
|
"""
|
|
# Use the template's reference (mandatory field, always present)
|
|
template_reference = self.waypoint_template.reference
|
|
|
|
# Create waypoint using string reference
|
|
waypoint = self.jet_test.create_waypoint(template_reference)
|
|
|
|
# Should return a waypoint record
|
|
self.assertTrue(waypoint, "Should return a waypoint record")
|
|
self.assertTrue(waypoint.exists(), "Waypoint should exist")
|
|
self.assertEqual(
|
|
waypoint.waypoint_template_id.id,
|
|
self.waypoint_template.id,
|
|
"Waypoint should use the correct template from reference",
|
|
)
|
|
# Reference is auto-generated, so just verify it exists and is not empty
|
|
self.assertTrue(
|
|
waypoint.reference,
|
|
"Waypoint should have an auto-generated reference",
|
|
)
|
|
|
|
def test_create_waypoint_with_name(self):
|
|
"""
|
|
Test create_waypoint with custom name
|
|
"""
|
|
# Create waypoint with custom name
|
|
waypoint = self.jet_test.create_waypoint(
|
|
self.waypoint_template, name="Custom Waypoint Name"
|
|
)
|
|
|
|
# Should return a waypoint record with custom name
|
|
self.assertTrue(waypoint, "Should return a waypoint record")
|
|
self.assertEqual(
|
|
waypoint.name,
|
|
"Custom Waypoint Name",
|
|
"Waypoint should have the custom name",
|
|
)
|
|
# Reference is auto-generated, so just verify it exists and is not empty
|
|
self.assertTrue(
|
|
waypoint.reference,
|
|
"Waypoint should have an auto-generated reference",
|
|
)
|
|
|
|
def test_create_waypoint_with_fly_here(self):
|
|
"""
|
|
Test create_waypoint with fly_here parameter
|
|
Note: fly_here should set is_destination=True, and after prepare()
|
|
the waypoint should automatically fly to if is_destination is True
|
|
"""
|
|
# Create waypoint with fly_here=True
|
|
waypoint = self.jet_test.create_waypoint(self.waypoint_template, fly_here=True)
|
|
|
|
# Should return a waypoint record
|
|
self.assertTrue(waypoint, "Should return a waypoint record")
|
|
self.assertTrue(waypoint.exists(), "Waypoint should exist")
|
|
|
|
# Verify that the waypoint flew to
|
|
# (state should be "current" in synchronous tests)
|
|
self.assertEqual(
|
|
waypoint.state,
|
|
"current",
|
|
"Waypoint should have flown to and "
|
|
"become current (tests run synchronously)",
|
|
)
|
|
|
|
# Verify jet's waypoint_id was updated
|
|
self.assertEqual(
|
|
self.jet_test.waypoint_id.id,
|
|
waypoint.id,
|
|
"Jet's waypoint_id should be updated to the flown-to waypoint",
|
|
)
|
|
|
|
@mute_logger("odoo.addons.cetmix_tower_server.models.cx_tower_jet")
|
|
def test_create_waypoint_jet_busy(self):
|
|
"""
|
|
Test create_waypoint when jet is busy (has target_state_id)
|
|
"""
|
|
# Set jet to busy state (has target_state_id)
|
|
self.jet_test.target_state_id = self.state_running
|
|
|
|
# Try to create waypoint
|
|
with self.assertRaises(ValidationError):
|
|
self.jet_test.create_waypoint(self.waypoint_template)
|
|
|
|
@mute_logger("odoo.addons.cetmix_tower_server.models.cx_tower_jet")
|
|
def test_create_waypoint_template_not_found(self):
|
|
"""
|
|
Test create_waypoint with non-existent template reference
|
|
"""
|
|
# Mute logger error for this test
|
|
with self.assertRaises(ValidationError):
|
|
self.jet_test.create_waypoint("non_existent_reference")
|
|
|
|
@mute_logger("odoo.addons.cetmix_tower_server.models.cx_tower_jet")
|
|
def test_create_waypoint_template_wrong_jet_template(self):
|
|
"""
|
|
Test create_waypoint with template from different jet template
|
|
"""
|
|
# Create a waypoint template for a different jet template
|
|
other_jet_template = self.JetTemplate.create(
|
|
{
|
|
"name": "Other Jet Template",
|
|
"reference": "other_jet_template",
|
|
}
|
|
)
|
|
other_waypoint_template = self.JetWaypointTemplate.create(
|
|
{
|
|
"name": "Other Waypoint Template",
|
|
"jet_template_id": other_jet_template.id,
|
|
}
|
|
)
|
|
|
|
# Mute logger error for this test
|
|
with self.assertRaises(ValidationError):
|
|
# Try to create waypoint with template from different jet template
|
|
self.jet_test.create_waypoint(other_waypoint_template)
|
|
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
# Create a Waypoint command (flight plan) tests
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
|
|
def test_create_waypoint_command_success_fly_here_false(self):
|
|
"""Create a Waypoint command from flight plan: waypoint created, log
|
|
finished by callback."""
|
|
command = self.Command.create(
|
|
{
|
|
"name": "Create waypoint command",
|
|
"action": "create_waypoint",
|
|
"waypoint_template_id": self.waypoint_template.id,
|
|
"fly_here": False,
|
|
}
|
|
)
|
|
plan = self.Plan.create({"name": "Plan create waypoint"})
|
|
self.plan_line.create(
|
|
{
|
|
"plan_id": plan.id,
|
|
"sequence": 10,
|
|
"command_id": command.id,
|
|
}
|
|
)
|
|
initial_waypoint_count = len(self.jet_test.waypoint_ids)
|
|
plan_log = self.server_test_1.sudo().run_flight_plan(plan, jet=self.jet_test)
|
|
self.assertTrue(plan_log, "Plan log should be created")
|
|
command_logs = plan_log.command_log_ids.filtered(
|
|
lambda log: log.command_id == command
|
|
)
|
|
self.assertEqual(
|
|
len(command_logs), 1, "Exactly one command log for create_waypoint"
|
|
)
|
|
log_record = command_logs[0]
|
|
self.assertTrue(
|
|
log_record.finish_date,
|
|
"Command log should be finished by waypoint callback",
|
|
)
|
|
self.assertEqual(
|
|
log_record.command_status,
|
|
0,
|
|
"Command should finish with success",
|
|
)
|
|
self.assertEqual(
|
|
len(self.jet_test.waypoint_ids),
|
|
initial_waypoint_count + 1,
|
|
"One new waypoint should be created",
|
|
)
|
|
new_waypoint = self.jet_test.waypoint_ids.filtered(
|
|
lambda w: w.created_from_command_log_id == log_record
|
|
)
|
|
self.assertEqual(len(new_waypoint), 1, "One waypoint linked to command log")
|
|
new_waypoint = new_waypoint[0]
|
|
self.assertEqual(
|
|
new_waypoint.state,
|
|
"ready",
|
|
"Waypoint should be in ready state (fly_here=False)",
|
|
)
|
|
self.assertEqual(
|
|
new_waypoint.created_from_command_log_id,
|
|
log_record,
|
|
"Waypoint should reference the command log",
|
|
)
|
|
|
|
def test_create_waypoint_command_success_fly_here_true(self):
|
|
"""Create a Waypoint command with fly_here: waypoint becomes current."""
|
|
command = self.Command.create(
|
|
{
|
|
"name": "Create waypoint fly here",
|
|
"action": "create_waypoint",
|
|
"waypoint_template_id": self.waypoint_template.id,
|
|
"fly_here": True,
|
|
}
|
|
)
|
|
plan = self.Plan.create({"name": "Plan create waypoint fly here"})
|
|
self.plan_line.create(
|
|
{
|
|
"plan_id": plan.id,
|
|
"sequence": 10,
|
|
"command_id": command.id,
|
|
}
|
|
)
|
|
plan_log = self.server_test_1.sudo().run_flight_plan(plan, jet=self.jet_test)
|
|
command_logs = plan_log.command_log_ids.filtered(
|
|
lambda log: log.command_id == command
|
|
)
|
|
log_record = command_logs[0]
|
|
self.assertTrue(log_record.finish_date, "Command log should be finished")
|
|
self.assertEqual(log_record.command_status, 0, "Command should succeed")
|
|
waypoints_with_log = self.jet_test.waypoint_ids.filtered(
|
|
lambda w: w.created_from_command_log_id == log_record
|
|
)
|
|
self.assertEqual(
|
|
len(waypoints_with_log),
|
|
1,
|
|
"One waypoint created from command",
|
|
)
|
|
self.assertEqual(
|
|
waypoints_with_log.state,
|
|
"current",
|
|
"Waypoint should be current when fly_here=True",
|
|
)
|
|
self.assertEqual(
|
|
self.jet_test.waypoint_id,
|
|
waypoints_with_log,
|
|
"Jet waypoint_id should point to the new waypoint",
|
|
)
|
|
|
|
def test_create_waypoint_command_no_jet(self):
|
|
"""Create a Waypoint command run without jet: command log finished
|
|
with JET_NOT_FOUND."""
|
|
from ..models.constants import JET_NOT_FOUND
|
|
|
|
command = self.Command.create(
|
|
{
|
|
"name": "Create waypoint no jet",
|
|
"action": "create_waypoint",
|
|
"waypoint_template_id": self.waypoint_template.id,
|
|
"fly_here": False,
|
|
}
|
|
)
|
|
plan = self.Plan.create({"name": "Plan no jet"})
|
|
self.plan_line.create(
|
|
{"plan_id": plan.id, "sequence": 10, "command_id": command.id}
|
|
)
|
|
plan_log = self.server_test_1.sudo().run_flight_plan(plan)
|
|
command_logs = plan_log.command_log_ids.filtered(
|
|
lambda log: log.command_id == command
|
|
)
|
|
self.assertEqual(len(command_logs), 1)
|
|
self.assertEqual(
|
|
command_logs.command_status,
|
|
JET_NOT_FOUND,
|
|
"Should finish with JET_NOT_FOUND when no jet in plan",
|
|
)
|
|
self.assertTrue(command_logs.finish_date)
|
|
|
|
def test_create_waypoint_command_no_template(self):
|
|
"""Create a Waypoint command without waypoint template:
|
|
WAYPOINT_TEMPLATE_NOT_FOUND."""
|
|
from ..models.constants import WAYPOINT_TEMPLATE_NOT_FOUND
|
|
|
|
command = self.Command.create(
|
|
{
|
|
"name": "Create waypoint no template",
|
|
"action": "create_waypoint",
|
|
"fly_here": False,
|
|
}
|
|
)
|
|
plan = self.Plan.create({"name": "Plan no template"})
|
|
self.plan_line.create(
|
|
{"plan_id": plan.id, "sequence": 10, "command_id": command.id}
|
|
)
|
|
plan_log = self.server_test_1.sudo().run_flight_plan(plan, jet=self.jet_test)
|
|
command_logs = plan_log.command_log_ids.filtered(
|
|
lambda log: log.command_id == command
|
|
)
|
|
self.assertEqual(len(command_logs), 1)
|
|
self.assertEqual(
|
|
command_logs.command_status,
|
|
WAYPOINT_TEMPLATE_NOT_FOUND,
|
|
"Should finish with WAYPOINT_TEMPLATE_NOT_FOUND",
|
|
)
|
|
|
|
def test_create_waypoint_command_jet_busy(self):
|
|
"""
|
|
Create a Waypoint when jet is busy (e.g. from flight plan):
|
|
ignore_busy=True, waypoint created, log success.
|
|
"""
|
|
self.jet_test.target_state_id = self.state_running
|
|
command = self.Command.create(
|
|
{
|
|
"name": "Create waypoint jet busy",
|
|
"action": "create_waypoint",
|
|
"waypoint_template_id": self.waypoint_template.id,
|
|
"fly_here": False,
|
|
}
|
|
)
|
|
plan = self.Plan.create({"name": "Plan jet busy"})
|
|
self.plan_line.create(
|
|
{"plan_id": plan.id, "sequence": 10, "command_id": command.id}
|
|
)
|
|
initial_waypoint_count = len(self.jet_test.waypoint_ids)
|
|
with mute_logger("odoo.addons.cetmix_tower_server.models.cx_tower_jet"):
|
|
plan_log = self.server_test_1.sudo().run_flight_plan(
|
|
plan, jet=self.jet_test
|
|
)
|
|
command_logs = plan_log.command_log_ids.filtered(
|
|
lambda log: log.command_id == command
|
|
)
|
|
self.assertEqual(len(command_logs), 1)
|
|
self.assertTrue(
|
|
command_logs.finish_date,
|
|
"Command log should be finished by waypoint callback when jet busy",
|
|
)
|
|
self.assertEqual(
|
|
command_logs.command_status,
|
|
0,
|
|
"Create waypoint command should succeed when jet is busy "
|
|
"(ignore_busy=True)",
|
|
)
|
|
self.assertEqual(
|
|
len(self.jet_test.waypoint_ids),
|
|
initial_waypoint_count + 1,
|
|
"One new waypoint should be created despite jet busy",
|
|
)
|
|
self.jet_test.target_state_id = False
|
|
|
|
def test_create_waypoint_command_wrong_jet_template(self):
|
|
"""Create a Waypoint with template for another jet template: False
|
|
and WAYPOINT_CREATE_FAILED."""
|
|
from ..models.constants import WAYPOINT_CREATE_FAILED
|
|
|
|
other_jet_template = self.JetTemplate.create(
|
|
{
|
|
"name": "Other Jet Template",
|
|
"reference": "other_jet_template_cmd",
|
|
}
|
|
)
|
|
other_waypoint_template = self.JetWaypointTemplate.create(
|
|
{
|
|
"name": "Other Waypoint Template",
|
|
"jet_template_id": other_jet_template.id,
|
|
}
|
|
)
|
|
command = self.Command.create(
|
|
{
|
|
"name": "Create waypoint wrong template",
|
|
"action": "create_waypoint",
|
|
"waypoint_template_id": other_waypoint_template.id,
|
|
"fly_here": False,
|
|
}
|
|
)
|
|
plan = self.Plan.create({"name": "Plan wrong template"})
|
|
self.plan_line.create(
|
|
{"plan_id": plan.id, "sequence": 10, "command_id": command.id}
|
|
)
|
|
with mute_logger("odoo.addons.cetmix_tower_server.models.cx_tower_jet"):
|
|
plan_log = self.server_test_1.sudo().run_flight_plan(
|
|
plan, jet=self.jet_test
|
|
)
|
|
command_logs = plan_log.command_log_ids.filtered(
|
|
lambda log: log.command_id == command
|
|
)
|
|
self.assertEqual(len(command_logs), 1)
|
|
self.assertEqual(
|
|
command_logs.command_status,
|
|
WAYPOINT_CREATE_FAILED,
|
|
"Should finish with WAYPOINT_CREATE_FAILED when template is "
|
|
"for another jet template",
|
|
)
|
|
self.assertTrue(command_logs.finish_date)
|
|
|
|
def test_create_waypoint_command_waypoint_reaches_error(self):
|
|
"""Create plan fails: waypoint goes to error, callback finishes
|
|
command log with error."""
|
|
from ..models.constants import WAYPOINT_CREATE_FAILED
|
|
|
|
fail_command = self.Command.create(
|
|
{
|
|
"name": "Fail command",
|
|
"action": "python_code",
|
|
"code": "result = {'exit_code': 1, 'message': 'fail'}",
|
|
}
|
|
)
|
|
fail_plan = self.Plan.create({"name": "Plan that fails"})
|
|
self.plan_line.create(
|
|
{
|
|
"plan_id": fail_plan.id,
|
|
"sequence": 10,
|
|
"command_id": fail_command.id,
|
|
}
|
|
)
|
|
waypoint_template_with_failing_plan = self.JetWaypointTemplate.create(
|
|
{
|
|
"name": "Waypoint template with failing create plan",
|
|
"jet_template_id": self.jet_template_test.id,
|
|
"plan_create_id": fail_plan.id,
|
|
}
|
|
)
|
|
command = self.Command.create(
|
|
{
|
|
"name": "Create waypoint with failing plan",
|
|
"action": "create_waypoint",
|
|
"waypoint_template_id": waypoint_template_with_failing_plan.id,
|
|
"fly_here": False,
|
|
}
|
|
)
|
|
plan = self.Plan.create({"name": "Plan create waypoint error"})
|
|
self.plan_line.create(
|
|
{"plan_id": plan.id, "sequence": 10, "command_id": command.id}
|
|
)
|
|
plan_log = self.server_test_1.sudo().run_flight_plan(plan, jet=self.jet_test)
|
|
command_logs = plan_log.command_log_ids.filtered(
|
|
lambda log: log.command_id == command
|
|
)
|
|
self.assertEqual(len(command_logs), 1)
|
|
log_record = command_logs[0]
|
|
self.assertTrue(
|
|
log_record.finish_date,
|
|
"Command log should be finished by waypoint callback when "
|
|
"waypoint reaches error",
|
|
)
|
|
self.assertNotEqual(
|
|
log_record.command_status,
|
|
0,
|
|
"Command should finish with error status",
|
|
)
|
|
self.assertEqual(
|
|
log_record.command_status,
|
|
WAYPOINT_CREATE_FAILED,
|
|
"Callback should use WAYPOINT_CREATE_FAILED when plan fails",
|
|
)
|
|
waypoints_with_log = self.jet_test.waypoint_ids.filtered(
|
|
lambda w: w.created_from_command_log_id == log_record
|
|
)
|
|
self.assertEqual(len(waypoints_with_log), 1)
|
|
self.assertEqual(
|
|
waypoints_with_log.state,
|
|
"error",
|
|
"Waypoint should be in error state after create plan fails",
|
|
)
|
|
|
|
def test_finalize_create_waypoint_command_log_double_finish_guard(self):
|
|
"""Calling _finalize_create_waypoint_command_log twice does not
|
|
double-finish."""
|
|
waypoint = self.jet_test.create_waypoint(
|
|
self.waypoint_template,
|
|
created_from_command_log=None,
|
|
)
|
|
log_record = self.CommandLog.create(
|
|
{
|
|
"server_id": self.server_test_1.id,
|
|
"command_id": self.Command.create(
|
|
{
|
|
"name": "Dummy create waypoint",
|
|
"action": "create_waypoint",
|
|
"waypoint_template_id": self.waypoint_template.id,
|
|
}
|
|
).id,
|
|
"start_date": fields.Datetime.now(),
|
|
}
|
|
)
|
|
waypoint.created_from_command_log_id = log_record
|
|
self.assertFalse(log_record.finish_date, "Log should not be finished yet")
|
|
waypoint._finalize_create_waypoint_command_log(success=True)
|
|
self.assertTrue(log_record.finish_date, "Log should be finished once")
|
|
finish_date_first = log_record.finish_date
|
|
waypoint._finalize_create_waypoint_command_log(success=True)
|
|
self.assertEqual(
|
|
log_record.finish_date,
|
|
finish_date_first,
|
|
"Second call should not change finish_date (guard)",
|
|
)
|