From 90c893ae66c670c502736cc1a27697f6af4ceca0 Mon Sep 17 00:00:00 2001 From: git_admin Date: Mon, 27 Apr 2026 08:16:04 +0000 Subject: [PATCH] Tower: upload cetmix_tower_server 16.0.3.0.1 (via marketplace) --- .../models/cx_tower_scheduled_task.py | 442 ++++++++++++++++++ 1 file changed, 442 insertions(+) create mode 100644 addons/cetmix_tower_server/models/cx_tower_scheduled_task.py diff --git a/addons/cetmix_tower_server/models/cx_tower_scheduled_task.py b/addons/cetmix_tower_server/models/cx_tower_scheduled_task.py new file mode 100644 index 0000000..f809739 --- /dev/null +++ b/addons/cetmix_tower_server/models/cx_tower_scheduled_task.py @@ -0,0 +1,442 @@ +import logging +from datetime import timedelta + +from dateutil.relativedelta import relativedelta + +from odoo import _, api, fields, models +from odoo.exceptions import ValidationError + +_logger = logging.getLogger(__name__) + + +class CxTowerScheduledTask(models.Model): + """ + Scheduled Tasks. + Used to schedule commands and flight plans to run on servers and jets. + """ + + _name = "cx.tower.scheduled.task" + _description = "Scheduled Task" + _inherit = ["cx.tower.access.role.mixin", "cx.tower.reference.mixin"] + _order = "sequence, next_call" + + active = fields.Boolean(default=True) + sequence = fields.Integer(default=10) + server_ids = fields.Many2many( + "cx.tower.server", + "cx_tower_scheduled_task_server_rel", + "scheduled_task_id", + "server_id", + string="Servers", + ) + server_template_ids = fields.Many2many( + string="Server Templates", + comodel_name="cx.tower.server.template", + relation="cx_tower_server_template_scheduled_task_rel", + column1="scheduled_task_id", + column2="server_template_id", + ) + jet_ids = fields.Many2many( + "cx.tower.jet", + "cx_tower_scheduled_task_jet_rel", + "scheduled_task_id", + "jet_id", + string="Jets", + ) + jet_template_ids = fields.Many2many( + string="Jet Templates", + comodel_name="cx.tower.jet.template", + relation="cx_tower_jet_template_scheduled_task_rel", + column1="scheduled_task_id", + column2="jet_template_id", + ) + action = fields.Selection( + [("command", "Command"), ("plan", "Flight Plan")], required=True + ) + command_id = fields.Many2one("cx.tower.command", string="Command") + plan_id = fields.Many2one(string="Flight Plan", comodel_name="cx.tower.plan") + is_running = fields.Boolean(default=False, readonly=True) + interval_number = fields.Integer(default=1, help="Repeat every x.") + interval_type = fields.Selection( + [ + ("minutes", "Minutes"), + ("hours", "Hours"), + ("days", "Days"), + ("dow", "Days of Week"), + ("weeks", "Weeks"), + ("months", "Months"), + ], + string="Interval Unit", + default="months", + ) + next_call = fields.Datetime( + string="Next Execution Date", + required=True, + default=fields.Datetime.now, + help="Next planned execution date for this task.", + ) + last_call = fields.Datetime( + string="Last Execution Date", help="Previous time the task ran successfully." + ) + # Days of week + monday = fields.Boolean(default=False) + tuesday = fields.Boolean(default=False) + wednesday = fields.Boolean(default=False) + thursday = fields.Boolean(default=False) + friday = fields.Boolean(default=False) + saturday = fields.Boolean(default=False) + sunday = fields.Boolean(default=False) + + custom_variable_value_ids = fields.One2many( + "cx.tower.scheduled.task.cv", + "scheduled_task_id", + string="Custom Variable Values", + ) + warning_message = fields.Text( + compute="_compute_warning_message", + ) + + # ---- Access. Add relation for mixin fields + user_ids = fields.Many2many( + relation="cx_tower_scheduled_task_user_rel", + ) + manager_ids = fields.Many2many( + relation="cx_tower_scheduled_task_manager_rel", + ) + + _sql_constraints = [ + ( + "interval_positive", + "CHECK (interval_number > 0)", + "Interval number must be greater than zero.", + ), + ] + + @api.constrains( + "interval_type", + "monday", + "tuesday", + "wednesday", + "thursday", + "friday", + "saturday", + "sunday", + ) + def _check_days_of_week(self): + """ + Check if at least one day of week is selected + """ + for task in self: + if task.interval_type == "dow" and not any( + [ + task.monday, + task.tuesday, + task.wednesday, + task.thursday, + task.friday, + task.saturday, + task.sunday, + ] + ): + raise ValidationError( + _( + "At least one day of week must be selected for the task '%s'.", + task.display_name, + ) + ) + + @api.depends("interval_number", "interval_type") + def _compute_warning_message(self): + """ + Show warning on the task form if interval in the scheduled task + is less than interval in the underlaying cron job. + """ + cron = self.env.ref( + "cetmix_tower_server.ir_cron_run_scheduled_tasks", raise_if_not_found=False + ) + if not cron: + self.warning_message = False + return + + # Using now's date as the base point ensures a consistent and comparable + # reference when calculating the next scheduled execution for both the cron + # and the tasks. + now = fields.Datetime.now() + # _get_next_call is designed for tasks, but can also be used for the + # cron record, as both share the same interval fields. This keeps interval + # comparison logic consistent. + cron_next = self._get_next_call(cron, now) + + for task in self: + if task.interval_type == "dow": + task.warning_message = False + continue + task_next = self._get_next_call(task, now) + if task_next < cron_next: + task.warning_message = _( + "The selected task interval is too low in relation to the general " + "system settings. This may lead to task execution delays." + ) + else: + task.warning_message = False + + def action_run(self): + """ + Run scheduled action and reschedule next call. + """ + return self._run() + + def action_open_command_logs(self): + """ + Open current scheduled task command log records + """ + action = self.env["ir.actions.actions"]._for_xml_id( + "cetmix_tower_server.action_cx_tower_command_log" + ) + action["domain"] = [("scheduled_task_id", "=", self.id)] # pylint: disable=no-member + return action + + def action_open_plan_logs(self): + """ + Open current scheduled task flightplan log records + """ + action = self.env["ir.actions.actions"]._for_xml_id( + "cetmix_tower_server.action_cx_tower_plan_log" + ) + action["domain"] = [("scheduled_task_id", "=", self.id)] # pylint: disable=no-member + return action + + @api.model + def _run_scheduled_tasks(self): + """ + Cron: finds due tasks and runs their actions (command/plan). + Handles errors per-task and reserves tasks atomically to avoid double execution. + """ + now = fields.Datetime.now() + due_tasks = self.search( + [ + ("next_call", "<=", now), + ("active", "=", True), + ("is_running", "=", False), + ] + ) + if not due_tasks: + return + + due_tasks.with_context(from_cron=True)._run() + + def _run(self): + """ + Run scheduled action and reschedule next call. + """ + tasks = self._reserve_tasks() + if not tasks: + return + + if self.env.context.get("from_cron"): + # WARNING: Explicit commit! + # This commit is made **only** when called from cron (context["from_cron"]). + # Reason: To atomically reserve scheduled tasks by setting is_running=True, + # so that only one cron worker processes each task, even if multiple workers + # pick up the cron job at the same time. Without this commit, the change + # would not be visible to other transactions until the end of the cron + # transaction, leading to a race condition and possible double execution. + # Explicit commits are strongly discouraged in Odoo business logic and + # should be used only with clear justification and in strictly controlled + # contexts (like this cron scenario). Never add this commit for general + # business flows! + self.env.cr.commit() # pylint: disable=invalid-commit + + errors = [] + for task in tasks: + try: + with self.env.cr.savepoint(): + if task.action == "command" and task.command_id: + task._run_command() + elif task.action == "plan" and task.plan_id: + task._run_plan() + except Exception as e: + _logger.exception(f"Scheduled task {task.id} failed: {e}") + + task_error = _( + "Unable to run scheduled task '%(f)s'. Error: %(e)s", + f=task.display_name, + e=e, + ) + errors.append(task_error) + + finally: + finished_at = fields.Datetime.now() + # Always update the scheduling, even if the task failed + task.write( + { + "last_call": finished_at, + "next_call": self._get_next_call(task, task.next_call), + "is_running": False, + } + ) + + if errors: + return { + "type": "ir.actions.client", + "tag": "display_notification", + "params": { + "title": _("Failure"), + "message": "\n".join(errors), + }, + } + + return { + "type": "ir.actions.client", + "tag": "display_notification", + "params": { + "title": _("Success"), + "message": _("Scheduled tasks run successfully."), + }, + } + + def _get_next_call(self, task, from_date): + """ + Calculate next_call datetime + + task: cx.tower.scheduled.task + from_date: datetime + """ + if task.interval_type == "dow": + return self._get_next_call_dow(task, from_date) + + num = task.interval_number or 1 + intervals = { + "minutes": timedelta(minutes=num), + "hours": timedelta(hours=num), + "days": timedelta(days=num), + "weeks": timedelta(weeks=num), + "months": relativedelta(months=num), + } + return from_date + intervals.get(task.interval_type, timedelta()) + + def _get_task_selected_days(self, task): + """ + Get list of selected weekday numbers for a task + + task: cx.tower.scheduled.task + Returns: list of weekday numbers (0=Monday, 6=Sunday) + """ + selected_days = [] + if task.monday: + selected_days.append(0) + if task.tuesday: + selected_days.append(1) + if task.wednesday: + selected_days.append(2) + if task.thursday: + selected_days.append(3) + if task.friday: + selected_days.append(4) + if task.saturday: + selected_days.append(5) + if task.sunday: + selected_days.append(6) + return selected_days + + def _get_next_call_dow(self, task, from_date): + """ + Calculate next_call datetime for days of week interval type + + task: cx.tower.scheduled.task + from_date: datetime + """ + # Days of week: find next selected day at the same time + # weekday() returns 0=Monday, 6=Sunday + selected_days = self._get_task_selected_days(task) + if not selected_days: + raise ValidationError( + _( + "At least one day of week must be selected for the task '%s'.", + task.display_name, + ) + ) + current_weekday = from_date.weekday() + + # Find next selected day (starting from tomorrow to get next occurrence) + # Check days in current week first (after today) + next_day = None + for day in selected_days: + if day > current_weekday: + next_day = day + break + + # If no day found in current week, take first day of next week + if next_day is None: + next_day = min(selected_days) + days_ahead = (7 - current_weekday) + next_day + else: + days_ahead = next_day - current_weekday + + # Create new datetime with same time, on the next selected day + next_date = from_date + timedelta(days=days_ahead) + return next_date.replace( + hour=from_date.hour, + minute=from_date.minute, + second=from_date.second, + microsecond=from_date.microsecond, + ) + + def _run_command(self): + """Run command on selected servers.""" + variable_values = { + value.variable_id.reference: value.value_char + for value in self.custom_variable_value_ids + } + kwargs = { + "log": {"scheduled_task_id": self.id}, + "variable_values": variable_values, + } + # Run for servers + for server in self.server_ids: + server.run_command(self.command_id, **kwargs) + # Run for jets + for jet in self.jet_ids: + jet.run_command(self.command_id, **kwargs) + + def _run_plan(self): + """Run flight plan on selected servers.""" + variable_values = { + value.variable_id.reference: value.value_char + for value in self.custom_variable_value_ids + } + kwargs = { + "plan_log": {"scheduled_task_id": self.id}, + "variable_values": variable_values, + } + # Run for servers + for server in self.server_ids: + server.run_flight_plan(self.plan_id, **kwargs) + # Run for jets + for jet in self.jet_ids: + jet.run_flight_plan(self.plan_id, **kwargs) + + def _reserve_tasks(self, limit=None): + """ + Atomically select and lock free tasks for processing. + """ + sql = """ + SELECT id + FROM cx_tower_scheduled_task + WHERE is_running = FALSE AND id IN %s + ORDER BY id + """ + params = [tuple(self.ids)] + if limit: + sql += " LIMIT %s" + params.append(limit) + sql += " FOR UPDATE SKIP LOCKED" + self.env.cr.execute(sql, tuple(params)) + + task_ids = [row[0] for row in self.env.cr.fetchall()] + if not task_ids: + return self.browse() + + tasks = self.browse(task_ids) + tasks.write({"is_running": True}) + return tasks