Wipe cetmix_tower_yaml (polluted by overlapping uploads)

This commit is contained in:
Tower Deploy
2026-04-27 13:43:58 +03:00
parent 18dd9c7a1f
commit 7cef9f1a32
80 changed files with 0 additions and 9275 deletions

View File

@@ -1,21 +0,0 @@
from . import cx_tower_yaml_mixin
from . import cx_tower_command
from . import cx_tower_file_template
from . import cx_tower_tag
from . import cx_tower_plan
from . import cx_tower_plan_line
from . import cx_tower_plan_line_action
from . import cx_tower_variable
from . import cx_tower_variable_option
from . import cx_tower_variable_value
from . import cx_tower_os
from . import cx_tower_server_template
from . import cx_tower_key
from . import cx_tower_key_value
from . import cx_tower_server_log
from . import cx_tower_shortcut
from . import cx_tower_scheduled_task
from . import cx_tower_file
from . import cx_tower_server
from . import cx_tower_yaml_manifest_template
from . import cx_tower_yaml_manifest_author

View File

@@ -1,31 +0,0 @@
# Copyright (C) 2024 Cetmix OÜ
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from odoo import models
class CxTowerCommand(models.Model):
_name = "cx.tower.command"
_inherit = ["cx.tower.command", "cx.tower.yaml.mixin"]
def _get_fields_for_yaml(self):
res = super()._get_fields_for_yaml()
res += [
"access_level",
"name",
"action",
"allow_parallel_run",
"note",
"os_ids",
"tag_ids",
"path",
"file_template_id",
"flight_plan_id",
"code",
"server_status",
"variable_ids",
"secret_ids",
"no_split_for_sudo",
"if_file_exists",
"disconnect_file",
]
return res

View File

@@ -1,46 +0,0 @@
# Copyright (C) 2024 Cetmix OÜ
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from odoo import models
class CxTowerFile(models.Model):
_name = "cx.tower.file"
_inherit = ["cx.tower.file", "cx.tower.yaml.mixin"]
def _get_fields_for_yaml(self):
res = super()._get_fields_for_yaml()
res += [
"name",
"source",
"file_type",
"server_dir",
"code",
"file",
"variable_ids",
"secret_ids",
"template_id",
"keep_when_deleted",
"auto_sync",
"auto_sync_interval",
"sync_date_next",
"sync_date_last",
"server_response",
]
return res
def _post_create_write(self, op_type="write"):
# Do not pull/push files if they are being created from YAML
if self.env.context.get("from_yaml"):
return
super()._post_create_write(op_type)
def _prepare_record_for_yaml(self):
"""
Override to drop file `code` when the source is 'server'.
"""
record_dict = super()._prepare_record_for_yaml()
if record_dict.get("source") == "server":
record_dict["code"] = False
return record_dict

View File

@@ -1,25 +0,0 @@
# Copyright (C) 2024 Cetmix OÜ
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from odoo import models
class CxTowerFileTemplate(models.Model):
_name = "cx.tower.file.template"
_inherit = ["cx.tower.file.template", "cx.tower.yaml.mixin"]
def _get_fields_for_yaml(self):
res = super()._get_fields_for_yaml()
res += [
"name",
"source",
"file_type",
"server_dir",
"file_name",
"keep_when_deleted",
"tag_ids",
"note",
"code",
"variable_ids",
"secret_ids",
]
return res

View File

@@ -1,26 +0,0 @@
# Copyright (C) 2024 Cetmix OÜ
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from odoo import models
class CxTowerJetAction(models.Model):
_name = "cx.tower.jet.action"
_inherit = [
"cx.tower.jet.action",
"cx.tower.yaml.mixin",
]
def _get_fields_for_yaml(self):
res = super()._get_fields_for_yaml()
res += [
"name",
"note",
"priority",
"access_level",
"state_from_id",
"state_transit_id",
"state_to_id",
"state_error_id",
"plan_id",
]
return res

View File

@@ -1,22 +0,0 @@
# Copyright (C) 2024 Cetmix OÜ
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from odoo import models
class CxTowerJetState(models.Model):
_name = "cx.tower.jet.state"
_inherit = [
"cx.tower.jet.state",
"cx.tower.yaml.mixin",
]
def _get_fields_for_yaml(self):
res = super()._get_fields_for_yaml()
res += [
"name",
"sequence",
"access_level",
"color",
"note",
]
return res

