Files
odoo-addons/addons/cetmix_tower_server/models/cx_tower_server.py

2465 lines
85 KiB
Python

# Copyright (C) 2022 Cetmix OÜ
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
import ast
import io
import logging
from datetime import timedelta
from functools import wraps
from odoo import _, api, fields, models
from odoo.exceptions import UserError, ValidationError
from odoo.tools.safe_eval import safe_eval
from odoo.addons.base.models.res_users import check_identity
from ..ssh.ssh import SSHConnection, SSHManager
from .constants import (
ANOTHER_COMMAND_RUNNING,
COMMAND_NOT_COMPATIBLE_WITH_SERVER,
COMMAND_TIMED_OUT,
COMMAND_TIMED_OUT_MESSAGE,
FILE_CREATION_FAILED,
GENERAL_ERROR,
JET_NOT_FOUND,
JET_TEMPLATE_NOT_FOUND,
NO_COMMAND_RUNNER_FOUND,
PYTHON_COMMAND_ERROR,
SSH_CONNECTION_ERROR,
WAYPOINT_CREATE_FAILED,
WAYPOINT_TEMPLATE_NOT_FOUND,
)
from .tools import generate_random_id
_logger = logging.getLogger(__name__)
def ensure_ssh_disconnect(func):
"""
Decorator that ensures the SSH connection is disconnected after the transaction
completes, whether by commit or rollback.
This decorator registers hooks (postcommit and postrollback) before calling the
decorated function. Thus, even if the function raises an exception (and it's caught
at a higher level), the hooks will still be executed, ensuring that the
SSH connection is closed.
"""
@wraps(func)
def wrapped(self, *args, **kwargs):
# Try to obtain the SSH connection once
try:
connection = self._get_ssh_client(raise_on_error=True)
except Exception as e:
_logger.error(f"Error obtaining SSH connection: {e}")
connection = None
# Define a hook to disconnect the SSH connection using the obtained connection.
def disconnect_connection():
if connection:
try:
connection.disconnect()
except Exception as e:
_logger.error(f"Error disconnecting SSH connection: {e}")
# Register the disconnect hook for both commit and rollback events.
self.env.cr.postcommit.add(disconnect_connection)
self.env.cr.postrollback.add(disconnect_connection)
# Call the decorated function.
result = func(self, *args, **kwargs)
return result
return wrapped
class CxTowerServer(models.Model):
"""Represents a server entity.
Keeps information required to connect and perform routine operations
such as configuration, file management etc.
"""
_name = "cx.tower.server"
_inherit = [
"cx.tower.access.role.mixin",
"cx.tower.variable.mixin",
"cx.tower.reference.mixin",
"cx.tower.metadata.mixin",
"mail.thread",
"mail.activity.mixin",
"cx.tower.vault.mixin",
"cx.tower.tag.mixin",
]
_description = "Cetmix Tower Server"
_order = "name asc"
SECRET_FIELDS = ["ssh_password", "host_key"]
# ---- Main
active = fields.Boolean(default=True)
color = fields.Integer(help="For better visualization in views")
partner_id = fields.Many2one(comodel_name="res.partner")
status = fields.Selection(
selection=lambda self: self._selection_status(),
default=None,
required=False,
)
# ---- Connection
ip_v4_address = fields.Char(
string="IPv4 Address", groups="cetmix_tower_server.group_manager"
)
ip_v6_address = fields.Char(
string="IPv6 Address", groups="cetmix_tower_server.group_manager"
)
skip_host_key = fields.Boolean(
default=False,
help="Enable to skip host key verification",
)
host_key = fields.Char(
groups="cetmix_tower_server.group_manager",
help="Host key to verify the server",
)
ssh_port = fields.Integer(
string="SSH port",
required=True,
default=22,
groups="cetmix_tower_server.group_manager",
)
ssh_username = fields.Char(
string="SSH Username", required=True, groups="cetmix_tower_server.group_manager"
)
ssh_password = fields.Char(
string="SSH Password",
groups="cetmix_tower_server.group_manager",
)
ssh_key_id = fields.Many2one(
comodel_name="cx.tower.key",
string="SSH Private Key",
domain=[("key_type", "=", "k")],
groups="cetmix_tower_server.group_manager",
)
ssh_auth_mode = fields.Selection(
string="SSH Auth Mode",
selection=[
("p", "Password"),
("k", "Key"),
],
default="p",
required=True,
groups="cetmix_tower_server.group_manager",
)
use_sudo = fields.Selection(
string="Use sudo",
selection=[("n", "Without password"), ("p", "With password")],
help="Run commands using 'sudo'. Leave empty if 'sudo' is not needed.",
groups="cetmix_tower_server.group_manager",
)
url = fields.Char(
string="URL", help="Server web interface, eg 'https://doge.example.com'"
)
# ---- Variables
variable_value_ids = fields.One2many(
inverse_name="server_id" # Other field properties are defined in mixin
)
# ---- Keys
secret_ids = fields.One2many(
string="Secrets",
comodel_name="cx.tower.key.value",
inverse_name="server_id",
groups="cetmix_tower_server.group_manager",
)
# ---- Attributes
os_id = fields.Many2one(
string="Operating System",
comodel_name="cx.tower.os",
groups="cetmix_tower_server.group_manager",
)
tag_ids = fields.Many2many(
relation="cx_tower_server_tag_rel",
column1="server_id",
column2="tag_id",
)
note = fields.Text()
command_log_ids = fields.One2many(
comodel_name="cx.tower.command.log", inverse_name="server_id"
)
plan_log_ids = fields.One2many(
comodel_name="cx.tower.plan.log", inverse_name="server_id"
)
file_ids = fields.One2many(
"cx.tower.file",
"server_id",
string="Files",
)
file_count = fields.Integer(
"Total Files",
compute="_compute_counters",
)
# ---- Server logs
server_log_ids = fields.One2many(
string="Server Logs",
comodel_name="cx.tower.server.log",
inverse_name="server_id",
)
# ---- Related server template
server_template_id = fields.Many2one(
"cx.tower.server.template",
readonly=True,
index=True,
)
# ---- Delete plan
plan_delete_id = fields.Many2one(
"cx.tower.plan",
string="On Delete Plan",
groups="cetmix_tower_server.group_manager",
help="This Flightplan will be run when the server is deleted",
)
# ---- Jets
jet_template_ids = fields.Many2many(
comodel_name="cx.tower.jet.template",
relation="cx_tower_jet_template_server_rel",
column1="server_id",
column2="jet_template_id",
string="Installed Jet Templates",
readonly=True,
copy=False,
)
jet_template_count = fields.Integer(
compute="_compute_counters",
)
jet_ids = fields.One2many(
comodel_name="cx.tower.jet",
inverse_name="server_id",
string="Jets",
copy=False,
)
jet_count = fields.Integer(
compute="_compute_counters",
)
# ---- Access. Add relation for mixin fields
user_ids = fields.Many2many(
relation="cx_tower_server_user_rel",
)
manager_ids = fields.Many2many(
relation="cx_tower_server_manager_rel",
)
# ---- Shortcuts
shortcut_ids = fields.Many2many(
comodel_name="cx.tower.shortcut",
relation="cx_tower_server_shortcut_rel",
column1="server_id",
column2="shortcut_id",
string="Shortcuts",
)
# ---- Scheduled Tasks
scheduled_task_ids = fields.Many2many(
comodel_name="cx.tower.scheduled.task",
relation="cx_tower_scheduled_task_server_rel",
column1="server_id",
column2="scheduled_task_id",
string="Scheduled Tasks",
)
command_ids = fields.Many2many(
comodel_name="cx.tower.command",
relation="cx_tower_server_command_rel",
column1="server_id",
column2="command_id",
string="Commands",
)
plan_ids = fields.Many2many(
comodel_name="cx.tower.plan",
relation="cx_tower_plan_cx_tower_server_rel",
column1="cx_tower_server_id",
column2="cx_tower_plan_id",
string="Flight Plans",
)
def _selection_status(self):
"""
Status selection options
Returns:
list: status selection options
"""
return [
("stopped", "Stopped"),
("starting", "Starting"),
("running", "Running"),
("stopping", "Stopping"),
("restarting", "Restarting"),
("deleting", "Deleting"),
("delete_error", "Deletion Error"),
]
# ---- Computed fields
def _compute_counters(self):
"""
Compute total jet template, jets and files installed on server
Note: as numbers depend on the records user has access to,
we don't store the values.
@depends is not needed because they are displayed in the views only
or computed when accessed explicitly.
"""
for server in self:
server.update(
{
"jet_template_count": len(server.jet_template_ids),
"jet_count": len(server.jet_ids),
"file_count": len(server.file_ids),
}
)
@api.constrains("ip_v4_address", "ip_v6_address", "ssh_auth_mode")
def _constraint_ssh_settings(self):
"""Ensure SSH settings are valid.
Set 'skip_ssh_settings_check' context key to skip the checks
"""
# Skip the check if context key is set
if self._context.get("skip_ssh_settings_check"):
return
for rec in self:
# Combine all errors together
validation_errors = []
if not rec.ip_v4_address and not rec.ip_v6_address:
validation_errors.append(
_(
"Please provide IPv4 or IPv6 address for %(srv)s",
srv=rec.name,
)
)
if rec.ssh_auth_mode == "k" and not rec.ssh_key_id:
validation_errors.append(
_("Please provide SSH Key for %(srv)s", srv=rec.name)
)
# Raise errors if any
if validation_errors:
validation_error = "\n".join(validation_errors)
raise ValidationError(validation_error)
@api.model_create_multi
def create(self, vals_list):
"""Override create to validate SSH password before record creation."""
# Validate SSH password before creating records
if not self._context.get("skip_ssh_settings_check"):
validation_errors = []
for vals in vals_list:
if vals.get("ssh_auth_mode") == "p" and not vals.get("ssh_password"):
server_name = vals["name"]
validation_errors.append(
_("Please provide SSH password for %(srv)s", srv=server_name)
)
if validation_errors:
raise ValidationError("\n".join(validation_errors))
return super().create(vals_list)
def unlink(self):
"""Run post-delete flight plan"""
servers_to_delete = self.env["cx.tower.server"]
flight_plan_log_obj = self.env["cx.tower.plan.log"]
for server in self:
# If forced, no delete plan, or already in deleting state,
# skip plan running
if (
self._context.get("server_force_delete")
or not server.plan_delete_id
or server._is_being_deleted()
):
servers_to_delete |= server
continue
plan_label = generate_random_id(4)
server.plan_delete_id._run_single(
server=server, **{"plan_log": {"label": plan_label}}
)
plan_log = flight_plan_log_obj.search(
[
("server_id", "=", server.id),
("plan_id", "=", server.plan_delete_id.id),
("label", "=", plan_label),
]
)
# If plan has finished, either mark for deletion or set an error
if plan_log and plan_log.finish_date:
if plan_log.plan_status == 0:
servers_to_delete |= server
else:
server.status = "delete_error"
else:
# Plan still in progress
server.status = "deleting"
return super(CxTowerServer, servers_to_delete).unlink()
@api.returns("self", lambda value: value.id)
def copy(self, default=None):
default = default or {}
default["status"] = None
file_ids = self.env["cx.tower.file"]
for file in self.file_ids:
file_ids |= file.copy(
{
"auto_sync": False,
"keep_when_deleted": True,
}
)
default["file_ids"] = file_ids.ids
# Copy SSH password and host key
default["ssh_password"] = self._get_secret_value("ssh_password")
default["host_key"] = self._get_secret_value("host_key")
result = super().copy(default=default)
# Copy server secrets
for secret in self.secret_ids:
secret.sudo().copy({"server_id": result.id})
for var_value in self.variable_value_ids:
# Duplicating a server with variable values and then duplicating the
# duplicate causes a uniqueness constraint error for the 'reference' field
# in 'cx.tower.variable.value'. This happens because 'reference' is
# generated from the 'name' field, which is a related field fetching the
# same value across duplications. To avoid this, we pass the existing
# 'reference' as 'name' during duplication, ensuring unique 'reference'
# generation for each copy.
var_value.copy({"server_id": result.id, "name": var_value.reference})
for server_log in self.server_log_ids:
server_log.copy({"server_id": result.id})
return result
# ------------------------------
# ---- Actions
# ------------------------------
@check_identity
def action_show_host_key(self):
"""Show host key"""
self.ensure_one()
try:
host_key = self._get_host_key_from_host()
is_error = False
except Exception as error:
is_error = True
host_key = error
context = {
"default_host_key": host_key,
"default_is_error": is_error,
"default_server_id": self.id,
}
return {
"type": "ir.actions.act_window",
"name": _("Host Key"),
"res_model": "cx.tower.server.host.key.wizard",
"view_mode": "form",
"target": "new",
"context": context,
}
def action_update_server_logs(self):
"""Update selected log from its source."""
for server in self:
if server.server_log_ids:
server.server_log_ids.action_update_log()
def action_open_command_logs(self):
"""
Open current server command log records
"""
self.ensure_one()
action = self.env["ir.actions.actions"]._for_xml_id(
"cetmix_tower_server.action_cx_tower_command_log"
)
action["domain"] = [("server_id", "=", self.id)] # pylint: disable=no-member
return action
def action_open_plan_logs(self):
"""
Open current server flightplan log records
"""
self.ensure_one()
action = self.env["ir.actions.actions"]._for_xml_id(
"cetmix_tower_server.action_cx_tower_plan_log"
)
action["domain"] = [("server_id", "=", self.id)] # pylint: disable=no-member
return action
def action_run_command(self):
"""
Returns wizard action to select command and run it
"""
context = self.env.context.copy()
context.update(
{
"default_server_ids": self.ids,
}
)
return {
"type": "ir.actions.act_window",
"name": _("Run Command"),
"res_model": "cx.tower.command.run.wizard",
"view_mode": "form",
"target": "new",
"context": context,
}
def action_run_flight_plan(self):
"""
Returns wizard action to select flightplan and run it
"""
context = self.env.context.copy()
context.update(
{
"default_server_ids": self.ids,
}
)
return {
"type": "ir.actions.act_window",
"name": _("Run Flight Plan"),
"res_model": "cx.tower.plan.run.wizard",
"view_mode": "form",
"target": "new",
"context": context,
}
def action_new_jet(self):
"""
Returns wizard action to launch a jet
"""
self.ensure_one()
context = self.env.context.copy()
context.update(
{
"default_server_id": self.id,
}
)
return {
"type": "ir.actions.act_window",
"name": _("Launch New Jet"),
"res_model": "cx.tower.jet.create.wizard",
"view_mode": "form",
"target": "new",
"context": context,
}
def action_open_files(self):
"""
Open files of the current server
"""
self.ensure_one()
action = self.env["ir.actions.actions"]._for_xml_id(
"cetmix_tower_server.cx_tower_file_action"
)
action["domain"] = [("server_id", "=", self.id)] # pylint: disable=no-member
context = self._context.copy()
if "context" in action and isinstance((action["context"]), str):
context.update(ast.literal_eval(action["context"]))
else:
context.update(action.get("context", {}))
context.update(
{
"default_server_id": self.id, # pylint: disable=no-member
}
)
action["context"] = context
return action
def action_open_jets(self):
"""
Open jets of the current server
"""
self.ensure_one()
action = self.env["ir.actions.actions"]._for_xml_id(
"cetmix_tower_server.cx_tower_jet_action"
)
action["domain"] = [("server_id", "=", self.id)] # pylint: disable=no-member
context = self._context.copy()
if "context" in action and isinstance((action["context"]), str):
context.update(ast.literal_eval(action["context"]))
else:
context.update(action.get("context", {}))
context.update(
{
"default_server_id": self.id, # pylint: disable=no-member
}
)
action["context"] = context
return action
def action_install_jet_template(self):
"""Action to install the Jet Template on the selected servers."""
self.ensure_one()
# Open the wizard to install the template on the selected servers
return {
"type": "ir.actions.act_window",
"name": _("Install Jet Template"),
"res_model": "cx.tower.jet.template.install.wiz",
"view_mode": "form",
"target": "new",
"context": {
"default_server_ids": self.ids, # pylint: disable=no-member
},
}
def action_uninstall_jet_template(self):
"""
Uninstall jet template from the current server
"""
self.ensure_one()
jet_template_id = self.env.context.get("jet_template_id")
if jet_template_id and jet_template_id in self.jet_template_ids.ids:
jet_template = self.env["cx.tower.jet.template"].browse(jet_template_id)
if jet_template:
jet_template.uninstall_from_servers(self)
# ------------------------------
# ---- Connectivity
# ------------------------------
def _get_ssh_client(self, raise_on_error=False, timeout=5000, skip_host_key=False):
"""Create a new SSH client instance
Args:
raise_on_error (bool, optional): If true will raise exception
in case or error, otherwise False will be returned
Defaults to True.
timeout (int, optional): SSH connection timeout in seconds.
skip_host_key (bool, optional): If true will skip host key verification.
Defaults to False.
Raises:
ValidationError: If the provided server reference is invalid or
the server cannot be found.
Returns:
SSH: SSH manager instance or False and exception content
"""
self.ensure_one()
self = self.sudo()
try:
host_key = self._get_secret_value("host_key")
# Check host only if IP address is present
skip_host_key = skip_host_key or self.skip_host_key
if (
not host_key
and not skip_host_key
and (self.ip_v4_address or self.ip_v6_address)
):
raise ValidationError(
_("Host key not found for server %(server)s", server=self.name)
)
connection = SSHConnection(
host=self.ip_v4_address or self.ip_v6_address,
port=self.ssh_port,
username=self.ssh_username,
password=self._get_ssh_password(),
ssh_key=self._get_ssh_key(),
host_key=host_key if host_key and not self.skip_host_key else None,
mode=self.ssh_auth_mode,
timeout=timeout,
)
client = SSHManager(connection)
except Exception as e:
if raise_on_error:
raise ValidationError(_("SSH connection error %(err)s", err=e)) from e
return False, e
return client
def test_ssh_connection(
self,
raise_on_error=True,
return_notification=True,
try_command=True,
try_file=True,
timeout=60,
):
"""Test SSH connection.
Args:
raise_on_error (bool, optional): Raise exception in case of error.
Defaults to True.
return_notification (bool, optional): Show sticky notification
Defaults to True.
try_command (bool, optional): Try to run a command.
Defaults to True.
try_file (bool, optional): Try file operations.
Defaults to True.
timeout (int, optional): SSH connection timeout in seconds.
Defaults to 60.
Raises:
ValidationError: In case of SSH connection error.
ValidationError: In case of no output received.
ValidationError: In case of file operations error.
Returns:
dict: {
"status": int,
"response": str,
"error": str,
}
"""
self.ensure_one()
client = self._get_ssh_client(raise_on_error=raise_on_error, timeout=timeout)
if not try_command and not try_file:
try:
client.connection.connect()
return {
"status": 0,
"response": _("Connection successful."),
"error": "",
}
except Exception as e:
if raise_on_error:
raise ValidationError(
_("SSH connection error %(err)s", err=e)
) from e
else:
return {
"status": SSH_CONNECTION_ERROR,
"response": _("Connection failed."),
"error": e,
}
# Try command
if try_command:
command = self._get_connection_test_command()
test_result = self._run_command_using_ssh(
client, command_code=command, **{"raise_on_error": raise_on_error}
)
status = test_result.get("status", 0)
response = test_result.get("response", "")
error = test_result.get("error", "")
# Got an error
if raise_on_error and (status != 0 or error):
raise ValidationError(
_(
"Cannot run command\n. CODE: %(status)s. "
"RESULT: %(res)s. ERROR: %(err)s",
status=status,
res=response,
err=error,
)
)
# No output received
if raise_on_error and not response:
raise ValidationError(
_(
"No output received."
" Please log in manually and check for any issues.\n"
"===\nCODE: %(status)s",
status=status,
)
)
if try_file:
# test upload file
self.upload_file("test", "/tmp/cetmix_tower_test_connection.txt")
# test download loaded file
self.download_file("/tmp/cetmix_tower_test_connection.txt")
# remove file from server
file_test_result = self._run_command_using_ssh(
client, command_code="rm -rf /tmp/cetmix_tower_test_connection.txt"
)
file_status = file_test_result.get("status", 0)
file_error = file_test_result.get("error", "")
# In case of an error, raise or replace command result with file test result
if file_status != 0 or file_error:
if raise_on_error:
raise ValidationError(
_(
"Cannot remove test file using command.\n "
"CODE: %(status)s. ERROR: %(err)s",
err=file_error,
status=file_status,
)
)
# Replace command result with file test result
test_result = file_test_result
# Return notification
if return_notification:
response = test_result.get("response", "")
return self._get_notification_action(
_(
"Connection test passed! \n%(res)s",
res=response.rstrip(),
),
notification_type="info",
title=_("Success"),
sticky=False,
)
return test_result
def _get_connection_test_command(self):
"""Get command used to test SSH connection
Returns:
Char: SSH command
"""
command = "uname -a"
return command
def _get_ssh_password(self):
"""Get ssh password
This function prepares and returns ssh password for the ssh connection
Override this function to implement own password algorithms
Returns:
Char: password ready to be used for connection parameters
"""
self.ensure_one()
password = self._get_secret_value("ssh_password")
return password
def _get_ssh_key(self):
"""Get SSH key
Get private key for an SSH connection
Returns:
Char: SSH private key
"""
self.ensure_one()
# To ensure that key will be read
# regardless of access rights
if self.sudo().ssh_key_id:
# Use context key to read secret value
ssh_key = self.ssh_key_id._get_secret_value("secret_value")
else:
ssh_key = None
return ssh_key
@ensure_ssh_disconnect
def _get_host_key_from_host(self, raise_on_error=False, timeout=60):
"""Get host key
Args:
raise_on_error (bool, optional): If true will raise exception
in case or error, otherwise False will be returned
Defaults to True.
timeout (int, optional): SSH connection timeout in seconds.
Raises:
ValidationError: If the provided server reference is invalid or
the server cannot be found.
Returns:
Host key: Host key of the server
"""
self.ensure_one()
# Check access before getting host key
# This is needed to avoid possible access violations
self.check_access_rights("read")
self.check_access_rule("read")
try:
# Skip host key verification to obtain the server's real host key.
client = self._get_ssh_client(
raise_on_error=raise_on_error, timeout=timeout, skip_host_key=True
)
# Disable host key verification for this connection only, to obtain the
# server's real host key. If a pre-configured host key is incorrect using
# it would cause a key mismatch error. By setting host_key to False
# here, we trigger AutoAddPolicy for this connection, which automatically
# accepts the server's actual host key.
client.connection.host_key = False
ssh_client = client.connection.connect()
transport = ssh_client.get_transport()
remote_key = transport.get_remote_server_key()
host_key = remote_key.get_base64()
return host_key
except Exception as e:
if raise_on_error:
raise ValidationError(
_("Error retrieving host key: %(err)s", err=e)
) from e
else:
return None
# ------------------------------
# ---- Command execution
# ------------------------------
def run_command(
self,
command,
path=None,
sudo=None,
ssh_connection=None,
jet_template=None,
jet=None,
**kwargs,
):
"""This is the main function to use for running commands.
It renders command code, creates log record and calls command runner.
Args:
command (cx.tower.command()): Command record
path (Char): directory where command is run.
Provide in case you need to override default command value
sudo (Boolean): use sudo
Defaults to None
ssh_connection (SSH client instance, optional): SSH connection.
Pass to reuse existing connection.
This is useful in case you would like to speed up
the ssh command running.
jet_template (cx.tower.jet.template()) Jet Template record
Pass to run for specific jet template
jet (cx.tower.jet()): Jet record
Pass to run for specific jet
kwargs (dict): extra arguments. Use to pass external values.
Following keys are supported by default:
- "waypoint", cx.tower.jet.waypoint(): waypoint record
when running for a waypoint (e.g. from waypoint plan)
- "log", dict(): values passed to logger
- "key", dict(): values passed to key parser
- "variable_values", dict(): custom variable values
in the format of `{variable_reference: variable_value}`
eg `{'odoo_version': '16.0'}`
Will be applied only if user has write access to the server.
Context:
no_command_log (Bool): set this context key to `True`
to disable log creation.
Command running results will be returned instead.
If any non command related error occurs in the command running flow
an exception will be raised.
IMPORTANT: be aware when running commands with `no_command_log=True`
because no `Allow Parallel Run` check will be done!
Returns:
dict(): command running result if `no_command_log`
context value == True else None
"""
self.ensure_one()
# Check if jet belongs to the server
if jet and not jet.server_id == self:
raise ValidationError(
_(
"Jet '%(jet)s' doesn't belong to the server '%(server)s'.",
jet=jet.name,
server=self.name, # pylint: disable=no-member
)
)
# Force set jet template from jet if jet is provided
if jet:
jet_template = jet.jet_template_id
# Populate `sudo` value from the server settings if not provided explicitly
if self.sudo().ssh_username == "root":
sudo = False
elif sudo is None or sudo:
sudo = self.sudo().use_sudo
# Prepare log object
log_obj = self.env["cx.tower.command.log"]
log_vals = kwargs.get("log", {})
log_vals.update(
{
"use_sudo": sudo,
"variable_values": kwargs.get("variable_values", {}),
"jet_template_id": jet_template.id if jet_template else None,
"jet_id": jet.id if jet else None,
}
)
# Check if no log record should be created
no_command_log = self._context.get("no_command_log")
# Check if command can be run on this server:
# 1. Server is listed in command's server_ids
# 2. There are no server_ids at all (command is not server specific)
if not command._check_server_compatibility(self):
error = _("Command is not compatible with the server")
if no_command_log:
return {
"status": COMMAND_NOT_COMPATIBLE_WITH_SERVER,
"response": None,
"error": error,
}
log_obj.record(
server_id=self.id, # pylint: disable=no-member
command_id=command.id,
status=COMMAND_NOT_COMPATIBLE_WITH_SERVER,
error=error,
**log_vals,
)
return
# Check if another instance of the same command is running
another_command_running_domain = [
("server_id", "=", self.id),
("command_id", "=", command.id),
("is_running", "=", True),
]
if jet_template:
another_command_running_domain.append(
("jet_template_id", "=", jet_template.id)
)
if jet:
another_command_running_domain.append(("jet_id", "=", jet.id))
another_command_running_block = (
not command.allow_parallel_run
and log_obj.sudo().search_count(domain=another_command_running_domain)
)
# Another command is running, return error
if another_command_running_block:
if no_command_log:
return {
"status": ANOTHER_COMMAND_RUNNING,
"response": None,
"error": _("Another instance of the command is already running"),
}
log_obj.record(
server_id=self.id, # pylint: disable=no-member
command_id=command.id,
status=ANOTHER_COMMAND_RUNNING,
error=_("Another instance of the command is already running"),
**log_vals,
)
return
# Render command
custom_variable_values = kwargs.get("variable_values", {})
rendered_command = self._render_command(
command=command,
path=path,
jet_template=jet_template,
jet=jet,
custom_variable_values=custom_variable_values,
)
rendered_command_code = rendered_command["rendered_code"]
rendered_command_path = rendered_command["rendered_path"]
# Prepare key renderer values
key_vals = kwargs.get("key", {}) # Get vals from kwargs
key_vals.update({"server_id": self.id}) # pylint: disable=no-member
if self.partner_id:
key_vals.update({"partner_id": self.partner_id.id})
kwargs.update({"key": key_vals})
# Save rendered code to log
if no_command_log:
log_record = None
else:
log_vals.update(
{"code": rendered_command_code, "path": rendered_command_path}
)
# Create log record
log_record = log_obj.start(self.id, command.id, **log_vals) # pylint: disable=no-member
# If on command we have the flag
if command.no_split_for_sudo:
kwargs["no_split_for_sudo"] = True
return self._command_runner_wrapper(
command=command,
log_record=log_record,
rendered_command_code=rendered_command_code,
sudo=sudo,
rendered_command_path=rendered_command_path,
ssh_connection=ssh_connection,
**kwargs,
)
def _render_command(
self,
command,
path=None,
jet_template=None,
jet=None,
custom_variable_values=None,
):
"""Renders command code for selected command for current server
Args:
command (cx.tower.command): Command to render
path (Char): Path where to run the command.
Provide in case you need to override default command path
jet (cx.tower.jet()): Jet to render command for
custom_variable_values (dict): Custom variable values to render command
Returns:
dict: rendered values
{
"rendered_code": rendered command code,
"rendered_path": rendered command path
}
"""
self.ensure_one()
variable_references = []
# Get variables from code
if command.code:
variables_extracted = command.get_variables_from_code(command.code)
for ve in variables_extracted:
if ve not in variable_references:
variable_references.append(ve)
# Get variables from path
path = path if path else command.path
if path:
variables_extracted = command.get_variables_from_code(path)
for ve in variables_extracted:
if ve not in variable_references:
variable_references.append(ve)
# If there are variables to render, get variable values
if variable_references:
# Get the variable values
variable_values = (
self.env["cx.tower.variable"]
.sudo()
._get_variable_values_by_references(
variable_references, server=self, jet_template=jet_template, jet=jet
)
)
# Apply custom variable values only if user has write access to the server
has_write_access = self._have_access_to_server("write")
if custom_variable_values and has_write_access:
variable_values.update(custom_variable_values)
# Render command code and path using variables
if variable_values:
if command.action == "python_code":
variable_values["pythonic_mode"] = True
rendered_code = (
command.render_code_custom(command.code, **variable_values)
if command.code
else False
)
rendered_path = (
command.render_code_custom(path, **variable_values)
if path
else False
)
else:
rendered_code = command.code
rendered_path = path
return {"rendered_code": rendered_code, "rendered_path": rendered_path}
def _have_access_to_server(self, operation):
"""Check access to the server.
This is a wrapper function over the Odoo built-in ones.
It's used in order we need to implement custom access checks.
Args:
operation (Char): Operation to check access
same format as `check_access_rights`
Returns:
Bool: True if access is granted, False otherwise
"""
# Check access rights first
has_write_access = self.check_access_rights(operation, raise_exception=False)
# Check access rule to parti
if has_write_access:
try:
self.check_access_rule(operation)
except UserError:
has_write_access = False
return has_write_access
def run_flight_plan(self, flight_plan, jet_template=None, jet=None, **kwargs):
"""
Runs flight plan on the current server.
Args:
flight_plan (cx.tower.plan()): flight plan to run
jet_template (cx.tower.jet.template()): jet template
to run the flight plan on
jet (cx.tower.jet()): jet to run the flight plan on
kwargs (dict): Optional arguments
Following are supported but not limited to:
- "plan_log": {values passed to flightplan logger}
- "log": {values passed to logger}
- "key": {values passed to key parser}
- "variable_values", dict(): custom variable values
in the format of `{variable_reference: variable_value}`
eg `{'odoo_version': '16.0'}`
Will be applied only if user has write access to the server.
Returns:
log_record (cx.tower.plan.log()): plan log record
"""
self.ensure_one()
# Check if jet belongs to the server
if jet and not jet.server_id == self:
raise ValidationError(
_(
"Jet '%(jet)s' doesn't belong to the server '%(server)s'.",
jet=jet.name,
server=self.name, # pylint: disable=no-member
)
)
# Set jet template from jet if jet is provided
if jet:
jet_template = jet.jet_template_id
# Run flight plan
return flight_plan._run_single(
self, jet_template=jet_template, jet=jet, **kwargs
)
def _command_runner_wrapper(
self,
command,
log_record,
rendered_command_code,
sudo=None,
rendered_command_path=None,
ssh_connection=None,
**kwargs,
):
"""Used to implement custom runner mechanisms.
Use it in case you need to redefine the entire command running engine.
Eg it's used in `cetmix_tower_server_queue` OCA `queue_job` implementation.
Args:
command (cx.tower.command()): Command
log_record (cx.tower.command.log()): Command log record
rendered_command_code (Text): Rendered command code.
We are passing in case it differs from command code in the log record.
sudo (Selection): Command sudo mode. Defaults to None.
rendered_command_path (Char, optional): Rendered command path.
ssh_connection (SSH client instance, optional): SSH connection to reuse.
kwargs (dict): extra arguments. Use to pass external values.
Following keys are supported by default:
- "log": {values passed to logger}
- "key": {values passed to key parser}
Context:
use_sudo (Bool): use sudo for command running
Returns:
dict(): command running result if `log_record` is defined else None
"""
return self._command_runner(
command=command,
log_record=log_record,
rendered_command_code=rendered_command_code,
sudo=sudo,
rendered_command_path=rendered_command_path,
ssh_connection=ssh_connection,
**kwargs,
)
def _command_runner(
self,
command,
log_record,
rendered_command_code,
sudo=None,
rendered_command_path=None,
ssh_connection=None,
**kwargs,
):
"""Top level command runner function.
Calls command type specific runners.
Args:
command (cx.tower.command()): Command
log_record (cx.tower.command.log()): Command log record
rendered_command_code (Text): Rendered command code.
We are passing in case it differs from command code in the log record.
sudo (Selection): Command sudo mode. Defaults to None.
rendered_command_path (Char, optional): Rendered command path.
ssh_connection (SSH client instance, optional): SSH connection to reuse.
kwargs (dict): extra arguments. Use to pass external values.
Following keys are supported by default:
- "log": {values passed to logger}
- "key": {values passed to key parser}
Returns:
dict(): command running result if `log_record` is defined else None
"""
response = None
need_check_server_status = True
if command.action == "ssh_command":
response = self._command_runner_ssh(
log_record=log_record,
rendered_command_code=rendered_command_code,
sudo=sudo,
rendered_command_path=rendered_command_path,
ssh_connection=ssh_connection,
**kwargs,
)
elif command.action == "file_using_template":
response = self._command_runner_file_using_template(
log_record,
rendered_command_path,
**kwargs,
)
elif command.action == "python_code":
response = self._command_runner_python_code(
log_record,
rendered_command_code,
**kwargs,
)
elif command.action == "jet_action":
response = self._command_runner_jet_action(
log_record,
**kwargs,
)
elif command.action == "create_waypoint":
response = self._command_runner_create_waypoint(
log_record,
**kwargs,
)
elif command.action == "plan":
response = self.with_context(
prevent_plan_recursion=True
)._command_runner_flight_plan(
log_record=log_record,
flight_plan=command.flight_plan_id,
**kwargs,
)
need_check_server_status = True
else:
need_check_server_status = False
if (
need_check_server_status
and command.server_status
and (
(log_record and log_record.command_status == 0)
or (response and response["status"] == 0)
)
):
self.write({"status": command.server_status})
if need_check_server_status:
return response
error_message = _(
"No runner found for command action '%(cmd_action)s'",
cmd_action=command.action,
)
if log_record:
log_record.finish(
finish_date=fields.Datetime.now(),
status=NO_COMMAND_RUNNER_FOUND,
response=None,
error=error_message,
)
else:
raise ValidationError(error_message)
def _command_runner_file_using_template_create_file(
self, log_record, server_dir, **kwargs
):
"""
Creates a file on the server using the specified file template.
This method is intended to allow overriding the file creation logic
and provides access to the created file object.
Args:
log_record (recordset): Command log record.
server_dir (str): The directory on the server where the file should be
created.
**kwargs: Additional keyword arguments.
Returns:
record: The created file record.
"""
file_template_id = log_record.command_id.file_template_id
return file_template_id.create_file(
server=self,
server_dir=server_dir,
if_file_exists=log_record.command_id.if_file_exists,
jet_template=log_record.jet_template_id,
jet=log_record.jet_id,
)
def _command_runner_file_using_template(
self,
log_record,
server_dir,
**kwargs,
):
"""
Run the command to create a file from a template and push to server if source
is 'tower' and pull to tower if source is 'server'.
This function attempts to create a new file on the server/tower using the
specified file template. If the file creation is successful, it uploads
the file to the server/tower. The function logs the status of the operation
in the provided log record.
Args:
log_record (recordset): The log record to update with the command's
status.
server_dir (str): The directory on the server where the file should be
created.
**kwargs: Additional keyword arguments.
Returns:
None
Raises:
Exception: If any error occurs during the file creation or upload
process, it logs the error and the exception message in the
log record.
"""
try:
# Attempt to create a new file using the template for the current server
file = self._command_runner_file_using_template_create_file(
log_record=log_record,
server_dir=server_dir,
)
# If file creation failed, log the failure and exit
if not file:
command_result = {
"status": FILE_CREATION_FAILED,
"response": None,
"error": _("File already exists"),
}
if log_record:
return log_record.finish(
finish_date=fields.Datetime.now(),
status=command_result["status"],
response=command_result["response"],
error=command_result["error"],
)
else:
return command_result
# Context is used to detect a retry
# and avoid handling skip logic on first attempt
is_creation_skipped = file._context.get("file_creation_skipped")
if not is_creation_skipped:
if file.source == "server":
file.action_pull_from_server()
elif file.source == "tower":
file.action_push_to_server()
else:
raise UserError(
_(
"File source cannot be determined: '%(source)s'",
source=file.source,
)
)
if log_record.command_id.disconnect_file:
file.action_unlink_from_template()
if is_creation_skipped:
return log_record.finish(
fields.Datetime.now(),
0,
_("File already exists on server. Upload skipped"),
None,
)
# Log the successful creation and upload of the file
return log_record.finish(
finish_date=fields.Datetime.now(),
status=0,
response=_("File created and uploaded successfully"),
error=None,
)
except Exception as e:
# Log any exception that occurs during the process
log_record.finish(
finish_date=fields.Datetime.now(),
status=FILE_CREATION_FAILED,
response=None,
error=_("An error occurred: %(error)s", error=str(e)),
)
def _command_runner_ssh(
self,
log_record,
rendered_command_code,
sudo=None,
rendered_command_path=None,
ssh_connection=None,
**kwargs,
):
"""Run SSH command.
Updates the record in the Command Log (cx.tower.command.log)
Args:
log_record (cx.tower.command.log()): Command log record
rendered_command_code (Text): Rendered command code.
We are passing in case it differs from command code in the log record.
sudo (Selection): Command sudo mode. Defaults to None.
rendered_command_path (Char, optional): Rendered command path.
ssh_connection (SSH client instance, optional): SSH connection to reuse.
kwargs (dict): extra arguments. Use to pass external values.
Following keys are supported by default:
- "log": {values passed to logger}
- "key": {values passed to key parser}
- "raise_on_error": Raise exception on error.
Returns:
dict(): command running result if `log_record` is defined else None
"""
raise_on_error = kwargs.get("raise_on_error", False)
if not ssh_connection:
ssh_connection = self._get_ssh_client(raise_on_error=raise_on_error)
# Run command
command_result = self._run_command_using_ssh(
client=ssh_connection,
command_code=rendered_command_code,
command_path=rendered_command_path,
raise_on_error=raise_on_error,
sudo=sudo,
**kwargs,
)
# Log result
if log_record:
log_record.finish(
finish_date=fields.Datetime.now(),
status=command_result["status"],
response=command_result["response"],
error=command_result["error"],
)
else:
return command_result
def _command_runner_flight_plan(
self, log_record, flight_plan, raise_on_error=False, **kwargs
):
"""
Run Flight plan from command.
Updates the record in the Command Log (cx.tower.command.log)
Args:
log_record (cx.tower.command.log()): Command log record.
flight_plan (cx.tower.plan()): Flight Plan to be run.
raise_on_error (bool, optional): raise error on error.
kwargs (dict): extra arguments. Use to pass external values.
Following keys are supported by default:
- "log": {values passed to logger}
- "key": {values passed to key parser}
Returns:
dict(): flight plan running result if `log_record` is
not defined else None
"""
response = None
error = None
status = 0
plan_log_record = None
try:
# Generate custom label and add values for log
kwargs["plan_log"] = {
"label": generate_random_id(4),
"parent_flight_plan_log_id": log_record.plan_log_id.id,
}
# add executed command with action "plan" to save link to plan log
kwargs["flight_plan_command_log"] = log_record
plan_log_record = flight_plan.with_context(from_command=True)._run_single(
server=self,
jet_template=log_record.jet_template_id,
jet=log_record.jet_id,
**kwargs,
)
except Exception as e: # pylint: disable=broad-exception-caught
if raise_on_error:
raise ValidationError(
_("Flight plan running error %(err)s", err=e)
) from e
status = GENERAL_ERROR
error = e
else:
if plan_log_record.plan_status != 0:
status = plan_log_record.plan_status
error = _("Flight plan running error")
result = {"status": status, "response": response, "error": error}
if log_record:
log_record.finish(
finish_date=fields.Datetime.now(),
status=result["status"],
response=result["response"],
error=result["error"],
variable_values=plan_log_record.variable_values
if plan_log_record
else None,
)
return result
def _command_runner_python_code(
self,
log_record,
rendered_code,
**kwargs,
):
"""
Run Python code.
Updates the record in the Command Log (cx.tower.command.log)
Args:
log_record (cx.tower.command.log()): Command log record
rendered_code (Text): Rendered python code.
kwargs (dict): extra arguments. Use to pass external values.
Following keys are supported by default:
- "log": {values passed to logger}
- "key": {values passed to key parser}
Returns:
dict(): python code running result if `log_record` is
not defined else None
"""
# Run python code
result = self._run_python_code(
code=rendered_code,
raise_on_error=False,
**kwargs,
)
# Log result
if log_record:
log_record.finish(
finish_date=fields.Datetime.now(),
status=result["status"],
response=result["response"],
error=result["error"],
variable_values=result["variable_values"],
)
else:
return result
def _command_runner_jet_action(
self,
log_record,
**kwargs,
):
"""
Run Jet action.
Updates the record in the Command Log (cx.tower.command.log)
Args:
log_record (cx.tower.command.log()): Command log record
Returns:
dict(): jet action running result if `log_record` is
not defined else None
Raises:
ValidationError: if `log_record` is not defined
"""
if not log_record:
raise ValidationError(
_("Command log is required for 'Jet Action' commands!")
)
# Initialize result values
status = 0
response = None
error = None
dependent_jets = None
# Get the action from the command
action = log_record.command_id.jet_action_id
if not action:
status = GENERAL_ERROR
error = _("Jet action is not found.")
log_record.finish(
status=status,
response=response,
error=error,
)
return {"status": status, "response": response, "error": error}
jet_for_which_command_is_run = log_record.jet_id
requested_jet_template = log_record.command_id.jet_template_id
if not jet_for_which_command_is_run:
status = JET_NOT_FOUND
error = _("Jet for which command is run is not found.")
log_record.finish(
status=status,
response=response,
error=error,
)
return {"status": status, "response": response, "error": error}
if not requested_jet_template:
status = JET_TEMPLATE_NOT_FOUND
error = _("Jet template is not found.")
log_record.finish(
status=status,
response=response,
error=error,
)
return {"status": status, "response": response, "error": error}
# Trigger for the jet itself if the same jet template is used
# This is used when you want to trigger an action for
# the same jet for which the command is run.
if jet_for_which_command_is_run.jet_template_id == requested_jet_template:
dependent_jets = jet_for_which_command_is_run
else:
# Get dependent jets by template
dependent_jets = (
jet_for_which_command_is_run._get_dependent_jets_by_template(
requested_jet_template
)
)
if dependent_jets:
# Trigger the action for all dependent jets; aggregate failures as
# "ref: message, ref2: message2" for the command log.
error_parts = []
for jet in dependent_jets:
result = jet._trigger_action(
action=action,
raise_if_not_available=False,
)
if not result:
continue
jet_status = result.get("status", 0)
jet_error = result.get("error")
if jet_status == 0 and not jet_error:
continue
if jet_error:
error_parts.append(f"{jet.reference}: {jet_error}")
else:
error_parts.append(
_(
"%(jet)s: action failed (status %(status)s)",
jet=jet.reference,
status=jet_status,
)
)
# Compose the main message
jet_references = ", ".join(jet.reference for jet in dependent_jets)
main_message = _(
"Action triggered for %(jet_references)s",
jet_references=jet_references,
)
if error_parts:
status = GENERAL_ERROR
error = "\n".join(
[
main_message,
(
error_parts[0]
if len(error_parts) == 1
else ", ".join(error_parts)
),
]
)
response = None
else:
response = main_message
log_record.finish(
status=status,
response=response,
error=error,
)
# If no dependent jets, finish the command
else:
status = 0 # no dependent jets, so the command is finished with no error
response = _(
"Jet %(jet)s has no dependent jets with template %(template)s.",
jet=jet_for_which_command_is_run.name,
template=requested_jet_template.name,
)
log_record.finish(
status=status,
response=response,
error=error,
)
# Return result
return {"status": status, "response": response, "error": error}
def _command_runner_create_waypoint(self, log_record, **kwargs):
"""Run Create a Waypoint command.
Creates a waypoint for the plan's jet from the command's waypoint template.
The flight plan sets the jet as busy, so we pass ignore_busy=True so
creation is allowed. The command log is not finished here when a waypoint
is created; the waypoint callback (_finalize_create_waypoint_command_log)
finishes it when the waypoint reaches ready/current or error.
Args:
log_record (cx.tower.command.log): Command log record.
Returns:
dict: status, response (e.g. waypoint id when created), error.
When waypoint is created, status 0 and response indicate
deferred completion; the log stays running until the callback.
"""
if not log_record:
raise ValidationError(
_("Command log is required for 'Create a Waypoint' commands!")
)
command = log_record.command_id
jet = log_record.jet_id
waypoint_template = command.waypoint_template_id
status = 0
response = None
error = None
if not jet:
status = JET_NOT_FOUND
error = _("Jet for which command is run is not found.")
log_record.finish(
status=status,
response=response,
error=error,
)
return {"status": status, "response": response, "error": error}
if not waypoint_template:
status = WAYPOINT_TEMPLATE_NOT_FOUND
error = _("Waypoint template is not set.")
log_record.finish(
status=status,
response=response,
error=error,
)
return {"status": status, "response": response, "error": error}
try:
waypoint = jet.create_waypoint(
waypoint_template,
fly_here=command.fly_here,
ignore_busy=True,
created_from_command_log=log_record,
)
except ValidationError:
waypoint = False
if not waypoint:
status = WAYPOINT_CREATE_FAILED
error = _(
"Waypoint creation failed (e.g. waypoint template "
"does not match jet template)."
)
log_record.finish(
status=status,
response=response,
error=error,
)
return {"status": status, "response": response, "error": error}
# Do not finish the log; waypoint callback will finish it when
# ready/current/error
response = {"waypoint_id": waypoint.id}
return {"status": 0, "response": response, "error": None}
@ensure_ssh_disconnect
def _run_command_using_ssh(
self,
client,
command_code,
command_path=None,
raise_on_error=False,
sudo=None,
**kwargs,
):
"""This is a low level method for running an SSH command.
Use it in case you need to get direct output of an SSH command.
Otherwise call `run_command()`
Args:
client (Connection): valid server ssh connection object
command_code (Text): command text
command_path (Char, optional): directory where command should be run
raise_on_error (bool, optional): raise error on error
sudo (Selection): Command sudo mode. Defaults to None. Defaults to None.
kwargs (dict): extra arguments. Use to pass external values.
Following keys are supported by default:
- "log": {values passed to logger}
- "key": {values passed to key parser}
Raises:
ValidationError: if client is not valid
ValidationError: command run error
Returns:
dict: {
"status": <int>,
"response": Text,
"error": Text
}
"""
if not client:
if raise_on_error:
raise ValidationError(_("SSH Client is not defined."))
return {
"status": SSH_CONNECTION_ERROR,
"response": False,
"error": _("SSH Client is not defined."),
}
# Client contains a result of _get_ssh_client()
# If it's a tuple, it means there was an error getting the client
if isinstance(client, tuple):
error = client[1]
if raise_on_error:
raise ValidationError(error)
return {
"status": SSH_CONNECTION_ERROR,
"response": False,
"error": error,
}
# Parse inline secrets
code_and_secrets = self.env["cx.tower.key"]._parse_code_and_return_key_values(
command_code, **kwargs.get("key", {})
)
command_code = code_and_secrets["code"]
secrets = code_and_secrets["key_values"]
# Prepare ssh command
prepared_command_code = self._prepare_ssh_command(
command_code,
command_path,
sudo,
**kwargs,
)
try:
status = []
response = []
error = []
# Command is a single sting. No 'sudo' or 'sudo' w/o password
if isinstance(prepared_command_code, str):
status, response, error = client.command_executor.exec_command(
prepared_command_code, sudo=sudo
)
# Multiple commands: sudo with password
elif isinstance(prepared_command_code, list):
for cmd in prepared_command_code:
st, resp, err = client.command_executor.exec_command(cmd, sudo=sudo)
status.append(st)
response += resp
error += err
# Something weird ))
else:
status = GENERAL_ERROR
except Exception as e:
if raise_on_error:
_logger.error(f"SSH run command error: {e}")
raise ValidationError(_("SSH run command error %(err)s", err=e)) from e
status = GENERAL_ERROR
response = []
error = [e]
result = self._parse_command_results(status, response, error, secrets, **kwargs)
return result
def _run_python_code(
self,
code,
raise_on_error=False,
**kwargs,
):
"""
This is a low level method for Python code running.
Args:
code (Text): python code
raise_on_error (bool, optional): raise error on error
kwargs (dict): extra arguments. Use to pass external values.
Following keys are supported by default:
- "log": {values passed to logger}
- "key": {values passed to key parser}
Raises:
ValidationError: python code running error
Returns:
dict: {
"status": <int>,
"response": Text,
"error": Text
}
"""
response = None
error = None
status = 0
secrets = None
try:
# Parse inline secrets
code_and_secrets = self.env[
"cx.tower.key"
]._parse_code_and_return_key_values(
code, pythonic_mode=True, **kwargs.get("key", {})
)
secrets = code_and_secrets.get("key_values")
command_code = code_and_secrets["code"]
code = self.env["cx.tower.key"]._parse_code(
command_code, pythonic_mode=True, **kwargs.get("key", {})
)
# Check if code contains banned keywords
banned_keywords = self.env[
"cx.tower.command"
]._get_banned_python_code_keywords()
for banned_keyword in banned_keywords:
if banned_keyword in code:
raise ValidationError(
_(
"Following keyword is not allowed in Python code:"
" '%(banned_keyword)s'",
banned_keyword=banned_keyword,
)
)
# Get jet template, jet and waypoint from kwargs or log
log_vals = kwargs.get("log", {})
if log_vals:
jet_template_id = log_vals.get("jet_template_id")
jet_id = log_vals.get("jet_id")
else:
jet_template_id = kwargs.get("jet_template_id")
jet_id = kwargs.get("jet_id")
waypoint = kwargs.get("waypoint")
jet_template = (
self.env["cx.tower.jet.template"].browse(jet_template_id)
if jet_template_id
else None
)
jet = self.env["cx.tower.jet"].browse(jet_id) if jet_id else None
# Get the evaluation context for the python command
eval_context = self.env[
"cx.tower.command"
]._get_python_command_eval_context(
server=self,
jet_template=jet_template,
jet=jet,
waypoint=waypoint,
variable_values=kwargs.get("variable_values", {}),
)
safe_eval(
code,
eval_context,
mode="exec",
nocopy=True,
)
kwargs["variable_values"] = eval_context.get("custom_values", {})
result = eval_context.get("result")
if result:
status = result.get("exit_code", 0)
if status == 0:
response = [result.get("message")]
else:
error = [result.get("message")]
except Exception as e: # pylint: disable=broad-exception-caught
if raise_on_error:
raise ValidationError(
_("Python code running error: %(err)s", err=e)
) from e
status = PYTHON_COMMAND_ERROR
error = [e]
result = self._parse_command_results(status, response, error, secrets, **kwargs)
result["variable_values"] = kwargs.get("variable_values", {})
return result
def _prepare_ssh_command(self, command_code, path=None, sudo=None, **kwargs):
"""Prepare ssh command
IMPORTANT:
Commands run with sudo will be run separately one after another
even if there is a single command separated with '&&'
Examples:
# Default (sudo with splitting):
"pwd && ls -l" becomes:
sudo pwd
sudo ls -l
# With no_split_for_sudo=True:
sudo pwd && ls -l
Args:
command_code (str): initial command
path (str, optional): directory where command should be run
sudo (str, optional): sudo mode ('n' or 'p')
'n' — sudo without password
'p' — sudo with password
kwargs (dict): extra arguments. Supported keys:
- "log": values passed to logger
- "key": values passed to key parser
- "no_split_for_sudo" (bool): if True, do not split on '&&'
Returns:
list or str: if sudo='p' (with password), returns a list of commands;
if sudo='n', returns a single string (possibly joined by '&&');
without sudo, returns the raw command_code.
"""
# Prepare command for sudo if needed
if sudo:
# Add location
sudo_prefix = "sudo -S -p ''"
no_split = kwargs.get("no_split_for_sudo", False)
separator = "&&"
# split only when '&&' is present AND splitting is not disabled
if separator in command_code and not no_split:
result = (
command_code.replace("\\", "").replace("\n", "").split(separator)
)
# Sudo with password expects a list of commands
result = [f"{sudo_prefix} {cmd.strip()}" for cmd in result]
# Merge back into a single command is sudo is without password
if sudo == "n":
result = f" {separator} ".join(result)
else:
# single command or no_split requested
result = f"{sudo_prefix} {command_code}"
# Sudo with password expects a list of commands
if sudo == "p":
result = [result]
else:
# Command without sudo is always run as is
result = command_code
# Add path change command
if path:
# Add sudo prefix if needed
cd_command = f"cd {path}"
if isinstance(result, list):
result = [cd_command] + result
else:
result = f"{cd_command} && {result}"
return result
def _parse_command_results(
self, status, response, error, key_values=None, **kwargs
):
"""
Parse results of a command run with sudo (either SSH or Python).
Removes secrets and formats the response and error messages.
Paramiko returns SSH response and error as list.
When running a command with sudo with password we return status as a list too.
_
Args:
status (Int or list of int): Status or statuses
response (list): Response
error (list): Error
key_values (list): Secrets that were discovered in code.
Used to clean up command result.
kwargs (dict): extra arguments. Use to pass external values.
Following keys are supported by default:
- "log": {values passed to logger}
- "key": {values passed to key parser}
Returns:
dict: {
"status": <int>,
"response": <text>,
"error": <text>
}
"""
# In case of several statuses we return the last one that is not 0 ("ok")
# Eg for [0,1,0,4,0] result will be 4
if isinstance(status, list):
final_status = 0
for st in status:
if st != 0 and st != status:
final_status = st
status = final_status
# This is needed to remove keys
if key_values:
key_model = self.env["cx.tower.key"]
# Compose response message
if response and isinstance(response, list):
# Replace secrets with spoiler
response_vals = [
key_model._replace_with_spoiler(str(r), key_values)
if key_values
else str(r)
for r in response
]
response = "".join(response_vals)
elif not response:
# For not to save an empty list `[]` in log
response = None
# Compose error message
if error and isinstance(error, list):
# Replace secrets with spoiler
error_vals = [
key_model._replace_with_spoiler(str(e), key_values)
if key_values
else str(e)
for e in error
]
error = "".join(error_vals)
elif not error:
# For not to save an empty list `[]` in log
error = None
return {
"status": status,
"response": response,
"error": error,
}
def _check_zombie_commands(self):
"""
Check commands that are running longer than the timeout
and mark them as finished
"""
timeout = int(
self.env["ir.config_parameter"]
.sudo()
.get_param("cetmix_tower_server.command_timeout", 0)
)
if not timeout:
return
# SSH or Python command is running longer than the timeout
# We are not terminating Flight Plans and File Upload commands
domain = [
("is_running", "=", True),
("start_date", "<", fields.Datetime.now() - timedelta(seconds=timeout)),
("command_action", "in", ["ssh_command", "python_code"]),
]
zombie_command_logs = self.env["cx.tower.command.log"].search(domain)
if zombie_command_logs:
zombie_command_logs.finish(
status=COMMAND_TIMED_OUT,
response=None,
error=COMMAND_TIMED_OUT_MESSAGE,
)
# ------------------------------
# ---- File management
# ------------------------------
@ensure_ssh_disconnect
def delete_file(self, remote_path):
"""
Delete file from remote server
Args:
remote_path (Text): full path file location with file type
(e.g. /test/my_file.txt).
"""
self.ensure_one()
client = self._get_ssh_client(raise_on_error=True)
client.sftp_service.delete_file(remote_path)
@ensure_ssh_disconnect
def upload_file(self, data, remote_path, from_path=False):
"""
Upload file to remote server.
Args:
data (Text, Bytes): If the data are text, they are converted to bytes,
contains a local file path if from_path=True.
remote_path (Text): full path file location with file type
(e.g. /test/my_file.txt).
from_path (Boolean): set True if `data` is file path.
Raise:
TypeError: incorrect type of file.
Returns:
Result (class paramiko.sftp_attr.SFTPAttributes): metadata of the
uploaded file.
"""
self.ensure_one()
client = self._get_ssh_client(raise_on_error=True)
if from_path:
result = client.sftp_service.upload_file(data, remote_path)
else:
# Convert string to bytes
if isinstance(data, str):
data = data.encode()
file = io.BytesIO(data)
result = client.sftp_service.upload_file(file, remote_path)
return result
@ensure_ssh_disconnect
def download_file(self, remote_path):
"""
Download file from remote server
Args:
remote_path (Text): full path file location with file type
(e.g. /test/my_file.txt).
Raise:
ValidationError: raise if file not found.
Returns:
Result (Bytes): file content.
"""
self.ensure_one()
client = self._get_ssh_client(raise_on_error=True)
try:
result = client.sftp_service.download_file(remote_path)
except FileNotFoundError as fe:
raise ValidationError(
_("The file %(f_path)s not found.", f_path=remote_path)
) from fe
return result
# ------------------------------
# ---- Auxiliary functions
# ------------------------------
def get_variable_value(self, variable_reference, no_fallback=False):
"""
Return the value of a variable for the current server.
NB: this function follows the value application order.
So it will return the global value if server value is not set.
Returns:
str: The value of the variable for the current record or None
"""
self.ensure_one()
variable = self.env["cx.tower.variable"].get_by_reference(variable_reference)
if not variable:
return None
values = variable._get_variable_values_by_references(
variable_references=[variable_reference], server=self
)
return values[variable_reference]
def server_toggle_active(self, self_active):
"""
Change active status of related records:
- files
- commands
- plans
- variable values
Add custom logic to your model if you want to change
the active status of other records.
Args:
self_active (bool): active status of the record
"""
self.file_ids.filtered(lambda f: f.active == self_active).toggle_active()
self.command_log_ids.filtered(lambda c: c.active == self_active).toggle_active()
self.plan_log_ids.filtered(lambda p: p.active == self_active).toggle_active()
self.variable_value_ids.filtered(
lambda vv: vv.active == self_active
).toggle_active()
def toggle_active(self):
"""Archive or unarchive related server"""
res = super().toggle_active()
server_active = self.with_context(active_test=False).filtered(
lambda x: x.active
)
server_not_active = self - server_active
if server_active:
server_active.server_toggle_active(False)
if server_not_active:
server_not_active.server_toggle_active(True)
return res
def _is_being_deleted(self):
"""Check if the server is being deleted.
Returns:
bool: True if the server is being deleted, False otherwise
"""
self.ensure_one()
return self.status and self.status == "deleting"
def _get_post_create_fields(self):
"""
Add fields that should be populated after server creation
"""
res = super()._get_post_create_fields()
return res + ["variable_value_ids", "server_log_ids", "secret_ids"]
def _get_notification_action(
self, message, notification_type="info", title=None, sticky=True
):
"""Get notification action
Args:
message (str): Message
notification_type (str, optional): Notification type. Defaults to "info".
title (str, optional): Title. Defaults to None.
sticky (bool, optional): Sticky notification. Defaults to True.
Returns:
dict: Notification action
"""
return {
"type": "ir.actions.client",
"tag": "display_notification",
"params": {
"type": notification_type,
"title": title,
"message": message,
"sticky": sticky,
},
}
def _get_dependent_model_relation_fields(self):
"""Check cx.tower.reference.mixin for the function documentation"""
res = super()._get_dependent_model_relation_fields()
return res + ["variable_value_ids", "file_ids"]