Tower: upload cetmix_tower_server 16.0.2.2.9 (via marketplace)
This commit is contained in:
414
addons/cetmix_tower_server/models/cx_tower_key.py
Normal file
414
addons/cetmix_tower_server/models/cx_tower_key.py
Normal file
@@ -0,0 +1,414 @@
|
|||||||
|
# Copyright (C) 2022 Cetmix OÜ
|
||||||
|
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
|
||||||
|
|
||||||
|
from odoo import api, fields, models
|
||||||
|
|
||||||
|
|
||||||
|
class CxTowerKey(models.Model):
|
||||||
|
"""SSH Private key and secret storage"""
|
||||||
|
|
||||||
|
_name = "cx.tower.key"
|
||||||
|
_description = "Cetmix Tower Key/Secret Storage"
|
||||||
|
_inherit = [
|
||||||
|
"cx.tower.reference.mixin",
|
||||||
|
"cx.tower.access.role.mixin",
|
||||||
|
"cx.tower.vault.mixin",
|
||||||
|
]
|
||||||
|
_order = "name"
|
||||||
|
|
||||||
|
KEY_PREFIX = "#!cxtower"
|
||||||
|
KEY_TERMINATOR = "!#"
|
||||||
|
SECRET_FIELDS = ["secret_value"]
|
||||||
|
|
||||||
|
key_type = fields.Selection(
|
||||||
|
selection=[
|
||||||
|
("k", "SSH Key"),
|
||||||
|
("s", "Secret"),
|
||||||
|
],
|
||||||
|
required=True,
|
||||||
|
)
|
||||||
|
reference_code = fields.Char(
|
||||||
|
compute="_compute_reference_code",
|
||||||
|
help="Key reference for inline usage",
|
||||||
|
)
|
||||||
|
secret_value = fields.Text(
|
||||||
|
string="SSH Private Key",
|
||||||
|
)
|
||||||
|
value_ids = fields.One2many(
|
||||||
|
string="Values",
|
||||||
|
comodel_name="cx.tower.key.value",
|
||||||
|
inverse_name="key_id",
|
||||||
|
)
|
||||||
|
server_ssh_ids = fields.One2many(
|
||||||
|
string="Used as SSH Key",
|
||||||
|
comodel_name="cx.tower.server",
|
||||||
|
inverse_name="ssh_key_id",
|
||||||
|
readonly=True,
|
||||||
|
help="Used as SSH key in the following servers",
|
||||||
|
)
|
||||||
|
note = fields.Text()
|
||||||
|
|
||||||
|
# ---- Access. Add relation for mixin fields
|
||||||
|
user_ids = fields.Many2many(
|
||||||
|
relation="cx_tower_key_user_rel",
|
||||||
|
domain=lambda self: [
|
||||||
|
("groups_id", "in", [self.env.ref("cetmix_tower_server.group_manager").id])
|
||||||
|
],
|
||||||
|
)
|
||||||
|
manager_ids = fields.Many2many(
|
||||||
|
relation="cx_tower_key_manager_rel",
|
||||||
|
)
|
||||||
|
|
||||||
|
@api.depends("reference", "key_type")
|
||||||
|
def _compute_reference_code(self):
|
||||||
|
"""Compute key reference
|
||||||
|
Eg '#!cxtower.secret.KEY!#'
|
||||||
|
"""
|
||||||
|
for rec in self:
|
||||||
|
if rec.reference:
|
||||||
|
key_prefix = self._compose_key_prefix(rec.key_type)
|
||||||
|
if key_prefix:
|
||||||
|
rec.reference_code = f"#!cxtower.{key_prefix}.{rec.reference}!#"
|
||||||
|
else:
|
||||||
|
rec.reference_code = None
|
||||||
|
else:
|
||||||
|
rec.reference_code = None
|
||||||
|
|
||||||
|
@api.returns("self", lambda value: value.id)
|
||||||
|
def copy(self, default=None):
|
||||||
|
"""Copy key. Ensure secret value is copied.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
default (dict, optional): Default values. Defaults to None.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
self: Copied key
|
||||||
|
"""
|
||||||
|
default = default or {}
|
||||||
|
default["secret_value"] = self._get_secret_value("secret_value")
|
||||||
|
result = super().copy(default=default)
|
||||||
|
|
||||||
|
# Copy key values
|
||||||
|
for value in self.value_ids:
|
||||||
|
value.copy(
|
||||||
|
{
|
||||||
|
"key_id": result.id,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def _get_reference_pattern(self):
|
||||||
|
"""
|
||||||
|
Override mixin method
|
||||||
|
"""
|
||||||
|
return "[a-zA-Z0-9_]"
|
||||||
|
|
||||||
|
def _compose_key_prefix(self, key_type):
|
||||||
|
"""Compose key prefix based on key type.
|
||||||
|
Override to implement own key prefixes.
|
||||||
|
|
||||||
|
|
||||||
|
Args:
|
||||||
|
key_type (_type_): _description_
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValidationError: _description_
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Char: key prefix
|
||||||
|
"""
|
||||||
|
if key_type == "s":
|
||||||
|
key_prefix = "secret"
|
||||||
|
else:
|
||||||
|
key_prefix = None
|
||||||
|
return key_prefix
|
||||||
|
|
||||||
|
def _parse_code_and_return_key_values(self, code, pythonic_mode=False, **kwargs):
|
||||||
|
"""Replaces key placeholders in code with the corresponding values,
|
||||||
|
returning key values.
|
||||||
|
|
||||||
|
This function is meant to be used in the flow where key values
|
||||||
|
are needed for some follow up operations such as command log clean up.
|
||||||
|
|
||||||
|
NB:
|
||||||
|
- key format must follow "#!cxtower.key.KEY_ID!#" pattern.
|
||||||
|
eg #!cxtower.secret.GITHUB_TOKEN!# for GITHUB_TOKEN key
|
||||||
|
Args:
|
||||||
|
code (Text): code to process
|
||||||
|
pythonic_mode (Bool): If True, all variables in kwargs are converted to
|
||||||
|
strings and wrapped in double quotes.
|
||||||
|
Default is False.
|
||||||
|
kwargs (dict): optional arguments
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict(): 'code': Command text, 'key_values': List of key values
|
||||||
|
"""
|
||||||
|
|
||||||
|
# No need to search if code is too short
|
||||||
|
if len(code) <= len(self.KEY_PREFIX) + 3 + len(
|
||||||
|
self.KEY_TERMINATOR
|
||||||
|
): # at least one dot separator and two symbols
|
||||||
|
return {"code": code, "key_values": None}
|
||||||
|
|
||||||
|
# Get key strings
|
||||||
|
key_strings = self._extract_key_strings(code)
|
||||||
|
|
||||||
|
# Set key values
|
||||||
|
key_values = []
|
||||||
|
# Replace keys with values
|
||||||
|
for key_string in key_strings:
|
||||||
|
# Replace key including key terminator
|
||||||
|
key_value = self._parse_key_string(key_string, **kwargs)
|
||||||
|
if pythonic_mode and key_value:
|
||||||
|
# save key value as string in pythonic mode
|
||||||
|
key_value = f'"{key_value}"'
|
||||||
|
# Escape newline characters to ensure the key value remains
|
||||||
|
# a valid single-line string. This prevents syntax errors
|
||||||
|
# when the string is used in contexts where unescaped
|
||||||
|
# newlines would break Python syntax or evaluation logic.
|
||||||
|
key_value = key_value.replace("\n", "\\n")
|
||||||
|
|
||||||
|
# Save key value if not saved yet
|
||||||
|
if key_value and key_value not in key_values:
|
||||||
|
key_values.append(key_value)
|
||||||
|
|
||||||
|
# Handle False and None values
|
||||||
|
if not key_value:
|
||||||
|
key_value = str(key_value)
|
||||||
|
|
||||||
|
# Replace key with value
|
||||||
|
code = code.replace(key_string, key_value)
|
||||||
|
|
||||||
|
return {"code": code, "key_values": key_values}
|
||||||
|
|
||||||
|
def _parse_code(self, code, **kwargs):
|
||||||
|
"""Replaces key placeholders in code with the corresponding values.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
code (Text): code to proceed
|
||||||
|
kwargs (dict): optional arguments
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Text: code with key values in place and list of key values.
|
||||||
|
Use key values
|
||||||
|
"""
|
||||||
|
|
||||||
|
return self._parse_code_and_return_key_values(code, **kwargs)["code"]
|
||||||
|
|
||||||
|
def _extract_key_strings(self, code):
|
||||||
|
"""Extract all keys from code
|
||||||
|
Args:
|
||||||
|
code (Text): description
|
||||||
|
**kwargs (dict): optional arguments
|
||||||
|
Returns:
|
||||||
|
[str]: list of key strings
|
||||||
|
"""
|
||||||
|
key_strings = []
|
||||||
|
key_terminator_len = len(self.KEY_TERMINATOR)
|
||||||
|
index_from = 0 # initial position
|
||||||
|
|
||||||
|
while index_from >= 0:
|
||||||
|
index_from = code.find(self.KEY_PREFIX, index_from)
|
||||||
|
if index_from >= 0:
|
||||||
|
# Key end
|
||||||
|
index_to = code.find(self.KEY_TERMINATOR, index_from)
|
||||||
|
# Extract key value only if key terminator is found
|
||||||
|
if index_to > 0:
|
||||||
|
# Extract key string including key terminator
|
||||||
|
extract_to = index_to + key_terminator_len
|
||||||
|
key_string = code[index_from:extract_to]
|
||||||
|
# Add only if not added before
|
||||||
|
if key_string not in key_strings:
|
||||||
|
key_strings.append(key_string)
|
||||||
|
# Update index from
|
||||||
|
index_from = extract_to
|
||||||
|
else:
|
||||||
|
# No terminator found, move past this occurrence of prefix
|
||||||
|
index_from += len(self.KEY_PREFIX)
|
||||||
|
else:
|
||||||
|
# No more prefixes found
|
||||||
|
break
|
||||||
|
|
||||||
|
return key_strings
|
||||||
|
|
||||||
|
def _parse_key_string(self, key_string, **kwargs):
|
||||||
|
"""Parse key string and call resolver based on the key type.
|
||||||
|
Each key string consists of 3 parts:
|
||||||
|
- key marker: #!cxtower
|
||||||
|
- key type: e.g. "secret", "password", "login" etc
|
||||||
|
- key ID: e.g "qwerty123", "mystrongpassword" etc
|
||||||
|
|
||||||
|
Inherit this function to implement your own parser or resolver
|
||||||
|
Args:
|
||||||
|
key_string (str): key string
|
||||||
|
**kwargs (dict) optional values
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: key value or None if not able to parse
|
||||||
|
"""
|
||||||
|
|
||||||
|
key_parts = self._extract_key_parts(key_string)
|
||||||
|
if key_parts is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
key_type, reference = key_parts
|
||||||
|
key_value = self._resolve_key(key_type, reference, **kwargs)
|
||||||
|
|
||||||
|
return key_value
|
||||||
|
|
||||||
|
def _extract_key_parts(self, key_string):
|
||||||
|
"""Extract and validate key parts from the key string.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
key_string (str): key string
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
tuple: (key_type, reference) if valid, else None
|
||||||
|
"""
|
||||||
|
key_parts = (
|
||||||
|
key_string.replace(" ", "").replace(self.KEY_TERMINATOR, "").split(".")
|
||||||
|
)
|
||||||
|
|
||||||
|
# Must be 3 parts including pre!
|
||||||
|
if len(key_parts) == 3 and key_parts[0] == self.KEY_PREFIX:
|
||||||
|
return key_parts[1], key_parts[2]
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _resolve_key(self, key_type, reference, **kwargs):
|
||||||
|
"""Resolve key
|
||||||
|
Inherit this function to implement your own resolvers
|
||||||
|
|
||||||
|
Args:
|
||||||
|
reference (str): key reference
|
||||||
|
**kwargs (dict) optional values
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: value or None if not able to parse
|
||||||
|
"""
|
||||||
|
if key_type == "secret":
|
||||||
|
return self._resolve_key_type_secret(reference, **kwargs)
|
||||||
|
|
||||||
|
def _resolve_key_type_secret(self, reference, **kwargs):
|
||||||
|
"""Resolve key of type "secret".
|
||||||
|
Use this function as a custom parser example
|
||||||
|
|
||||||
|
Args:
|
||||||
|
reference (str): key reference
|
||||||
|
**kwargs (dict) optional values
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: value or False if not able to parse
|
||||||
|
"""
|
||||||
|
if not reference:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Compose domain used to fetch keys
|
||||||
|
#
|
||||||
|
# Keys are checked in the following order:
|
||||||
|
# 1. Partner and Server specific
|
||||||
|
# 2. Server specific
|
||||||
|
# 3. Partner specific
|
||||||
|
# 4. General (no server or partner specified)
|
||||||
|
server_id = kwargs.get("server_id")
|
||||||
|
partner_id = kwargs.get("partner_id")
|
||||||
|
|
||||||
|
# Fetch key
|
||||||
|
key = self.sudo().search([("reference", "=", reference)], limit=1)
|
||||||
|
if not key:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Check if key has custom values
|
||||||
|
key_values = key.value_ids
|
||||||
|
key_value = None
|
||||||
|
|
||||||
|
# 1. Server and Partner specific key first
|
||||||
|
if key_values and server_id and partner_id:
|
||||||
|
filtered_key_values = key_values.filtered(
|
||||||
|
lambda k: k.server_id.id == server_id and k.partner_id.id == partner_id
|
||||||
|
)
|
||||||
|
if filtered_key_values:
|
||||||
|
key_value = filtered_key_values[0]
|
||||||
|
|
||||||
|
# 2. Server specific key first
|
||||||
|
if not key_value and key_values and server_id:
|
||||||
|
filtered_key_values = key_values.filtered(
|
||||||
|
lambda k: k.server_id.id == server_id and not k.partner_id
|
||||||
|
)
|
||||||
|
if filtered_key_values:
|
||||||
|
key_value = filtered_key_values[0]
|
||||||
|
|
||||||
|
# 3. Partner specific key next
|
||||||
|
if not key_value and key_values and partner_id:
|
||||||
|
filtered_key_values = key_values.filtered(
|
||||||
|
lambda k: k.partner_id.id == partner_id and not k.server_id
|
||||||
|
)
|
||||||
|
if filtered_key_values:
|
||||||
|
key_value = filtered_key_values[0]
|
||||||
|
|
||||||
|
# 4. General key next
|
||||||
|
if not key_value and key_values:
|
||||||
|
filtered_key_values = key_values.filtered(
|
||||||
|
lambda k: not k.partner_id and not k.server_id
|
||||||
|
)
|
||||||
|
if filtered_key_values:
|
||||||
|
key_value = filtered_key_values[0]
|
||||||
|
|
||||||
|
if key_value:
|
||||||
|
return key_value._get_secret_value("secret_value")
|
||||||
|
|
||||||
|
def _replace_with_spoiler(self, code, key_values):
|
||||||
|
"""Helper function that replaces clean text keys in code with spoiler.
|
||||||
|
Eg
|
||||||
|
'Code with passwordX and passwordY` will look like:
|
||||||
|
'Code with *** and ***'
|
||||||
|
|
||||||
|
Important: this function doesn't parse keys by itself.
|
||||||
|
You need to get and provide key values yourself.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
code (Text): code to clean
|
||||||
|
key_values (List): secret values to be cleaned from code
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Text: cleaned code
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not key_values:
|
||||||
|
return code
|
||||||
|
|
||||||
|
# Replace keys with values
|
||||||
|
for key_value in key_values:
|
||||||
|
# If key_value includes quotes, remove them for the replacement
|
||||||
|
key_value = key_value.strip('"')
|
||||||
|
# If key_value contains an escaped line break replace then remove escaping
|
||||||
|
key_value = key_value.replace("\\n", "\n")
|
||||||
|
# Replace key including key terminator
|
||||||
|
code = code.replace(key_value, self.SECRET_VALUE_PLACEHOLDER)
|
||||||
|
|
||||||
|
return code
|
||||||
|
|
||||||
|
def _set_secret_values(self, vals):
|
||||||
|
"""Set secret value.
|
||||||
|
Override this method in case you need
|
||||||
|
to implement custom key storages.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
vals (dict): Dictionary of field names to secret values
|
||||||
|
"""
|
||||||
|
self.ensure_one()
|
||||||
|
if self.key_type == "s":
|
||||||
|
# Set general value or create new one if not exists
|
||||||
|
general_value = self.value_ids.filtered(
|
||||||
|
lambda x: not x.server_id and not x.partner_id
|
||||||
|
)
|
||||||
|
if general_value:
|
||||||
|
general_value._set_secret_values(vals)
|
||||||
|
else:
|
||||||
|
create_vals = {"key_id": self.id}
|
||||||
|
create_vals.update(vals)
|
||||||
|
self.value_ids.create(create_vals)
|
||||||
|
|
||||||
|
elif self.key_type == "k":
|
||||||
|
return super()._set_secret_values(vals)
|
||||||
Reference in New Issue
Block a user