View File

@@ -1,42 +0,0 @@
# Copyright (C) 2024 Cetmix OÜ
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from odoo import models
class CxTowerJetTemplate(models.Model):
_name = "cx.tower.jet.template"
_inherit = [
"cx.tower.jet.template",
"cx.tower.yaml.mixin",
]
def _get_fields_for_yaml(self):
res = super()._get_fields_for_yaml()
res += [
"name",
"note",
"tag_ids",
"limit_per_server",
"show_in_create_wizard",
"plan_install_id",
"plan_uninstall_id",
"plan_clone_same_server_id",
"plan_clone_different_server_id",
"variable_value_ids",
"action_ids",
"template_requires_ids",
"waypoint_template_ids",
"server_log_ids",
"scheduled_task_ids",
]
return res
def _get_deferred_x2m_import_fields(self):
"""Return x2m child records resolved after the main import pass."""
return {
"template_requires_ids": {
"child_model": "cx.tower.jet.template.dependency",
"deferred_field": "template_required_id",
"target_model": "cx.tower.jet.template",
}
}

View File

@@ -1,19 +0,0 @@
# Copyright (C) 2024 Cetmix OÜ
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from odoo import models
class CxTowerJetTemplateDependency(models.Model):
_name = "cx.tower.jet.template.dependency"
_inherit = [
"cx.tower.jet.template.dependency",
"cx.tower.yaml.mixin",
]
def _get_fields_for_yaml(self):
res = super()._get_fields_for_yaml()
res += [
"template_required_id",
"state_required_id",
]
return res

View File

@@ -1,32 +0,0 @@
# Copyright (C) 2025 Cetmix OÜ
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from odoo import models
class CxTowerJetWaypointTemplate(models.Model):
_name = "cx.tower.jet.waypoint.template"
_inherit = [
"cx.tower.jet.waypoint.template",
"cx.tower.yaml.mixin",
]
def _get_fields_for_yaml(self):
res = super()._get_fields_for_yaml()
res += [
"name",
"sequence",
"access_level",
"jet_template_id",
"plan_create_id",
"plan_arrive_id",
"plan_leave_id",
"plan_delete_id",
"note",
]
return res
def _get_deferred_m2o_import_fields(self):
"""Return m2o waypoint-template fields resolved after import."""
return {
"jet_template_id": "cx.tower.jet.template",
}

View File

@@ -1,20 +0,0 @@
# Copyright (C) 2024 Cetmix OÜ
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from odoo import models
class CxTowerKey(models.Model):
_name = "cx.tower.key"
_inherit = [
"cx.tower.key",
"cx.tower.yaml.mixin",
]
def _get_fields_for_yaml(self):
res = super()._get_fields_for_yaml()
res += [
"name",
"key_type",
"note",
]
return res

View File

@@ -1,18 +0,0 @@
# Copyright (C) 2024 Cetmix OÜ
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from odoo import models
class CxTowerKeyValue(models.Model):
_name = "cx.tower.key.value"
_inherit = [
"cx.tower.key.value",
"cx.tower.yaml.mixin",
]
def _get_fields_for_yaml(self):
res = super()._get_fields_for_yaml()
res += [
"key_id",
]
return res

View File

@@ -1,20 +0,0 @@
# Copyright (C) 2024 Cetmix OÜ
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from odoo import models
class CxTowerOs(models.Model):
_name = "cx.tower.os"
_inherit = [
"cx.tower.os",
"cx.tower.yaml.mixin",
]
def _get_fields_for_yaml(self):
res = super()._get_fields_for_yaml()
res += [
"name",
"color",
"parent_id",
]
return res

View File

@@ -1,23 +0,0 @@
# Copyright (C) 2024 Cetmix OÜ
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from odoo import models
class CxTowerPlan(models.Model):
_name = "cx.tower.plan"
_inherit = ["cx.tower.plan", "cx.tower.yaml.mixin"]
def _get_fields_for_yaml(self):
res = super()._get_fields_for_yaml()
res += [
"name",
"access_level",
"allow_parallel_run",
"color",
"tag_ids",
"note",
"on_error_action",
"custom_exit_code",
"line_ids",
]
return res

