Tower: upload cetmix_tower_server 16.0.2.2.9 (via marketplace)
This commit is contained in:
408
addons/cetmix_tower_server/models/cx_tower_vault_mixin.py
Normal file
408
addons/cetmix_tower_server/models/cx_tower_vault_mixin.py
Normal file
@@ -0,0 +1,408 @@
|
||||
from collections import defaultdict
|
||||
|
||||
from odoo import api, models
|
||||
|
||||
|
||||
class CxTowerVaultMixin(models.AbstractModel):
|
||||
"""Mixin for vault functionality.
|
||||
|
||||
This mixin provides methods to securely store and retrieve sensitive data
|
||||
in the vault. Inheriting models must define SECRET_FIELDS list with field
|
||||
names that should be stored in the vault.
|
||||
"""
|
||||
|
||||
_name = "cx.tower.vault.mixin"
|
||||
_description = "Cetmix Tower Vault Mixin"
|
||||
|
||||
SECRET_VALUE_PLACEHOLDER = "*****"
|
||||
SECRET_FIELDS = []
|
||||
|
||||
def _read(self, fields): # pylint: disable=missing-return # doesn't return anything
|
||||
"""Substitute fields based on api.
|
||||
|
||||
This method replaces values of secret fields with a placeholder value
|
||||
when they are read from the database.
|
||||
|
||||
Args:
|
||||
fields (list): List of fields to read
|
||||
"""
|
||||
super()._read(fields)
|
||||
|
||||
show_all = not fields
|
||||
secret_fields = (
|
||||
self.SECRET_FIELDS
|
||||
if show_all
|
||||
else [f for f in self.SECRET_FIELDS if f in fields]
|
||||
)
|
||||
|
||||
for record in self:
|
||||
for secret_field in secret_fields:
|
||||
try:
|
||||
record._cache[secret_field] = self.SECRET_VALUE_PLACEHOLDER
|
||||
except Exception: # pylint: disable=except-pass
|
||||
# skip SpecialValue
|
||||
# (e.g. for missing record or access right)
|
||||
pass
|
||||
|
||||
@api.model_create_multi
|
||||
def create(self, vals_list):
|
||||
"""Override create to handle secret values securely.
|
||||
|
||||
Extracts secret fields, stores them in vault, and prevents
|
||||
actual secret values from being saved in the main table.
|
||||
|
||||
Args:
|
||||
vals_list (list): List of dictionaries containing field values
|
||||
for record creation
|
||||
|
||||
Returns:
|
||||
recordset: Created records with secret values stored in vault
|
||||
|
||||
Note:
|
||||
Secret fields are automatically processed and stored securely.
|
||||
The main database table never contains actual secret values.
|
||||
"""
|
||||
|
||||
# Step 1: Extract secret fields and generate temporary IDs
|
||||
secret_vals = self._extract_and_replace_secret_fields(vals_list)
|
||||
|
||||
# Step 2: Create records with batch operation
|
||||
records = super().create(vals_list)
|
||||
|
||||
# Step 3: Update vault records with real IDs
|
||||
if secret_vals:
|
||||
self._process_secret_values_after_creation(records, secret_vals)
|
||||
|
||||
return records
|
||||
|
||||
def write(self, vals):
|
||||
"""Override write to handle secret fields.
|
||||
|
||||
Extracts secret field values from vals dictionary and stores them securely
|
||||
in the vault instead of the main database table. The remaining non-secret
|
||||
fields are processed by the standard write method.
|
||||
|
||||
Args:
|
||||
vals (dict): Dictionary of field values to write to records
|
||||
|
||||
Returns:
|
||||
bool: Result of the parent write operation
|
||||
|
||||
Note:
|
||||
Secret fields defined in SECRET_FIELDS are automatically intercepted
|
||||
and stored in vault. Cache is invalidated for all secret fields when
|
||||
any secret field is modified.
|
||||
"""
|
||||
# Extract secret fields
|
||||
secret_values = {}
|
||||
for secret_field in self.SECRET_FIELDS:
|
||||
if secret_field in vals:
|
||||
secret_values[secret_field] = vals.pop(secret_field)
|
||||
|
||||
res = super().write(vals)
|
||||
|
||||
if secret_values:
|
||||
self._set_secret_values(secret_values)
|
||||
# Invalidate cache for all secret fields
|
||||
self.invalidate_recordset(self.SECRET_FIELDS)
|
||||
|
||||
return res
|
||||
|
||||
def unlink(self):
|
||||
"""Override unlink to delete vault records.
|
||||
|
||||
Automatically removes all associated vault records after deleting
|
||||
the main records to prevent orphaned secret data in the vault.
|
||||
|
||||
Returns:
|
||||
bool: Result of the parent unlink operation
|
||||
|
||||
Note:
|
||||
Vault cleanup is performed automatically and cannot be bypassed.
|
||||
"""
|
||||
ids = self.ids
|
||||
|
||||
res = super().unlink()
|
||||
|
||||
# Find all vault records for these records
|
||||
vault_records = (
|
||||
self.env["cx.tower.vault"]
|
||||
.sudo()
|
||||
.search([("res_model", "=", self._name), ("res_id", "in", ids)])
|
||||
)
|
||||
|
||||
# Delete vault records
|
||||
if vault_records:
|
||||
vault_records.sudo().unlink()
|
||||
|
||||
return res
|
||||
|
||||
def _get_secret_value(self, field_name):
|
||||
"""Retrieves the actual secret value for a specific field for a single record.
|
||||
|
||||
This method is the only way to get the real secret field value because:
|
||||
- Direct field access (e.g., self.secret_field)
|
||||
returns placeholder due to _read() override
|
||||
- The actual field in the main table is empty/NULL
|
||||
as values are stored in vault
|
||||
|
||||
Args:
|
||||
field_name (str): Name of the secret field to retrieve
|
||||
|
||||
Returns:
|
||||
str or None: The actual secret value, or None if not found or field
|
||||
is not in SECRET_FIELDS
|
||||
|
||||
Note:
|
||||
This method bypasses Odoo's ORM field access to avoid getting
|
||||
placeholder values returned by the overridden _read() method.
|
||||
"""
|
||||
|
||||
self.ensure_one()
|
||||
|
||||
return self._get_secret_values([field_name]).get(self.id, {}).get(field_name)
|
||||
|
||||
def _get_secret_values(self, fields_list=None):
|
||||
"""Retrieve secret values from the vault for specified fields.
|
||||
|
||||
This method fetches secret values stored in the vault for all records
|
||||
in the current recordset and specified fields (or all SECRET_FIELDS).
|
||||
|
||||
Args:
|
||||
fields_list (list, optional): List of field names to retrieve.
|
||||
Defaults to all SECRET_FIELDS.
|
||||
|
||||
Returns:
|
||||
dict: Dictionary mapping record IDs to their secret field values.
|
||||
Structure: {res_id: {field_name: secret_value}}
|
||||
|
||||
Example:
|
||||
{1: {'ssh_password': 'secret123', 'host_key': 'key456'},
|
||||
2: {'ssh_password': 'secret789'}}
|
||||
|
||||
Note:
|
||||
This method searches vault records using standard domain filtering
|
||||
by res_id, and field_name for reliable record matching.
|
||||
If a record has no secret values this record is not included in the result.
|
||||
"""
|
||||
# If no records, return empty dict
|
||||
if not self:
|
||||
return {}
|
||||
|
||||
# Prepare fields to fetch
|
||||
fields_to_fetch = (
|
||||
[f for f in fields_list if f in self.SECRET_FIELDS]
|
||||
if fields_list
|
||||
else self.SECRET_FIELDS
|
||||
)
|
||||
# If no fields to fetch, return empty dict
|
||||
if not fields_to_fetch:
|
||||
return {}
|
||||
|
||||
# Search vault records for all records and all secret fields
|
||||
domain = [
|
||||
("res_model", "=", self._name),
|
||||
("res_id", "in", self.ids),
|
||||
("field_name", "in", fields_to_fetch),
|
||||
]
|
||||
vault_records = (
|
||||
self.env["cx.tower.vault"]
|
||||
.sudo()
|
||||
.search_read(
|
||||
domain,
|
||||
["res_id", "field_name", "data"],
|
||||
)
|
||||
)
|
||||
res = defaultdict(dict)
|
||||
for record in vault_records:
|
||||
res[record["res_id"]][record["field_name"]] = record["data"]
|
||||
|
||||
return dict(res)
|
||||
|
||||
def _set_secret_values(self, vals):
|
||||
"""Store secret values in the vault.
|
||||
|
||||
This method stores sensitive data in the vault for all records in the recordset.
|
||||
It either updates existing vault records or creates new ones for each
|
||||
record-field pair in the vals dictionary.
|
||||
|
||||
This method can be overridden to implement custom storage mechanisms
|
||||
for secret values, such as external key management systems or
|
||||
encryption services.
|
||||
|
||||
Args:
|
||||
vals (dict): Dictionary mapping field names to their secret values
|
||||
to be stored in the vault for all records
|
||||
|
||||
Returns:
|
||||
None
|
||||
"""
|
||||
if not vals or not self:
|
||||
return
|
||||
|
||||
# Get all existing vault records in ONE SQL query
|
||||
domain = [
|
||||
("res_model", "=", self._name),
|
||||
("res_id", "in", self.ids),
|
||||
("field_name", "in", list(vals.keys())),
|
||||
]
|
||||
existing_vault_records = self.env["cx.tower.vault"].sudo().search(domain)
|
||||
|
||||
# Prepare data for batch operations
|
||||
vals_to_update_records = defaultdict(lambda: self.env["cx.tower.vault"])
|
||||
records_to_unlink = self.env["cx.tower.vault"]
|
||||
records_to_create = []
|
||||
|
||||
# Index existing records by (res_id, field_name) for O(1) lookups
|
||||
existing_map = {(v.res_id, v.field_name): v for v in existing_vault_records}
|
||||
|
||||
# Only allow known secret fields to be set
|
||||
allowed_fields = set(self.SECRET_FIELDS)
|
||||
|
||||
# Process each record and field combination
|
||||
for record in self:
|
||||
for field, value in vals.items():
|
||||
if field not in allowed_fields:
|
||||
continue
|
||||
# Fast lookup for existing record
|
||||
existing_record = existing_map.get((record.id, field))
|
||||
if existing_record:
|
||||
if value is False or value is None:
|
||||
records_to_unlink |= existing_record
|
||||
else:
|
||||
vals_to_update_records[value] |= existing_record
|
||||
|
||||
else:
|
||||
if value is False or value is None:
|
||||
continue
|
||||
|
||||
records_to_create.append(
|
||||
{
|
||||
"res_model": self._name,
|
||||
"res_id": record.id,
|
||||
"field_name": field,
|
||||
"data": value,
|
||||
}
|
||||
)
|
||||
|
||||
# Batch operations
|
||||
for value, records in vals_to_update_records.items():
|
||||
records.sudo().write({"data": value})
|
||||
|
||||
if records_to_create:
|
||||
self.env["cx.tower.vault"].sudo().create(records_to_create)
|
||||
if records_to_unlink:
|
||||
records_to_unlink.sudo().unlink()
|
||||
|
||||
def _extract_and_replace_secret_fields(self, vals_list):
|
||||
"""Extract secret fields and replace with temporary identifiers.
|
||||
|
||||
Processes value dictionaries for record creation, replacing secret field values
|
||||
with unique temporary identifiers. The actual secret values are mapped to these
|
||||
temporary identifiers for later secure storage in the vault system.
|
||||
|
||||
Args:
|
||||
vals_list (list): List of value dictionaries for record creation.
|
||||
|
||||
Returns:
|
||||
dict: Mapping of temporary identifiers to secret values.
|
||||
Note: vals_list is modified in-place to contain temp identifiers.
|
||||
|
||||
Note:
|
||||
Used during record creation as part of the secure secret storage workflow.
|
||||
"""
|
||||
temp_id_counter = 0
|
||||
secret_vals = {}
|
||||
|
||||
for vals in vals_list:
|
||||
for secret_field in self.SECRET_FIELDS:
|
||||
if (
|
||||
secret_field in vals
|
||||
and vals[secret_field] is not False
|
||||
and vals[secret_field] is not None
|
||||
):
|
||||
temp_id_counter += 1
|
||||
temp_identifier = str(temp_id_counter)
|
||||
secret_vals[temp_identifier] = vals[secret_field]
|
||||
vals[secret_field] = temp_identifier
|
||||
|
||||
return secret_vals
|
||||
|
||||
def _process_secret_values_after_creation(self, records, secret_vals):
|
||||
"""Process secret values after records creation.
|
||||
|
||||
Replaces temporary identifiers with actual secret values in the vault
|
||||
and invalidates cache for affected fields.
|
||||
|
||||
Args:
|
||||
records (recordset): Newly created records with temporary identifiers
|
||||
secret_vals (dict): Mapping of temporary identifiers to secret values
|
||||
|
||||
Returns:
|
||||
None
|
||||
|
||||
Note:
|
||||
Called automatically during create() process. Should not be used directly.
|
||||
"""
|
||||
fields_str = ", ".join(self.SECRET_FIELDS)
|
||||
query = f"SELECT id, {fields_str} FROM {self._table} WHERE id in %s"
|
||||
self.env.cr.execute(query, (tuple(records.ids),))
|
||||
records_dict = self.env.cr.dictfetchall()
|
||||
|
||||
for record_dict in records_dict:
|
||||
self._process_single_record_secrets(record_dict, secret_vals)
|
||||
|
||||
records._clear_temp_values()
|
||||
records.invalidate_recordset(self.SECRET_FIELDS)
|
||||
|
||||
def _process_single_record_secrets(self, record_dict, secret_vals):
|
||||
"""Process secrets for a single record.
|
||||
|
||||
Replaces temporary identifiers with actual secret values for one record,
|
||||
clears temporary values from main table and stores secrets in vault.
|
||||
|
||||
Args:
|
||||
record_dict (dict): Dictionary with record data
|
||||
including temporary identifiers
|
||||
secret_vals (dict): Mapping of temporary identifiers to actual secret values
|
||||
|
||||
Returns:
|
||||
None
|
||||
|
||||
Note:
|
||||
Internal method used by _process_secret_values_after_creation.
|
||||
"""
|
||||
record_id = record_dict.get("id")
|
||||
vault_vals = {}
|
||||
field_temp_id_pairs = (
|
||||
(field_name, record_dict[field_name]) for field_name in self.SECRET_FIELDS
|
||||
)
|
||||
|
||||
# Collect secret values and fields to clear
|
||||
for field_name, temp_identifier in field_temp_id_pairs:
|
||||
secret_value = secret_vals.get(temp_identifier)
|
||||
if secret_value:
|
||||
vault_vals[field_name] = secret_value
|
||||
|
||||
# Update database and vault if needed
|
||||
if vault_vals:
|
||||
record = self.browse(record_id)
|
||||
record._set_secret_values(vault_vals)
|
||||
|
||||
def _clear_temp_values(self):
|
||||
"""Clear temporary values from main table.
|
||||
|
||||
Sets all SECRET_FIELDS to NULL in the database to remove temporary
|
||||
identifiers after secret values have been stored in vault.
|
||||
Works with multiple records in the recordset.
|
||||
|
||||
Returns:
|
||||
None
|
||||
|
||||
Note:
|
||||
Internal method used during secret processing workflow.
|
||||
Clears all SECRET_FIELDS for all records in the current recordset.
|
||||
"""
|
||||
set_clause = ", ".join(f"{field} = NULL" for field in self.SECRET_FIELDS)
|
||||
query = f"UPDATE {self._table} SET {set_clause} WHERE id in %s"
|
||||
self.env.cr.execute(query, (tuple(self.ids),))
|
||||
Reference in New Issue
Block a user