1704 lines
61 KiB
Python
1704 lines
61 KiB
Python
# Copyright (C) 2024 Cetmix OÜ
|
|
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
|
|
import ast
|
|
import logging
|
|
|
|
from odoo import _, api, fields, models
|
|
from odoo.exceptions import AccessError, ValidationError
|
|
|
|
from .constants import (
|
|
JET_ACTION_NOT_AVAILABLE,
|
|
JET_DEPENDENCIES_NOT_SATISFIED,
|
|
JET_STATE_ERROR,
|
|
)
|
|
from .tools import generate_random_id
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
|
|
class CxTowerJet(models.Model):
|
|
"""Jets represent application instances that can be managed independently"""
|
|
|
|
_name = "cx.tower.jet"
|
|
_description = "Cetmix Tower Jet"
|
|
_inherit = [
|
|
"cx.tower.reference.mixin",
|
|
"cx.tower.variable.mixin",
|
|
"cx.tower.metadata.mixin",
|
|
"mail.thread",
|
|
"mail.activity.mixin",
|
|
"cx.tower.tag.mixin",
|
|
"cx.tower.access.role.mixin",
|
|
]
|
|
_order = "sequence, name"
|
|
_mail_post_access = "read"
|
|
|
|
active = fields.Boolean(default=True)
|
|
deletable = fields.Boolean(
|
|
readonly=True,
|
|
default=True,
|
|
help="This field is set by the jet actions. "
|
|
"If enabled, the jet can be deleted",
|
|
)
|
|
url = fields.Char(string="URL", help="Jet URL, eg 'https://meme.example.com'")
|
|
color = fields.Integer(related="state_id.color", readonly=True)
|
|
icon = fields.Image(
|
|
string="Icon image",
|
|
related="jet_template_id.icon",
|
|
readonly=True,
|
|
store=False,
|
|
help="Jet icon, computed from the template by default",
|
|
)
|
|
sequence = fields.Integer(default=10, help="Used to sort jets in views")
|
|
partner_id = fields.Many2one(
|
|
comodel_name="res.partner",
|
|
help="Partner associated with this jet",
|
|
)
|
|
note = fields.Text()
|
|
|
|
jet_cloned_from_id = fields.Many2one(
|
|
comodel_name="cx.tower.jet",
|
|
string="Cloned from",
|
|
readonly=True,
|
|
copy=False,
|
|
help="Jet this jet was cloned from. "
|
|
"This field is set when the jet is cloned from another jet.",
|
|
)
|
|
|
|
jet_template_id = fields.Many2one(
|
|
comodel_name="cx.tower.jet.template",
|
|
required=True,
|
|
ondelete="restrict",
|
|
help="Template that this jet is based on",
|
|
)
|
|
jet_template_domain = fields.Binary(
|
|
compute="_compute_jet_template_domain",
|
|
)
|
|
server_id = fields.Many2one(
|
|
comodel_name="cx.tower.server",
|
|
required=True,
|
|
ondelete="restrict",
|
|
help="Server where this jet is running",
|
|
)
|
|
server_allowed_domain = fields.Binary(
|
|
compute="_compute_server_allowed_domain",
|
|
)
|
|
file_ids = fields.One2many(
|
|
comodel_name="cx.tower.file",
|
|
inverse_name="jet_id",
|
|
string="Files",
|
|
help="Files of this jet",
|
|
)
|
|
server_log_ids = fields.One2many(
|
|
comodel_name="cx.tower.server.log",
|
|
inverse_name="jet_id",
|
|
copy=False,
|
|
)
|
|
scheduled_task_ids = fields.Many2many(
|
|
comodel_name="cx.tower.scheduled.task",
|
|
relation="cx_tower_scheduled_task_jet_rel",
|
|
column1="jet_id",
|
|
column2="scheduled_task_id",
|
|
string="Scheduled Tasks",
|
|
)
|
|
|
|
# -- Jet Requests
|
|
served_jet_request_id = fields.Many2one(
|
|
comodel_name="cx.tower.jet.request",
|
|
help="Request this jet is currently serving",
|
|
readonly=True,
|
|
copy=False,
|
|
)
|
|
|
|
# -- Dependencies
|
|
jet_requires_ids = fields.One2many(
|
|
comodel_name="cx.tower.jet.dependency",
|
|
inverse_name="jet_id",
|
|
string="Requires",
|
|
help="Other jets this jet depends on",
|
|
compute="_compute_jet_requires_ids",
|
|
store=True,
|
|
groups="cetmix_tower_server.group_manager",
|
|
copy=False,
|
|
)
|
|
jet_required_by_ids = fields.One2many(
|
|
comodel_name="cx.tower.jet.dependency",
|
|
inverse_name="jet_depends_on_id",
|
|
string="Required By",
|
|
help="Jets that depend on this jet",
|
|
groups="cetmix_tower_server.group_manager",
|
|
copy=False,
|
|
readonly=True,
|
|
)
|
|
|
|
# -- States and actions
|
|
state_id = fields.Many2one(
|
|
comodel_name="cx.tower.jet.state",
|
|
string="Current State",
|
|
tracking=True,
|
|
domain="[('id', 'in', jet_template_state_ids)]",
|
|
copy=False,
|
|
)
|
|
state = fields.Char(
|
|
related="state_id.reference",
|
|
readonly=True,
|
|
store=True,
|
|
index=True,
|
|
string="State Reference",
|
|
help="Current state of the jet. "
|
|
"NB: this is "
|
|
"the reference of the state, not the name.",
|
|
)
|
|
jet_template_state_ids = fields.One2many(
|
|
comodel_name="cx.tower.jet.state",
|
|
compute="_compute_state_available_ids",
|
|
)
|
|
state_available_ids = fields.One2many(
|
|
comodel_name="cx.tower.jet.state",
|
|
compute="_compute_state_available_ids",
|
|
help="Available states for the jet. "
|
|
"Click on the button to transition to the state.",
|
|
copy=False,
|
|
)
|
|
|
|
target_state_id = fields.Many2one(
|
|
comodel_name="cx.tower.jet.state",
|
|
string="Target State",
|
|
readonly=True,
|
|
copy=False,
|
|
help="Destination state to which the jet is currently transitioning",
|
|
)
|
|
show_available_states = fields.Boolean(
|
|
help="Show available states in the jet view",
|
|
compute="_compute_show_available_states",
|
|
inverse="_inverse_show_available_states",
|
|
groups="cetmix_tower_server.group_manager",
|
|
)
|
|
action_available_ids = fields.Many2many(
|
|
comodel_name="cx.tower.jet.action",
|
|
compute="_compute_available_actions",
|
|
string="Available Actions",
|
|
help="Available actions for the jet. "
|
|
"Click on the button to trigger the action.",
|
|
)
|
|
current_action_id = fields.Many2one(
|
|
comodel_name="cx.tower.jet.action",
|
|
string="Executing Action",
|
|
readonly=True,
|
|
copy=False,
|
|
)
|
|
current_command_log_id = fields.Many2one(
|
|
comodel_name="cx.tower.command.log",
|
|
string="Executing Command Log",
|
|
groups="cetmix_tower_server.group_manager",
|
|
readonly=True,
|
|
copy=False,
|
|
)
|
|
|
|
# -- Waypoints
|
|
is_waypoints_available = fields.Boolean(
|
|
compute="_compute_is_waypoints_available",
|
|
readonly=True,
|
|
)
|
|
waypoint_ids = fields.One2many(
|
|
comodel_name="cx.tower.jet.waypoint",
|
|
inverse_name="jet_id",
|
|
string="Waypoints",
|
|
help="Waypoints of the jet",
|
|
copy=False,
|
|
)
|
|
waypoint_id = fields.Many2one(
|
|
comodel_name="cx.tower.jet.waypoint",
|
|
help="Current waypoint of the jet",
|
|
readonly=True,
|
|
copy=False,
|
|
tracking=True,
|
|
)
|
|
|
|
# -- Variables used for configuration
|
|
variable_value_ids = fields.One2many(
|
|
inverse_name="jet_id",
|
|
)
|
|
|
|
# -- Logs
|
|
command_log_ids = fields.One2many(
|
|
comodel_name="cx.tower.command.log",
|
|
inverse_name="jet_id",
|
|
copy=False,
|
|
)
|
|
plan_log_ids = fields.One2many(
|
|
comodel_name="cx.tower.plan.log",
|
|
inverse_name="jet_id",
|
|
copy=False,
|
|
)
|
|
|
|
# -- Access. Add relation for mixin fields
|
|
user_ids = fields.Many2many(
|
|
relation="cx_tower_jet_user_rel",
|
|
)
|
|
manager_ids = fields.Many2many(
|
|
relation="cx_tower_jet_manager_rel",
|
|
)
|
|
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
# Compute methods
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
@api.depends("name", "state_id")
|
|
def _compute_display_name(self):
|
|
"""Compute the display name of the jet"""
|
|
for jet in self:
|
|
jet.display_name = f"{jet.name} ({jet.state})" if jet.state else jet.name
|
|
|
|
@api.depends("server_id")
|
|
def _compute_jet_template_domain(self):
|
|
"""Compute the domain of the jet template"""
|
|
for jet in self:
|
|
jet.jet_template_domain = (
|
|
[("server_ids", "in", [jet.server_id.id])] if jet.server_id else []
|
|
)
|
|
|
|
@api.depends("jet_template_id")
|
|
def _compute_server_allowed_domain(self):
|
|
"""Compute the domain of the server allowed"""
|
|
for jet in self:
|
|
jet.server_allowed_domain = (
|
|
[("id", "in", jet.jet_template_id.server_ids.ids)]
|
|
if jet.jet_template_id and jet.jet_template_id.server_ids
|
|
else []
|
|
)
|
|
|
|
@api.depends("jet_template_id", "jet_template_id.action_ids")
|
|
def _compute_state_available_ids(self):
|
|
"""Compute the available states for the jet"""
|
|
for jet in self:
|
|
if not jet.jet_template_id:
|
|
jet.update(
|
|
{
|
|
"jet_template_state_ids": False,
|
|
"state_available_ids": False,
|
|
}
|
|
)
|
|
continue
|
|
actions = jet.jet_template_id.action_ids
|
|
if not actions:
|
|
jet.update(
|
|
{
|
|
"jet_template_state_ids": False,
|
|
"state_available_ids": False,
|
|
}
|
|
)
|
|
continue
|
|
# Compute effective access level for the user
|
|
effective_user_access_level = jet._get_user_effective_access_level()
|
|
jet.update(
|
|
{
|
|
"jet_template_state_ids": actions.state_from_id
|
|
| actions.state_transit_id
|
|
| actions.state_to_id,
|
|
"state_available_ids": (
|
|
actions.state_to_id - jet.state_id
|
|
).filtered(
|
|
lambda s,
|
|
access_level=effective_user_access_level: s.access_level
|
|
<= access_level
|
|
),
|
|
}
|
|
)
|
|
|
|
@api.depends(
|
|
"state_id",
|
|
"jet_template_id",
|
|
"jet_template_id.action_ids",
|
|
"jet_template_id.action_ids.state_from_id",
|
|
"jet_template_id.action_ids.state_to_id",
|
|
"jet_template_id.action_ids.priority",
|
|
)
|
|
def _compute_available_actions(self):
|
|
"""Compute available actions based on current state and template"""
|
|
for jet in self:
|
|
if not jet.jet_template_id:
|
|
jet.action_available_ids = False
|
|
continue
|
|
|
|
# Find actions in the template that start from the current state
|
|
actions = jet.jet_template_id.action_ids.filtered(
|
|
lambda a, state=jet.state_id: a.state_from_id == state
|
|
)
|
|
jet.update({"action_available_ids": actions})
|
|
|
|
@api.depends("jet_template_id", "jet_template_id.template_requires_ids")
|
|
def _compute_jet_requires_ids(self):
|
|
"""Compute the dependencies of the jets"""
|
|
for jet in self:
|
|
jet_template_dependencies = jet.jet_template_id.template_requires_ids
|
|
|
|
final_vals = []
|
|
|
|
# 1. Check removed dependencies
|
|
if jet_template_dependencies:
|
|
jet_dependencies_to_remove = jet.jet_requires_ids.filtered(
|
|
lambda d,
|
|
jtd=jet_template_dependencies: d.jet_template_dependency_id
|
|
not in jtd
|
|
)
|
|
else:
|
|
jet_dependencies_to_remove = jet.jet_requires_ids
|
|
|
|
if jet_dependencies_to_remove:
|
|
final_vals = [(3, dep.id) for dep in jet_dependencies_to_remove]
|
|
|
|
# Check new template dependencies
|
|
if jet_template_dependencies:
|
|
if jet.jet_requires_ids:
|
|
new_jet_template_dependencies = jet_template_dependencies.filtered(
|
|
lambda d, j=jet: d.id
|
|
not in j.jet_requires_ids.jet_template_dependency_id.ids
|
|
)
|
|
else:
|
|
new_jet_template_dependencies = jet_template_dependencies
|
|
for dep in new_jet_template_dependencies:
|
|
final_vals.append(
|
|
(
|
|
0,
|
|
0,
|
|
{
|
|
"jet_id": jet.id,
|
|
"jet_template_dependency_id": dep.id,
|
|
},
|
|
)
|
|
)
|
|
if final_vals:
|
|
jet.jet_requires_ids = final_vals
|
|
|
|
@api.depends_context("uid")
|
|
def _compute_show_available_states(self):
|
|
"""Compute if available states should be shown for the jet"""
|
|
# Set all records at once to avoid multiple writes
|
|
self.show_available_states = (
|
|
self.env.user.cetmix_tower_show_jet_available_states
|
|
)
|
|
|
|
def _inverse_show_available_states(self):
|
|
"""Inverse the show available states for the jet"""
|
|
for jet in self:
|
|
if jet.show_available_states is not None:
|
|
jet.env.user.cetmix_tower_show_jet_available_states = (
|
|
jet.show_available_states
|
|
)
|
|
|
|
@api.depends("jet_template_id", "jet_template_id.waypoint_template_ids")
|
|
def _compute_is_waypoints_available(self):
|
|
"""Compute if waypoints are available for the jet"""
|
|
for jet in self:
|
|
jet.is_waypoints_available = bool(jet.jet_template_id.waypoint_template_ids)
|
|
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
# Constraints
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
@api.constrains("server_id", "jet_template_id")
|
|
def _check_jet_limit_per_server(self):
|
|
"""Check if the jet limit per server is reached"""
|
|
for jet in self:
|
|
if (
|
|
jet.jet_template_id.limit_per_server
|
|
and jet.jet_template_id.limit_per_server > 0
|
|
):
|
|
if jet.jet_template_id.limit_per_server < len(
|
|
jet.jet_template_id.jet_ids.filtered(
|
|
lambda j, s=jet.server_id: j.server_id == s
|
|
)
|
|
):
|
|
raise ValidationError(
|
|
_(
|
|
"Jet limit per server reached for"
|
|
" '%(jet)s' on server '%(server)s'!",
|
|
jet=jet.display_name,
|
|
server=jet.server_id.display_name,
|
|
)
|
|
)
|
|
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
# ORM methods
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
@api.model_create_multi
|
|
def create(self, vals_list):
|
|
"""
|
|
Create jets
|
|
- Generate jet reference if not provided
|
|
"""
|
|
|
|
for vals in vals_list:
|
|
if not vals.get("reference"):
|
|
vals["reference"] = generate_random_id(
|
|
sections=3, population=4, separator="_"
|
|
)
|
|
jets = super().create(vals_list)
|
|
|
|
# Create server logs and scheduled tasks
|
|
for jet in jets:
|
|
# Server logs
|
|
for server_log in jet.jet_template_id.server_log_ids:
|
|
jet_log = server_log.copy(
|
|
{
|
|
"jet_id": jet.id,
|
|
"server_id": jet.server_id.id,
|
|
"jet_template_id": False,
|
|
}
|
|
)
|
|
if server_log.log_type == "file":
|
|
jet_log.file_id = server_log.file_template_id.create_file(
|
|
server=jet.server_id, jet=jet, if_file_exists="skip"
|
|
).id
|
|
|
|
# Scheduled tasks
|
|
jet.scheduled_task_ids = jet.jet_template_id.scheduled_task_ids
|
|
|
|
return jets
|
|
|
|
def write(self, vals):
|
|
"""Handle the entry into the new state"""
|
|
# Allow modifications in install mode only to load demo data
|
|
if ("jet_template_id" in vals or "server_id" in vals) and not (
|
|
self._context.get("install_mode") and self._context.get("install_xmlid")
|
|
):
|
|
raise ValidationError(
|
|
_(
|
|
"Jet template and server cannot be changed"
|
|
" once the jet is created!"
|
|
)
|
|
)
|
|
if "state_id" in vals:
|
|
for jet in self:
|
|
jet._on_state_exit(state=jet.state_id)
|
|
res = super().write(vals)
|
|
for jet in self:
|
|
jet._on_state_enter(state=jet.state_id)
|
|
else:
|
|
res = super().write(vals)
|
|
return res
|
|
|
|
def unlink(self):
|
|
"""
|
|
Unlink all related files
|
|
"""
|
|
|
|
# Check if the jet is deletable
|
|
not_deletable_jets = self.filtered(lambda j: not j.deletable)
|
|
if not_deletable_jets:
|
|
raise ValidationError(
|
|
_(
|
|
"Following jets cannot be deleted as they are not deletable: %s",
|
|
not_deletable_jets.mapped("display_name"),
|
|
)
|
|
)
|
|
files = self.file_ids
|
|
res = super().unlink()
|
|
|
|
# Unlink files only after the records are deleted
|
|
# This is done to avoid deleting the files while
|
|
# the 'unlink' method fails due to some reason.
|
|
if files:
|
|
files.unlink()
|
|
return res
|
|
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
# Odoo Actions
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
def action_run_command(self):
|
|
"""
|
|
Returns wizard action to select command and run it
|
|
"""
|
|
context = self.env.context.copy()
|
|
context["default_jet_ids"] = self.ids
|
|
return {
|
|
"type": "ir.actions.act_window",
|
|
"name": _("Run Command"),
|
|
"res_model": "cx.tower.command.run.wizard",
|
|
"view_mode": "form",
|
|
"target": "new",
|
|
"context": context,
|
|
}
|
|
|
|
def action_run_flight_plan(self):
|
|
"""
|
|
Returns wizard action to select flightplan and run it
|
|
"""
|
|
context = self.env.context.copy()
|
|
context["default_jet_ids"] = self.ids
|
|
return {
|
|
"type": "ir.actions.act_window",
|
|
"name": _("Run Flight Plan"),
|
|
"res_model": "cx.tower.plan.run.wizard",
|
|
"view_mode": "form",
|
|
"target": "new",
|
|
"context": context,
|
|
}
|
|
|
|
def action_open_command_logs(self):
|
|
"""
|
|
Open current server command log records
|
|
"""
|
|
self.ensure_one()
|
|
action = self.env["ir.actions.actions"]._for_xml_id(
|
|
"cetmix_tower_server.action_cx_tower_command_log"
|
|
)
|
|
action["domain"] = [("jet_id", "=", self.id)] # pylint: disable=no-member
|
|
return action
|
|
|
|
def action_open_plan_logs(self):
|
|
"""
|
|
Open current server flightplan log records
|
|
"""
|
|
self.ensure_one()
|
|
action = self.env["ir.actions.actions"]._for_xml_id(
|
|
"cetmix_tower_server.action_cx_tower_plan_log"
|
|
)
|
|
action["domain"] = [("jet_id", "=", self.id)] # pylint: disable=no-member
|
|
return action
|
|
|
|
def action_open_state_wizard(self):
|
|
"""Open the jet state wizard"""
|
|
context = self.env.context.copy()
|
|
context["default_jet_ids"] = [(6, 0, self.ids)]
|
|
action = {
|
|
"type": "ir.actions.act_window",
|
|
"res_model": "cx.tower.jet.state.wizard",
|
|
"view_mode": "form",
|
|
"target": "new",
|
|
"context": context,
|
|
}
|
|
return action
|
|
|
|
def action_open_action_wizard(self):
|
|
"""Open the jet action wizard"""
|
|
context = self.env.context.copy()
|
|
context["default_jet_ids"] = [(6, 0, self.ids)]
|
|
action = {
|
|
"type": "ir.actions.act_window",
|
|
"res_model": "cx.tower.jet.action.wizard",
|
|
"view_mode": "form",
|
|
"target": "new",
|
|
"context": context,
|
|
}
|
|
return action
|
|
|
|
def action_open_files(self):
|
|
"""
|
|
Open files of the current server
|
|
"""
|
|
self.ensure_one()
|
|
action = self.env["ir.actions.actions"]._for_xml_id(
|
|
"cetmix_tower_server.cx_tower_file_action"
|
|
)
|
|
action["domain"] = [("jet_id", "=", self.id)] # pylint: disable=no-member
|
|
|
|
context = self._context.copy()
|
|
if "context" in action and isinstance((action["context"]), str):
|
|
context.update(ast.literal_eval(action["context"]))
|
|
else:
|
|
context.update(action.get("context", {}))
|
|
|
|
# Remove group_by from context
|
|
context.pop("group_by", None)
|
|
context.update(
|
|
{
|
|
"default_jet_id": self.id,
|
|
"default_server_id": self.server_id.id,
|
|
}
|
|
)
|
|
action["context"] = context
|
|
return action
|
|
|
|
def action_open_requires_jets(self):
|
|
"""
|
|
Open required jets of the current jet
|
|
"""
|
|
self.ensure_one()
|
|
action = self.env["ir.actions.actions"]._for_xml_id(
|
|
"cetmix_tower_server.cx_tower_jet_action"
|
|
)
|
|
action["domain"] = [("jet_required_by_ids.jet_id", "=", self.id)] # pylint: disable=no-member
|
|
return action
|
|
|
|
def action_open_required_by_jets(self):
|
|
"""
|
|
Open dependant jets of the current jet
|
|
"""
|
|
self.ensure_one()
|
|
action = self.env["ir.actions.actions"]._for_xml_id(
|
|
"cetmix_tower_server.cx_tower_jet_action"
|
|
)
|
|
action["domain"] = [("jet_requires_ids.jet_depends_on_id", "=", self.id)] # pylint: disable=no-member
|
|
return action
|
|
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
# General functions
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
def _get_user_effective_access_level(self):
|
|
"""
|
|
Get the effective access level for the current user.
|
|
If user is manager but is not added as a manager to the jet,
|
|
his access level is considered as user.
|
|
Returns:
|
|
str: The effective access level for the current user.
|
|
see _selection_access_level() in cx.tower.access.mixin
|
|
"""
|
|
self.ensure_one()
|
|
user_access_level = self.env.user._cetmix_tower_access_level()
|
|
if user_access_level == "2" and self.env.user not in self.manager_ids:
|
|
return "1"
|
|
return user_access_level
|
|
|
|
def get_variable_value(self, variable_reference, no_fallback=False):
|
|
"""
|
|
Return the value of a variable for the current jet.
|
|
NB: this function follows the value application order.
|
|
Jet->Jet Template->Server->Global
|
|
|
|
Args:
|
|
variable_reference (Char): The reference of the variable
|
|
to get the value for
|
|
no_fallback (bool): If True, will return current record value
|
|
without checking fallback values.
|
|
|
|
Returns:
|
|
str: The value of the variable for the current record or None
|
|
"""
|
|
self.ensure_one()
|
|
if no_fallback:
|
|
return super().get_variable_value(variable_reference, no_fallback)
|
|
variable = self.env["cx.tower.variable"].get_by_reference(variable_reference)
|
|
if not variable:
|
|
return None
|
|
values = variable._get_variable_values_by_references(
|
|
variable_references=[variable_reference], jet=self
|
|
)
|
|
return values[variable_reference]
|
|
|
|
def run_command(
|
|
self,
|
|
command,
|
|
path=None,
|
|
sudo=None,
|
|
ssh_connection=None,
|
|
**kwargs,
|
|
):
|
|
"""Run command on selected Jet.
|
|
A helper function that calls the corresponding server function.
|
|
|
|
Important: this method raises an exception if the jet
|
|
is currently executing an action.
|
|
You should handle this exception in your code.
|
|
|
|
Args:
|
|
command (cx.tower.command()): Command record
|
|
path (Char): directory where command is run.
|
|
Provide in case you need to override default command value
|
|
sudo (Boolean): use sudo
|
|
Defaults to None
|
|
ssh_connection (SSH client instance, optional): SSH connection.
|
|
Pass to reuse existing connection.
|
|
This is useful in case you would like to speed up
|
|
the ssh command running. Returns:
|
|
|
|
Returns:
|
|
dict(): command running result if `no_command_log`
|
|
context value == True else None
|
|
"""
|
|
self.ensure_one()
|
|
|
|
# Raise an exception if jets is currently executing an action
|
|
if self.current_action_id:
|
|
raise ValidationError(
|
|
_(
|
|
"Jet '%(jet)s' is currently executing an action",
|
|
jet=self.display_name,
|
|
)
|
|
)
|
|
|
|
return self.server_id.run_command(
|
|
command=command,
|
|
path=path,
|
|
sudo=sudo,
|
|
ssh_connection=ssh_connection,
|
|
jet=self,
|
|
**kwargs,
|
|
)
|
|
|
|
def run_flight_plan(self, flight_plan, jet_template=None, **kwargs):
|
|
"""
|
|
Runs flight plan on the current jet.
|
|
|
|
Important: this method raises an exception if the jet
|
|
is currently executing an action.
|
|
You should handle this exception in your code.
|
|
|
|
Args:
|
|
flight_plan (cx.tower.plan()): flight plan to run
|
|
jet_template (cx.tower.jet.template()): jet template
|
|
to run the flight plan on
|
|
kwargs (dict): Optional arguments
|
|
Following are supported but not limited to:
|
|
- "plan_log": {values passed to flightplan logger}
|
|
- "log": {values passed to logger}
|
|
- "key": {values passed to key parser}
|
|
- "variable_values", dict(): custom variable values
|
|
in the format of `{variable_reference: variable_value}`
|
|
eg `{'odoo_version': '16.0'}`
|
|
Will be applied only if user has write access to the server.
|
|
Raises:
|
|
ValidationError: If the jet is currently executing an action.
|
|
Returns:
|
|
log_record (cx.tower.plan.log()): plan log record
|
|
"""
|
|
|
|
self.ensure_one()
|
|
|
|
# Raise an exception if jets is currently executing an action
|
|
# TODO: keep an eye on this method in case we use it
|
|
# directly in actions.
|
|
if self.current_action_id:
|
|
raise ValidationError(
|
|
_(
|
|
"Jet '%(jet)s' is currently executing an action",
|
|
jet=self.display_name,
|
|
)
|
|
)
|
|
|
|
return self.server_id.run_flight_plan(
|
|
flight_plan=flight_plan,
|
|
jet_template=jet_template,
|
|
jet=self,
|
|
**kwargs,
|
|
)
|
|
|
|
def bring_to_state(self, state_reference):
|
|
"""
|
|
Bring the jet to a specific state.
|
|
This is a wrapper around the _bring_to_state method meant to be used
|
|
in various automatic actions.
|
|
|
|
IMPORTANT: alway prefer using this method over the _bring_to_state method
|
|
in automation (eg Python commands) because it will check the access level
|
|
of the user to the state and raise an exception if the user is not allowed
|
|
to set the state.
|
|
|
|
Use `_bring_to_state` method directly if you want to provide a state
|
|
object instead of a reference.
|
|
|
|
Args:
|
|
state_reference (Char): The reference of the state to bring the jet to.
|
|
Returns:
|
|
The jet is brought into the target state.
|
|
In case of an error, the jet is brought into the error state
|
|
if the latter is defined.
|
|
|
|
Raises:
|
|
ValidationError: If the state is not found.
|
|
AccessError: If the user is not allowed to set the state.
|
|
"""
|
|
self.ensure_one()
|
|
state = self.env["cx.tower.jet.state"].get_by_reference(state_reference)
|
|
if not state:
|
|
raise ValidationError(
|
|
_(
|
|
"State '%(state)s' not found for jet '%(jet)s'",
|
|
state=state_reference,
|
|
jet=self.display_name,
|
|
)
|
|
)
|
|
|
|
if state.access_level > self._get_user_effective_access_level():
|
|
raise AccessError(
|
|
_("You are not allowed to set the '%(state)s' state!", state=state.name)
|
|
)
|
|
|
|
self._bring_to_state(state)
|
|
|
|
def clone(self, server=None, name=None, state=None, **kwargs):
|
|
"""
|
|
Create a new jet from this template on the given server.
|
|
|
|
Following configuration variables will be available in the flight plan:
|
|
`__original_jet__`: The reference of the original jet
|
|
`__requested_state__`: The reference of the requested state
|
|
the new jet was requested to be in.
|
|
|
|
Use these variables in the flight plan to identify the original jet
|
|
and the requested state.
|
|
|
|
Args:
|
|
server (cx.tower.server()): The server to clone the jet on.
|
|
If not provided, the jet will be cloned on the same server.
|
|
name (str): The name of the new jet.
|
|
If not provided, a random name will be generated.
|
|
state (cx.tower.jet.state()): The state to bring the new jet to.
|
|
|
|
Kwargs:
|
|
field values to populate in the new jet record.
|
|
NB: configuration variables are provided as follows:
|
|
(dict): Custom configuration variables
|
|
Following format is used:
|
|
`variable_reference`: `variable_value_char`
|
|
eg:
|
|
{'branch': 'prod', 'odoo_version': '16.0'}
|
|
Returns:
|
|
cx.tower.jet(): The new jet or False if the cloning has failed
|
|
"""
|
|
self.ensure_one()
|
|
|
|
jet_template = self.jet_template_id
|
|
if not server:
|
|
server = self.server_id
|
|
same_server = True
|
|
else:
|
|
same_server = server.id == self.server_id.id
|
|
|
|
# Check if template allows cloning on the same server
|
|
if same_server and not jet_template.plan_clone_same_server_id:
|
|
raise ValidationError(
|
|
_(
|
|
"Cloning on the same server is not allowed"
|
|
" for template '%(template)s'",
|
|
template=jet_template.name,
|
|
)
|
|
)
|
|
# Check if template allows cloning to a different server
|
|
if not same_server and not jet_template.plan_clone_different_server_id:
|
|
raise ValidationError(
|
|
_(
|
|
"Cloning to a different server is not allowed"
|
|
" for template '%(template)s'",
|
|
template=jet_template.name,
|
|
)
|
|
)
|
|
# Check if the jet creation is allowed on the given server
|
|
if not jet_template._allow_jet_creation(server):
|
|
return False
|
|
|
|
# Prepare the jet custom values
|
|
kwargs.update(
|
|
{
|
|
"jet_cloned_from_id": self.id,
|
|
}
|
|
)
|
|
|
|
# Create a new jet
|
|
jet = jet_template.create_jet(
|
|
server, name=name or self._default_cloned_jet_name(), **kwargs
|
|
)
|
|
|
|
# Set scheduled tasks of the original jet to the new jet
|
|
jet.scheduled_task_ids = self.scheduled_task_ids
|
|
|
|
# Set server logs of the original jet to the new jet
|
|
# Delete the server logs of the new jet if the original jet
|
|
# has no server logs
|
|
if self.server_log_ids:
|
|
jet.server_log_ids = [
|
|
log.copy({"jet_id": False, "server_id": False}).id
|
|
for log in self.server_log_ids
|
|
]
|
|
# Create files for file-type server logs
|
|
for jet_log in jet.server_log_ids:
|
|
if jet_log.log_type == "command":
|
|
continue
|
|
if jet_log.log_type == "file":
|
|
jet_log.file_id = jet_log.file_template_id.create_file(
|
|
server=jet.server_id, jet=jet, if_file_exists="skip"
|
|
).id
|
|
else:
|
|
jet.server_log_ids.unlink()
|
|
|
|
# NB: we are not passing the state as we need to run
|
|
# the clone flight plan first.
|
|
# The plan should take care of the state transition
|
|
# using the configuration variables.
|
|
# Update the custom values in the kwargs
|
|
|
|
variable_values = {
|
|
"__original_jet__": self.reference,
|
|
"__original_server__": self.server_id.reference,
|
|
"__requested_jet_state__": state.reference if state else None,
|
|
}
|
|
|
|
if same_server and jet_template.plan_clone_same_server_id:
|
|
jet.run_flight_plan(
|
|
jet_template.plan_clone_same_server_id, variable_values=variable_values
|
|
)
|
|
elif not same_server and jet_template.plan_clone_different_server_id:
|
|
jet.run_flight_plan(
|
|
jet_template.plan_clone_different_server_id,
|
|
variable_values=variable_values,
|
|
)
|
|
|
|
return jet
|
|
|
|
def _default_cloned_jet_name(self):
|
|
"""Return default cloned jet name"""
|
|
self.ensure_one()
|
|
return f"{self.name} (clone)"
|
|
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
# Jet actions, state transitions, jet requests
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
def _trigger_action(
|
|
self, action, from_transition=False, raise_if_not_available=True, **kwargs
|
|
):
|
|
"""Trigger an action on the jet.
|
|
|
|
The function flow is:
|
|
|
|
1. Bring the jet into the transit state.
|
|
2. Execute the flight plan if defined.
|
|
3. Bring the jet into the target state.
|
|
|
|
Success:
|
|
The jet is brought into the target state.
|
|
|
|
Error:
|
|
The jet is brought into the error state if it is defined.
|
|
Otherwise, the jet is brought into the initial state.
|
|
|
|
|
|
Args:
|
|
action (cx.tower.jet.action()): The action to trigger
|
|
from_transition (bool): True if the action is triggered
|
|
from a transition.
|
|
This is used to distinguish between a user directly
|
|
triggering the action and a transition from one state
|
|
to another.
|
|
raise_if_not_available (bool):
|
|
True if the function should raise an exception
|
|
if the action is not available for this jet.
|
|
**kwargs: Additional arguments:
|
|
- current_command_log: Optional command log record to track execution
|
|
|
|
Returns:
|
|
dict: A dictionary with the following keys:
|
|
- status: The status of the action
|
|
- error: The error message if the action is not available
|
|
|
|
Raises:
|
|
ValidationError: If the action is not available for this jet.
|
|
"""
|
|
self.ensure_one()
|
|
|
|
# TODO: put action in the queue if jet is busy
|
|
|
|
# Action properties must be accessible despite of the user group
|
|
action = action.sudo()
|
|
|
|
# Get current command log
|
|
current_command_log = kwargs.get("current_command_log")
|
|
|
|
# Ensure the action is available for this jet
|
|
if action.id not in self.action_available_ids.ids:
|
|
error = _(
|
|
"Action '%(action)s' is not available for jet"
|
|
" '%(jet)s' in state '%(state)s'",
|
|
action=action.name,
|
|
jet=self.name, # pylint: disable=no-member
|
|
state=self.state_id.name if self.state_id else _("Undefined"),
|
|
)
|
|
if raise_if_not_available:
|
|
raise ValidationError(error)
|
|
if current_command_log:
|
|
current_command_log.finish(
|
|
status=JET_ACTION_NOT_AVAILABLE,
|
|
error=error,
|
|
)
|
|
return {"status": JET_ACTION_NOT_AVAILABLE, "error": error}
|
|
|
|
# Update the jet state
|
|
transit_state = action.state_transit_id
|
|
target_state = action.state_to_id
|
|
|
|
# Check if the jet is already in the target state
|
|
# TODO: handle the case when destination state
|
|
# is the same as the current state.
|
|
# Eg when a jet is restarted.
|
|
if self.state_id == target_state and from_transition:
|
|
self.sudo().write({"target_state_id": None})
|
|
self._finalize_transition(failed=False)
|
|
return {"status": 0, "error": None}
|
|
|
|
# Set target state if not already set
|
|
if not self.target_state_id:
|
|
self.sudo().write({"target_state_id": target_state})
|
|
|
|
# Check if all dependencies are satisfied
|
|
# if starting from an undefined state.
|
|
# This is typical for a newly created jet.
|
|
if not self.state_id and not self._control_dependencies():
|
|
# The process will be resumed
|
|
# when the dependencies are satisfied
|
|
error = _("Jet dependencies are not satisfied")
|
|
if current_command_log:
|
|
current_command_log.finish(
|
|
status=JET_DEPENDENCIES_NOT_SATISFIED,
|
|
error=error,
|
|
)
|
|
return {"status": JET_DEPENDENCIES_NOT_SATISFIED, "error": error}
|
|
|
|
self.sudo().write(
|
|
{
|
|
"state_id": transit_state,
|
|
"current_action_id": action.id,
|
|
"current_command_log_id": current_command_log.id
|
|
if current_command_log
|
|
else False,
|
|
}
|
|
)
|
|
if action.plan_id:
|
|
# Run the flight plan
|
|
plan_kwargs = {
|
|
"plan_log": {
|
|
"jet_action_id": action.id,
|
|
},
|
|
}
|
|
# Populate custom variable values from current command log
|
|
current_command_log = self.current_command_log_id
|
|
if current_command_log and current_command_log.variable_values:
|
|
plan_kwargs["variable_values"] = current_command_log.variable_values
|
|
|
|
# Run the flight plan
|
|
with self.env.cr.savepoint():
|
|
self.server_id.sudo().run_flight_plan(
|
|
flight_plan=action.plan_id,
|
|
jet=self,
|
|
**plan_kwargs,
|
|
)
|
|
# Flight plan will trigger the `_flight_plan_finished` function again
|
|
# if the flight plan is finished successfully.
|
|
# So we don't need continue the loop in this case.
|
|
return {"status": 0, "error": None}
|
|
|
|
# Set the state to the destination state if no plan is defined
|
|
final_vals = {
|
|
"state_id": target_state,
|
|
"current_action_id": False,
|
|
}
|
|
|
|
# Reset the target state if the jet has reached the target state
|
|
if target_state == self.target_state_id:
|
|
final_vals["target_state_id"] = None
|
|
|
|
self.sudo().write(final_vals)
|
|
|
|
# Continue the chain of actions if the final state is not reached yet
|
|
if self.target_state_id:
|
|
self._bring_to_state(self.target_state_id)
|
|
|
|
# Trigger the transition finished event
|
|
self._finalize_transition(failed=False)
|
|
return {"status": 0, "error": None}
|
|
|
|
def _bring_to_state(self, state=None):
|
|
"""
|
|
Bring the jet to a specific state.
|
|
|
|
The function flow is:
|
|
|
|
1. Compute the path of actions to bring the jet
|
|
to the target state.
|
|
2. Set the target state.
|
|
3. Trigger the first action in the path.
|
|
This will trigger a chain of actions until the jet is brought
|
|
into the target state.
|
|
|
|
IMPORTANT: this method uses sudo() to bypass access rules.
|
|
This means that this method must be used with caution and only in cases
|
|
where the access level is not important.
|
|
For external automation including Python commands always prefer using
|
|
the bring_to_state() method instead.
|
|
For example:
|
|
```python
|
|
jet = self.env["cx.tower.jet"].browse(jet_id)
|
|
jet.bring_to_state(state_reference)
|
|
```
|
|
|
|
Args:
|
|
state (cx.tower.jet.state()): The state to bring the jet to
|
|
|
|
Returns:
|
|
The jet is brought into the first state of the path.
|
|
In case of an error, the jet is brought into the error state
|
|
if the latter is defined.
|
|
|
|
Raises:
|
|
ValidationError: If the path is not found.
|
|
"""
|
|
self.ensure_one()
|
|
|
|
# Use sudo to bypass access rules
|
|
self = self.sudo()
|
|
|
|
# Exit if jet is already in the target state
|
|
if self.state_id == state:
|
|
return
|
|
|
|
# Compute the path of actions to bring the jet to the target state
|
|
path = self.jet_template_id._get_action_path(
|
|
state_from=self.state_id, state_to=state
|
|
)
|
|
if not path:
|
|
raise ValidationError(
|
|
_(
|
|
"No path found to bring the jet %(jet)s to the state '%(state)s'",
|
|
jet=self.name, # pylint: disable=no-member
|
|
state=state.name if state else _("Undefined"),
|
|
)
|
|
)
|
|
|
|
# Set the target state if not already set
|
|
if not self.target_state_id:
|
|
self.write(
|
|
{
|
|
"target_state_id": state,
|
|
}
|
|
)
|
|
|
|
# Trigger the first action in the path
|
|
self._trigger_action(path[0], from_transition=True)
|
|
|
|
def _flight_plan_finished(self, plan_status):
|
|
"""
|
|
Handle the completion of a flight plan.
|
|
|
|
Args:
|
|
plan_status (int): The status of the flight plan
|
|
(0: success, other: failure)
|
|
"""
|
|
self.ensure_one()
|
|
|
|
# Used in case this is the last action in the chain
|
|
transition_failed = False
|
|
|
|
# Reset the current action
|
|
vals = {"current_action_id": False}
|
|
|
|
# If the flight plan is finished successfully,
|
|
# we bring the jet to the destination state
|
|
# of the current action
|
|
if plan_status == 0:
|
|
# Set the state to the destination state
|
|
vals["state_id"] = (
|
|
self.current_action_id.state_to_id
|
|
and self.current_action_id.state_to_id.id
|
|
)
|
|
|
|
# Reset the target state if the jet has reached the target state
|
|
# This will stop the chain of actions
|
|
if self.target_state_id == self.current_action_id.state_to_id:
|
|
vals["target_state_id"] = None
|
|
|
|
# If the flight plan is finished with an error,
|
|
# we bring the jet to the error state if it is defined
|
|
# or back to the initial state if not
|
|
# Reset the target state because we cannot continue the chain of actions
|
|
else:
|
|
vals.update(
|
|
{
|
|
"state_id": (
|
|
self.current_action_id.state_error_id
|
|
and self.current_action_id.state_error_id.id
|
|
)
|
|
or (
|
|
self.current_action_id.state_from_id
|
|
and self.current_action_id.state_from_id.id
|
|
),
|
|
"target_state_id": None,
|
|
}
|
|
)
|
|
transition_failed = True
|
|
|
|
self.sudo().write(vals)
|
|
|
|
# Continue the chain of actions if the final state is not reached yet
|
|
if self.target_state_id:
|
|
self._bring_to_state(self.target_state_id)
|
|
else:
|
|
# Trigger the transition finished event
|
|
self._finalize_transition(failed=transition_failed)
|
|
|
|
def _finalize_transition(self, failed=False):
|
|
"""
|
|
Handle the completion of a state transition.
|
|
|
|
Args:
|
|
failed (bool): True if the transition failed, False otherwise
|
|
"""
|
|
self.ensure_one()
|
|
|
|
# 1. Finalize the jet request if it exists
|
|
if self.served_jet_request_id:
|
|
self.served_jet_request_id._finalize(failed=failed)
|
|
|
|
# 2. Finalize the command log if transition was
|
|
# triggered from a command
|
|
command_log = self.current_command_log_id
|
|
if command_log:
|
|
# Reset the current command log id
|
|
# Using sudo to bypass write access rules
|
|
self.sudo().write({"current_command_log_id": False})
|
|
|
|
# Prepare the command log finish values
|
|
if failed:
|
|
error = _(
|
|
"Action failed for jet %(jet)s.",
|
|
jet=self.name, # pylint: disable=no-member
|
|
)
|
|
response = None
|
|
status = JET_STATE_ERROR
|
|
else:
|
|
response = _(
|
|
"Jet %(jet)s was moved to the '%(state)s' state.",
|
|
jet=self.name, # pylint: disable=no-member
|
|
state=self.state_id.name if self.state_id else _("Undefined"),
|
|
)
|
|
status = 0
|
|
error = None
|
|
|
|
# Finish the command log
|
|
command_log.finish(
|
|
status=status,
|
|
response=response,
|
|
error=error,
|
|
)
|
|
|
|
# 3. Notify the jet that it is available
|
|
self._on_is_available()
|
|
|
|
def _serve_jet_request(self, jet_request):
|
|
"""
|
|
Serve a jet request.
|
|
|
|
Args:
|
|
jet_request (cx.tower.jet.request()): The jet request to serve
|
|
"""
|
|
self.ensure_one()
|
|
|
|
# Save the request
|
|
# Using sudo to bypass write access rules
|
|
self.sudo().write({"served_jet_request_id": jet_request.id})
|
|
|
|
# State is reached, finalize the request
|
|
if self.state_id == jet_request.state_requested_id:
|
|
jet_request._finalize(failed=False)
|
|
else:
|
|
# Trigger the jet to bring itself to the required state
|
|
jet_request.state = "processing"
|
|
self._bring_to_state(jet_request.state_requested_id)
|
|
|
|
def _finalize_jet_request(self, jet_request):
|
|
"""
|
|
This function is called when a jet request issued by this jet is finalized.
|
|
|
|
Args:
|
|
jet_request (cx.tower.jet.request()): The jet request that was finalized
|
|
"""
|
|
self.ensure_one()
|
|
|
|
# On success, update the dependency and
|
|
if jet_request.state == "success":
|
|
# Update the dependency if the request was for a dependency
|
|
dependency = jet_request.for_dependency_id
|
|
if dependency:
|
|
dependency.jet_depends_on_id = jet_request.jet_id
|
|
# Proceed with the state transition if all dependencies are satisfied
|
|
# and the transition is still in progress
|
|
if self._control_dependencies() and self.target_state_id:
|
|
self._bring_to_state(self.target_state_id)
|
|
else:
|
|
# Stop transition if the request failed
|
|
# Using sudo to bypass write access rules
|
|
self.sudo().write({"target_state_id": False})
|
|
# Mark served jet request as failed
|
|
if self.served_jet_request_id:
|
|
self.served_jet_request_id._finalize(failed=True)
|
|
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
# Waypoints
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
def create_waypoint(
|
|
self,
|
|
waypoint_template,
|
|
name=None,
|
|
fly_here=False,
|
|
ignore_busy=False,
|
|
created_from_command_log=None,
|
|
**metadata,
|
|
):
|
|
"""Create a new waypoint for the jet.
|
|
|
|
The jet must not be busy unless ignore_busy is True.
|
|
When created_from_command_log is provided, the waypoint stores it so that
|
|
the waypoint callback can finish the command log when the waypoint
|
|
reaches ready/current or error.
|
|
|
|
Args:
|
|
waypoint_template (cx.tower.jet.waypoint.template or str):
|
|
The waypoint template or reference to create the waypoint from.
|
|
name (str, optional): The name of the waypoint. Defaults to None.
|
|
fly_here (bool, optional): Whether to fly to the waypoint after creation.
|
|
Defaults to False.
|
|
ignore_busy (bool, optional): Whether to ignore the busy state and create
|
|
the waypoint anyway.
|
|
Useful when creating waypoints from jet actions.
|
|
Defaults to False.
|
|
created_from_command_log (cx.tower.command.log, optional): Command log
|
|
that created this waypoint; the waypoint callback will finish it.
|
|
Defaults to None.
|
|
**metadata: Additional metadata to pass to the waypoint.
|
|
|
|
Returns:
|
|
cx.tower.jet.waypoint
|
|
|
|
Raises:
|
|
ValidationError: If the waypoint template is not found
|
|
or does not belong to the jet template, or if the jet is busy.
|
|
"""
|
|
self.ensure_one()
|
|
|
|
# Check if the jet is busy
|
|
if self._is_busy() and not ignore_busy:
|
|
_logger.error(
|
|
"Cannot create waypoint for jet %s because it is busy", self.name
|
|
)
|
|
raise ValidationError(
|
|
_("Cannot create waypoint for jet %s because it is busy", self.name)
|
|
)
|
|
|
|
# Resolve the waypoint template
|
|
if isinstance(waypoint_template, str):
|
|
waypoint_reference = waypoint_template
|
|
waypoint_template = self.env[
|
|
"cx.tower.jet.waypoint.template"
|
|
].get_by_reference(waypoint_reference)
|
|
if not waypoint_template:
|
|
_logger.error("Waypoint template %s not found", waypoint_reference)
|
|
raise ValidationError(
|
|
_("Waypoint template %s not found", waypoint_reference)
|
|
)
|
|
|
|
# Check if the waypoint template belongs to the jet template
|
|
if waypoint_template.jet_template_id != self.jet_template_id:
|
|
_logger.error(
|
|
"Waypoint template %s does not belong to the jet template %s",
|
|
waypoint_template.name,
|
|
self.jet_template_id.name,
|
|
)
|
|
raise ValidationError(
|
|
_(
|
|
"Waypoint template %(waypoint_template)s does not belong "
|
|
"to the jet template %(jet_template)s",
|
|
waypoint_template=waypoint_template.name,
|
|
jet_template=self.jet_template_id.name,
|
|
)
|
|
)
|
|
|
|
# Prepare the waypoint values
|
|
waypoint_values = self._prepare_waypoint_values(
|
|
waypoint_template=waypoint_template,
|
|
name=name,
|
|
**metadata,
|
|
)
|
|
if created_from_command_log:
|
|
waypoint_values["created_from_command_log_id"] = created_from_command_log.id
|
|
|
|
# Create the waypoint
|
|
waypoint = self.env["cx.tower.jet.waypoint"].create(waypoint_values)
|
|
waypoint.prepare(is_destination=fly_here)
|
|
return waypoint
|
|
|
|
def _prepare_waypoint_values(self, waypoint_template, name=None, **metadata):
|
|
"""Prepare the waypoint values
|
|
|
|
Args:
|
|
waypoint_template (cx.tower.jet.waypoint.template): The waypoint template
|
|
name (Char, optional): The name of the waypoint.
|
|
"""
|
|
self.ensure_one()
|
|
|
|
# Prepare the waypoint values
|
|
vals = {
|
|
"waypoint_template_id": waypoint_template.id,
|
|
"name": name if name else _("Auto-generated waypoint"),
|
|
"jet_id": self.id,
|
|
}
|
|
if metadata:
|
|
vals["metadata"] = metadata
|
|
|
|
return vals
|
|
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
# Event handling
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
def _on_state_exit(self, state=None):
|
|
"""
|
|
Handle the exit of the jet from a state.
|
|
|
|
Args:
|
|
state (cx.tower.jet.state()): The state jet is exiting
|
|
"""
|
|
self.ensure_one()
|
|
# TODO: Implement the logic to handle the exit of the jet from a state
|
|
pass
|
|
|
|
def _on_state_enter(self, state=None):
|
|
"""
|
|
Handle the entry of the jet into a state.
|
|
|
|
Args:
|
|
state (cx.tower.jet.state()): The state jet is entering
|
|
"""
|
|
self.ensure_one()
|
|
|
|
# Refresh the frontend views
|
|
self.env.user.reload_views(model="cx.tower.jet", rec_ids=[self.id])
|
|
|
|
def _on_jet_request_completed(self, jet_request):
|
|
"""
|
|
Handle the completion of a jet request.
|
|
"""
|
|
self.ensure_one()
|
|
# TODO: Implement the logic to handle the completion of a jet request
|
|
pass
|
|
|
|
def _on_is_available(self):
|
|
"""
|
|
Handle the event when the jet is not busy anymore.
|
|
"""
|
|
|
|
# Process pending requests
|
|
jet_request_obj = self.env["cx.tower.jet.request"].sudo()
|
|
|
|
# 1. Requests where the jet is requested explicitly
|
|
explicit_requests = jet_request_obj.search(
|
|
[
|
|
("jet_id", "=", self.id), # pylint: disable=no-member
|
|
("state", "=", "new"),
|
|
]
|
|
)
|
|
if explicit_requests:
|
|
# Check which state is required by the request
|
|
# TODO: IMPORTANT: we must find a workaround to avoid infinite loops
|
|
# when different jets keep requesting the same target jet in different
|
|
# states and the target jet keeps jumping from one state to another.
|
|
|
|
# Finalize all requests that request the same state as the jet
|
|
same_state_requests = explicit_requests.filtered(
|
|
lambda r: r.state_requested_id == self.state_id
|
|
)
|
|
for request in same_state_requests:
|
|
request._finalize(failed=False)
|
|
|
|
# Pick the first request that requests a different state
|
|
remaining_requests = explicit_requests - same_state_requests
|
|
if remaining_requests:
|
|
self._serve_jet_request(remaining_requests[0])
|
|
return
|
|
|
|
# 2. Requests where the jet is requested implicitly via template
|
|
if self._accepts_new_links():
|
|
implicit_requests = jet_request_obj.search(
|
|
[
|
|
("server_id", "=", self.server_id.id), # pylint: disable=no-member
|
|
("jet_template_id", "=", self.jet_template_id.id), # pylint: disable=no-member
|
|
("jet_id", "=", False),
|
|
("state", "=", "new"),
|
|
]
|
|
)
|
|
same_state_requests = implicit_requests.filtered(
|
|
lambda r: r.state_requested_id == self.state_id
|
|
)
|
|
if same_state_requests:
|
|
# Set current jet as the target jet for the requests
|
|
same_state_requests.write({"jet_id": self.id}) # pylint: disable=no-member
|
|
for request in same_state_requests:
|
|
request._finalize(failed=False)
|
|
|
|
# Pick the first request that requests a different state
|
|
remaining_requests = implicit_requests - same_state_requests
|
|
if remaining_requests:
|
|
remaining_request = remaining_requests[0]
|
|
# Set current jet as the target jet for the request
|
|
remaining_request.write({"jet_id": self.id}) # pylint: disable=no-member
|
|
self._serve_jet_request(remaining_request)
|
|
return
|
|
|
|
# Send success notification when everything is done
|
|
# Use context timestamp to avoid timezone issues
|
|
context_timestamp = fields.Datetime.context_timestamp(
|
|
self, fields.Datetime.now()
|
|
)
|
|
|
|
# Check if notifications are enabled
|
|
ICP_sudo = self.env["ir.config_parameter"].sudo()
|
|
notification_type_success = ICP_sudo.get_param(
|
|
"cetmix_tower_server.notification_type_success"
|
|
)
|
|
if notification_type_success:
|
|
# Action for button
|
|
action = self.env["ir.actions.act_window"]._for_xml_id(
|
|
"cetmix_tower_server.cx_tower_jet_action"
|
|
)
|
|
|
|
context = self.env.context.copy()
|
|
params = dict(context.get("params") or {})
|
|
params["button_name"] = _("View Jet")
|
|
context["params"] = params
|
|
|
|
# Add record id and context to the action
|
|
action.update(
|
|
{
|
|
"context": context,
|
|
"res_id": self.id,
|
|
"views": [(False, "form")],
|
|
}
|
|
)
|
|
# Send success notification
|
|
self.env.user.notify_success(
|
|
message=_(
|
|
"%(timestamp)s<br/>" "Available in the '%(name)s' state",
|
|
name=self.state_id.name if self.state_id else _("Undefined"),
|
|
timestamp=context_timestamp,
|
|
),
|
|
title=self.name,
|
|
sticky=notification_type_success == "sticky",
|
|
action=action,
|
|
)
|
|
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
# Status and busyness
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
def _accepts_new_links(self):
|
|
"""
|
|
Check if the jet is available to accept new links from other jets.
|
|
|
|
Returns:
|
|
bool: True if the jet is available to accept new links from other jets,
|
|
False otherwise
|
|
"""
|
|
self.ensure_one()
|
|
# TODO: Implement the logic to check if the jet is available
|
|
# to accept new links from other jets
|
|
return True
|
|
|
|
def _is_busy(self):
|
|
"""
|
|
Check if the jet is busy with some other action.
|
|
Overwrite this function to implement custom logic.
|
|
|
|
Returns:
|
|
bool: True if the jet is busy with some other action,
|
|
False otherwise
|
|
"""
|
|
self.ensure_one()
|
|
|
|
# Jet is considered busy if it is currently transitioning to another state
|
|
busy = bool(self.target_state_id)
|
|
return busy
|
|
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
# Manage dependencies
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
|
|
def _control_dependencies(self):
|
|
"""
|
|
Check if dependencies are satisfied.
|
|
If some dependencies are missing, it creates a new jet request to ensure
|
|
that a jet that is required by that dependency is available.
|
|
|
|
Returns:
|
|
bool: True if all dependencies are satisfied, False otherwise
|
|
"""
|
|
self.ensure_one()
|
|
|
|
all_dependencies_satisfied = True
|
|
|
|
jet_request_obj = self.env["cx.tower.jet.request"]
|
|
|
|
# Check if jets are present in the required state
|
|
for jet_dependency in self.jet_requires_ids:
|
|
jet_template_dependency = jet_dependency.jet_template_dependency_id
|
|
if (
|
|
jet_dependency.jet_depends_on_id
|
|
and jet_dependency.jet_depends_on_id.state_id
|
|
== jet_template_dependency.state_required_id
|
|
):
|
|
# The dependency is satisfied, continue to the next dependency
|
|
continue
|
|
|
|
# Create a new jet request to ensure we have the required jet
|
|
# in the required state
|
|
jet_request_obj._create_request(
|
|
server=self.server_id,
|
|
jet_template=jet_template_dependency.template_required_id,
|
|
state=jet_template_dependency.state_required_id,
|
|
requested_by_jet=self,
|
|
for_dependency=jet_dependency,
|
|
)
|
|
# Stop here as it will be resumed when the jet request is finalized
|
|
all_dependencies_satisfied = False
|
|
break
|
|
|
|
return all_dependencies_satisfied
|
|
|
|
def _get_dependent_jets_by_template(self, jet_template):
|
|
"""
|
|
Check all dependencies of the jet and returns all jets
|
|
of the given template.
|
|
Both dependent and this jet depends on jets are returned.
|
|
|
|
Args:
|
|
jet_template (cx.tower.jet.template()): The jet template
|
|
|
|
Returns:
|
|
cx.tower.jet(): Recordset of jets
|
|
"""
|
|
self.ensure_one()
|
|
|
|
# Check L1 jets this jet depends on
|
|
l1_jets = self.jet_requires_ids.filtered(
|
|
lambda r: r.jet_depends_on_id.jet_template_id == jet_template
|
|
).jet_depends_on_id
|
|
# Check L1 jets that depend on this jet
|
|
l2_jets = self.jet_required_by_ids.filtered(
|
|
lambda r: r.jet_id.jet_template_id == jet_template
|
|
).jet_id
|
|
|
|
# TODO: check the entire dependency tree
|
|
return l1_jets | l2_jets
|
|
|
|
def get_dependent_jets_by_template_reference(self, jet_template_reference):
|
|
"""
|
|
A wrapper for _get_dependent_jets_by_template that allows
|
|
to use the reference of the jet template instead of the record.
|
|
Designed to be used in the Python commands.
|
|
|
|
Args:
|
|
jet_template_reference (str): The reference of the jet template
|
|
|
|
Returns:
|
|
cx.tower.jet(): Recordset of jets with the given template
|
|
that depend on the current jet.
|
|
"""
|
|
self.ensure_one()
|
|
|
|
jet_template = self.jet_template_id.get_by_reference(jet_template_reference)
|
|
if jet_template:
|
|
return self._get_dependent_jets_by_template(jet_template)
|
|
return False
|
|
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
# Access role mixin functions
|
|
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|
|
def _get_post_create_fields(self):
|
|
"""
|
|
Add fields that should be populated after jet template creation
|
|
"""
|
|
res = super()._get_post_create_fields()
|
|
return res + ["variable_value_ids", "server_log_ids"]
|