View File

@@ -1,21 +0,0 @@
# Copyright (C) 2024 Cetmix OÜ
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from odoo import models
class CxTowerPlanLine(models.Model):
_name = "cx.tower.plan.line"
_inherit = ["cx.tower.plan.line", "cx.tower.yaml.mixin"]
def _get_fields_for_yaml(self):
res = super()._get_fields_for_yaml()
res += [
"sequence",
"condition",
"use_sudo",
"path",
"command_id",
"action_ids",
"variable_ids",
]
return res

View File

@@ -1,20 +0,0 @@
# Copyright (C) 2024 Cetmix OÜ
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from odoo import models
class CxTowerPlanLineAction(models.Model):
_name = "cx.tower.plan.line.action"
_inherit = ["cx.tower.plan.line.action", "cx.tower.yaml.mixin"]
def _get_fields_for_yaml(self):
res = super()._get_fields_for_yaml()
res += [
"sequence",
"condition",
"value_char",
"action",
"custom_exit_code",
"variable_value_ids",
]
return res

View File

@@ -1,23 +0,0 @@
# Copyright (C) 2025 Cetmix OÜ
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from odoo import models
class CxTowerScheduledTask(models.Model):
_name = "cx.tower.scheduled.task"
_inherit = ["cx.tower.scheduled.task", "cx.tower.yaml.mixin"]
def _get_fields_for_yaml(self):
res = super()._get_fields_for_yaml()
res += [
"name",
"sequence",
"action",
"command_id",
"plan_id",
"interval_number",
"interval_type",
"next_call",
"last_call",
]
return res

View File

@@ -1,36 +0,0 @@
# Copyright (C) 2025 Cetmix OÜ
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from odoo import models
class CxTowerScheduledTaskCv(models.Model):
_name = "cx.tower.scheduled.task.cv"
_inherit = [
"cx.tower.scheduled.task.cv",
"cx.tower.yaml.mixin",
"cx.tower.reference.mixin",
]
def _get_fields_for_yaml(self):
res = super()._get_fields_for_yaml()
res += ["variable_value_id"]
return res
def _post_process_yaml_dict_values(self, values):
"""Populate required child fields from the linked variable value."""
res = super()._post_process_yaml_dict_values(values)
variable_value_id = res.get("variable_value_id")
if variable_value_id:
variable_value = self.env["cx.tower.variable.value"].browse(
variable_value_id
)
if variable_value.exists():
res.update(
{
"name": variable_value.name,
"variable_id": variable_value.variable_id.id,
"option_id": variable_value.option_id.id or False,
"value_char": variable_value.value_char,
}
)
return res

View File

@@ -1,52 +0,0 @@
# Copyright (C) 2024 Cetmix OÜ
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from odoo import models
class CxTowerServer(models.Model):
_name = "cx.tower.server"
_inherit = [
"cx.tower.server",
"cx.tower.yaml.mixin",
]
def _get_fields_for_yaml(self):
res = super()._get_fields_for_yaml()
res += [
"name",
"ip_v4_address",
"ip_v6_address",
"skip_host_key",
"color",
"os_id",
"tag_ids",
"url",
"note",
"ssh_port",
"ssh_username",
"ssh_key_id",
"ssh_auth_mode",
"use_sudo",
"variable_value_ids",
"secret_ids",
"server_log_ids",
"shortcut_ids",
"scheduled_task_ids",
"plan_delete_id",
"file_ids",
"command_ids",
"plan_ids",
]
return res
def _get_force_x2m_resolve_models(self):
res = super()._get_force_x2m_resolve_models()
# This is useful to avoid duplicating existing plans
res += [
"cx.tower.shortcut",
"cx.tower.scheduled.task",
"cx.tower.command",
"cx.tower.plan",
]
return res

View File

@@ -1,23 +0,0 @@
# Copyright (C) 2024 Cetmix OÜ
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from odoo import models
class CxTowerServerLog(models.Model):
_name = "cx.tower.server.log"
_inherit = [
"cx.tower.server.log",
"cx.tower.yaml.mixin",
]
def _get_fields_for_yaml(self):
res = super()._get_fields_for_yaml()
res += [
"name",
"log_type",
"command_id",
"use_sudo",
"file_template_id",
"file_id",
]
return res

