From b70114419a1c7ed40ce4128d95a9088aa7b5d69f Mon Sep 17 00:00:00 2001 From: git_admin Date: Mon, 27 Apr 2026 10:44:16 +0000 Subject: [PATCH] Tower: upload cetmix_tower_yaml 16.0.2.0.3 (via marketplace) --- .../models/cx_tower_yaml_mixin.py | 577 ++++++++++++++++++ 1 file changed, 577 insertions(+) create mode 100644 addons/cetmix_tower_yaml/models/cx_tower_yaml_mixin.py diff --git a/addons/cetmix_tower_yaml/models/cx_tower_yaml_mixin.py b/addons/cetmix_tower_yaml/models/cx_tower_yaml_mixin.py new file mode 100644 index 0000000..c089e83 --- /dev/null +++ b/addons/cetmix_tower_yaml/models/cx_tower_yaml_mixin.py @@ -0,0 +1,577 @@ +# Copyright (C) 2024 Cetmix OÜ +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). + +import logging + +import yaml + +from odoo import _, api, fields, models +from odoo.exceptions import AccessError, ValidationError + +_logger = logging.getLogger(__name__) + + +class CustomDumper(yaml.Dumper): + """Custom dumper to ensures code + is properly dumped in YAML + """ + + def represent_scalar(self, tag, value, style=None): + if isinstance(value, str) and "\n" in value: + style = "|" + return super().represent_scalar(tag, value, style) + + +class YamlExportCollector: + """ + Collector for YAML export. + Tracks unique records by their (model_name, reference) tuple to avoid duplicates. + """ + + def __init__(self): + """ + Initialize the collector. + """ + self.added_references = set() + + def add(self, key): + """ + Add a record to the collector if its reference is unique. + :param key: tuple, key of the record + """ + if key and key not in self.added_references: + self.added_references.add(key) + + def is_added(self, key): + """ + Check by (model, reference) tuple. + :param key: tuple, key of the record + :return: bool + """ + return key in self.added_references + + +class CxTowerYamlMixin(models.AbstractModel): + """Used to implement YAML rendering functions. + Inherit in your model in case you want to YAML instance of the records. + """ + + _name = "cx.tower.yaml.mixin" + _description = "Cetmix Tower YAML rendering mixin" + + # File format version in order to track compatibility + CETMIX_TOWER_YAML_VERSION = 1 + + # TO_YAML_* used to convert from Odoo field values to YAML + TO_YAML_ACCESS_LEVEL = {"1": "user", "2": "manager", "3": "root"} + + # TO_TOWER_* used to convert from YAML field values to Tower ones + TO_TOWER_ACCESS_LEVEL = {"user": "1", "manager": "2", "root": "3"} + + yaml_code = fields.Text( + compute="_compute_yaml_code", + inverse="_inverse_yaml_code", + groups="cetmix_tower_yaml.group_export,cetmix_tower_yaml.group_import", + ) + + def _compute_yaml_code(self): + """Compute YAML code based on model record data""" + + # This is used for the file name. + # Eg cx.tower.command record will have 'command_' prefix. + for record in self: + # We are reading field list for each record + # because list of fields can differ from record to record + record.yaml_code = self._convert_dict_to_yaml( + record._prepare_record_for_yaml() + ) + + def _inverse_yaml_code(self): + """Compose record based on provided YAML""" + for record in self: + if record.yaml_code: + record_yaml_dict = yaml.safe_load(record.yaml_code) + record_vals = record._post_process_yaml_dict_values(record_yaml_dict) + record.update(record_vals) + + @api.constrains("yaml_code") + def _check_yaml_code_write_access(self): + """ + Check if user has access to create records from YAML. + This is checked only when user already has access to export YAML. + Otherwise, the field is not accessible due to security group. + """ + if self.env.user.has_group("cetmix_tower_yaml.group_export") and ( + not self.env.user.has_group("cetmix_tower_yaml.group_import") + and not self.env.user._is_superuser() + ): + raise AccessError(_("You are not allowed to create records from YAML")) + + @api.model_create_multi + def create(self, vals_list): + # Handle validation error when field values are not valid + try: + return super().create(vals_list) + except ValueError as e: + raise ValidationError(str(e)) from e + + def write(self, vals): + # Handle validation error when field values are not valid + try: + return super().write(vals) + except ValueError as e: + raise ValidationError(str(e)) from e + + def action_open_yaml_export_wizard(self): + """Open YAML export wizard""" + + return { + "type": "ir.actions.act_window", + "res_model": "cx.tower.yaml.export.wiz", + "view_mode": "form", + "target": "new", + } + + def _convert_dict_to_yaml(self, values): + """Converts Python dictionary to YAML string. + + This is a helper function that is designed to be used + by any models that need to convert a dictionary to YAML. + + Args: + values (Dict): Dictionary containing data + to be converted to YAML format + Returns: + Text: YAML string + Raises: + ValidationError: If values is not a dictionary + or YAML conversion fails + """ + if not isinstance(values, dict): + raise ValidationError(_("Values must be a dictionary")) + try: + yaml_code = yaml.dump( + values, + Dumper=CustomDumper, + default_flow_style=False, + sort_keys=False, + ) + return yaml_code + except (yaml.YAMLError, UnicodeEncodeError) as e: + raise ValidationError( + _( + "Failed to convert dictionary" " to YAML: %(error)s", + error=str(e), + ) + ) from e + + def _prepare_record_for_yaml(self): + """Reads and processes current record before converting it to YAML + + Returns: + dict: values ready for YAML conversion + """ + self.ensure_one() + yaml_keys = self._get_fields_for_yaml() + record_dict = self.read(fields=yaml_keys)[0] + return self._post_process_record_values(record_dict) + + def _get_fields_for_yaml(self): + """Get ist of field to be present in YAML + + Set 'no_yaml_service_fields' context key to skip + service fields creation (cetmix_tower_yaml_version, cetmix_tower_model) + + Returns: + list(): list of fields to be used as YAML keys + """ + return ["reference"] + + def _get_force_x2m_resolve_models(self): + """List of models that will always try to be resolved + when referenced in x2m related fields. + + This is useful for models that should always use existing records + instead of creating new ones when referenced in x2m related fields. + Such as variables or tags. + + Returns: + List: list of models that will always try to be resolved + """ + return [ + "cx.tower.variable", + "cx.tower.variable.option", + "cx.tower.tag", + "cx.tower.os", + "cx.tower.key", + ] + + def _post_process_record_values(self, values): + """Post process record values + before converting them to YAML + + Args: + values (dict): values returned by 'read' method + + Context: + explode_related_record: if set will return entire record dictionary + not just a reference + remove_empty_values: if set will remove empty values from the record + + Returns: + dict(): processed values + """ + collector = self._context.get("yaml_collector") + ref = values.get("reference") + collector_key = (self._name, ref) if ref else None + + if collector and collector_key and collector.is_added(collector_key): + return {"reference": ref} + + # We don't need id because we are not using it + values.pop("id", None) + + # Add YAML format version and model + if not self._context.get("no_yaml_service_fields"): + model_name = self._name.replace("cx.tower.", "").replace(".", "_") + model_values = { + "cetmix_tower_model": model_name, + } + else: + model_values = {} + + # Parse access level + access_level = values.pop("access_level", None) + if access_level: + model_values.update( + {"access_level": self.TO_YAML_ACCESS_LEVEL[access_level]} + ) + + values = {**model_values, **values} + # Copy values to avoid modifying the original values + new_values = values.copy() + + # Check if we need to return a record dict or just a reference + # Use context value first, revert to the record setting if not defined + explode_related_record = self._context.get("explode_related_record") + + # Check if we need to remove empty values + # Currently only x2m fields are supported + remove_empty_values = self._context.get("remove_empty_values") + + # Post process m2o and x2m fields + for key, value in values.items(): + # IMPORTANT: Odoo naming patterns must be followed for related fields. + # This is why we are checking for the field name ending here. + # Further checks for the field type are done + # in _process_relation_field_value() + if key.endswith("_id") or key.endswith("_ids"): + if not value and remove_empty_values: + del new_values[key] + else: + processed_value = self.with_context( + explode_related_record=explode_related_record + )._process_relation_field_value(key, value, record_mode=True) + new_values.update({key: processed_value}) + + if collector and collector_key: + collector.add(collector_key) + + return new_values + + def _post_process_yaml_dict_values(self, values): + """Post process dictionary values generated from YAML code + + Args: + values (dict): Dictionary generated from YAML + + Returns: + dict(): Post-processed values + """ + + # Remove model data because it is not a field + if "cetmix_tower_model" in values: + values.pop("cetmix_tower_model") + + # Parse access level + if "access_level" in values: + values_access_level = values["access_level"] + access_level = self.TO_TOWER_ACCESS_LEVEL.get(values_access_level) + if access_level: + values.update({"access_level": access_level}) + else: + raise ValidationError( + _( + "Wrong value for 'access_level' key: %(acv)s", + acv=values_access_level, + ) + ) + + # Leave supported keys only + supported_keys = self._get_fields_for_yaml() + filtered_values = {k: v for k, v in values.items() if k in supported_keys} + + # Post process m2o fields + for key, value in filtered_values.items(): + # IMPORTANT: Odoo naming patterns must be followed for related fields. + # This is why we are checking for the field name ending here. + # Further checks for the field type are done + # in _process_relation_field_value() + if key.endswith("_id") or key.endswith("_ids"): + processed_value = self.with_context( + explode_related_record=True + )._process_relation_field_value(key, value, record_mode=False) + filtered_values.update({key: processed_value}) + + return filtered_values + + def _process_relation_field_value(self, field, value, record_mode=False): + """Post process One2many, Many2many or Many2one value + + Args: + field (Char): Field the value belongs to + value (Char): Value to process + record_mode (Bool): If True process value as a record value + else process value as a YAML value + Context: + explode_related_record: if set will return entire record dictionary + not just a reference + Returns: + dict() or Char: record dictionary if fetch_record else reference + """ + # Step 1: Return False if the value is not set or the field is not found + if not value: + return False + + field_obj = self._fields.get(field) + if not field_obj: + return False + + # Step 2: Return False if the field type doesn't match + # or comodel is not defined + field_type = field_obj.type + if ( + field_type not in ["one2many", "many2many", "many2one"] + or not field_obj.comodel_name + ): + return False + + comodel = self.env[field_obj.comodel_name] + explode_related_record = self._context.get("explode_related_record") + + # Step 3: process value based on the field type + if field_type == "many2one": + return self._process_m2o_value( + comodel, value, explode_related_record, record_mode + ) + if field_type in ["one2many", "many2many"]: + return self._process_x2m_values( + comodel, field_type, value, explode_related_record, record_mode + ) + + # Step 4: fall back if field type is not supported + return False + + def _process_m2o_value( + self, comodel, value, explode_related_record, record_mode=False + ): + """Post process many2one value + Args: + comodel (BaseClass): Model the value belongs to + value (Char): Value to process + explode_related_record (Bool): If True return entire record dict + instead of a reference + record_mode (Bool): If True process value as a record value + else process value as a YAML value + + Returns: + dict() or Char: record dictionary if fetch_record else reference + """ + + # -- (Record -> YAML) + if record_mode: + # Retrieve the record based on the ID provided in the value + record = comodel.browse(value[0]) + + # If the context specifies to explode the related record, + # return its dictionary representation + if explode_related_record: + return ( + record.with_context( + no_yaml_service_fields=True + )._prepare_record_for_yaml() + if record + else False + ) + + # Otherwise, return just the reference (or False if record does not exist) + return record.reference if record else False + + # -- (YAML -> Record) + # Step 1: Process value in normal mode + record = False + + # If the value is a string, it is treated as a reference + if isinstance(value, str): + reference = value + + # If the value is a dictionary, extract the reference from it + elif isinstance(value, dict): + reference = value.get("reference") + + record = self._update_or_create_related_record( + comodel, reference, value, create_immediately=True + ) + else: + return False + + # Step 2: Final fallback: attempt to retrieve the record by reference if set, + # return its ID or False + if not record and reference: + record = comodel.get_by_reference(reference) + return record.id if record else False + + def _process_x2m_values( + self, comodel, field_type, values, explode_related_record, record_mode=False + ): + """Post process many2many value + Args: + comodel (BaseClass): Model the value belongs to + field_type (Char): Field type + values (list()): Values to process + explode_related_record (Bool): If True return entire record dict + instead of a reference + record_mode (Bool): If True process value as a record value + else process value as a YAML value + + Returns: + dict() or Char: record dictionary if fetch_record else reference + """ + + # -- (Record -> YAML) + if record_mode: + record_list = [] + for value in values: + # Retrieve the record based on the ID provided in the value + record = comodel.browse(value) + + # If the context specifies to explode the related record, + # return its dictionary representation + if explode_related_record: + record_list.append( + record.with_context( + no_yaml_service_fields=True + )._prepare_record_for_yaml() + if record + else False + ) + + # Otherwise, return just the reference + # (or False if record does not exist) + else: + record_list.append(record.reference if record else False) + + return record_list + + # -- (YAML -> Record) + # Step 1: Process value in normal mode + record_ids = [] + + for value in values: + record = False + # If the value is a string, it is treated as a reference + if isinstance(value, str): + reference = value + + # If the value is a dictionary, extract the reference from it + elif isinstance(value, dict): + reference = value.get("reference") + record = self._update_or_create_related_record( + comodel, + reference, + value, + create_immediately=field_type == "many2many", + ) + + # Step 2: Final fallback: attempt to retrieve the record by reference + # Return record ID or False if reference is not defined + if not record and reference: + record = comodel.get_by_reference(reference) + + # Save record data + if record: + record_ids.append( + record if isinstance(record, tuple) else (4, record.id) + ) + + return record_ids + + def _update_or_create_related_record( + self, model, reference, values, create_immediately=False + ): + """Update related record with provided values or create a new one + + Args: + model (BaseModel): Related record model + values (dict()): Values to update existing/create new record + reference (Char): Record reference + create_immediately (Bool): If True create a new record immediately. + Used for Many2one fields. + + Context: + force_create_related_record (Bool): If True, create a new record + even if reference is provided. + + Returns: + record: Existing record or new record tuple + """ + + # If reference is found, retrieve the corresponding record + if reference and ( + model._name in self._get_force_x2m_resolve_models() + or not self._context.get("force_create_related_record") + ): + record = model.get_by_reference(reference) + # If the record exists, update it with the values from the dictionary + if record: + # Remove reference from values to avoid possible consequences + values.pop("reference", None) + record.with_context(from_yaml=True).write( + record._post_process_yaml_dict_values(values) + ) + + # If the record does not exist, create a new one + else: + if create_immediately: + record = model.with_context(from_yaml=True).create( + model._post_process_yaml_dict_values(values) + ) + else: + # Use "Create" service command tuple + record = (0, 0, model._post_process_yaml_dict_values(values)) + + # If there's no reference but value is a dict, create a new record + else: + if create_immediately: + # Only 'reference' provided, no other data: do not create, + # just log warning + if set(values.keys()) == {"reference"}: + _logger.warning( + "Attempted to import a record for model '%s' with reference " + "'%s', but only the 'reference' field was provided. " + "It is possible that this record has already been imported. " + "Creation will be skipped.", + model._name, + reference, + ) + return False + + record = model.with_context(from_yaml=True).create( + model._post_process_yaml_dict_values(values) + ) + else: + # Use "Create" service command tuple + record = (0, 0, model._post_process_yaml_dict_values(values)) + + # Return the record's ID if it exists, otherwise return False + return record or False