diff --git a/addons/cetmix_tower_server/models/cx_tower_variable.py b/addons/cetmix_tower_server/models/cx_tower_variable.py new file mode 100644 index 0000000..f83f1a8 --- /dev/null +++ b/addons/cetmix_tower_server/models/cx_tower_variable.py @@ -0,0 +1,900 @@ +# Copyright (C) 2022 Cetmix OÜ +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). +import logging +import uuid +from urllib.parse import urlparse + +from odoo import _, api, fields, models +from odoo.tools.safe_eval import safe_eval, wrap_module + +_logger = logging.getLogger(__name__) + +re = wrap_module( + __import__("re"), + [ + "match", + "fullmatch", + "search", + "sub", + "subn", + "split", + "findall", + "finditer", + "compile", + "template", + "escape", + "error", + ], +) + +# Maximum recursion depth for variable value rendering +# to prevent infinite loops +MAX_DEPTH = 10 + + +class TowerVariable(models.Model): + """Variables""" + + _name = "cx.tower.variable" + _description = "Cetmix Tower Variable" + _inherit = [ + "cx.tower.reference.mixin", + "cx.tower.access.mixin", + "cx.tower.tag.mixin", + ] + + _order = "name" + + DEFAULT_VALIDATION_MESSAGE = _("Invalid value!") + SYSTEM_VARIABLE_REFERENCE = "tower" + + value_ids = fields.One2many( + string="Values", + comodel_name="cx.tower.variable.value", + inverse_name="variable_id", + ) + value_ids_count = fields.Integer( + string="Value Count", compute="_compute_variable_counters" + ) + option_ids = fields.One2many( + comodel_name="cx.tower.variable.option", + inverse_name="variable_id", + string="Options", + auto_join=True, + ) + variable_type = fields.Selection( + selection=[("s", "String"), ("o", "Options")], + default="s", + required=True, + string="Type", + ) + applied_expression = fields.Text( + help="Python expression to apply to the variable value. \n" + "You can use general python sting functions and 're' module " + "for regex operations. " + "Use 'value' variable to refer to the variable value, use 'result'" + " to assign the final result that will be used as a variable value.\n" + "Eg 'result = value.lower().replace(' ', '_')'", + ) + validation_pattern = fields.Char( + help="Regex pattern to validate the variable values using the " + "'re.match' function. Eg. ^[a-z0-9]+$ \n" + "If empty, the variable values will not be validated.", + ) + validation_message = fields.Char( + translate=True, + help="Message to display when the variable value is invalid. \n" + "First line will be added automatically: " + "`Variable:, Value: `\n" + "Eg: `Variable: Customer Name, Value: Test\nInvalid value!`\n" + "If empty, the default message will be used.", + ) + note = fields.Text( + help="Additional notes about the variable. \n" + "This field will be displayed in the variable form.", + ) + + # --- Link to records where the variable is used + command_ids = fields.Many2many( + comodel_name="cx.tower.command", + relation="cx_tower_command_variable_rel", + column1="variable_id", + column2="command_id", + copy=False, + ) + command_ids_count = fields.Integer( + string="Command Count", compute="_compute_variable_counters" + ) + plan_line_ids = fields.Many2many( + comodel_name="cx.tower.plan.line", + relation="cx_tower_plan_line_variable_rel", + column1="variable_id", + column2="plan_line_id", + copy=False, + ) + plan_line_ids_count = fields.Integer( + string="Plan Line Count", compute="_compute_variable_counters" + ) + file_ids = fields.Many2many( + comodel_name="cx.tower.file", + relation="cx_tower_file_variable_rel", + column1="variable_id", + column2="file_id", + copy=False, + ) + file_ids_count = fields.Integer( + string="File Count", compute="_compute_variable_counters" + ) + file_template_ids = fields.Many2many( + comodel_name="cx.tower.file.template", + relation="cx_tower_file_template_variable_rel", + column1="variable_id", + column2="file_template_id", + copy=False, + ) + file_template_ids_count = fields.Integer( + string="File Template Count", compute="_compute_variable_counters" + ) + variable_value_ids = fields.Many2many( + comodel_name="cx.tower.variable.value", + relation="cx_tower_variable_value_variable_rel", + column1="variable_id", + column2="variable_value_id", + copy=False, + ) + variable_value_ids_count = fields.Integer( + string="Variable Value Count", compute="_compute_variable_counters" + ) + + _sql_constraints = [("name_uniq", "unique (name)", "Variable names must be unique")] + + def _compute_variable_counters(self): + """Count number of variable values for the variable""" + for rec in self: + rec.update( + { + "variable_value_ids_count": len(rec.variable_value_ids), + "command_ids_count": len(rec.command_ids), + "plan_line_ids_count": len(rec.plan_line_ids), + "file_ids_count": len(rec.file_ids), + "file_template_ids_count": len(rec.file_template_ids), + "value_ids_count": len(rec.value_ids), + } + ) + + def action_open_values(self): + """Open the variable values""" + self.ensure_one() + context = self.env.context.copy() + context.update( + { + "default_variable_id": self.id, + } + ) + + return { + "type": "ir.actions.act_window", + "name": _("Variable Values"), + "res_model": "cx.tower.variable.value", + "views": [[False, "tree"]], + "target": "current", + "context": context, + "domain": [("variable_id", "=", self.id)], + } + + def action_open_commands(self): + """Open the commands where the variable is used""" + + self.ensure_one() + action = self.env["ir.actions.act_window"]._for_xml_id( + "cetmix_tower_server.action_cx_tower_command" + ) + action.update( + { + "domain": [("variable_ids", "in", self.ids)], + } + ) + return action + + def action_open_plan_lines(self): + """Open the plan lines where the variable is used""" + self.ensure_one() + return { + "type": "ir.actions.act_window", + "name": _("Plan Lines"), + "res_model": "cx.tower.plan.line", + "views": [ + [False, "tree"], + [ + self.env.ref("cetmix_tower_server.cx_tower_plan_line_view_form").id, + "form", + ], + ], + "target": "current", + "domain": [("variable_ids", "in", self.ids)], + } + + def action_open_files(self): + """Open the files where the variable is used""" + self.ensure_one() + action = self.env["ir.actions.act_window"]._for_xml_id( + "cetmix_tower_server.cx_tower_file_action" + ) + action.update( + { + "domain": [("variable_ids", "in", self.ids)], + } + ) + return action + + def action_open_file_templates(self): + """Open the file templates where the variable is used""" + self.ensure_one() + action = self.env["ir.actions.act_window"]._for_xml_id( + "cetmix_tower_server.cx_tower_file_template_action" + ) + action.update( + { + "domain": [("variable_ids", "in", self.ids)], + } + ) + return action + + def action_open_variable_values(self): + """Open the variable values where the variable is used""" + self.ensure_one() + return { + "type": "ir.actions.act_window", + "name": _("Variable Values"), + "res_model": "cx.tower.variable.value", + "views": [[False, "tree"]], + "target": "current", + "domain": [("variable_ids", "in", self.ids)], + } + + @api.model + def _get_eval_context(self, value_char=None): + """ + Evaluation context to pass to safe_eval to evaluate + the Python expression used in the `applied_expression` field + + Args: + value_char (Char): variable value + + Returns: + dict: evaluation context + """ + return { + "re": re, + "value": value_char, + } + + # Reference rename propagation + + def write(self, vals): + """Override the write method to propagate variable reference updates. + + Records the old reference values, performs the write, and if the reference + field has changed, initiates propagation to update related records. + """ + old_refs = ( + {rec.id: rec.reference for rec in self} if "reference" in vals else {} + ) + res = super().write(vals) + if "reference" in vals: + for rec in self: + old_ref = old_refs.get(rec.id) + if old_ref and old_ref != rec.reference: + rec._propagate_reference_change(old_ref, rec.reference) + return res + + def _propagate_reference_change(self, old_ref, new_ref): + """Replace all occurrences of an old variable reference with a new one. + + Compiles a pattern matching the old Jinja-style reference, then searches across + configured models and fields to substitute any matches, preserving formatting. + """ + pattern = re.compile(r"(\{\{\s*)" + re.escape(old_ref) + r"(\s*\}\})") + + def _replace(text): + """Helper to replace old_ref with new_ref in the given text.""" + return pattern.sub(lambda m: f"{m.group(1)}{new_ref}{m.group(2)}", text) + + model_fields_map = self._get_propagation_field_mapping() + + for model_name, field_names in model_fields_map.items(): + Model = self.env[model_name] + + if model_name == "cx.tower.variable.value": + domain = [("variable_id", "=", self.id)] + else: + domain = [("variable_ids", "in", self.ids)] + + for record in Model.search(domain): + vals = {} + for field_name in field_names: + value = record[field_name] + if isinstance(value, str) and old_ref in value: + new_value = _replace(value) + if new_value != value: + vals[field_name] = new_value + + if vals: + record.with_context(skip_reference_propagation=True).write(vals) + _logger.debug( + "Variable reference updated in %s(%s): %s", + model_name, + record.id, + ", ".join(vals.keys()), + ) + + def _get_propagation_field_mapping(self): + """Return the mapping of models to fields for reference change propagation. + + The returned dict maps each model name to a list of field names + that may contain variable references requiring updates. + """ + return { + "cx.tower.command": ["code", "path"], + "cx.tower.file": ["code", "server_dir", "name"], + "cx.tower.file.template": ["code", "server_dir", "file_name"], + "cx.tower.variable.value": ["value_char"], + "cx.tower.plan.line": ["condition"], + } + + def _get_dependent_model_relation_fields(self): + """Check cx.tower.reference.mixin for the function documentation""" + res = super()._get_dependent_model_relation_fields() + return res + ["value_ids"] + + def _validate_value(self, value_char=None): + """ + Validate the variable value + + Args: + value_char (Char): variable value + + Returns: + (Boolean, Char): (is_valid, validation_message) + """ + self.ensure_one() + if ( + not self.validation_pattern + or not value_char + or re.match(self.validation_pattern, value_char) # pylint: disable=no-member + ): + return True, None + message = self.validation_message or self.DEFAULT_VALIDATION_MESSAGE + return ( + False, + _( + "Variable: %(var)s, Value: %(val)s\n%(msg)s", + msg=message, + var=self.name, # pylint: disable=no-member + val=value_char, + ), + ) + + # ------------------------------ + # ---- Managing variable values + # ------------------------------ + def _get_value( + self, + server=None, + server_template=None, + plan_line_action=None, + jet_template=None, + jet=None, + ): + """Get the value of the variable. + + 0. No arguments: return the global value. + 1. Server Template: return the Server Template specific value + or the global value. + 2. Server: return the Server specific value or the global value. + 3. Jet Template: return the Jet Template specific value + or the Server value + or the global value. + 4. Jet: return the Jet specific value + or the Jet Template value + or the Server value + or the global value. + 5. Plan Line Action: return the Plan Line Action specific value. + + Args: + server (cx.tower.server): Server + server_template (cx.tower.server.template): Server Template + plan_line_action (cx.tower.plan.line.action): Plan Line Action + jet_template (cx.tower.jet.template): Jet Template + jet (cx.tower.jet): Jet + + Returns: + Char: The value of the variable or None if no value is found. + """ + self.ensure_one() + values = self.value_ids + + # 0. Set server and jet template from jet + # if jet is provided + if jet: + server = jet.server_id + jet_template = jet.jet_template_id + + # 1. Prepare the values + + # Initialize all values to None + global_value_char = ( + server_value_char + ) = ( + server_template_value_char + ) = ( + plan_line_action_value_char + ) = jet_template_value_char = jet_value_char = None + + # Get origin id's in case we are dealing with onchange() + server_id = ( + server._origin.id + if server and hasattr(server, "_origin") + else server.id + if server + else None + ) + server_template_id = ( + server_template._origin.id + if server_template and hasattr(server_template, "_origin") + else server_template.id + if server_template + else None + ) + plan_line_action_id = ( + plan_line_action._origin.id + if plan_line_action and hasattr(plan_line_action, "_origin") + else plan_line_action.id + if plan_line_action + else None + ) + jet_template_id = ( + jet_template._origin.id + if jet_template and hasattr(jet_template, "_origin") + else jet_template.id + if jet_template + else None + ) + jet_id = ( + jet._origin.id + if jet and hasattr(jet, "_origin") + else jet.id + if jet + else None + ) + + # Check all values for the variable and assign them. + # Note: we are not using filtered() to avoid multiple iterations + # on the same recordset. + for variable_value in values: + # Fetch the server value + if ( + server + and server_value_char is None + and variable_value.server_id.id == server_id + ): + server_value_char = variable_value.value_char + continue + # Fetch the server template value + if ( + server_template + and server_template_value_char is None + and variable_value.server_template_id.id == server_template_id + ): + server_template_value_char = variable_value.value_char + continue + # Fetch the plan line action value + if ( + plan_line_action + and plan_line_action_value_char is None + and variable_value.plan_line_action_id.id == plan_line_action_id + ): + plan_line_action_value_char = variable_value.value_char + continue + # Fetch the jet template value + if ( + jet_template + and jet_template_value_char is None + and variable_value.jet_template_id.id == jet_template_id + ): + jet_template_value_char = variable_value.value_char + continue + # Fetch the jet value + if jet and jet_value_char is None and variable_value.jet_id.id == jet_id: + jet_value_char = variable_value.value_char + continue + # Fetch the global value + if global_value_char is None and variable_value.is_global: + global_value_char = variable_value.value_char + + # 2. Compose the response + # 2.1. Server Template + if server_template: + return server_template_value_char or global_value_char + + # 2.2. Jet + if jet: + return ( + jet_value_char + if jet_value_char is not None + else jet_template_value_char + if jet_template_value_char is not None + else server_value_char + if server_value_char is not None + else global_value_char + ) + + # 2.3. Jet Template + if jet_template: + return ( + jet_template_value_char + if jet_template_value_char is not None + else server_value_char + if server_value_char is not None + else global_value_char + ) + + # 2.4. Server + if server: + return ( + server_value_char + if server_value_char is not None + else global_value_char + ) + + # 2.5. Plan Line Action + if plan_line_action: + return plan_line_action_value_char + + # 2.6. Global + return global_value_char + + @api.model + def _get_variable_values_by_references( + self, + variable_references, + apply_modifiers=True, + **kwargs, + ): + """Get variable values for multiple references. + This method is designed to be used for template rendering. + It also includes system variable values in the result. + + Args: + variable_references (list of Char): variable names + apply_modifiers (bool): apply Python modifiers to the values + **kwargs: keyword arguments to pass to the _get_value method + - server (cx.tower.server): Server + - server_template (cx.tower.server.template): Server Template + - plan_line_action (cx.tower.plan.line.action): Plan Line Action + - jet_template (cx.tower.jet.template): Jet Template + - jet (cx.tower.jet): Jet + - _depth (int): Depth of the recursion + Returns: + dict {variable_reference: value} + """ + # 0. Get keyword arguments + server = kwargs.get("server") + server_template = kwargs.get("server_template") + plan_line_action = kwargs.get("plan_line_action") + jet_template = kwargs.get("jet_template") + jet = kwargs.get("jet") + _depth = kwargs.get("_depth", 0) + + # 0. Update server and jet template from jet + if jet: + server = jet.server_id + jet_template = jet.jet_template_id + + # 1. Get system variable values + variable_values = {} + system_vars = self._get_system_variable_values( + server=server, jet_template=jet_template, jet=jet + ) + if system_vars: + variable_values[self.SYSTEM_VARIABLE_REFERENCE] = system_vars + + # Return just system variable values if no references are provided + # or the only one is the system variable + # Need a fallback in case system variable is provides several times + if not variable_references or ( + all( + reference == self.SYSTEM_VARIABLE_REFERENCE + for reference in variable_references + ) + ): + return variable_values + + # 2. Get variable value records + for reference in variable_references: + # Do not overwrite system variable values + if reference == self.SYSTEM_VARIABLE_REFERENCE: + continue + variable = self.get_by_reference(reference) # pylint: disable=no-member + + # Assign the value to the variable values dictionary + variable_value = ( + variable._get_value( + server=server, + server_template=server_template, + plan_line_action=plan_line_action, + jet_template=jet_template, + jet=jet, + ) + if variable + else None + ) + variable_values[reference] = variable_value + + # 3. Render templates in values + self._render_variable_values( + variable_values, + server=server, + jet_template=jet_template, + jet=jet, + _depth=_depth, + ) + + # 4. Apply modifiers + if apply_modifiers: + self._apply_modifiers(variable_values) + + return variable_values + + def _render_variable_values(self, variable_values, **kwargs): + """Renders variable values using other variable values. + For example we have the following values: + "server_root": "/opt/server" + "server_assets": "{{ server_root }}/assets" + + This function will render the "server_assets" variable: + "server_assets": "/opt/server/assets" + + Args: + variable_values (dict): variable values to complete + **kwargs: keyword arguments to pass to the _get_value method + - server (cx.tower.server): Server + - server_template (cx.tower.server.template): Server Template + - plan_line_action (cx.tower.plan.line.action): Plan Line Action + - jet_template (cx.tower.jet.template): Jet Template + - jet (cx.tower.jet): Jet + - _depth (int): Depth of the recursion + """ + # 0. Get keyword arguments + server = kwargs.get("server") + jet_template = kwargs.get("jet_template") + jet = kwargs.get("jet") + _depth = kwargs.get("_depth", 0) + + # Control recursion depth + _depth += 1 + if _depth > MAX_DEPTH: + _logger.error("Max depth %d reached for variable %s", _depth, self.name) + return + + TemplateMixin = self.env["cx.tower.template.mixin"] + for key, var_value in variable_values.items(): + # Skip system variable values + if not var_value or key == self.SYSTEM_VARIABLE_REFERENCE: + continue + + # Render only if template is found + if "{{" in var_value and "}}" in var_value: + # Get variables used in value + value_vars = TemplateMixin.get_variables_from_code(var_value) + + # Render variables used in value + values_for_value = self._get_variable_values_by_references( + value_vars, + apply_modifiers=True, + server=server, + jet_template=jet_template, + jet=jet, + _depth=_depth, + ) + + # Render value using variables + variable_values[key] = TemplateMixin.render_code_custom( + var_value, **values_for_value + ) + + def _apply_modifiers(self, variable_values): + """Apply pre-defined Python expression to the dictionary + of variable values. + + Args: + variable_values (dict): variable values + {variable_reference: value} + """ + + for variable_reference, value in variable_values.items(): + if not value: + continue + + # ORM should cache resolved variables + variable = self.get_by_reference(variable_reference) + + # Should never happen.. anyway + if not variable: + continue + + # Skip if no expression to apply + if not variable.applied_expression: + continue + + # Evaluate expression + eval_context = variable._get_eval_context(value) + try: + safe_eval( + variable.applied_expression, + eval_context, + mode="exec", + nocopy=True, + ) + variable_values[variable_reference] = eval_context.get("result", value) + except Exception as e: + _logger.error( + "Error evaluating applied expression for " + "variable %s value %s: %s", + variable.name, + value, + str(e), + ) + + @api.model + def _get_system_variable_values(self, server=None, jet_template=None, jet=None): + """ + Get the values for the `tower` system variable. + This variable uses `tower..` format. + E.g. `tower.server.ipv6`, `tower.tools.uuid`, + `tower.jet_template.reference`, `tower.tools.now_underscore` etc. + + + Args: + server (cx.tower.server()): server record + jet_template (cx.tower.jet.template()): jet template record + jet (cx.tower.jet()): jet record + + Returns: + dict(): `tower` values. + { + 'tools': {..helper tools vals...} + 'server': {..server vals..}, + 'jet_template': {..jet template vals..}, + 'jet': {..jet vals..}, + } + """ + return { + "tools": self._parse_system_variable_tools(), + "server": self._parse_system_variable_server(server), + "jet_template": self._parse_system_variable_jet_template(jet_template), + "jet": self._parse_system_variable_jet(jet), + } + + def _parse_system_variable_server(self, server=None): + """Parser system variable of `server` type. + + Args: + server (cx.tower.server()): server record + + Returns: + dict(): `server` values of the `tower` variable. + """ + # Get current server + values = {} + if server: + # Using sudo() to get all fields + server = server.sudo() + values = { + "name": server.name, + "reference": server.reference, + "username": server.ssh_username, + "partner_name": server.partner_id.name if server.partner_id else False, + "ipv4": server.ip_v4_address, + "ipv6": server.ip_v6_address, + "status": server.status, + "os": server.os_id.name if server.os_id else False, + "url": server.url, + } + if server.url: + url_parts = urlparse(server.url) + values.update( + { + "hostname": url_parts.hostname, + "netloc": url_parts.netloc, + "port": url_parts.port, + } + ) + return values + + def _parse_system_variable_jet_template(self, jet_template=None): + """Parser system variable of `server` type. + + Args: + jet_template (cx.tower.jet.template()): jet template record + + Returns: + dict(): `jet_template` values of the `tower` variable. + """ + # Get current server + values = {} + if jet_template: + # Using sudo() to get all fields + jet_template = jet_template.sudo() + values = { + "name": jet_template.name, + "reference": jet_template.reference, + } + return values + + def _parse_system_variable_jet(self, jet=None): + """Parser system variable of `jet` type. + + Args: + jet (cx.tower.jet()): jet record + """ + values = {} + if jet: + # Using sudo() to get all fields + jet = jet.sudo() + values = { + "name": jet.name, + "reference": jet.reference, + "url": jet.url, + "state": jet.state, + "cloned_from": jet.jet_cloned_from_id.reference + if jet.jet_cloned_from_id + else False, + } + # Add URL parts if URL is set + if jet.url: + url_parts = urlparse(jet.url) + else: + url_parts = False + values.update( + { + "hostname": url_parts.hostname + if url_parts and url_parts.hostname + else False, + "netloc": url_parts.netloc + if url_parts and url_parts.netloc + else False, + "port": url_parts.port if url_parts and url_parts.port else False, + } + ) + # Add waypoint values if waypoint is set + waypoint_data = { + "reference": jet.waypoint_id.reference if jet.waypoint_id else False, + "type": jet.waypoint_id.waypoint_template_id.reference + if jet.waypoint_id + else False, + } + # Add each metadata key-value pair to the waypoint data + metadata = jet.waypoint_id.metadata if jet.waypoint_id else False + if metadata: + for key, value in metadata.items(): + waypoint_data[key] = value + values.update({"waypoint": waypoint_data}) + return values + + def _parse_system_variable_tools(self): + """Parser system variable of `tools` type. + + Returns: + dict(): `tools` values of the `tower` variable. + """ + today = fields.Date.to_string(fields.Date.today()) + now = fields.Datetime.to_string(fields.Datetime.now()) + values = { + "uuid": uuid.uuid4(), + "today": today, + "now": now, + "today_underscore": re.sub(r"[-: .\/]", "_", today), + "now_underscore": re.sub(r"[-: .\/]", "_", now), + } + return values