View File

@@ -1,41 +0,0 @@
# Copyright (C) 2024 Cetmix OÜ
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from odoo import models
class CxTowerServerTemplate(models.Model):
_name = "cx.tower.server.template"
_inherit = [
"cx.tower.server.template",
"cx.tower.yaml.mixin",
]
def _get_fields_for_yaml(self):
res = super()._get_fields_for_yaml()
res += [
"name",
"color",
"os_id",
"tag_ids",
"note",
"ssh_port",
"ssh_username",
"ssh_key_id",
"ssh_auth_mode",
"use_sudo",
"variable_value_ids",
"server_log_ids",
"shortcut_ids",
"scheduled_task_ids",
"flight_plan_id",
"plan_delete_id",
]
return res
def _get_force_x2m_resolve_models(self):
res = super()._get_force_x2m_resolve_models()
# Add Flight Plan in order to always try to use existing one
# This is useful to avoid duplicating existing plans
res += ["cx.tower.plan", "cx.tower.shortcut", "cx.tower.scheduled.task"]
return res

View File

@@ -1,22 +0,0 @@
# Copyright (C) 2024 Cetmix OÜ
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from odoo import models
class CxTowerShortcut(models.Model):
_name = "cx.tower.shortcut"
_inherit = ["cx.tower.shortcut", "cx.tower.yaml.mixin"]
def _get_fields_for_yaml(self):
res = super()._get_fields_for_yaml()
res += [
"name",
"sequence",
"access_level",
"action",
"command_id",
"use_sudo",
"plan_id",
"note",
]
return res

View File

@@ -1,16 +0,0 @@
# Copyright (C) 2024 Cetmix OÜ
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from odoo import models
class CxTowerTag(models.Model):
_name = "cx.tower.tag"
_inherit = ["cx.tower.tag", "cx.tower.yaml.mixin"]
def _get_fields_for_yaml(self):
res = super()._get_fields_for_yaml()
res += [
"name",
"color",
]
return res

View File

@@ -1,23 +0,0 @@
# Copyright (C) 2024 Cetmix OÜ
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from odoo import models
class CxTowerVariable(models.Model):
_name = "cx.tower.variable"
_inherit = ["cx.tower.variable", "cx.tower.yaml.mixin"]
def _get_fields_for_yaml(self):
res = super()._get_fields_for_yaml()
res += [
"name",
"access_level",
"variable_type",
"option_ids",
"applied_expression",
"validation_pattern",
"validation_message",
"note",
"tag_ids",
]
return res

View File

@@ -1,18 +0,0 @@
# Copyright (C) 2024 Cetmix OÜ
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from odoo import models
class CxTowerVariableOption(models.Model):
_name = "cx.tower.variable.option"
_inherit = ["cx.tower.variable.option", "cx.tower.yaml.mixin"]
def _get_fields_for_yaml(self):
res = super()._get_fields_for_yaml()
res += [
"sequence",
"access_level",
"name",
"value_char",
]
return res

View File

@@ -1,20 +0,0 @@
# Copyright (C) 2024 Cetmix OÜ
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from odoo import models
class CxTowerVariableValue(models.Model):
_name = "cx.tower.variable.value"
_inherit = ["cx.tower.variable.value", "cx.tower.yaml.mixin"]
def _get_fields_for_yaml(self):
res = super()._get_fields_for_yaml()
res += [
"sequence",
"access_level",
"variable_id",
"value_char",
"variable_ids",
"required",
]
return res

View File

@@ -1,23 +0,0 @@
# Copyright (C) 2025 Cetmix OÜ
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from odoo import fields, models
class CxTowerYamlManifestAuthor(models.Model):
"""Author of a YAML manifest (can be one or many)."""
_name = "cx.tower.yaml.manifest.author"
_sql_constraints = [
(
"yaml_manifest_author_name_uniq",
"unique(name)",
"Author name must be unique.",
)
]
_description = "YAML Manifest Author"
_order = "name"
name = fields.Char(required=True, translate=False)

View File

