diff --git a/addons/cetmix_tower_server/models/cx_tower_plan.py b/addons/cetmix_tower_server/models/cx_tower_plan.py new file mode 100644 index 0000000..45ae97e --- /dev/null +++ b/addons/cetmix_tower_server/models/cx_tower_plan.py @@ -0,0 +1,424 @@ +# Copyright (C) 2022 Cetmix OÜ +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). +from operator import indexOf + +from odoo import _, api, fields, models +from odoo.exceptions import ValidationError +from odoo.tools.safe_eval import expr_eval + +from .constants import ( + ANOTHER_PLAN_RUNNING, + PLAN_LINE_CONDITION_CHECK_FAILED, + PLAN_LINE_NOT_ASSIGNED, + PLAN_NOT_ASSIGNED, + PLAN_NOT_COMPATIBLE_WITH_SERVER, +) + + +class CxTowerPlan(models.Model): + """Cetmix Tower flight plan""" + + _name = "cx.tower.plan" + _description = "Cetmix Tower Flight Plan" + _inherit = [ + "cx.tower.reference.mixin", + "cx.tower.access.mixin", + "cx.tower.access.role.mixin", + "cx.tower.tag.mixin", + ] + _order = "name asc" + + active = fields.Boolean(default=True) + allow_parallel_run = fields.Boolean( + help="If enabled, multiple instances of the same flight plan " + "can be run on the same server at the same time.\n" + "Otherwise, ANOTHER_PLAN_RUNNING status will be returned if another" + " instance of the same flight plan is already running" + ) + + color = fields.Integer(help="For better visualization in views") + server_ids = fields.Many2many(string="Servers", comodel_name="cx.tower.server") + tag_ids = fields.Many2many( + relation="cx_tower_plan_tag_rel", + column1="plan_id", + column2="tag_id", + ) + line_ids = fields.One2many( + string="Lines", + comodel_name="cx.tower.plan.line", + inverse_name="plan_id", + auto_join=True, + copy=True, + ) + command_ids = fields.Many2many( + string="Commands", + comodel_name="cx.tower.command", + relation="cx_tower_command_flight_plan_used_id_rel", + column1="plan_id", + column2="command_id", + help="Commands used in this flight plan", + compute="_compute_command_ids", + store=True, + ) + note = fields.Text() + on_error_action = fields.Selection( + string="On Error", + selection=[ + ("e", "Exit with command exit code"), + ("ec", "Exit with custom exit code"), + ("n", "Run next command"), + ], + required=True, + default="e", + help="This action will be triggered on error " + "if no command action can be applied", + ) + custom_exit_code = fields.Integer( + help="Will be used instead of the command exit code" + ) + + access_level_warn_msg = fields.Text( + compute="_compute_command_access_level", + compute_sudo=True, + ) + + # ---- Access. Add relation for mixin fields + user_ids = fields.Many2many( + relation="cx_tower_plan_user_rel", + ) + manager_ids = fields.Many2many( + relation="cx_tower_plan_manager_rel", + ) + + @api.depends("line_ids.command_id.access_level", "access_level") + def _compute_command_access_level(self): + """Check if the access level of a command in the plan + is higher than the plan's access level""" + for record in self: + commands = record.mapped("line_ids").mapped("command_id") + # Retrieve all commands associated with the flight plan + commands_with_higher_access = commands.filtered( + lambda c, access_level=record.access_level: c.access_level + > access_level + ) + if commands_with_higher_access: + command_names = ", ".join(commands_with_higher_access.mapped("name")) + record.access_level_warn_msg = _( + "The access level of command(s) '%(command_names)s' included in the" + " current Flight plan is higher than the access level of the" + " Flight plan itself. Please ensure that you want to allow" + " those commands to be run anyway.", + command_names=command_names, + ) + else: + record.access_level_warn_msg = False + + @api.depends("line_ids", "line_ids.command_id") + def _compute_command_ids(self): + """Compute command ids""" + for plan in self: + plan.command_ids = [(6, 0, plan.line_ids.mapped("command_id").ids)] + + def action_open_plan_logs(self): + """ + Open current flight plan log records + """ + action = self.env["ir.actions.actions"]._for_xml_id( + "cetmix_tower_server.action_cx_tower_plan_log" + ) + action["domain"] = [("plan_id", "=", self.id)] + return action + + def _get_dependent_model_relation_fields(self): + """Check cx.tower.reference.mixin for the function documentation""" + res = super()._get_dependent_model_relation_fields() + return res + ["line_ids"] + + def _is_plan_incompatible_with_server(self, server): + """ + Check if the flight plan is compatible with the server. + Note: this function uses the inverse logic to simplify the checks. + + Args: + server (cx.tower.server()): Server object + + Returns: + Char or False: Incompatible reason or False if compatible + """ + + # Check if the flight plan is compatible with the server + if not self.server_ids: + return False + if server.id not in self.server_ids.ids: + return _("Flight plan is not compatible with the server") + + # Check if the flight plan commands are compatible with the server + for command in self.command_ids: + # Check the entire command first + if not command._check_server_compatibility(server): + return _( + "Command %(command_name)s is not compatible with the server", + command_name=command.name, + ) # pylint: disable=no-member + + # Check if the nested flight plan is compatible with the server + if command.action == "plan": + plan_check_result = ( + command.flight_plan_id._is_plan_incompatible_with_server(server) + ) + if plan_check_result: + return plan_check_result + + return False + + def _get_post_create_fields(self): + res = super()._get_post_create_fields() + return res + ["line_ids"] + + def _run_single(self, server, jet_template=None, jet=None, **kwargs): + """Run single Flight Plan on a single server + + Args: + server (cx.tower.server()): Server object + jet_template (cx.tower.jet.template()): jet template record + jet (cx.tower.jet()): jet record + kwargs (dict): Optional arguments + Following are supported but not limited to: + - "plan_log": {values passed to flightplan logger} + - "log": {values passed to logger} + - "key": {values passed to key parser} + - "variable_values", dict(): custom variable values + in the format of `{variable_reference: variable_value}` + eg `{'odoo_version': '16.0'}` + Will be applied only if user has write access to the server. + + Returns: + log_record (cx.tower.plan.log()): plan log record + """ + + self.ensure_one() + # Ensure we have a single server record + server.ensure_one() + + # Check if Jet belongs to the server + if jet and jet.server_id != server: + raise ValidationError( + _( + "Jet %(jet)s does not belong to server %(server)s", + jet=jet.name, + server=server.name, + ) + ) + + # Check plan access before running + # This is needed to avoid possible access violations + self.check_access_rights("read") + self.check_access_rule("read") + + # Save jet template and jet in kwargs + plan_log_vals = kwargs.get("plan_log", {}) + if jet_template: + plan_log_vals["jet_template_id"] = jet_template.id + if jet: + plan_log_vals["jet_id"] = jet.id + kwargs["plan_log"] = plan_log_vals + + # Access log as root to bypass access restrictions + plan_log_obj = self.env["cx.tower.plan.log"].sudo() + + # Check if flight plan and all its commands can be run on this server + # This check is skipped if 'from_command' context key is set to True + if not self.env.context.get("from_command"): + plan_is_incompatible = self._is_plan_incompatible_with_server(server) + if plan_is_incompatible: + # Create a log record with the custom message and exit + plan_log_kwargs = kwargs.get("plan_log", {}) + plan_log_kwargs["custom_message"] = plan_is_incompatible + kwargs["plan_log"] = plan_log_kwargs + plan_log = plan_log_obj.record( + server=server, + plan=self, + status=PLAN_NOT_COMPATIBLE_WITH_SERVER, + **kwargs, + ) + return plan_log + + # Check if the same plan is being run on this server right now + if not self.allow_parallel_run or self.env.context.get( + "prevent_plan_recursion" + ): + domain = [ + ("server_id", "=", server.id), + ("plan_id", "=", self.id), # type: ignore + ("is_running", "=", True), + ] + if jet_template: + domain.append(("jet_template_id", "=", jet_template.id)) + if jet: + domain.append(("jet_id", "=", jet.id)) + running_count = plan_log_obj.search_count(domain=domain) + if running_count > 0: + plan_log = plan_log_obj.record( + server=server, plan=self, status=ANOTHER_PLAN_RUNNING, **kwargs + ) + return plan_log + + # Start Flight Plan and return the log record + return plan_log_obj.start( + server=server, + plan=self, + **kwargs, + ) + + def _get_next_action_values(self, command_log): + """Get next action values based of previous command result: + + - Action to proceed + - Exit code + - Next line of the plan if next line should be run + + Args: + command_log (cx.tower.command.log()): Command log record + + Returns: + action, exit_code, next_line (Selection, Integer, cx.tower.plan.line()) + + """ + # Iterate all actions and return the first matching one. + # If no action is found return the default plan values + # If the line is the last one return last command exit code + + if not command_log.plan_log_id: # Exit with custom code "Plan not found" + return "ec", PLAN_NOT_ASSIGNED, None + + current_line = command_log.plan_log_id.plan_line_executed_id + if not current_line: + return "ec", PLAN_LINE_NOT_ASSIGNED, None + + # Default values + exit_code = command_log.command_status + server = command_log.server_id + jet_template = command_log.jet_template_id + jet = command_log.jet_id + + # Check line condition + variable_values = ( + command_log.variable_values or command_log.plan_log_id.variable_values or {} + ) + if not current_line._is_executable_line( + server=server, + jet_template=jet_template, + jet=jet, + variable_values=variable_values, + ): + # Immediately return to the next line if condition fails + return self._get_next_action_state( + "n", PLAN_LINE_CONDITION_CHECK_FAILED, current_line + ) + + # Check plan action lines + for action_line in current_line.action_ids: + conditional_expression = ( + f"{exit_code} {action_line.condition} {action_line.value_char}" + ) + # Evaluate expression using safe_eval + if expr_eval(conditional_expression): + action = action_line.action + # Use custom exit code if action requires it + if action == "ec" and action_line.custom_exit_code is not None: + exit_code = action_line.custom_exit_code + + # Apply action-defined values into the variable values context + for variable_value in action_line.variable_value_ids: + ref = variable_value.variable_id.reference + variable_values[ref] = variable_value.value_char + + # Persist the updated custom values only in logs + # so they remain available within the current flight plan context + updated_values = dict(variable_values) + command_log.variable_values = updated_values + if command_log.plan_log_id: + command_log.plan_log_id.variable_values = updated_values + + return self._get_next_action_state(action, exit_code, current_line) + + # If no action matched, fallback to default ones + return self._get_next_action_state(None, exit_code, current_line) + + def _get_next_action_state(self, action, exit_code, current_line): + """ + Determine the next action, exit code, and next line based on the current state. + + Args: + action (Selection): Action to proceed + exit_code (Integer): Exit code + current_line (cx.tower.plan.line()): Current line + + Returns: + action, exit_code, next_line (Selection, Integer, cx.tower.plan.line()) + """ + lines = current_line.plan_id.line_ids + is_last_line = current_line == lines[-1] + + # If no conditions were met fallback to default ones + if not action: + action = "n" if exit_code == 0 else current_line.plan_id.on_error_action + + # Exit with custom code + if action == "ec": + exit_code = current_line.plan_id.custom_exit_code + + # Determine the next line if current is not the last one + next_line = None + if action == "n" and not is_last_line: + next_line = lines[indexOf(lines, current_line) + 1] + + # Exit with command code if not exiting with custom code + if is_last_line and action != "ec": + action = "e" + + return action, exit_code, next_line + + def _run_next_action(self, command_log): + """Run next action based on the command result + + Args: + command_log (cx.tower.command.log()): Command log record + """ + self.ensure_one() + action, exit_code, plan_line = self._get_next_action_values(command_log) + plan_log = command_log.plan_log_id + + # Update log message + if exit_code == PLAN_LINE_CONDITION_CHECK_FAILED: + # save log exit code as success + exit_code = 0 + + # Run next line + if action == "n" and plan_line: + server = command_log.server_id + variable_values = command_log.variable_values or plan_log.variable_values + if plan_line._is_executable_line( + server=server, + jet_template=plan_log.jet_template_id, + jet=plan_log.jet_id, + variable_values=variable_values, + ): + plan_line._run( + server, + plan_log, + variable_values=variable_values, + ) + else: + plan_line._skip( + server, + plan_log, + log={"variable_values": dict(variable_values or {})}, + ) + + # Exit + if action in ["e", "ec"]: + plan_log.finish(exit_code) + + # NB: we are not putting any fallback here in case + # someone needs to inherit and extend this function