Files
odoo-addons/addons/cetmix_tower_server/tests/test_command.py

1912 lines
66 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Copyright (C) 2022 Cetmix OÜ
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from datetime import timedelta
from unittest.mock import patch
from odoo.exceptions import AccessError, ValidationError
from odoo.fields import Datetime
from odoo.tests.common import Form
from odoo.tools import mute_logger
from ..models.constants import (
ANOTHER_COMMAND_RUNNING,
COMMAND_TIMED_OUT,
COMMAND_TIMED_OUT_MESSAGE,
GENERAL_ERROR,
)
from .common import TestTowerCommon
class TestTowerCommand(TestTowerCommon):
"""
Test the command model.
Important!
As this model inherits from the `cx.tower.template.mixin`
we will tests template rendering methods in this class too.
"""
@classmethod
def setUpClass(cls):
super().setUpClass()
# Save variable values for Server 1
with Form(cls.server_test_1) as f:
with f.variable_value_ids.new() as line:
line.variable_id = cls.variable_dir
line.value_char = "test-odoo-1"
with f.variable_value_ids.new() as line:
line.variable_id = cls.variable_path
line.value_char = "/opt/tower"
f.save()
# Secret key
cls.secret_folder_key = cls.Key.create(
{
"name": "Folder",
"reference": "FOLDER",
"secret_value": "secretFolder",
"key_type": "s",
}
)
cls.secret_python_key = cls.Key.create(
{
"name": "python",
"reference": "PYTHON",
"secret_value": "secretPythonCode",
"key_type": "s",
}
)
# secret value as multi line string
cls.python_ssh_key = cls.Key.create(
{
"name": "Test Python SSH Key",
"reference": "test_python_ssh_key",
"key_type": "s",
"secret_value": """
Python
much
key
""",
}
)
cls.secret_test_rsa_key = cls.Key.create(
{
"name": "test rsa",
"reference": "test_rsa",
"secret_value": """-----BEGIN RSA PRIVATE KEY-----
MIIBOgIBAAJBAKj34GkxFhD90vcNLYLInFEX6Ppy1tPf9Cnzj4p4WGeKLs1Pt8Qu
KUpRKfFLfRYC9AIKjbJTWit+CqvjWYzvQwECAwEAAQJAIJLixBy2qpFoS4DSmoEm
o3qGy0t6z09AIJtH+5OeRV1be+N4cDYJKffGzDa88vQENZiRm0GRq6a+HPGQMd2k
TQIhAKMSvzIBnni7ot/OSie2TmJLY4SwTQAevXysE2RbFDYdAiEBCUEaRQnMnbp7
9mxDXDf6AU0cN/RPBjb9qSHDcWZHGzUCIG2Es59z8ugGrDY+pxLQnwfotadxd+Uy
v/Ow5T0q5gIJAiEAyS4RaI9YG8EWx/2w0T67ZUVAw8eOMB6BIUg0Xcu+3okCIBOs
/5OiPgoTdSy7bcF9IGpSE8ZgGKzgYQVZeN97YE00
-----END RSA PRIVATE KEY----- """,
"key_type": "s",
}
)
# Command
cls.command_create_new_command = cls.Command.create(
{
"name": "Create new command",
"action": "python_code",
"code": """
server_name = {{ tower.server.name }}
if server_name and #!cxtower.secret.FOLDER!# == "secretFolder":
# We don't actually create a new command because it will raise
# access error if user doesn't have access to 'create' operation.
# Instead we just return a dummy command result.
command = "new command"
result = {"exit_code": 0, "message": "New command was created"}
else:
result = {"exit_code": %s, "message": "error"}
"""
% GENERAL_ERROR,
}
)
cls.command_python_command_1 = cls.Command.create(
{
"name": "Python command with secret #1",
"action": "python_code",
"code": """
result = {
"exit_code": 0,
"message": #!cxtower.secret.PYTHON!#,
}
""",
}
)
cls.command_python_command_2 = cls.Command.create(
{
"name": "Python command with secret #2",
"action": "python_code",
"code": """
result = {
"exit_code": 0,
"message": 'We use #!cxtower.secret.PYTHON!#' ,
}
""",
}
)
cls.command_python_command_3 = cls.Command.create(
{
"name": "Python command with secret #3",
"action": "python_code",
"code": """
result = {
"exit_code": 0,
"message": ""#!cxtower.secret.test_rsa!#"" ,
}
""",
}
)
cls.command_python_command_4 = cls.Command.create(
{
"name": "Python command with secret #4",
"action": "python_code",
"code": """
top_secret = #!cxtower.secret.test_python_ssh_key!#
result = {
"exit_code": 0,
"message": top_secret ,
}
""",
}
)
cls.server = cls.Server.create(
{
"name": "Test Server",
"user_ids": [(6, 0, [cls.user.id])],
"manager_ids": [(6, 0, [cls.manager.id])],
"ssh_username": "test",
"ssh_password": "test",
"ip_v4_address": "127.0.0.1",
}
)
def _create_command(self, **kwargs):
"""Helper to create a command record with default values."""
vals = {
"name": "Test Command",
"access_level": "1", # override default
"user_ids": [(6, 0, [])],
"manager_ids": [(6, 0, [])],
"server_ids": [(6, 0, [])],
}
if kwargs:
vals.update(kwargs)
return self.Command.create(vals)
def test_user_read_access(self):
"""
For a user:
Read access is allowed if access_level == "1" and either the command's
own user_ids includes the user OR a related server (via server_ids)
includes the user in its user_ids.
"""
# Case 1: Command with access_level "1" and user in command.user_ids.
cmd1 = self._create_command(
**{
"access_level": "1",
"user_ids": [(6, 0, [self.user.id])],
}
)
recs1 = self.Command.with_user(self.user).search([("id", "=", cmd1.id)])
self.assertIn(
cmd1,
recs1,
"User should see the command if in command.user_ids"
" and access_level == '1'.",
)
# Case 2: Command with access_level "1" and user not in command.user_ids
# but in a related server.
cmd2 = self._create_command(
**{
"access_level": "1",
"user_ids": [(6, 0, [])],
"server_ids": [(6, 0, [self.server.id])],
}
)
recs2 = self.Command.with_user(self.user).search([("id", "=", cmd2.id)])
self.assertIn(
cmd2,
recs2,
"User should see the command if related server.user_ids includes the user.",
)
# Negative: If access_level is "1" but neither command.user_ids
# nor server_ids.user_ids includes the user.
cmd3 = self._create_command(
**{
"access_level": "1",
"user_ids": [(6, 0, [])],
"server_ids": [(6, 0, [])],
}
)
recs3 = self.Command.with_user(self.user).search([("id", "=", cmd3.id)])
self.assertNotIn(
cmd3,
recs3,
"User should not see the command if not granted access.",
)
def test_manager_read_access(self):
"""
For a manager:
Allowed to read a command if access_level <= "2" AND
(either the command itself grants access via user_ids or manager_ids
OR there are no related servers OR a related server grants access via
its user_ids or manager_ids).
"""
# Case 1: Command with access_level "2" and command.manager_ids
# includes the manager but the server is not related to the command.
another_server = self.Server.create(
{
"name": "Another Server",
"ip_v4_address": "127.0.0.2",
"ssh_username": "test",
"ssh_password": "test",
"user_ids": [(6, 0, [])],
"manager_ids": [(6, 0, [])],
}
)
cmd1 = self._create_command(
**{
"access_level": "2",
"manager_ids": [(6, 0, [self.manager.id])],
"server_ids": [(6, 0, [another_server.id])],
}
)
recs1 = self.Command.with_user(self.manager).search([("id", "=", cmd1.id)])
self.assertIn(
cmd1,
recs1,
"Manager should see the command if in command.manager_ids"
" and access_level <= '2'.",
)
# Case 2: Command with access_level "2" that does not grant access
# on the command itself, but a related server grants access via
# but a related server grants access via its manager_ids.
cmd2 = self._create_command(
**{
"access_level": "2",
"user_ids": [(6, 0, [])],
"manager_ids": [(6, 0, [])],
"server_ids": [(6, 0, [self.server.id])],
}
)
recs2 = self.Command.with_user(self.manager).search([("id", "=", cmd2.id)])
self.assertIn(
cmd2,
recs2,
"Manager should see the command if related server.manager_ids"
" includes the manager.",
)
# Positive: Command with access_level "2" without any granted access.
cmd3 = self._create_command(
**{
"access_level": "2",
"user_ids": [(6, 0, [])],
"manager_ids": [(6, 0, [])],
"server_ids": [(6, 0, [])],
}
)
recs3 = self.Command.with_user(self.manager).search([("id", "=", cmd3.id)])
self.assertIn(
cmd3,
recs3,
"Manager should see the command if not granted access "
"but not related to any server.",
)
# Case 3: Remove from manager in the cmd1.
# Should not see the command because it belongs to another server.
cmd1.manager_ids = [(3, self.manager.id)]
recs4 = self.Command.with_user(self.manager).search([("id", "=", cmd1.id)])
self.assertNotIn(
cmd1,
recs4,
"Manager should not see the command if "
"removed from command.manager_ids."
" and command belongs to another server.",
)
def test_manager_write_create_access(self):
"""
For a manager:
Allowed to write and create a command if access_level <= "2" AND
the command's own manager_ids includes the manager.
"""
# Case: Command with access_level "2" and manager_ids includes the manager.
cmd1 = self._create_command(
**{
"access_level": "2",
"manager_ids": [(6, 0, [self.manager.id])],
}
)
try:
cmd1.with_user(self.manager).write({"name": "Manager Updated Command"})
except AccessError:
self.fail(
"Manager should be able to update the command "
"if in command.manager_ids."
)
self.assertEqual(cmd1.with_user(self.manager).name, "Manager Updated Command")
# Attempt to create a command as manager without including their ID
# in manager_ids should fail.
cmd_invalid_vals = {
"name": "Invalid Manager Create",
"access_level": "2",
"manager_ids": [(6, 0, [])],
"action": "python_code",
"code": "print('dummy')",
}
with self.assertRaises(AccessError):
self.Command.with_user(self.manager).create(cmd_invalid_vals)
def test_manager_unlink_access(self):
"""
For a manager:
Allowed to delete a command if access_level <= "2",
the current user is the record creator,
AND the command's own manager_ids includes the manager.
"""
# Scenario 1: Command created by the manager with manager_ids
# including the manager.
cmd1 = self.Command.with_user(self.manager).create(
{
"name": "Manager Created Command",
"access_level": "2",
}
)
try:
cmd1.unlink()
except AccessError:
self.fail(
"Manager should be able to delete a command "
"they created if in command.manager_ids."
)
# Scenario 2: Command created by someone else
# even if manager_ids includes the manager.
cmd2 = self._create_command(
**{
"access_level": "2",
"manager_ids": [(6, 0, [self.manager.id])],
}
)
with self.assertRaises(AccessError):
cmd2.with_user(self.manager).unlink()
def test_root_unrestricted_access(self):
"""
For a root user:
Unlimited access: root can read, write, create, and delete commands
regardless of access_level or related servers.
"""
cmd = self._create_command(
**{
"access_level": "3", # above the threshold for managers
}
)
recs = self.Command.with_user(self.root).search([("id", "=", cmd.id)])
self.assertIn(
cmd,
recs,
"Root should see the command regardless of restrictions.",
)
try:
cmd.with_user(self.root).write({"name": "Root Updated Command"})
except AccessError:
self.fail(
"Root should be able to update the command " "without restrictions."
)
self.assertEqual(cmd.with_user(self.root).name, "Root Updated Command")
cmd2 = self.Command.with_user(self.root).create(
{
"name": "Root Created Command",
"access_level": "3",
"action": "python_code",
"code": "print('root')",
}
)
self.assertTrue(
cmd2,
"Root should be able to create a command " "without restrictions.",
)
cmd2.with_user(self.root).unlink()
recs_after = self.Command.with_user(self.root).search([("id", "=", cmd2.id)])
self.assertFalse(
recs_after,
"Root should be able to delete the command without restrictions.",
)
def test_ssh_command_prepare_method_without_path(self):
"""Test ssh command preparation in different modes without path"""
server = self.server_test_1
single_command = "ls -a /tmp"
multiple_commands = "ls -a /tmp && mkdir /tmp/test"
sudo_mode = "p"
# Prepare single command for sudo with password
cmd = server._prepare_ssh_command(single_command, path=None, sudo=sudo_mode)
self.assertEqual(
cmd,
[f"{self.sudo_prefix} {single_command}"],
msg=(
"Single command for sudo with password should be "
"equal to list with the original command"
"as an only element"
),
)
# Prepare multiple commands for sudo with password
cmd = server._prepare_ssh_command(multiple_commands, path=None, sudo=sudo_mode)
self.assertEqual(
cmd,
[
f"{self.sudo_prefix} ls -a /tmp",
f"{self.sudo_prefix} mkdir /tmp/test",
],
msg=(
"Multiple commands with sudo with password should be "
"a list of separated commands from original line"
),
)
sudo_mode = "n"
# Prepare single command for sudo without password
cmd = server._prepare_ssh_command(single_command, path=None, sudo=sudo_mode)
self.assertEqual(
cmd,
f"{self.sudo_prefix} {single_command}",
msg=(
"Single command with sudo without password should be "
f'equal to the original command prefixed with "{self.sudo_prefix}"'
),
)
# Prepare multiple commands for sudo without password
cmd = server._prepare_ssh_command(multiple_commands, path=None, sudo=sudo_mode)
self.assertEqual(
cmd,
f"{self.sudo_prefix} ls -a /tmp && {self.sudo_prefix} mkdir /tmp/test",
msg=(
"Multiple commands with sudo with password should be "
"a re-joined string from list of separated original "
f'each prefixed with "{self.sudo_prefix}"'
),
)
# Prepare single command without sudo
cmd = server._prepare_ssh_command(single_command)
self.assertEqual(
cmd,
single_command,
msg=(
"Single command without sudo should be "
"equal to the original command "
),
)
# Prepare multiple without sudo
cmd = server._prepare_ssh_command(multiple_commands)
self.assertEqual(
cmd,
multiple_commands,
msg=(
"Multiple commands without sudo should be "
"equal to the original line of commands"
),
)
def test_ssh_command_prepare_method_with_path(self):
"""Test command preparation in different modes without path"""
server = self.server_test_1
single_command = "ls -a /tmp"
multiple_commands = "ls -a /tmp && mkdir /tmp/test"
path = "/home/doge"
sudo_mode = "p"
# Prepare single command for sudo with password
cmd = server._prepare_ssh_command(single_command, path=path, sudo=sudo_mode)
self.assertEqual(
cmd,
[f"cd {path}", f"{self.sudo_prefix} {single_command}"],
msg=(
"Single command for sudo with password should be "
"equal to list of two elements:"
" change directory and original command"
),
)
# Prepare multiple commands for sudo with password
cmd = server._prepare_ssh_command(multiple_commands, path=path, sudo=sudo_mode)
self.assertEqual(
cmd,
[
f"cd {path}",
f"{self.sudo_prefix} ls -a /tmp",
f"{self.sudo_prefix} mkdir /tmp/test",
],
msg=(
"Multiple commands with sudo with password should be "
"a list of separated commands from original line"
),
)
sudo_mode = "n"
# Prepare single command for sudo without password
cmd = server._prepare_ssh_command(single_command, path=path, sudo=sudo_mode)
self.assertEqual(
cmd,
f"cd {path} && {self.sudo_prefix} {single_command}",
msg=(
"Single command with sudo without password should be "
f'equal to the original command prefixed with "{self.sudo_prefix}"'
),
)
# Prepare multiple commands for sudo without password
cmd = server._prepare_ssh_command(multiple_commands, path=path, sudo=sudo_mode)
self.assertEqual(
cmd,
f"cd {path} && {self.sudo_prefix} ls -a /tmp && {self.sudo_prefix} mkdir /tmp/test", # noqa
msg=(
"Multiple commands with sudo with password should be "
"a re-joined string from list of separated original "
f'each prefixed with "{self.sudo_prefix}"'
),
)
# Prepare single command without sudo
cmd = server._prepare_ssh_command(single_command, path=path)
self.assertEqual(
cmd,
f"cd {path} && {single_command}",
msg=(
"Single command for without sudo should be "
"equal to the the original command"
"with 'cd {{ path }} && ' prefix"
),
)
# Prepare multiple commands without sudo
cmd = server._prepare_ssh_command(multiple_commands, path=path)
self.assertEqual(
cmd,
f"cd {path} && {multiple_commands}", # noqa
msg=(
"Multiple commands without sudo should be "
"original command with 'change directory' command prepended"
),
)
def test_ssh_command_no_split_for_sudo_without_path(self):
"""If no_split_for_sudo=True, even '&&' shouldnt split into a list."""
server = self.server_test_1
cmd_line = "echo a && echo b"
sudo_mode = "p"
result = server._prepare_ssh_command(
cmd_line, sudo=sudo_mode, no_split_for_sudo=True
)
expected = [f"{self.sudo_prefix} {cmd_line}"]
self.assertEqual(
result, expected, "With no_split_for_sudo, '&&' must not produce a list"
)
def test_ssh_command_no_split_for_sudo_with_path(self):
"""Same, but with a custom cwd prefix."""
server = self.server_test_1
cmd_line = "echo a && echo b"
path = "/tmp"
sudo_mode = "p"
result = server._prepare_ssh_command(
cmd_line, path=path, sudo=sudo_mode, no_split_for_sudo=True
)
expected = [f"cd {path}", f"{self.sudo_prefix} {cmd_line}"]
self.assertEqual(
result,
expected,
"With no_split_for_sudo and path, the entire '&&' string remains un-split",
)
def test_server_render_command(self):
"""Test rendering command using `_render_command` method
of cx.tower.server
"""
# -- 1 --
# Test with default path
rendered_command = self.server_test_1._render_command(self.command_create_dir)
rendered_code_expected = "cd /opt/tower && mkdir test-odoo-1"
rendered_path_expected = f"/home/{self.server_test_1.ssh_username}"
self.assertEqual(
rendered_command["rendered_code"],
rendered_code_expected,
"Rendered code doesn't match",
)
self.assertEqual(
rendered_command["rendered_path"],
rendered_path_expected,
"Rendered path doesn't match",
)
# -- 2 --
# Test with custom path
rendered_command = self.server_test_1._render_command(
self.command_create_dir, path="/such/much/path"
)
rendered_code_expected = "cd /opt/tower && mkdir test-odoo-1"
rendered_path_expected = "/such/much/path"
self.assertEqual(
rendered_command["rendered_code"],
rendered_code_expected,
"Rendered code doesn't match",
)
self.assertEqual(
rendered_command["rendered_path"],
rendered_path_expected,
"Rendered path doesn't match",
)
# -- 3 --
# Set variable_path to None and check again
variable_value_path = self.server_test_1.variable_value_ids.filtered(
lambda var_val: var_val.variable_id.id == self.variable_path.id
)
variable_value_path.value_char = None
rendered_command = self.server_test_1._render_command(self.command_create_dir)
rendered_code_expected = "cd False && mkdir test-odoo-1"
rendered_path_expected = f"/home/{self.server_test_1.ssh_username}"
self.assertEqual(
rendered_command["rendered_code"],
rendered_code_expected,
"Rendered code doesn't match",
)
self.assertEqual(
rendered_command["rendered_path"],
rendered_path_expected,
"Rendered path doesn't match",
)
# -- 4 --
# Set both path and code to None
self.write_and_invalidate(
self.command_create_dir, **{"code": None, "path": None}
)
rendered_command = self.server_test_1._render_command(self.command_create_dir)
self.assertFalse(
rendered_command["rendered_code"], "Rendered code doesn't match"
)
self.assertFalse(
rendered_command["rendered_path"], "Rendered path doesn't match"
)
def test_server_render_command_with_custom_variable_values(self):
"""Test rendering command using `_render_command` method
of cx.tower.server with custom variable values
"""
self.write_and_invalidate(
self.server_test_1,
**{"user_ids": [(4, self.user.id)], "manager_ids": [(4, self.manager.id)]},
)
# -- 1 --
# Set custom variable values
custom_variable_values = {
"test_path_": "/pepe/memes",
"other_path": "/etc/chad",
}
# Modify command path
self.write_and_invalidate(
self.command_create_dir,
**{"path": "{{ other_path }}/{{ tower.server.username }}"},
)
# Render command
rendered_command = self.server_test_1.with_user(self.manager)._render_command(
self.command_create_dir, custom_variable_values=custom_variable_values
)
rendered_code_expected = "cd /pepe/memes && mkdir test-odoo-1"
rendered_path_expected = f"/etc/chad/{self.server_test_1.ssh_username}"
self.assertEqual(
rendered_command["rendered_code"],
rendered_code_expected,
"Rendered code doesn't match",
)
self.assertEqual(
rendered_command["rendered_path"],
rendered_path_expected,
"Rendered path doesn't match",
)
# -- 2 --
# Test with user who doesn't have access to the server
rendered_command = self.server_test_1.with_user(self.user)._render_command(
self.command_create_dir, custom_variable_values=custom_variable_values
)
rendered_code_expected = "cd /opt/tower && mkdir test-odoo-1"
rendered_path_expected = f"None/{self.server_test_1.ssh_username}"
self.assertEqual(
rendered_command["rendered_code"],
rendered_code_expected,
"Rendered code doesn't match",
)
self.assertEqual(
rendered_command["rendered_path"],
rendered_path_expected,
"Rendered path doesn't match",
)
def test_server_render_command_variable_with_value_modifier(self):
"""Test rendering command using `_render_command` method
of cx.tower.server.
Use variable with value modifier for testing.
"""
# -- 1 --
# Set modifiers for variables
modifier_for_path = """
if 'opt' in value:
result = value.replace('opt', 'home')
else:
result = value
"""
self.variable_path.applied_expression = modifier_for_path
modifier_for_dir = """
pattern = r'(?i)odoo'
replacement = 'sap'
result = re.sub(pattern, replacement, value)
"""
self.variable_dir.applied_expression = modifier_for_dir
# -- 1 --
# Test with default path
rendered_command = self.server_test_1._render_command(self.command_create_dir)
rendered_code_expected = "cd /home/tower && mkdir test-sap-1"
rendered_path_expected = f"/home/{self.server_test_1.ssh_username}"
self.assertEqual(
rendered_command["rendered_code"],
rendered_code_expected,
"Rendered code doesn't match",
)
self.assertEqual(
rendered_command["rendered_path"],
rendered_path_expected,
"Rendered path doesn't match",
)
# -- 2 --
# Set invalid expression modifier
self.variable_path.applied_expression = "invalid"
with mute_logger(
"odoo.addons.cetmix_tower_server.models.cx_tower_variable_mixin"
):
rendered_command = self.server_test_1._render_command(
self.command_create_dir
)
rendered_code_expected = "cd /opt/tower && mkdir test-sap-1"
rendered_path_expected = f"/home/{self.server_test_1.ssh_username}"
self.assertEqual(
rendered_command["rendered_code"],
rendered_code_expected,
"Rendered code doesn't match",
)
self.assertEqual(
rendered_command["rendered_path"],
rendered_path_expected,
"Rendered path doesn't match",
)
# -- 3 --
# Test with variable in variable value
complex_variable = self.Variable.create(
{
"name": "Complex Variable",
"applied_expression": "result = value.replace('opt', 'meme')",
}
)
# Create a complex variable value
self.VariableValue.create(
{
"variable_id": complex_variable.id,
"value_char": "{{ test_path_ }}/{{ test_dir }}",
}
)
command_with_complex_variable = self.Command.create(
{
"name": "Command with complex variable",
"code": "cd {{ complex_variable }}",
"action": "ssh_command",
}
)
with mute_logger(
"odoo.addons.cetmix_tower_server.models.cx_tower_variable_mixin"
):
rendered_command = self.server_test_1._render_command(
command_with_complex_variable
)
rendered_code_expected = "cd /meme/tower/test-sap-1"
self.assertEqual(
rendered_command["rendered_code"],
rendered_code_expected,
"Rendered code doesn't match",
)
# -- 4 --
# Remove modifier from variable "Path" and check again
self.variable_dir.applied_expression = None
with mute_logger(
"odoo.addons.cetmix_tower_server.models.cx_tower_variable_mixin"
):
rendered_command = self.server_test_1._render_command(
command_with_complex_variable
)
rendered_code_expected = "cd /meme/tower/test-odoo-1"
self.assertEqual(
rendered_command["rendered_code"],
rendered_code_expected,
"Rendered code doesn't match",
)
def test_render_code_generic(self):
"""Test generic (aka ssh) code template direct rendering"""
# Only 'test_path_' must be rendered
args = {"test_path_": "/tmp", "test_os": "debian"}
res = self.command_create_dir.render_code(**args)
rendered_code = res.get(self.command_create_dir.id)
rendered_code_expected = "cd /tmp && mkdir "
self.assertEqual(
rendered_code,
rendered_code_expected,
msg=f"Must be rendered as '{rendered_code_expected}'",
)
# 'test_path_' and 'dir' must be rendered
args = {"test_path_": "/tmp", "os": "debian", "test_dir": "odoo"}
res = self.command_create_dir.render_code(**args)
rendered_code = res.get(self.command_create_dir.id)
self.assertEqual(
rendered_code,
"cd /tmp && mkdir odoo",
msg="Must be rendered as 'cd /tmp && mkdir odoo'",
)
def test_run_command_with_variables(self):
"""Test code execution using command log records"""
x = 1 # Used to distinguish labels
# Check with all available "sudo" option
for sudo in [False, "n", "p"]:
# Add label to track command log
self.server_test_1.use_sudo = sudo
command_label = f"Test Command {x}"
custom_values = {"log": {"label": command_label}}
# Run command for Server 1
self.server_test_1.run_command(
self.command_create_dir, sudo=sudo, **custom_values
)
# Expected rendered command code
rendered_code_expected = "cd /opt/tower && mkdir test-odoo-1"
# Get command log
log_record = self.CommandLog.search([("label", "=", command_label)])
# Check log values
self.assertEqual(len(log_record), 1, msg="Must be a single log record")
self.assertEqual(
log_record.server_id.id,
self.server_test_1.id,
msg="Record must belong to Test 1",
)
self.assertEqual(
log_record.command_id.id,
self.command_create_dir.id,
msg="Record must belong to command 'Create dir'",
)
self.assertEqual(
log_record.code,
rendered_code_expected,
msg=f"Rendered code must be '{rendered_code_expected}'",
)
self.assertEqual(
log_record.command_status, 0, msg="Command status must be equal to 0"
)
self.assertEqual(
log_record.use_sudo,
sudo,
msg="'sudo' param in log doesn't match the command one",
)
# Increment label counter
x += 1
def test_run_command_with_keys(self):
"""Test command with keys in code"""
# Command
code = "cd {{ test_path_ }} && mkdir #!cxtower.secret.FOLDER!#"
command_with_keys = self.Command.create(
{"name": "Command with keys", "code": code}
)
# Parse command with key parser to ensure key is parsed correctly
code_parsed_expected = "cd {{ test_path_ }} && mkdir secretFolder"
code_parsed = self.Key._parse_code(code)
self.assertEqual(
code_parsed,
code_parsed_expected,
msg="Parsed code doesn't match expected one",
)
# Add label to track command log
command_label = "Test Command with keys"
custom_values = {"log": {"label": command_label}}
# Run command for Server 1
self.server_test_1.run_command(command_with_keys, **custom_values)
# Expected rendered command code
rendered_code_expected = "cd /opt/tower && mkdir #!cxtower.secret.FOLDER!#"
# Get command log
log_record = self.CommandLog.search([("label", "=", command_label)])
# Check log values
self.assertEqual(len(log_record), 1, msg="Must be a single log record")
self.assertEqual(
log_record.server_id.id,
self.server_test_1.id,
msg=("Record must belong %s", self.server_test_1.name),
)
self.assertEqual(
log_record.command_id.id,
command_with_keys.id,
msg=("Record must belong to command %s", command_with_keys.name),
)
self.assertEqual(
log_record.code,
rendered_code_expected,
msg=f"Rendered code must be '{rendered_code_expected}'",
)
self.assertEqual(
log_record.command_status, 0, msg="Command status must be equal to 0"
)
def test_parse_ssh_command_result(self):
"""Test ssh command result parsing"""
placeholder = self.Key.SECRET_VALUE_PLACEHOLDER
# -------------------------------------------------------
# Case 1: regular command execution result with no error
# We are testing secret value placeholder here
# -------------------------------------------------------
status = 0
response = ["Such much", f"Doge like SSH {placeholder}"]
error = []
ssh_command_result = self.Server._parse_command_results(
status, response, error, key_values=[f"{self.secret_2.secret_value}"]
)
# Get result
result_status = ssh_command_result["status"]
result_response = ssh_command_result["response"]
result_error = ssh_command_result["error"]
self.assertEqual(
result_status,
result_status,
"Status in result must be the same as the initial one",
)
self.assertEqual(
result_response,
f"Such muchDoge like SSH {placeholder}",
"Response in result doesn't match expected",
)
self.assertIsNone(result_error, "Error in response must be set to None")
# -------------------------------------------------------
# Case 2: no response but an error
# -------------------------------------------------------
status = 1
response = []
error = ["Ooops", "I did", "it again"]
ssh_command_result = self.Server._parse_command_results(status, response, error)
# Get result
result_status = ssh_command_result["status"]
result_response = ssh_command_result["response"]
result_error = ssh_command_result["error"]
self.assertEqual(
result_status,
result_status,
"Status in result must be the same as the initial one",
)
self.assertIsNone(result_response, "Response in response must be set to None")
self.assertEqual(
result_error, "OoopsI didit again", "Error in result doesn't match expected"
)
# -------------------------------------------------------
# Case 3: several codes all 0, no response but an error
# -------------------------------------------------------
status = [0, 0, 0]
response = []
error = ["Ooops", "I did", "it again"]
ssh_command_result = self.Server._parse_command_results(status, response, error)
# Get result
result_status = ssh_command_result["status"]
result_response = ssh_command_result["response"]
result_error = ssh_command_result["error"]
self.assertEqual(
result_status, 0, "Status in result doesn't match expected one"
)
self.assertIsNone(result_response, "Response in response must be set to None")
self.assertEqual(
result_error, "OoopsI didit again", "Error in result doesn't match expected"
)
# -------------------------------------------------------
# Case 4: codes [0,1,0,4,0], no response but an error
# -------------------------------------------------------
status = [0, 1, 0, 4, 0]
response = []
error = ["Ooops", "I did", "it again"]
ssh_command_result = self.Server._parse_command_results(status, response, error)
# Get result
result_status = ssh_command_result["status"]
result_response = ssh_command_result["response"]
result_error = ssh_command_result["error"]
self.assertEqual(
result_status, 4, "Status in result doesn't match expected one"
)
self.assertIsNone(result_response, "Response in response must be set to None")
self.assertEqual(
result_error, "OoopsI didit again", "Error in result doesn't match expected"
)
# -------------------------------------------------------
# Case 5: regular command execution result with no error
# However the command result is saved in the "error" value.
# For example this happens in 'docker build'.
# -------------------------------------------------------
status = 0
error = ["Such much", f"Doge like SSH {placeholder}"]
response = []
ssh_command_result = self.Server._parse_command_results(
status, response, error, key_values=[f"{self.secret_2.secret_value}"]
)
# Get result
result_status = ssh_command_result["status"]
result_response = ssh_command_result["response"]
result_error = ssh_command_result["error"]
self.assertEqual(
result_status,
result_status,
"Status in result must be the same as the initial one",
)
self.assertEqual(
result_error,
f"Such muchDoge like SSH {placeholder}",
"Response in result doesn't match expected",
)
self.assertIsNone(result_response, "Error in response must be set to None")
def test_tower_command_action_file_using_template(self):
"""
Test action file using template for tower source
"""
with patch(
"odoo.addons.cetmix_tower_server.models.cx_tower_server.CxTowerServer.upload_file",
return_value="ok",
):
self.server_test_1.run_command(
self.command_create_file_with_template_tower_source
)
log_text_create_success = "File created and uploaded successfully"
log_text_file_exists = "An error occurred: File already exists on server."
# Get command log
log_record = self.CommandLog.search(
[
("server_id", "=", self.server_test_1.id),
(
"command_id",
"=",
self.command_create_file_with_template_tower_source.id,
),
("command_response", "=", log_text_create_success),
]
)
self.assertEqual(len(log_record), 1, msg="Must be a single log record")
with patch(
"odoo.addons.cetmix_tower_server.models.cx_tower_server.CxTowerServer.upload_file",
return_value="ok",
):
self.server_test_1.run_command(
self.command_create_file_with_template_tower_source
)
log_record_2 = self.CommandLog.search(
[
("server_id", "=", self.server_test_1.id),
(
"command_id",
"=",
self.command_create_file_with_template_tower_source.id,
),
("command_error", "=", log_text_file_exists),
]
)
self.assertEqual(len(log_record_2), 1, msg="Must be a single log record")
def test_server_command_action_file_using_template(self):
"""
Test action file using template for server source
"""
self.assertFalse(self.template_file_server.file_ids)
def download_file(this, remote_path):
return b"Hello, world!"
cx_tower_server_obj = self.registry["cx.tower.server"]
with patch.object(cx_tower_server_obj, "download_file", download_file):
self.server_test_1.run_command(
self.command_create_file_with_template_server_source
)
log_text_create_success = "File created and uploaded successfully"
log_text_file_exists = "An error occurred: File already exists on server."
# Get command log
log_record = self.CommandLog.search(
[
("server_id", "=", self.server_test_1.id),
(
"command_id",
"=",
self.command_create_file_with_template_server_source.id,
),
("command_response", "=", log_text_create_success),
]
)
self.assertEqual(len(log_record), 1, msg="Must be a single log record")
self.assertEqual(
len(self.template_file_server.file_ids), 1, msg="Must be one file!"
)
self.assertEqual(
self.template_file_server.file_ids.source,
"server",
msg="The File source must be 'server'",
)
with patch.object(cx_tower_server_obj, "download_file", download_file):
self.server_test_1.run_command(
self.command_create_file_with_template_server_source
)
log_record_2 = self.CommandLog.search(
[
("server_id", "=", self.server_test_1.id),
(
"command_id",
"=",
self.command_create_file_with_template_server_source.id,
),
("command_error", "=", log_text_file_exists),
]
)
self.assertEqual(len(log_record_2), 1, msg="Must be a single log record")
def test_run_command_no_command_log(self):
"""Run command without creating a log record.
Such commands return execution result directly.
"""
# Add label to track command log
command_label = "Test Command with keys"
custom_values = {"log": {"label": command_label}}
# Run command for Server 1
command_result = self.server_test_1.with_context(
no_command_log=True
).run_command(self.command_create_dir, **custom_values)
self.assertEqual(
command_result["status"], 0, "Command status doesn't match expected one"
)
self.assertEqual(
command_result["response"],
"ok",
"Command response doesn't match expected one",
)
self.assertIsNone(
command_result["error"], "Command error doesn't match expected one"
)
def test_another_command_is_running(self):
"""Test a case when another command is running on the same server"""
# Remove all existing command logs
self.CommandLog.search([]).unlink()
# Create a new command log
initial_command_log = self.CommandLog.create(
{
"server_id": self.server_test_1.id,
"command_id": self.command_create_new_command.id,
"start_date": Datetime.now(),
}
)
# Run the command without creating a log record
command_result = self.server_test_1.with_context(
no_command_log=True
).run_command(self.command_create_new_command)
self.assertEqual(command_result["status"], ANOTHER_COMMAND_RUNNING)
# Run the command with creating a log record
command_result = self.server_test_1.run_command(self.command_create_new_command)
# Get the command log
command_log = self.CommandLog.search(
[
("server_id", "=", self.server_test_1.id),
("command_id", "=", self.command_create_new_command.id),
("id", "!=", initial_command_log.id),
]
)
self.assertEqual(len(command_log), 1, "Must be a single log record")
self.assertEqual(command_log.command_status, ANOTHER_COMMAND_RUNNING)
def test_file_using_template_create_if_exists(self):
"""Test uploading file using template if it exists on server."""
command = self.command_create_file_with_template_server_source
command.write({"if_file_exists": "skip"})
# Create file to make sure that it exists on the server
file_template = command.file_template_id
orig_file = file_template.create_file(
server=self.server_test_1,
server_dir=file_template.server_dir,
if_file_exists=command.if_file_exists,
)
self.assertTrue(orig_file, "File must be created on the server")
# Test if file exists and command is set to "skip"
skipped_file = file_template.create_file(
server=self.server_test_1,
server_dir=file_template.server_dir,
if_file_exists=command.if_file_exists,
)
self.assertEqual(
orig_file,
skipped_file,
"Skip should return the existing file, not create a new one",
)
self.assertEqual(
self.env["cx.tower.file"].search_count(
[
("template_id", "=", file_template.id),
("server_id", "=", self.server_test_1.id),
]
),
1,
"There must be exactly one physical file record after skip",
)
# Change command to raise an error if file exists
command.write({"if_file_exists": "raise"})
with self.assertRaisesRegex(
ValidationError,
"File already exists on server.",
):
file_template.create_file(
server=self.server_test_1,
server_dir=file_template.server_dir,
if_file_exists=command.if_file_exists,
)
# Change command to "overwrite" file if it exists
command.write({"if_file_exists": "overwrite"})
# Run command again, it should overwrite the file
file_template.create_file(
server=self.server_test_1,
server_dir=file_template.server_dir,
if_file_exists=command.if_file_exists,
)
self.assertEqual(
self.env["cx.tower.file"].search_count(
[
("template_id", "=", file_template.id),
("server_id", "=", self.server_test_1.id),
("server_dir", "=", file_template.server_dir),
]
),
1,
"There must be exactly one physical file record after overwrite",
)
self.assertEqual(
orig_file.code,
file_template.code,
"File code must match template after overwrite",
)
self.assertEqual(
orig_file.name,
file_template.file_name,
"File name must match template after overwrite",
)
self.assertEqual(
orig_file.source,
file_template.source,
"File source must match template after overwrite",
)
def test_is_file_disconnected_from_template(self):
"""Test if file is disconnected from template after being created."""
initial_files = self.server_test_1.file_ids
command = self.command_create_file_with_template_server_source
command.disconnect_file = True
self.server_test_1.run_command(command=command)
new_files = self.server_test_1.file_ids - initial_files
self.assertEqual(len(new_files), 1, "Must be one new file created")
self.assertEqual(
new_files.code_on_server,
command.file_template_id.code,
"File code must match template",
)
self.assertFalse(
new_files.template_id, "File must be disconnected from template"
)
# ---------------------
# *********************
# Python commands
# *********************
# ---------------------
def test_render_code_python(self):
"""Test Python code template direct rendering"""
rendered_command = self.server_test_1._render_command(
self.command_create_new_command
)
# Note: this is rendered as for Server Test 1
rendered_code_pythonic = (
f"""
server_name = "{self.server_test_1.name}"
if server_name and #!cxtower.secret.FOLDER!# == "secretFolder":
# We don't actually create a new command because it will raise
# access error if user doesn't have access to 'create' operation.
# Instead we just return a dummy command result.
command = "new command"
result = {{"exit_code": 0, "message": "New command was created"}}
else:
result = {{"exit_code": %s, "message": "error"}}
"""
% GENERAL_ERROR
)
self.assertEqual(
rendered_command["rendered_code"],
rendered_code_pythonic,
"Rendered code doesn't match",
)
def test_execute_python_command(self):
"""
Run command with python action.
"""
command_result = self.server_test_1.with_context(
no_command_log=True
).run_command(self.command_create_new_command)
self.assertEqual(
command_result["status"], 0, "The command result status must be 0"
)
self.assertEqual(
command_result["response"],
"New command was created",
"The response must be text",
)
# Check error is raises
self.secret_folder_key.secret_value = "not_a_secretFolder"
command_result = self.server_test_1.with_context(
no_command_log=True
).run_command(self.command_create_new_command)
self.assertEqual(
command_result["status"],
GENERAL_ERROR,
"The command result status must be GENERAL_ERROR",
)
self.assertEqual(
command_result["error"],
"error",
"The error response must be contain text - error",
)
def test_run_python_code(self):
"""
Test python execution code
"""
rendered_command = self.server_test_1._render_command(
self.command_create_new_command
)
command_result = self.server_test_1._run_python_code(
rendered_command["rendered_code"]
)
self.assertEqual(
command_result["status"], 0, "The command result status must be 0"
)
self.assertEqual(
command_result["response"],
"New command was created",
"The response must be text",
)
self.assertIsNone(
command_result["error"],
"Error in command result must be set to None",
)
def test_run_command_without_set_server_status(self):
"""
Test command execution without setting server status
"""
# Set command access level to "user"
self.command_create_new_command.write({"access_level": "1"})
# Add user to command
self.write_and_invalidate(
self.server_test_1, **{"user_ids": [(4, self.user.id)]}
)
# Reset access rule cache
self.env["ir.rule"].invalidate_recordset()
# Run command
server_status = self.server_test_1.status
result = (
self.server_test_1.with_context(no_command_log=True)
.with_user(self.user)
.run_command(self.command_create_new_command)
)
# Check command result
self.assertEqual(result["status"], 0, "Command status must be 0")
self.assertEqual(
self.server_test_1.status, server_status, "Server status must be 'running'"
)
def test_run_command_with_set_server_status(self):
"""
Test command execution with setting server status
"""
# Set server status to "down"
self.command_create_new_command.write({"server_status": "stopping"})
# Run command
self.server_test_1.with_context(no_command_log=True).run_command(
self.command_create_new_command
)
# Check command result
self.assertEqual(
self.server_test_1.status, "stopping", "Server status must be 'stopping'"
)
def test_run_python_code_with_secret(self):
"""
Test execution of Python code with a secret value.
This test ensures that a command is rendered and executed correctly,
and that the secret value is correctly handled and replaced in the output.
"""
placeholder = self.Key.SECRET_VALUE_PLACEHOLDER
# Case 1
# Render the command using server_test_1
rendered_command = self.server_test_1._render_command(
self.command_python_command_1
)
# Run the rendered Python code
command_result = self.server_test_1._run_python_code(
rendered_command["rendered_code"]
)
# Assert that the command execution status is 0 (indicating success)
self.assertEqual(
command_result["status"], 0, "The command result status must be 0"
)
# Assert that the response contains the secret spoiler text
self.assertEqual(
command_result["response"],
placeholder,
"The response must correctly include the secret value placeholder",
)
# Assert that no error occurred during execution (error should be None)
self.assertIsNone(
command_result["error"],
"The error in command result must be None",
)
# Case 2
# Render the command using server_test_1
rendered_command = self.server_test_1._render_command(
self.command_python_command_2
)
# Run the rendered Python code
command_result = self.server_test_1._run_python_code(
rendered_command["rendered_code"]
)
# Assert that the command execution status is 0 (indicating success)
self.assertEqual(
command_result["status"], 0, "The command result status must be 0"
)
# Assert that the response contains the secret spoiler text
self.assertEqual(
command_result["response"],
f'We use "{placeholder}"',
"The response must correctly include the secret value placeholder",
)
# Assert that no error occurred during execution (error should be None)
self.assertIsNone(
command_result["error"],
"The error in command result must be None",
)
# Case 3
# Render the command using server_test_1
rendered_command = self.server_test_1._render_command(
self.command_python_command_3
)
# Run the rendered Python code
command_result = self.server_test_1._run_python_code(
rendered_command["rendered_code"]
)
# Assert that the command execution status is 0 (indicating success)
self.assertEqual(
command_result["status"], 0, "The command result status must be 0"
)
# Assert that the response contains the secret spoiler text
self.assertEqual(
command_result["response"],
placeholder,
"The response must correctly include the secret value placeholder",
)
# Assert that no error occurred during execution (error should be None)
self.assertIsNone(
command_result["error"],
"The error in command result must be None",
)
# Case 4
# Render the command using server_test_1
rendered_command = self.server_test_1._render_command(
self.command_python_command_4
)
# Run the rendered Python code
# SSH keys are not parsed inline, so we should raise a validation error
command_result = self.server_test_1._run_python_code(
rendered_command["rendered_code"]
)
# Assert that the command execution status is 0 (indicating success)
self.assertEqual(
command_result["status"], 0, "The command result status must be 0"
)
# Assert that the response contains the secret spoiler text
self.assertEqual(
command_result["response"],
placeholder,
"The response must correctly include the secret value placeholder",
)
# Assert that no error occurred during execution (error should be None)
self.assertIsNone(
command_result["error"],
"The error in command result must be None",
)
def test_command_with_secret(self):
"""
Test case to verify that when a command includes a secret reference,
the secret key is automatically linked with the command.
"""
# Command with a secret reference
code = "cd {{ test_path_ }} && mkdir #!cxtower.secret.FOLDER!#"
secrets = self.Command._extract_secret_ids(code)
secret_folder_key = self.secret_folder_key
self.assertIn(
secret_folder_key,
secrets,
msg=(
f"The expected secret ID #{secret_folder_key.id} "
"was not found in the provided code."
),
)
command_with_keys = self.Command.create(
{"name": "Command with keys", "code": code}
)
# -- 1 --
# Assert that the secret key is linked with the command
self.assertIn(
secret_folder_key,
command_with_keys.secret_ids,
msg="The secret key is not linked with the command.",
)
# -- 2 --
# Update the command's code to remove the secret reference
updated_code = "cd {{ test_path_ }} && mkdir new_folder"
command_with_keys.code = updated_code
self.assertFalse(
command_with_keys.secret_ids,
msg=(
"The secret_ids field should be empty after "
"removing the secret reference from command."
),
)
# -- 3 --
# Create a secret with the same reference but connected to another server
another_server = self.server_test_1.copy({"name": "another server"})
another_secret = self.Key.create(
{
"name": "another secret",
"reference": secret_folder_key.reference,
"key_type": "s",
}
)
another_secret_value = self.KeyValue.create(
{
"key_id": another_secret.id,
"server_id": another_server.id,
"secret_value": "another secret value",
}
)
# Set original code again
command_with_keys.code = code
self.assertEqual(
len(command_with_keys.secret_ids),
1,
msg="Must be only one secret",
)
self.assertIn(
secret_folder_key,
command_with_keys.secret_ids,
msg="The secret key is not linked with the command.",
)
self.assertNotIn(
another_secret,
command_with_keys.secret_ids,
msg="The another secret is linked with the command.",
)
# -- 4 --
# Connect command to server and secret to another server
# and ensure it's unlinked from the command.
yet_one_more_server = self.server_test_1.copy({"name": "yet one more server"})
self.write_and_invalidate(
another_secret_value, **{"server_id": yet_one_more_server.id}
)
self.write_and_invalidate(
command_with_keys, **{"server_ids": self.server_test_1}
)
self.assertEqual(
len(command_with_keys.secret_ids),
1,
msg="Must be one secret",
)
def test_check_zombie_commands(self):
"""Test checking and marking zombie commands"""
# Create test commands
ssh_command = self.Command.create(
{
"name": "Test SSH Command",
"code": "ls -la",
"action": "ssh_command",
}
)
python_command = self.Command.create(
{
"name": "Test Python Command",
"code": "print('test')",
"action": "python_code",
}
)
plan_command = self.Command.create(
{
"name": "Test Plan Command",
"code": "test plan",
"action": "plan",
}
)
# Set command timeout to 10 seconds
self.env["ir.config_parameter"].sudo().set_param(
"cetmix_tower_server.command_timeout", "10"
)
# Create command logs with different start times
now = Datetime.now()
old_time = now - timedelta(seconds=20) # Older than timeout
recent_time = now - timedelta(seconds=5) # Within timeout
# Create zombie SSH command log
zombie_ssh_log = self.CommandLog.create(
{
"command_id": ssh_command.id,
"server_id": self.server_test_1.id,
"start_date": old_time,
}
)
# Create zombie Python command log
zombie_python_log = self.CommandLog.create(
{
"command_id": python_command.id,
"server_id": self.server_test_1.id,
"start_date": old_time,
}
)
# Create non-zombie command logs
active_ssh_log = self.CommandLog.create(
{
"command_id": ssh_command.id,
"server_id": self.server_test_1.id,
"start_date": recent_time,
}
)
plan_log = self.CommandLog.create(
{
"command_id": plan_command.id,
"server_id": self.server_test_1.id,
"start_date": old_time,
}
)
# Test with timeout set
self.server_test_1._check_zombie_commands()
# Check zombie commands are marked as finished
self.assertFalse(
zombie_ssh_log.is_running, "Zombie SSH command should be marked as finished"
)
self.assertFalse(
zombie_python_log.is_running,
"Zombie Python command should be marked as finished",
)
self.assertEqual(
zombie_ssh_log.command_status,
COMMAND_TIMED_OUT,
"Zombie SSH command should have timed out status",
)
self.assertEqual(
zombie_python_log.command_error,
COMMAND_TIMED_OUT_MESSAGE,
"Zombie Python command should have timeout error message",
)
# Check non-zombie commands are still running
self.assertTrue(
active_ssh_log.is_running, "Recent command should still be running"
)
self.assertTrue(
plan_log.is_running, "Plan command should not be affected by timeout"
)
# Test with timeout disabled
self.env["ir.config_parameter"].sudo().set_param(
"cetmix_tower_server.command_timeout", "0"
)
# Create new zombie command log
new_zombie_log = self.CommandLog.create(
{
"command_id": ssh_command.id,
"server_id": self.server_test_1.id,
"start_date": old_time,
}
)
self.server_test_1._check_zombie_commands()
self.assertNotEqual(
new_zombie_log.command_status,
COMMAND_TIMED_OUT,
"Commands should not be marked as timed out when timeout is disabled",
)
def test_command_with_malformed_code(self):
"""Test rendering command using `_render_command` method
of cx.tower.server with malformed code
"""
with self.assertRaises(ValidationError):
self.Command.create(
{
"name": "Test Malformed Command",
"code": "cd {{ !@238203 }} && mkdir #!cxtower.secret.FOLDER!#",
"action": "ssh_command",
}
)