@@ -1,93 +0,0 @@
# Copyright (C) 2025 Cetmix OÜ
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).
import re
from odoo import _, api, fields, models
from odoo.exceptions import ValidationError
class CxTowerYamlManifestTemplate(models.Model):
"""Pre-defined YAML manifest template storing common metadata
such as authors, website, license, and currency for reuse
during YAML exports."""
_name = "cx.tower.yaml.manifest.tmpl"
_description = "YAML Manifest Template"
_order = "name"
name = fields.Char(
required=True,
help="Name of the manifest template.",
)
website = fields.Char(help="Website URL for the manifest.")
author_ids = fields.Many2many(
"cx.tower.yaml.manifest.author",
string="Authors",
help="List of author names to include in the YAML manifest.",
)
license = fields.Selection(
selection=lambda self: self._selection_license(),
help="License used for the code snippet.",
)
license_text = fields.Text(
help="Custom license text when license type is Custom.",
)
currency = fields.Selection(
selection=lambda self: self._selection_currency(),
help="Currency for pricing information.",
)
version = fields.Char(
help="Version in Major.Minor.Patch format, e.g. 1.0.0",
default="1.0.0",
)
file_prefix = fields.Char(
string="File prefix",
help="Add prefix to the exported YAML file name when this template is selected",
)
@api.model
def _selection_license(self):
"""Return available license options for manifest."""
return [
("agpl-3", "AGPL-3"),
("lgpl-3", "LGPL-3"),
("mit", "MIT"),
("custom", _("Custom")),
]
@api.model
def _selection_currency(self):
"""Return available currency options for manifest pricing."""
return [
("EUR", _("Euro")),
("USD", _("US Dollar")),
]
@api.constrains("license", "license_text")
def _check_license_text_for_custom(self):
"""Ensure that custom license text is provided when license is 'custom'."""
for rec in self:
if rec.license == "custom" and not (rec.license_text or "").strip():
raise ValidationError(
_("Provide Custom License Text when License is set to 'Custom'.")
)
@api.constrains("version")
def _check_version_format(self):
"""Ensure the template version follows the x.y.z semantic format.
The version must consist of three non-negative integers (major, minor, patch)
separated by dots—for example, “1.2.3”. Raises a ValidationError otherwise.
"""
semver = re.compile(r"^\d+\.\d+\.\d+$")
for rec in self:
if rec.version and not semver.match(rec.version):
raise ValidationError(
_("Version must be in the Major.Minor.Patch format, e.g. 1.2.3")
)

View File

