From b55049d482e5476be76cd683a9c66e6ce7f4b84a Mon Sep 17 00:00:00 2001 From: git_admin Date: Mon, 27 Apr 2026 08:46:12 +0000 Subject: [PATCH] Tower: upload queue_job 16.0.2.12.0 (via marketplace) --- addons/queue_job/delay.py | 666 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 666 insertions(+) create mode 100644 addons/queue_job/delay.py diff --git a/addons/queue_job/delay.py b/addons/queue_job/delay.py new file mode 100644 index 0000000..726e850 --- /dev/null +++ b/addons/queue_job/delay.py @@ -0,0 +1,666 @@ +# Copyright 2019 Camptocamp +# Copyright 2019 Guewen Baconnier +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html) + +import itertools +import logging +import uuid +from collections import defaultdict, deque + +from .job import Job +from .utils import must_run_without_delay + +_logger = logging.getLogger(__name__) + + +def group(*delayables): + """Return a group of delayable to form a graph + + A group means that jobs can be executed concurrently. + A job or a group of jobs depending on a group can be executed only after + all the jobs of the group are done. + + Shortcut to :class:`~odoo.addons.queue_job.delay.DelayableGroup`. + + Example:: + + g1 = group(delayable1, delayable2) + g2 = group(delayable3, delayable4) + g1.on_done(g2) + g1.delay() + """ + return DelayableGroup(*delayables) + + +def chain(*delayables): + """Return a chain of delayable to form a graph + + A chain means that jobs must be executed sequentially. + A job or a group of jobs depending on a group can be executed only after + the last job of the chain is done. + + Shortcut to :class:`~odoo.addons.queue_job.delay.DelayableChain`. + + Example:: + + chain1 = chain(delayable1, delayable2, delayable3) + chain2 = chain(delayable4, delayable5, delayable6) + chain1.on_done(chain2) + chain1.delay() + """ + return DelayableChain(*delayables) + + +class Graph: + """Acyclic directed graph holding vertices of any hashable type + + This graph is not specifically designed to hold :class:`~Delayable` + instances, although ultimately it is used for this purpose. + """ + + __slots__ = "_graph" + + def __init__(self, graph=None): + if graph: + self._graph = graph + else: + self._graph = {} + + def add_vertex(self, vertex): + """Add a vertex + + Has no effect if called several times with the same vertex + """ + self._graph.setdefault(vertex, set()) + + def add_edge(self, parent, child): + """Add an edge between a parent and a child vertex + + Has no effect if called several times with the same pair of vertices + """ + self.add_vertex(child) + self._graph.setdefault(parent, set()).add(child) + + def vertices(self): + """Return the vertices (nodes) of the graph""" + return set(self._graph) + + def edges(self): + """Return the edges (links) of the graph""" + links = [] + for vertex, neighbours in self._graph.items(): + for neighbour in neighbours: + links.append((vertex, neighbour)) + return links + + # from + # https://codereview.stackexchange.com/questions/55767/finding-all-paths-from-a-given-graph + def paths(self, vertex): + """Generate the maximal cycle-free paths in graph starting at vertex. + + >>> g = {1: [2, 3], 2: [3, 4], 3: [1], 4: []} + >>> sorted(self.paths(1)) + [[1, 2, 3], [1, 2, 4], [1, 3]] + >>> sorted(self.paths(3)) + [[3, 1, 2, 4]] + """ + path = [vertex] # path traversed so far + seen = {vertex} # set of vertices in path + + def search(): + dead_end = True + for neighbour in self._graph[path[-1]]: + if neighbour not in seen: + dead_end = False + seen.add(neighbour) + path.append(neighbour) + yield from search() + path.pop() + seen.remove(neighbour) + if dead_end: + yield list(path) + + yield from search() + + def topological_sort(self): + """Yields a proposed order of nodes to respect dependencies + + The order is not unique, the result may vary, but it is guaranteed + that a node depending on another is not yielded before. + It assumes the graph has no cycle. + """ + depends_per_node = defaultdict(int) + for __, tail in self.edges(): + depends_per_node[tail] += 1 + + # the queue contains only elements for which all dependencies + # are resolved + queue = deque(self.root_vertices()) + while queue: + vertex = queue.popleft() + yield vertex + for node in self._graph[vertex]: + depends_per_node[node] -= 1 + if not depends_per_node[node]: + queue.append(node) + + def root_vertices(self): + """Returns the root vertices + + meaning they do not depend on any other job. + """ + dependency_vertices = set() + for dependencies in self._graph.values(): + dependency_vertices.update(dependencies) + return set(self._graph.keys()) - dependency_vertices + + def __repr__(self): + paths = [path for vertex in self.root_vertices() for path in self.paths(vertex)] + lines = [] + for path in paths: + lines.append(" → ".join(repr(vertex) for vertex in path)) + return "\n".join(lines) + + +class DelayableGraph(Graph): + """Directed Graph for :class:`~Delayable` dependencies + + It connects together the :class:`~Delayable`, :class:`~DelayableGroup` and + :class:`~DelayableChain` graphs, and creates then enqueued the jobs. + """ + + def _merge_graph(self, graph): + """Merge a graph in the current graph + + It takes each vertex, which can be :class:`~Delayable`, + :class:`~DelayableChain` or :class:`~DelayableGroup`, and updates the + current graph with the edges between Delayable objects (connecting + heads and tails of the groups and chains), so that at the end, the + graph contains only Delayable objects and their links. + """ + for vertex, neighbours in graph._graph.items(): + tails = vertex._tail() + for tail in tails: + # connect the tails with the heads of each node + heads = {head for n in neighbours for head in n._head()} + self._graph.setdefault(tail, set()).update(heads) + + def _connect_graphs(self): + """Visit the vertices' graphs and connect them, return the whole graph + + Build a new graph, walk the vertices and their related vertices, merge + their graph in the new one, until we have visited all the vertices + """ + graph = DelayableGraph() + graph._merge_graph(self) + + seen = set() + visit_stack = deque([self]) + while visit_stack: + current = visit_stack.popleft() + if current in seen: + continue + + vertices = current.vertices() + for vertex in vertices: + vertex_graph = vertex._graph + graph._merge_graph(vertex_graph) + visit_stack.append(vertex_graph) + + seen.add(current) + + return graph + + def _has_to_execute_directly(self, vertices): + """Used for tests to run tests directly instead of storing them + + In tests, prefer to use + :func:`odoo.addons.queue_job.tests.common.trap_jobs`. + """ + envs = {vertex.recordset.env for vertex in vertices} + for env in envs: + if must_run_without_delay(env): + return True + return False + + @staticmethod + def _ensure_same_graph_uuid(jobs): + """Set the same graph uuid on all jobs of the same graph""" + jobs_count = len(jobs) + if jobs_count == 0: + raise ValueError("Expecting jobs") + elif jobs_count == 1: + if jobs[0].graph_uuid: + raise ValueError( + "Job %s is a single job, it should not" + " have a graph uuid" % (jobs[0],) + ) + else: + graph_uuids = {job.graph_uuid for job in jobs if job.graph_uuid} + if len(graph_uuids) > 1: + raise ValueError("Jobs cannot have dependencies between several graphs") + elif len(graph_uuids) == 1: + graph_uuid = graph_uuids.pop() + else: + graph_uuid = str(uuid.uuid4()) + for job in jobs: + job.graph_uuid = graph_uuid + + def delay(self): + """Build the whole graph, creates jobs and delay them""" + graph = self._connect_graphs() + + vertices = graph.vertices() + + for vertex in vertices: + vertex._build_job() + + self._ensure_same_graph_uuid([vertex._generated_job for vertex in vertices]) + + if self._has_to_execute_directly(vertices): + self._execute_graph_direct(graph) + return + + for vertex, neighbour in graph.edges(): + neighbour._generated_job.add_depends({vertex._generated_job}) + + # If all the jobs of the graph have another job with the same identity, + # we do not create them. Maybe we should check that the found jobs are + # part of the same graph, but not sure it's really required... + # Also, maybe we want to check only the root jobs. + existing_mapping = {} + for vertex in vertices: + if not vertex.identity_key: + continue + generated_job = vertex._generated_job + existing = generated_job.job_record_with_same_identity_key() + if not existing: + # at least one does not exist yet, we'll delay the whole graph + existing_mapping.clear() + break + existing_mapping[vertex] = existing + + # We'll replace the generated jobs by the existing ones, so callers + # can retrieve the existing job in "_generated_job". + # existing_mapping contains something only if *all* the job with an + # identity have an existing one. + for vertex, existing in existing_mapping.items(): + vertex._generated_job = existing + return + + for vertex in vertices: + vertex._generated_job.store() + + def _execute_graph_direct(self, graph): + for delayable in graph.topological_sort(): + delayable._execute_direct() + + +class DelayableChain: + """Chain of delayables to form a graph + + Delayables can be other :class:`~Delayable`, :class:`~DelayableChain` or + :class:`~DelayableGroup` objects. + + A chain means that jobs must be executed sequentially. + A job or a group of jobs depending on a group can be executed only after + the last job of the chain is done. + + Chains can be connected to other Delayable, DelayableChain or + DelayableGroup objects by using :meth:`~done`. + + A Chain is enqueued by calling :meth:`~delay`, which delays the whole + graph. + Important: :meth:`~delay` must be called on the top-level + delayable/chain/group object of the graph. + """ + + __slots__ = ("_graph", "__head", "__tail") + + def __init__(self, *delayables): + self._graph = DelayableGraph() + iter_delayables = iter(delayables) + head = next(iter_delayables) + self.__head = head + self._graph.add_vertex(head) + for neighbour in iter_delayables: + self._graph.add_edge(head, neighbour) + head = neighbour + self.__tail = head + + def _head(self): + return self.__head._tail() + + def _tail(self): + return self.__tail._head() + + def __repr__(self): + inner_graph = "\n\t".join(repr(self._graph).split("\n")) + return "DelayableChain(\n\t{}\n)".format(inner_graph) + + def on_done(self, *delayables): + """Connects the current chain to other delayables/chains/groups + + The delayables/chains/groups passed in the parameters will be executed + when the current Chain is done. + """ + for delayable in delayables: + self._graph.add_edge(self.__tail, delayable) + return self + + def delay(self): + """Delay the whole graph""" + self._graph.delay() + + +class DelayableGroup: + """Group of delayables to form a graph + + Delayables can be other :class:`~Delayable`, :class:`~DelayableChain` or + :class:`~DelayableGroup` objects. + + A group means that jobs must be executed sequentially. + A job or a group of jobs depending on a group can be executed only after + the all the jobs of the group are done. + + Groups can be connected to other Delayable, DelayableChain or + DelayableGroup objects by using :meth:`~done`. + + A group is enqueued by calling :meth:`~delay`, which delays the whole + graph. + Important: :meth:`~delay` must be called on the top-level + delayable/chain/group object of the graph. + """ + + __slots__ = ("_graph", "_delayables") + + def __init__(self, *delayables): + self._graph = DelayableGraph() + self._delayables = set(delayables) + for delayable in delayables: + self._graph.add_vertex(delayable) + + def _head(self): + return itertools.chain.from_iterable(node._head() for node in self._delayables) + + def _tail(self): + return itertools.chain.from_iterable(node._tail() for node in self._delayables) + + def __repr__(self): + inner_graph = "\n\t".join(repr(self._graph).split("\n")) + return "DelayableGroup(\n\t{}\n)".format(inner_graph) + + def on_done(self, *delayables): + """Connects the current group to other delayables/chains/groups + + The delayables/chains/groups passed in the parameters will be executed + when the current Group is done. + """ + for parent in self._delayables: + for child in delayables: + self._graph.add_edge(parent, child) + return self + + def delay(self): + """Delay the whole graph""" + self._graph.delay() + + +class Delayable: + """Unit of a graph, one Delayable will lead to an enqueued job + + Delayables can have dependencies on each others, as well as dependencies on + :class:`~DelayableGroup` or :class:`~DelayableChain` objects. + + This class will generally not be used directly, it is used internally + by :meth:`~odoo.addons.queue_job.models.base.Base.delayable`. Look + in the base model for more details. + + Delayables can be connected to other Delayable, DelayableChain or + DelayableGroup objects by using :meth:`~done`. + + Properties of the future job can be set using the :meth:`~set` method, + which always return ``self``:: + + delayable.set(priority=15).set({"max_retries": 5, "eta": 15}).delay() + + It can be used for example to set properties dynamically. + + A Delayable is enqueued by calling :meth:`delay()`, which delays the whole + graph. + Important: :meth:`delay()` must be called on the top-level + delayable/chain/group object of the graph. + """ + + _properties = ( + "priority", + "eta", + "max_retries", + "description", + "channel", + "identity_key", + ) + __slots__ = _properties + ( + "recordset", + "_graph", + "_job_method", + "_job_args", + "_job_kwargs", + "_generated_job", + ) + + def __init__( + self, + recordset, + priority=None, + eta=None, + max_retries=None, + description=None, + channel=None, + identity_key=None, + ): + self._graph = DelayableGraph() + self._graph.add_vertex(self) + + self.recordset = recordset + + self.priority = priority + self.eta = eta + self.max_retries = max_retries + self.description = description + self.channel = channel + self.identity_key = identity_key + + self._job_method = None + self._job_args = () + self._job_kwargs = {} + + self._generated_job = None + + def _head(self): + return [self] + + def _tail(self): + return [self] + + def __repr__(self): + return "Delayable({}.{}({}, {}))".format( + self.recordset, + self._job_method.__name__ if self._job_method else "", + self._job_args, + self._job_kwargs, + ) + + def __del__(self): + if not self._generated_job: + _logger.warning("Delayable %s was prepared but never delayed", self) + + def _set_from_dict(self, properties): + for key, value in properties.items(): + if key not in self._properties: + raise ValueError("No property %s" % (key,)) + setattr(self, key, value) + + def set(self, *args, **kwargs): + """Set job properties and return self + + The values can be either a dictionary and/or keywork args + """ + if args: + # args must be a dict + self._set_from_dict(*args) + self._set_from_dict(kwargs) + return self + + def on_done(self, *delayables): + """Connects the current Delayable to other delayables/chains/groups + + The delayables/chains/groups passed in the parameters will be executed + when the current Delayable is done. + """ + for child in delayables: + self._graph.add_edge(self, child) + return self + + def delay(self): + """Delay the whole graph""" + self._graph.delay() + + def split(self, size, chain=False): + """Split the Delayables. + + Use `DelayableGroup` or `DelayableChain` + if `chain` is True containing batches of size `size` + """ + if not self._job_method: + raise ValueError("No method set on the Delayable") + + total_records = len(self.recordset) + + delayables = [] + for index in range(0, total_records, size): + recordset = self.recordset[index : index + size] + delayable = Delayable( + recordset, + priority=self.priority, + eta=self.eta, + max_retries=self.max_retries, + description=self.description, + channel=self.channel, + identity_key=self.identity_key, + ) + # Update the __self__ + delayable._job_method = getattr(recordset, self._job_method.__name__) + delayable._job_args = self._job_args + delayable._job_kwargs = self._job_kwargs + + delayables.append(delayable) + + description = self.description or ( + self._job_method.__doc__.splitlines()[0].strip() + if self._job_method.__doc__ + else "{}.{}".format(self.recordset._name, self._job_method.__name__) + ) + for index, delayable in enumerate(delayables): + delayable.set( + description="%s (split %s/%s)" + % (description, index + 1, len(delayables)) + ) + + # Prevent warning on deletion + self._generated_job = True + + return (DelayableChain if chain else DelayableGroup)(*delayables) + + def _build_job(self): + if self._generated_job: + return self._generated_job + self._generated_job = Job( + self._job_method, + args=self._job_args, + kwargs=self._job_kwargs, + priority=self.priority, + max_retries=self.max_retries, + eta=self.eta, + description=self.description, + channel=self.channel, + identity_key=self.identity_key, + ) + return self._generated_job + + def _store_args(self, *args, **kwargs): + self._job_args = args + self._job_kwargs = kwargs + return self + + def __getattr__(self, name): + if name in self.__slots__: + return super().__getattr__(name) + if name in self.recordset: + raise AttributeError( + "only methods can be delayed (%s called on %s)" % (name, self.recordset) + ) + recordset_method = getattr(self.recordset, name) + self._job_method = recordset_method + return self._store_args + + def _execute_direct(self): + assert self._generated_job + self._generated_job.perform() + + +class DelayableRecordset: + """Allow to delay a method for a recordset (shortcut way) + + Usage:: + + delayable = DelayableRecordset(recordset, priority=20) + delayable.method(args, kwargs) + + The method call will be processed asynchronously in the job queue, with + the passed arguments. + + This class will generally not be used directly, it is used internally + by :meth:`~odoo.addons.queue_job.models.base.Base.with_delay` + """ + + __slots__ = ("delayable",) + + def __init__( + self, + recordset, + priority=None, + eta=None, + max_retries=None, + description=None, + channel=None, + identity_key=None, + ): + self.delayable = Delayable( + recordset, + priority=priority, + eta=eta, + max_retries=max_retries, + description=description, + channel=channel, + identity_key=identity_key, + ) + + @property + def recordset(self): + return self.delayable.recordset + + def __getattr__(self, name): + def _delay_delayable(*args, **kwargs): + getattr(self.delayable, name)(*args, **kwargs).delay() + return self.delayable._generated_job + + return _delay_delayable + + def __str__(self): + return "DelayableRecordset(%s%s)" % ( + self.delayable.recordset._name, + getattr(self.delayable.recordset, "_ids", ""), + ) + + __repr__ = __str__