# Copyright (C) 2024 Cetmix OÜ # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). import copy import logging import yaml from odoo import _, api, fields, models from odoo.exceptions import AccessError, ValidationError _logger = logging.getLogger(__name__) DEFERRED_M2O_IMPORT = object() class CustomDumper(yaml.Dumper): """Custom dumper to ensures code is properly dumped in YAML """ def represent_scalar(self, tag, value, style=None): if isinstance(value, str) and "\n" in value: style = "|" return super().represent_scalar(tag, value, style) class YamlExportCollector: """ Collector for YAML export. Tracks unique records by their (model_name, reference) tuple to avoid duplicates. """ def __init__(self): """ Initialize the collector. """ self.added_references = set() def add(self, key): """ Add a record to the collector if its reference is unique. :param key: tuple, key of the record """ if key and key not in self.added_references: self.added_references.add(key) def is_added(self, key): """ Check by (model, reference) tuple. :param key: tuple, key of the record :return: bool """ return key in self.added_references class CxTowerYamlMixin(models.AbstractModel): """Used to implement YAML rendering functions. Inherit in your model in case you want to YAML instance of the records. """ _name = "cx.tower.yaml.mixin" _description = "Cetmix Tower YAML rendering mixin" # File format version in order to track compatibility CETMIX_TOWER_YAML_VERSION = 1 # TO_YAML_* used to convert from Odoo field values to YAML TO_YAML_ACCESS_LEVEL = {"1": "user", "2": "manager", "3": "root"} # TO_TOWER_* used to convert from YAML field values to Tower ones TO_TOWER_ACCESS_LEVEL = {"user": "1", "manager": "2", "root": "3"} yaml_code = fields.Text( compute="_compute_yaml_code", inverse="_inverse_yaml_code", groups="cetmix_tower_yaml.group_export,cetmix_tower_yaml.group_import", ) def _compute_yaml_code(self): """Compute YAML code based on model record data""" # This is used for the file name. # Eg cx.tower.command record will have 'command_' prefix. for record in self: # Use a shared collector from context when one is provided (e.g. by # the export wizard for cross-record deduplication); otherwise use a # fresh per-record collector so that each record's yaml_code is # deterministic regardless of which sibling records are batched. collector = record._context.get("yaml_collector") or YamlExportCollector() # We are reading field list for each record # because list of fields can differ from record to record record.yaml_code = self._convert_dict_to_yaml( record.with_context(yaml_collector=collector)._prepare_record_for_yaml() ) def _inverse_yaml_code(self): """Compose record based on provided YAML""" for record in self: if record.yaml_code: record_yaml_dict = yaml.safe_load(record.yaml_code) record_vals = record._post_process_yaml_dict_values(record_yaml_dict) record.update(record_vals) @api.constrains("yaml_code") def _check_yaml_code_write_access(self): """ Check if user has access to create records from YAML. This is checked only when user already has access to export YAML. Otherwise, the field is not accessible due to security group. """ if self.env.user.has_group("cetmix_tower_yaml.group_export") and ( not self.env.user.has_group("cetmix_tower_yaml.group_import") and not self.env.user._is_superuser() ): raise AccessError(_("You are not allowed to create records from YAML")) def create(self, vals_list): # Handle validation error when field values are not valid try: return super().create(vals_list) except ValueError as e: raise ValidationError(str(e)) from e def write(self, vals): # Handle validation error when field values are not valid try: return super().write(vals) except ValueError as e: raise ValidationError(str(e)) from e def action_open_yaml_export_wizard(self): """Open YAML export wizard""" return { "type": "ir.actions.act_window", "res_model": "cx.tower.yaml.export.wiz", "view_mode": "form", "target": "new", } def _convert_dict_to_yaml(self, values): """Converts Python dictionary to YAML string. This is a helper function that is designed to be used by any models that need to convert a dictionary to YAML. Args: values (Dict): Dictionary containing data to be converted to YAML format Returns: Text: YAML string Raises: ValidationError: If values is not a dictionary or YAML conversion fails """ if not isinstance(values, dict): raise ValidationError(_("Values must be a dictionary")) try: yaml_code = yaml.dump( values, Dumper=CustomDumper, default_flow_style=False, sort_keys=False, ) return yaml_code except (yaml.YAMLError, UnicodeEncodeError) as e: raise ValidationError( _( "Failed to convert dictionary" " to YAML: %(error)s", error=str(e), ) ) from e def _prepare_record_for_yaml(self): """Reads and processes current record before converting it to YAML Returns: dict: values ready for YAML conversion """ self.ensure_one() yaml_keys = self._get_fields_for_yaml() record_dict = self.read(fields=yaml_keys)[0] return self._post_process_record_values(record_dict) def _get_fields_for_yaml(self): """Get ist of field to be present in YAML Set 'no_yaml_service_fields' context key to skip service fields creation (cetmix_tower_yaml_version, cetmix_tower_model) Returns: list(): list of fields to be used as YAML keys """ return ["reference"] def _get_force_x2m_resolve_models(self): """List of models that will always try to be resolved when referenced in x2m related fields. This is useful for models that should always use existing records instead of creating new ones when referenced in x2m related fields. Such as variables or tags. Returns: List: list of models that will always try to be resolved """ return [ "cx.tower.variable", "cx.tower.variable.option", "cx.tower.tag", "cx.tower.os", "cx.tower.key", ] def _get_deferred_m2o_import_fields(self): """Map m2o fields that should be resolved after the main import pass. Returns: dict: Field name to expected target model mapping. """ return {} def _get_deferred_x2m_import_fields(self): """Map x2m child records that should be created after the main import pass. Returns: dict: Parent field name to deferred child spec mapping. """ return {} def _has_meaningful_yaml_value(self, value): """Return whether a YAML value contains meaningful payload.""" if value is False or value is None or value == "": return False if isinstance(value, dict): if set(value.keys()) == {"reference"}: return bool(value.get("reference")) return any( self._has_meaningful_yaml_value(item) for key, item in value.items() if key != "reference" ) if isinstance(value, list): return any(self._has_meaningful_yaml_value(item) for item in value) return True def _get_reference_only_yaml_relation_reference(self, value): """Return reference for reference-only YAML relation values. Args: value (str | dict): YAML relation value. Returns: str | bool: Reference if the value is reference-only, otherwise False. """ if isinstance(value, str): return value if isinstance(value, dict) and set(value.keys()) == {"reference"}: return value.get("reference") or False return False def _queue_deferred_m2o_import(self, field, comodel, value): """Queue unresolved m2o relation for the deferred import pass. Args: field (str): Owner field name. comodel (BaseModel): Related model. value (str | dict): YAML relation value. Returns: bool: True when the relation was queued for deferred resolution. """ queue = self._context.get("yaml_deferred_m2o_queue") if queue is None: return False deferred_fields = self._get_deferred_m2o_import_fields() expected_model = deferred_fields.get(field) if not expected_model or expected_model != comodel._name: return False target_reference = self._get_reference_only_yaml_relation_reference(value) if not target_reference or comodel.get_by_reference(target_reference): return False record_reference = self._context.get("yaml_import_record_reference") if not record_reference: return False queue.append( { "record_model": self._name, "record_reference": record_reference, "field_name": field, "target_model": comodel._name, "target_reference": target_reference, } ) return True def _queue_deferred_x2m_import(self, field, comodel, value): """Queue unresolved x2m child record for the deferred import pass. Args: field (str): Owner x2m field name. comodel (BaseModel): Related child model. value (dict): YAML child record value. Returns: bool: True when the child was queued for deferred creation or should be skipped (e.g., empty value with skip_empty=True). """ queue = self._context.get("yaml_deferred_x2m_queue") if queue is None or not isinstance(value, dict): return False deferred_fields = self._get_deferred_x2m_import_fields() spec = deferred_fields.get(field) or {} if spec.get("child_model") != comodel._name: return False if spec.get("skip_empty") and not self._has_meaningful_yaml_value(value): return True deferred_field = spec.get("deferred_field") if not deferred_field: return False target_model = spec.get("target_model") target_value = value.get(deferred_field) target_reference = self._get_reference_only_yaml_relation_reference( target_value ) if not target_model or not target_reference: return False target_record = self.env[target_model].get_by_reference(target_reference) if target_record: return False record_reference = self._context.get("yaml_import_record_reference") if not record_reference: return False queue.append( { "record_model": self._name, "record_reference": record_reference, "field_name": field, "child_model": comodel._name, "deferred_field": deferred_field, "target_model": target_model, "target_reference": target_reference, "values": copy.deepcopy(value), } ) return True def _get_yaml_duplicate_reference_dict(self, ref, values): """Return the stub emitted when a record has already been serialized. The collector deduplicates by (model, reference); subsequent occurrences are collapsed to a reference-only dict. Import must never attempt to create from this stub — it must resolve the record by reference instead. Args: ref (str): Record reference. values (dict): Raw values (unused; kept for signature compatibility in case subclasses need them). Returns: dict: ``{"reference": ref}`` only. """ return {"reference": ref} def _post_process_record_values(self, values): """Post process record values before converting them to YAML Args: values (dict): values returned by 'read' method Context: explode_related_record: if set will return entire record dictionary not just a reference remove_empty_values: if set will remove empty values from the record Returns: dict(): processed values """ collector = self._context.get("yaml_collector") ref = values.get("reference") collector_key = (self._name, ref) if ref else None if collector and collector_key and collector.is_added(collector_key): return self._get_yaml_duplicate_reference_dict(ref, values) if collector and collector_key: collector.add(collector_key) if collector and collector_key: collector.add(collector_key) # We don't need id because we are not using it values.pop("id", None) # Add YAML format version and model if not self._context.get("no_yaml_service_fields"): model_name = self._name.replace("cx.tower.", "").replace(".", "_") model_values = { "cetmix_tower_model": model_name, } else: model_values = {} # Parse access level access_level = values.pop("access_level", None) if access_level: model_values.update( {"access_level": self.TO_YAML_ACCESS_LEVEL[access_level]} ) values = {**model_values, **values} # Copy values to avoid modifying the original values new_values = values.copy() # Check if we need to return a record dict or just a reference # Use context value first, revert to the record setting if not defined explode_related_record = self._context.get("explode_related_record") # Check if we need to remove empty values # Currently only x2m fields are supported remove_empty_values = self._context.get("remove_empty_values") # Post process m2o and x2m fields for key, value in values.items(): # IMPORTANT: Odoo naming patterns must be followed for related fields. # This is why we are checking for the field name ending here. # Further checks for the field type are done # in _process_relation_field_value() if key.endswith("_id") or key.endswith("_ids"): if not value and remove_empty_values: del new_values[key] else: processed_value = self.with_context( explode_related_record=explode_related_record )._process_relation_field_value(key, value, record_mode=True) new_values.update({key: processed_value}) return new_values def _post_process_yaml_dict_values(self, values): """Post process dictionary values generated from YAML code Args: values (dict): Dictionary generated from YAML Returns: dict(): Post-processed values """ # Remove model data because it is not a field if "cetmix_tower_model" in values: values.pop("cetmix_tower_model") # Parse access level if "access_level" in values: values_access_level = values["access_level"] access_level = self.TO_TOWER_ACCESS_LEVEL.get(values_access_level) if access_level: values.update({"access_level": access_level}) else: raise ValidationError( _( "Wrong value for 'access_level' key: %(acv)s", acv=values_access_level, ) ) # Leave supported keys only supported_keys = self._get_fields_for_yaml() filtered_values = {k: v for k, v in values.items() if k in supported_keys} # Post process m2o fields for key, value in list(filtered_values.items()): # IMPORTANT: Odoo naming patterns must be followed for related fields. # This is why we are checking for the field name ending here. # Further checks for the field type are done # in _process_relation_field_value() if key.endswith("_id") or key.endswith("_ids"): processed_value = self.with_context( explode_related_record=True, yaml_import_record_reference=filtered_values.get("reference"), )._process_relation_field_value(key, value, record_mode=False) if processed_value is DEFERRED_M2O_IMPORT: filtered_values.pop(key, None) else: filtered_values.update({key: processed_value}) return filtered_values def _process_relation_field_value(self, field, value, record_mode=False): """Post process One2many, Many2many or Many2one value Args: field (Char): Field the value belongs to value (Char): Value to process record_mode (Bool): If True process value as a record value else process value as a YAML value Context: explode_related_record: if set will return entire record dictionary not just a reference Returns: dict() or Char: record dictionary if fetch_record else reference """ # Step 1: Return False if the value is not set or the field is not found if not value: return False field_obj = self._fields.get(field) if not field_obj: return False # Step 2: Return False if the field type doesn't match # or comodel is not defined field_type = field_obj.type if ( field_type not in ["one2many", "many2many", "many2one"] or not field_obj.comodel_name ): return False comodel = self.env[field_obj.comodel_name] explode_related_record = self._context.get("explode_related_record") # Step 3: process value based on the field type if field_type == "many2one": return self._process_m2o_value( field, comodel, value, explode_related_record, record_mode ) if field_type in ["one2many", "many2many"]: return self._process_x2m_values( field, comodel, field_type, value, explode_related_record, record_mode ) # Step 4: fall back if field type is not supported return False def _process_m2o_value( self, field, comodel, value, explode_related_record, record_mode=False ): """Post process many2one value Args: field (Char): Field the value belongs to comodel (BaseClass): Model the value belongs to value (Char): Value to process explode_related_record (Bool): If True return entire record dict instead of a reference record_mode (Bool): If True process value as a record value else process value as a YAML value Returns: dict() or Char: record dictionary if fetch_record else reference """ # -- (Record -> YAML) if record_mode: # Retrieve the record based on the ID provided in the value record = comodel.browse(value[0]) # If the context specifies to explode the related record, # return its dictionary representation if explode_related_record: return ( record.with_context( no_yaml_service_fields=True )._prepare_record_for_yaml() if record else False ) # Otherwise, return just the reference (or False if record does not exist) return record.reference if record else False # -- (YAML -> Record) # Step 1: Process value in normal mode record = False if self._queue_deferred_m2o_import(field, comodel, value): return DEFERRED_M2O_IMPORT # If the value is a string, it is treated as a reference if isinstance(value, str): reference = value # If the value is a dictionary, extract the reference from it elif isinstance(value, dict): reference = value.get("reference") if self._get_reference_only_yaml_relation_reference(value): record = False else: record = self._update_or_create_related_record( comodel, reference, value, create_immediately=True ) else: return False # Step 2: Final fallback: attempt to retrieve the record by reference if set, # return its ID or False if not record and reference: record = comodel.get_by_reference(reference) return record.id if record else False def _process_x2m_values( self, field, comodel, field_type, values, explode_related_record, record_mode=False, ): """Post process many2many value Args: field (Char): Field the value belongs to comodel (BaseClass): Model the value belongs to field_type (Char): Field type values (list()): Values to process explode_related_record (Bool): If True return entire record dict instead of a reference record_mode (Bool): If True process value as a record value else process value as a YAML value Returns: dict() or Char: record dictionary if fetch_record else reference """ # -- (Record -> YAML) if record_mode: record_list = [] for value in values: # Retrieve the record based on the ID provided in the value record = comodel.browse(value) # If the context specifies to explode the related record, # return its dictionary representation if explode_related_record: record_list.append( record.with_context( no_yaml_service_fields=True )._prepare_record_for_yaml() if record else False ) # Otherwise, return just the reference # (or False if record does not exist) else: record_list.append(record.reference if record else False) return record_list # -- (YAML -> Record) # Step 1: Process value in normal mode record_ids = [] for value in values: record = False # If the value is a string, it is treated as a reference if isinstance(value, str): reference = value # If the value is a dictionary, extract the reference from it elif isinstance(value, dict): if self._queue_deferred_x2m_import(field, comodel, value): continue reference = value.get("reference") record = self._update_or_create_related_record( comodel, reference, value, create_immediately=field_type == "many2many", ) # Step 2: Final fallback: attempt to retrieve the record by reference # Return record ID or False if reference is not defined if not record and reference: record = comodel.get_by_reference(reference) # Save record data if record: record_ids.append( record if isinstance(record, tuple) else (4, record.id) ) return record_ids def _update_or_create_related_record( self, model, reference, values, create_immediately=False ): """Update related record with provided values or create a new one Args: model (BaseModel): Related record model values (dict()): Values to update existing/create new record reference (Char): Record reference create_immediately (Bool): If True create a new record immediately. Used for Many2one fields. Context: force_create_related_record (Bool): If True, create a new record even if reference is provided. Returns: record: Existing record or new record tuple """ # If reference is found, retrieve the corresponding record if reference and ( model._name in self._get_force_x2m_resolve_models() or not self._context.get("force_create_related_record") ): record = model.get_by_reference(reference) # If the record exists, update it with the values from the dictionary if record: # Remove reference from values to avoid possible consequences values.pop("reference", None) record.with_context(from_yaml=True).write( record._post_process_yaml_dict_values(values) ) # If the record does not exist, create a new one else: if set(values.keys()) == {"reference"}: _logger.warning( "Attempted to import a record for model '%s' " "with reference " "'%s', but only the 'reference' field was provided. " "Creation will be skipped until the target record " "exists.", model._name, reference, ) return False if create_immediately: record = model.with_context(from_yaml=True).create( model._post_process_yaml_dict_values(values) ) else: # Use "Create" service command tuple record = (0, 0, model._post_process_yaml_dict_values(values)) # If there's no reference but value is a dict, create a new record else: # Only 'reference' provided, no other data: do not create, # just log warning if set(values.keys()) == {"reference"}: _logger.warning( "Attempted to import a record for model '%s' with reference " "'%s', but only the 'reference' field was provided. " "It is possible that this record has already been imported. " "Creation will be skipped.", model._name, reference, ) return False if create_immediately: record = model.with_context(from_yaml=True).create( model._post_process_yaml_dict_values(values) ) else: # Use "Create" service command tuple record = (0, 0, model._post_process_yaml_dict_values(values)) # Return the record's ID if it exists, otherwise return False return record or False