@@ -1,577 +0,0 @@
# Copyright (C) 2024 Cetmix OÜ
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
import logging
import yaml
from odoo import _, api, fields, models
from odoo.exceptions import AccessError, ValidationError
_logger = logging.getLogger(__name__)
class CustomDumper(yaml.Dumper):
"""Custom dumper to ensures code
is properly dumped in YAML
"""
def represent_scalar(self, tag, value, style=None):
if isinstance(value, str) and "\n" in value:
style = "|"
return super().represent_scalar(tag, value, style)
class YamlExportCollector:
"""
Collector for YAML export.
Tracks unique records by their (model_name, reference) tuple to avoid duplicates.
"""
def __init__(self):
"""
Initialize the collector.
"""
self.added_references = set()
def add(self, key):
"""
Add a record to the collector if its reference is unique.
:param key: tuple, key of the record
"""
if key and key not in self.added_references:
self.added_references.add(key)
def is_added(self, key):
"""
Check by (model, reference) tuple.
:param key: tuple, key of the record
:return: bool
"""
return key in self.added_references
class CxTowerYamlMixin(models.AbstractModel):
"""Used to implement YAML rendering functions.
Inherit in your model in case you want to YAML instance of the records.
"""
_name = "cx.tower.yaml.mixin"
_description = "Cetmix Tower YAML rendering mixin"
# File format version in order to track compatibility
CETMIX_TOWER_YAML_VERSION = 1
# TO_YAML_* used to convert from Odoo field values to YAML
TO_YAML_ACCESS_LEVEL = {"1": "user", "2": "manager", "3": "root"}
# TO_TOWER_* used to convert from YAML field values to Tower ones
TO_TOWER_ACCESS_LEVEL = {"user": "1", "manager": "2", "root": "3"}
yaml_code = fields.Text(
compute="_compute_yaml_code",
inverse="_inverse_yaml_code",
groups="cetmix_tower_yaml.group_export,cetmix_tower_yaml.group_import",
)
def _compute_yaml_code(self):
"""Compute YAML code based on model record data"""
# This is used for the file name.
# Eg cx.tower.command record will have 'command_' prefix.
for record in self:
# We are reading field list for each record
# because list of fields can differ from record to record
record.yaml_code = self._convert_dict_to_yaml(
record._prepare_record_for_yaml()
)
def _inverse_yaml_code(self):
"""Compose record based on provided YAML"""
for record in self:
if record.yaml_code:
record_yaml_dict = yaml.safe_load(record.yaml_code)
record_vals = record._post_process_yaml_dict_values(record_yaml_dict)
record.update(record_vals)
@api.constrains("yaml_code")
def _check_yaml_code_write_access(self):
"""
Check if user has access to create records from YAML.
This is checked only when user already has access to export YAML.
Otherwise, the field is not accessible due to security group.
"""
if self.env.user.has_group("cetmix_tower_yaml.group_export") and (
not self.env.user.has_group("cetmix_tower_yaml.group_import")
and not self.env.user._is_superuser()
):
raise AccessError(_("You are not allowed to create records from YAML"))
@api.model_create_multi
def create(self, vals_list):
# Handle validation error when field values are not valid
try:
return super().create(vals_list)
except ValueError as e:
raise ValidationError(str(e)) from e
def write(self, vals):
# Handle validation error when field values are not valid
try:
return super().write(vals)
except ValueError as e:
raise ValidationError(str(e)) from e
def action_open_yaml_export_wizard(self):
"""Open YAML export wizard"""
return {
"type": "ir.actions.act_window",
"res_model": "cx.tower.yaml.export.wiz",
"view_mode": "form",
"target": "new",
}
def _convert_dict_to_yaml(self, values):
"""Converts Python dictionary to YAML string.
This is a helper function that is designed to be used
by any models that need to convert a dictionary to YAML.
Args:
values (Dict): Dictionary containing data
to be converted to YAML format
Returns:
Text: YAML string
Raises:
ValidationError: If values is not a dictionary
or YAML conversion fails
"""
if not isinstance(values, dict):
raise ValidationError(_("Values must be a dictionary"))
try:
yaml_code = yaml.dump(
values,
Dumper=CustomDumper,
default_flow_style=False,
sort_keys=False,
)
return yaml_code
except (yaml.YAMLError, UnicodeEncodeError) as e:
raise ValidationError(
_(
"Failed to convert dictionary" " to YAML: %(error)s",
error=str(e),
)
) from e
def _prepare_record_for_yaml(self):
"""Reads and processes current record before converting it to YAML
Returns:
dict: values ready for YAML conversion
"""
self.ensure_one()
yaml_keys = self._get_fields_for_yaml()
record_dict = self.read(fields=yaml_keys)[0]
return self._post_process_record_values(record_dict)
def _get_fields_for_yaml(self):
"""Get ist of field to be present in YAML
Set 'no_yaml_service_fields' context key to skip
service fields creation (cetmix_tower_yaml_version, cetmix_tower_model)
Returns:
list(): list of fields to be used as YAML keys
"""
return ["reference"]
def _get_force_x2m_resolve_models(self):
"""List of models that will always try to be resolved
when referenced in x2m related fields.
This is useful for models that should always use existing records
instead of creating new ones when referenced in x2m related fields.
Such as variables or tags.
Returns:
List: list of models that will always try to be resolved
"""
return [
"cx.tower.variable",
"cx.tower.variable.option",
"cx.tower.tag",
"cx.tower.os",
"cx.tower.key",
]
def _post_process_record_values(self, values):
"""Post process record values
before converting them to YAML
Args:
values (dict): values returned by 'read' method
Context:
explode_related_record: if set will return entire record dictionary
not just a reference
remove_empty_values: if set will remove empty values from the record
Returns:
dict(): processed values
"""
collector = self._context.get("yaml_collector")
ref = values.get("reference")
collector_key = (self._name, ref) if ref else None
if collector and collector_key and collector.is_added(collector_key):
return {"reference": ref}
# We don't need id because we are not using it
values.pop("id", None)
# Add YAML format version and model
if not self._context.get("no_yaml_service_fields"):
model_name = self._name.replace("cx.tower.", "").replace(".", "_")
model_values = {
"cetmix_tower_model": model_name,
}
else:
model_values = {}
# Parse access level
access_level = values.pop("access_level", None)
if access_level:
model_values.update(
{"access_level": self.TO_YAML_ACCESS_LEVEL[access_level]}
)
values = {**model_values, **values}
# Copy values to avoid modifying the original values
new_values = values.copy()
# Check if we need to return a record dict or just a reference
# Use context value first, revert to the record setting if not defined
explode_related_record = self._context.get("explode_related_record")
# Check if we need to remove empty values
# Currently only x2m fields are supported
remove_empty_values = self._context.get("remove_empty_values")
# Post process m2o and x2m fields
for key, value in values.items():
# IMPORTANT: Odoo naming patterns must be followed for related fields.
# This is why we are checking for the field name ending here.
# Further checks for the field type are done
# in _process_relation_field_value()
if key.endswith("_id") or key.endswith("_ids"):
if not value and remove_empty_values:
del new_values[key]
else:
processed_value = self.with_context(
explode_related_record=explode_related_record
)._process_relation_field_value(key, value, record_mode=True)
new_values.update({key: processed_value})
if collector and collector_key:
collector.add(collector_key)
return new_values
def _post_process_yaml_dict_values(self, values):
"""Post process dictionary values generated from YAML code
Args:
values (dict): Dictionary generated from YAML
Returns:
dict(): Post-processed values
"""
# Remove model data because it is not a field
if "cetmix_tower_model" in values:
values.pop("cetmix_tower_model")
# Parse access level
if "access_level" in values:
values_access_level = values["access_level"]
access_level = self.TO_TOWER_ACCESS_LEVEL.get(values_access_level)
if access_level:
values.update({"access_level": access_level})
else:
raise ValidationError(
_(
"Wrong value for 'access_level' key: %(acv)s",
acv=values_access_level,
)
)
# Leave supported keys only
supported_keys = self._get_fields_for_yaml()
filtered_values = {k: v for k, v in values.items() if k in supported_keys}
# Post process m2o fields
for key, value in filtered_values.items():
# IMPORTANT: Odoo naming patterns must be followed for related fields.
# This is why we are checking for the field name ending here.
# Further checks for the field type are done
# in _process_relation_field_value()
if key.endswith("_id") or key.endswith("_ids"):
processed_value = self.with_context(
explode_related_record=True
)._process_relation_field_value(key, value, record_mode=False)
filtered_values.update({key: processed_value})
return filtered_values
def _process_relation_field_value(self, field, value, record_mode=False):
"""Post process One2many, Many2many or Many2one value
Args:
field (Char): Field the value belongs to
value (Char): Value to process
record_mode (Bool): If True process value as a record value
else process value as a YAML value
Context:
explode_related_record: if set will return entire record dictionary
not just a reference
Returns:
dict() or Char: record dictionary if fetch_record else reference
"""
# Step 1: Return False if the value is not set or the field is not found
if not value:
return False
field_obj = self._fields.get(field)
if not field_obj:
return False
# Step 2: Return False if the field type doesn't match
# or comodel is not defined
field_type = field_obj.type
if (
field_type not in ["one2many", "many2many", "many2one"]
or not field_obj.comodel_name
):
return False
comodel = self.env[field_obj.comodel_name]
explode_related_record = self._context.get("explode_related_record")
# Step 3: process value based on the field type
if field_type == "many2one":
return self._process_m2o_value(
comodel, value, explode_related_record, record_mode
)
if field_type in ["one2many", "many2many"]:
return self._process_x2m_values(
comodel, field_type, value, explode_related_record, record_mode
)
# Step 4: fall back if field type is not supported
return False
def _process_m2o_value(
self, comodel, value, explode_related_record, record_mode=False
):
"""Post process many2one value
Args:
comodel (BaseClass): Model the value belongs to
value (Char): Value to process
explode_related_record (Bool): If True return entire record dict
instead of a reference
record_mode (Bool): If True process value as a record value
else process value as a YAML value
Returns:
dict() or Char: record dictionary if fetch_record else reference
"""
# -- (Record -> YAML)
if record_mode:
# Retrieve the record based on the ID provided in the value
record = comodel.browse(value[0])
# If the context specifies to explode the related record,
# return its dictionary representation
if explode_related_record:
return (
record.with_context(
no_yaml_service_fields=True
)._prepare_record_for_yaml()
if record
else False
)
# Otherwise, return just the reference (or False if record does not exist)
return record.reference if record else False
# -- (YAML -> Record)
# Step 1: Process value in normal mode
record = False
# If the value is a string, it is treated as a reference
if isinstance(value, str):
reference = value
# If the value is a dictionary, extract the reference from it
elif isinstance(value, dict):
reference = value.get("reference")
record = self._update_or_create_related_record(
comodel, reference, value, create_immediately=True
)
else:
return False
# Step 2: Final fallback: attempt to retrieve the record by reference if set,
# return its ID or False
if not record and reference:
record = comodel.get_by_reference(reference)
return record.id if record else False
def _process_x2m_values(
self, comodel, field_type, values, explode_related_record, record_mode=False
):
"""Post process many2many value
Args:
comodel (BaseClass): Model the value belongs to
field_type (Char): Field type
values (list()): Values to process
explode_related_record (Bool): If True return entire record dict
instead of a reference
record_mode (Bool): If True process value as a record value
else process value as a YAML value
Returns:
dict() or Char: record dictionary if fetch_record else reference
"""
# -- (Record -> YAML)
if record_mode:
record_list = []
for value in values:
# Retrieve the record based on the ID provided in the value
record = comodel.browse(value)
# If the context specifies to explode the related record,
# return its dictionary representation
if explode_related_record:
record_list.append(
record.with_context(
no_yaml_service_fields=True
)._prepare_record_for_yaml()
if record
else False
)
# Otherwise, return just the reference
# (or False if record does not exist)
else:
record_list.append(record.reference if record else False)
return record_list
# -- (YAML -> Record)
# Step 1: Process value in normal mode
record_ids = []
for value in values:
record = False
# If the value is a string, it is treated as a reference
if isinstance(value, str):
reference = value
# If the value is a dictionary, extract the reference from it
elif isinstance(value, dict):
reference = value.get("reference")
record = self._update_or_create_related_record(
comodel,
reference,
value,
create_immediately=field_type == "many2many",
)
# Step 2: Final fallback: attempt to retrieve the record by reference
# Return record ID or False if reference is not defined
if not record and reference:
record = comodel.get_by_reference(reference)
# Save record data
if record:
record_ids.append(
record if isinstance(record, tuple) else (4, record.id)
)
return record_ids
def _update_or_create_related_record(
self, model, reference, values, create_immediately=False
):
"""Update related record with provided values or create a new one
Args:
model (BaseModel): Related record model
values (dict()): Values to update existing/create new record
reference (Char): Record reference
create_immediately (Bool): If True create a new record immediately.
Used for Many2one fields.
Context:
force_create_related_record (Bool): If True, create a new record
even if reference is provided.
Returns:
record: Existing record or new record tuple
"""
# If reference is found, retrieve the corresponding record
if reference and (
model._name in self._get_force_x2m_resolve_models()
or not self._context.get("force_create_related_record")
):
record = model.get_by_reference(reference)
# If the record exists, update it with the values from the dictionary
if record:
# Remove reference from values to avoid possible consequences
values.pop("reference", None)
record.with_context(from_yaml=True).write(
record._post_process_yaml_dict_values(values)
)
# If the record does not exist, create a new one
else:
if create_immediately:
record = model.with_context(from_yaml=True).create(
model._post_process_yaml_dict_values(values)
)
else:
# Use "Create" service command tuple
record = (0, 0, model._post_process_yaml_dict_values(values))
# If there's no reference but value is a dict, create a new record
else:
if create_immediately:
# Only 'reference' provided, no other data: do not create,
# just log warning
if set(values.keys()) == {"reference"}:
_logger.warning(
"Attempted to import a record for model '%s' with reference "
"'%s', but only the 'reference' field was provided. "
"It is possible that this record has already been imported. "
"Creation will be skipped.",
model._name,
reference,
)
return False
record = model.with_context(from_yaml=True).create(
model._post_process_yaml_dict_values(values)
)
else:
# Use "Create" service command tuple
record = (0, 0, model._post_process_yaml_dict_values(values))
# Return the record's ID if it exists, otherwise return False
return record or False