From 65094d2031ec6682735b916e39caaa072cf0aba3 Mon Sep 17 00:00:00 2001 From: git_admin Date: Mon, 27 Apr 2026 08:46:32 +0000 Subject: [PATCH] Tower: upload queue_job 16.0.2.12.0 (via marketplace) --- addons/queue_job/jobrunner/channels.py | 1082 ++++++++++++++++++++++++ 1 file changed, 1082 insertions(+) create mode 100644 addons/queue_job/jobrunner/channels.py diff --git a/addons/queue_job/jobrunner/channels.py b/addons/queue_job/jobrunner/channels.py new file mode 100644 index 0000000..7fcabbd --- /dev/null +++ b/addons/queue_job/jobrunner/channels.py @@ -0,0 +1,1082 @@ +# Copyright (c) 2015-2016 ACSONE SA/NV () +# Copyright 2015-2016 Camptocamp SA +# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) +import logging +from collections import namedtuple +from functools import total_ordering +from heapq import heappop, heappush +from weakref import WeakValueDictionary + +from ..exception import ChannelNotFound +from ..job import CANCELLED, DONE, ENQUEUED, FAILED, PENDING, STARTED, WAIT_DEPENDENCIES + +NOT_DONE = (WAIT_DEPENDENCIES, PENDING, ENQUEUED, STARTED, FAILED) +JobSortingKey = namedtuple("SortingKey", "eta priority date_created seq") + +_logger = logging.getLogger(__name__) + + +class PriorityQueue: + """A priority queue that supports removing arbitrary objects. + + Adding an object already in the queue is a no op. + Popping an empty queue returns None. + + >>> q = PriorityQueue() + >>> q.add(2) + >>> q.add(3) + >>> q.add(3) + >>> q.add(1) + >>> q[0] + 1 + >>> len(q) + 3 + >>> q.pop() + 1 + >>> q.remove(2) + >>> len(q) + 1 + >>> q[0] + 3 + >>> q.pop() + 3 + >>> q.pop() + >>> q.add(2) + >>> q.remove(2) + >>> q.add(2) + >>> q.pop() + 2 + """ + + def __init__(self): + self._heap = [] + self._known = set() # all objects in the heap (including removed) + self._removed = set() # all objects that have been removed + + def __len__(self): + return len(self._known) - len(self._removed) + + def __getitem__(self, i): + if i != 0: + raise IndexError() + while True: + if not self._heap: + raise IndexError() + o = self._heap[0] + if o in self._removed: + o2 = heappop(self._heap) + assert o2 == o + self._removed.remove(o) + self._known.remove(o) + else: + return o + + def __contains__(self, o): + return o in self._known and o not in self._removed + + def add(self, o): + if o is None: + raise ValueError() + self._removed.discard(o) + if o in self._known: + return + self._known.add(o) + heappush(self._heap, o) + + def remove(self, o): + if o is None: + raise ValueError() + if o not in self._known: + return + self._removed.add(o) + + def pop(self): + while True: + try: + o = heappop(self._heap) + except IndexError: + # queue is empty + return None + self._known.remove(o) + if o in self._removed: + self._removed.remove(o) + else: + return o + + +@total_ordering +class ChannelJob: + """A channel job is attached to a channel and holds the properties of a + job that are necessary to prioritise them. + + Channel jobs are comparable according to the following rules: + * jobs with an eta cannot be compared with jobs without + * then jobs with a smaller eta come first + * then jobs with a smaller priority come first + * then jobs with a smaller creation time come first + * then jobs with a smaller sequence come first + + Here are some examples. + + j1 comes before j2 because it has an earlier date_created + + >>> j1 = ChannelJob(None, None, 1, + ... seq=0, date_created=1, priority=9, eta=None) + >>> j1 + + >>> j2 = ChannelJob(None, None, 2, + ... seq=0, date_created=2, priority=9, eta=None) + >>> j1 < j2 + True + + j3 comes first because it has lower priority, + despite having a creation date after j1 and j2 + + >>> j3 = ChannelJob(None, None, 3, + ... seq=0, date_created=3, priority=2, eta=None) + >>> j3 < j1 + True + + j4 and j5 have an eta, they cannot be compared with j3 + + >>> j4 = ChannelJob(None, None, 4, + ... seq=0, date_created=4, priority=9, eta=9) + >>> j5 = ChannelJob(None, None, 5, + ... seq=0, date_created=5, priority=9, eta=9) + >>> j4 < j5 + True + >>> j4 < j3 + Traceback (most recent call last): + ... + TypeError: '<' not supported between instances of 'int' and 'NoneType' + + j6 has same date_created and priority as j5 but a smaller eta + + >>> j6 = ChannelJob(None, None, 6, + ... seq=0, date_created=5, priority=9, eta=2) + >>> j6 < j4 < j5 + True + + Here is the complete suite: + + >>> j6 < j4 < j5 and j3 < j1 < j2 + True + + j0 has the same properties as j1 but they are not considered + equal as they are different instances + + >>> j0 = ChannelJob(None, None, 1, + ... seq=0, date_created=1, priority=9, eta=None) + >>> j0 == j1 + False + >>> j0 == j0 + True + + Comparison excluding eta: + + >>> j1.sorting_key_ignoring_eta() < j2.sorting_key_ignoring_eta() + True + + """ + + __slots__ = ("db_name", "channel", "uuid", "_sorting_key", "__weakref__") + + def __init__(self, db_name, channel, uuid, seq, date_created, priority, eta): + self.db_name = db_name + self.channel = channel + self.uuid = uuid + self._sorting_key = JobSortingKey(eta, priority, date_created, seq) + + def __repr__(self): + return "" % self.uuid + + def __eq__(self, other): + return id(self) == id(other) + + def __hash__(self): + return id(self) + + def set_no_eta(self): + self._sorting_key = JobSortingKey(None, *self._sorting_key[1:]) + + @property + def seq(self): + return self._sorting_key.seq + + @property + def date_created(self): + return self._sorting_key.date_created + + @property + def priority(self): + return self._sorting_key.priority + + @property + def eta(self): + return self._sorting_key.eta + + def sorting_key(self): + # DEPRECATED + return self._sorting_key + + def sorting_key_ignoring_eta(self): + return self._sorting_key[1:] + + def __lt__(self, other): + # Do not compare job where ETA is set with job where it is not + # If one job 'eta' is set, and the other is None, it raises TypeError + return self._sorting_key < other._sorting_key + + +class ChannelQueue: + """A channel queue is a priority queue for jobs. + + Jobs with an eta are set aside until their eta is past due, at + which point they start competing normally with other jobs. + + >>> q = ChannelQueue() + >>> j1 = ChannelJob(None, None, 1, + ... seq=0, date_created=1, priority=1, eta=10) + >>> j2 = ChannelJob(None, None, 2, + ... seq=0, date_created=2, priority=1, eta=None) + >>> j3 = ChannelJob(None, None, 3, + ... seq=0, date_created=3, priority=1, eta=None) + >>> q.add(j1) + >>> q.add(j2) + >>> q.add(j3) + + Wakeup time is the eta of job 1. + + >>> q.get_wakeup_time() + 10 + + We have not reached the eta of job 1, so we get job 2. + + >>> q.pop(now=1) + + + Wakeup time is still the eta of job 1, and we get job 1 when we are past + it's eta. + + >>> q.get_wakeup_time() + 10 + >>> q.pop(now=11) + + + Now there is no wakeup time anymore, because no job have an eta. + + >>> q.get_wakeup_time() + 0 + >>> q.pop(now=12) + + >>> q.get_wakeup_time() + 0 + >>> q.pop(now=13) + + Observe that job with past eta still run after jobs with higher priority. + + >>> j4 = ChannelJob(None, None, 4, + ... seq=0, date_created=4, priority=10, eta=20) + >>> j5 = ChannelJob(None, None, 5, + ... seq=0, date_created=5, priority=1, eta=None) + >>> q.add(j4) + >>> q.add(j5) + >>> q.get_wakeup_time() + 20 + >>> q.pop(21) + + >>> q.get_wakeup_time() + 0 + >>> q.pop(22) + + + Test a sequential queue. + + >>> sq = ChannelQueue(sequential=True) + >>> j6 = ChannelJob(None, None, 6, + ... seq=0, date_created=6, priority=1, eta=None) + >>> j7 = ChannelJob(None, None, 7, + ... seq=0, date_created=7, priority=1, eta=20) + >>> j8 = ChannelJob(None, None, 8, + ... seq=0, date_created=8, priority=1, eta=None) + >>> sq.add(j6) + >>> sq.add(j7) + >>> sq.add(j8) + >>> sq.pop(10) + + >>> sq.pop(15) + >>> sq.pop(20) + + >>> sq.pop(30) + + """ + + def __init__(self, sequential=False): + self._queue = PriorityQueue() + self._eta_queue = PriorityQueue() + self.sequential = sequential + + def __len__(self): + return len(self._eta_queue) + len(self._queue) + + def __contains__(self, o): + return o in self._eta_queue or o in self._queue + + def add(self, job): + if job.eta: + self._eta_queue.add(job) + else: + self._queue.add(job) + + def remove(self, job): + self._eta_queue.remove(job) + self._queue.remove(job) + + def pop(self, now): + while self._eta_queue and self._eta_queue[0].eta <= now: + eta_job = self._eta_queue.pop() + eta_job.set_no_eta() + self._queue.add(eta_job) + if self.sequential and self._eta_queue and self._queue: + eta_job = self._eta_queue[0] + job = self._queue[0] + + if eta_job.sorting_key_ignoring_eta() < job.sorting_key_ignoring_eta(): + # eta ignored, the job with eta has higher priority + # than the job without eta; since it's a sequential + # queue we wait until eta + return None + return self._queue.pop() + + def get_wakeup_time(self, wakeup_time=0): + if self._eta_queue: + if not wakeup_time: + wakeup_time = self._eta_queue[0].eta + else: + wakeup_time = min(wakeup_time, self._eta_queue[0].eta) + return wakeup_time + + +class Channel: + """A channel for jobs, with a maximum capacity. + + When jobs are created by queue_job modules, they may be associated + to a job channel. Jobs with no channel are inserted into the root channel. + + Job channels are joined in a hierarchy down to the root channel. + When a job channel has available capacity, jobs are dequeued, marked + as running in the channel and are inserted into the queue of the + parent channel where they wait for available capacity and so on. + + Job channels can be visualized as water channels with a given flow + limit (= capacity). Channels are joined together in a downstream channel + and the flow limit of the downstream channel limits upstream channels.:: + + ---------------------+ + | + | + Ch. A C:4,Q:12,R:4 +----------------------- + + ---------------------+ Ch. root C:5,Q:0,R:4 + | + ---------------------+ + Ch. B C:1,Q:0,R:0 + ---------------------+----------------------- + + The above diagram illustrates two channels joining in the root channel. + The root channel has a capacity of 5, and 4 running jobs coming from + Channel A. Channel A has a capacity of 4, all in use (passed down to the + root channel), and 12 jobs enqueued. Channel B has a capacity of 1, + none in use. This means that whenever a new job comes in channel B, + there will be available room for it to run in the root channel. + + Note that from the point of view of a channel, 'running' means enqueued + in the downstream channel. Only jobs marked running in the root channel + are actually sent to Odoo for execution. + + Should a downstream channel have less capacity than its upstream channels, + jobs going downstream will be enqueued in the downstream channel, + and compete normally according to their properties (priority, etc). + + Using this technique, it is possible to enforce sequence in a channel + with a capacity of 1. It is also possible to dedicate a channel with a + limited capacity for application-autocreated subchannels + without risking to overflow the system. + """ + + def __init__(self, name, parent, capacity=None, sequential=False, throttle=0): + self.name = name + self.parent = parent + if self.parent: + self.parent.children[name] = self + self.children = {} + self._queue = ChannelQueue() + self._running = set() + self._failed = set() + self._pause_until = 0 # utc seconds since the epoch + self.capacity = capacity + self.throttle = throttle # seconds + self.sequential = sequential + + @property + def sequential(self): + return self._queue.sequential + + @sequential.setter + def sequential(self, val): + self._queue.sequential = val + + def configure(self, config): + """Configure a channel from a dictionary. + + Supported keys are: + + * capacity + * sequential + * throttle + """ + assert self.fullname.endswith(config["name"]) + self.capacity = config.get("capacity", None) + self.sequential = bool(config.get("sequential", False)) + self.throttle = int(config.get("throttle", 0)) + if self.sequential and self.capacity != 1: + raise ValueError("A sequential channel must have a capacity of 1") + + @property + def fullname(self): + """The full name of the channel, in dot separated notation.""" + if self.parent: + return self.parent.fullname + "." + self.name + else: + return self.name + + def get_subchannel_by_name(self, subchannel_name): + return self.children.get(subchannel_name) + + def __str__(self): + capacity = "∞" if self.capacity is None else str(self.capacity) + return "%s(C:%s,Q:%d,R:%d,F:%d)" % ( + self.fullname, + capacity, + len(self._queue), + len(self._running), + len(self._failed), + ) + + def remove(self, job): + """Remove a job from the channel.""" + self._queue.remove(job) + self._running.discard(job) + self._failed.discard(job) + if self.parent: + self.parent.remove(job) + + def set_done(self, job): + """Mark a job as done. + + This removes it from the channel queue. + """ + self.remove(job) + _logger.debug("job %s marked done in channel %s", job.uuid, self) + + def set_pending(self, job): + """Mark a job as pending. + + This puts the job in the channel queue and remove it + from parent channels queues. + """ + if job not in self._queue: + self._queue.add(job) + self._running.discard(job) + self._failed.discard(job) + if self.parent: + self.parent.remove(job) + _logger.debug("job %s marked pending in channel %s", job.uuid, self) + + def set_running(self, job): + """Mark a job as running. + + This also marks the job as running in parent channels. + """ + if job not in self._running: + self._queue.remove(job) + self._running.add(job) + self._failed.discard(job) + if self.parent: + self.parent.set_running(job) + _logger.debug("job %s marked running in channel %s", job.uuid, self) + + def set_failed(self, job): + """Mark the job as failed.""" + if job not in self._failed: + self._queue.remove(job) + self._running.discard(job) + self._failed.add(job) + if self.parent: + self.parent.remove(job) + _logger.debug("job %s marked failed in channel %s", job.uuid, self) + + def has_capacity(self): + if self.sequential and self._failed: + # a sequential queue blocks on failed jobs + return False + if not self.capacity: + # unlimited capacity + return True + return len(self._running) < self.capacity + + def get_jobs_to_run(self, now): + """Get jobs that are ready to run in channel. + + This works by enqueuing jobs that are ready to run in children + channels, then yielding jobs from the channel queue until + ``capacity`` jobs are marked running in the channel. + + If the ``throttle`` option is set on the channel, then it yields + no job until at least throttle seconds have elapsed since the previous + yield. + + :param now: the current datetime in seconds + + :return: iterator of + :class:`odoo.addons.queue_job.jobrunner.ChannelJob` + """ + # enqueue jobs of children channels + for child in self.children.values(): + for job in child.get_jobs_to_run(now): + self._queue.add(job) + # is this channel paused? + if self.throttle and self._pause_until: + if now < self._pause_until: + if self.has_capacity(): + _logger.debug( + "channel %s paused until %s because " + "of throttle delay between jobs", + self, + self._pause_until, + ) + return + else: + # unpause, this is important to avoid perpetual wakeup + # while the channel is at full capacity + self._pause_until = 0 + _logger.debug("channel %s unpaused at %s", self, now) + # yield jobs that are ready to run, while we have capacity + while self.has_capacity(): + job = self._queue.pop(now) + if not job: + return + self._running.add(job) + _logger.debug("job %s marked running in channel %s", job.uuid, self) + yield job + if self.throttle: + self._pause_until = now + self.throttle + _logger.debug("pausing channel %s until %s", self, self._pause_until) + return + + def get_wakeup_time(self, wakeup_time=0): + if not self.has_capacity(): + # this channel is full, do not request timed wakeup, as + # a notification will wakeup the runner when a job finishes + return wakeup_time + if self._pause_until: + # this channel is paused, request wakeup at the end of the pause + if not wakeup_time: + wakeup_time = self._pause_until + else: + wakeup_time = min(wakeup_time, self._pause_until) + # since this channel is paused, no need to look at the + # wakeup time of children nor eta jobs, as such jobs would not + # run anyway because they would end up in this paused channel + return wakeup_time + wakeup_time = self._queue.get_wakeup_time(wakeup_time) + for child in self.children.values(): + wakeup_time = child.get_wakeup_time(wakeup_time) + return wakeup_time + + +def split_strip(s, sep, maxsplit=-1): + """Split string and strip each component. + + >>> split_strip("foo: bar baz\\n: fred:", ":") + ['foo', 'bar baz', 'fred', ''] + """ + return [x.strip() for x in s.split(sep, maxsplit)] + + +class ChannelManager: + """High level interface for channels + + This class handles: + + * configuration of channels + * high level api to create and remove jobs (notify, remove_job, remove_db) + * get jobs to run + + Here is how the runner will use it. + + Let's create a channel manager and configure it. + + >>> from pprint import pprint as pp + >>> cm = ChannelManager() + >>> cm.simple_configure('root:4,A:4,B:1') + >>> db = 'db' + + Add a few jobs in channel A with priority 10 + + >>> cm.notify(db, 'A', 'A1', 1, 0, 10, None, 'pending') + >>> cm.notify(db, 'A', 'A2', 2, 0, 10, None, 'pending') + >>> cm.notify(db, 'A', 'A3', 3, 0, 10, None, 'pending') + >>> cm.notify(db, 'A', 'A4', 4, 0, 10, None, 'pending') + >>> cm.notify(db, 'A', 'A5', 5, 0, 10, None, 'pending') + >>> cm.notify(db, 'A', 'A6', 6, 0, 10, None, 'pending') + + Add a few jobs in channel B with priority 5 + + >>> cm.notify(db, 'B', 'B1', 1, 0, 5, None, 'pending') + >>> cm.notify(db, 'B', 'B2', 2, 0, 5, None, 'pending') + + We must now run one job from queue B which has a capacity of 1 + and 3 jobs from queue A so the root channel capacity of 4 is filled. + + >>> pp(list(cm.get_jobs_to_run(now=100))) + [, , , ] + + Job A2 is done. Next job to run is A5, even if we have + higher priority job in channel B, because channel B has a capacity of 1. + + >>> cm.notify(db, 'A', 'A2', 2, 0, 10, None, 'done') + >>> pp(list(cm.get_jobs_to_run(now=100))) + [] + + Job B1 is done. Next job to run is B2 because it has higher priority. + + >>> cm.notify(db, 'B', 'B1', 1, 0, 5, None, 'done') + >>> pp(list(cm.get_jobs_to_run(now=100))) + [] + + Let's say A1 is done and A6 gets a higher priority. A6 will run next. + + >>> cm.notify(db, 'A', 'A1', 1, 0, 10, None, 'done') + >>> cm.notify(db, 'A', 'A6', 6, 0, 5, None, 'pending') + >>> pp(list(cm.get_jobs_to_run(now=100))) + [] + + Let's test the throttling mechanism. Configure a 2 seconds delay + on channel A, end enqueue two jobs. + + >>> cm = ChannelManager() + >>> cm.simple_configure('root:4,A:4:throttle=2') + >>> cm.notify(db, 'A', 'A1', 1, 0, 10, None, 'pending') + >>> cm.notify(db, 'A', 'A2', 2, 0, 10, None, 'pending') + + We have only one job to run, because of the throttle. + + >>> pp(list(cm.get_jobs_to_run(now=100))) + [] + >>> cm.get_wakeup_time() + 102 + + We have no job to run, because of the throttle. + + >>> pp(list(cm.get_jobs_to_run(now=101))) + [] + >>> cm.get_wakeup_time() + 102 + + 2 seconds later, we can run the other job (even though the first one + is still running, because we have enough capacity). + + >>> pp(list(cm.get_jobs_to_run(now=102))) + [] + >>> cm.get_wakeup_time() + 104 + + Let's test throttling in combination with a queue reaching full capacity. + + >>> cm = ChannelManager() + >>> cm.simple_configure('root:4,T:2:throttle=2') + >>> cm.notify(db, 'T', 'T1', 1, 0, 10, None, 'pending') + >>> cm.notify(db, 'T', 'T2', 2, 0, 10, None, 'pending') + >>> cm.notify(db, 'T', 'T3', 3, 0, 10, None, 'pending') + + >>> pp(list(cm.get_jobs_to_run(now=100))) + [] + >>> pp(list(cm.get_jobs_to_run(now=102))) + [] + + Channel is now full, so no job to run even though throttling + delay is over. + + >>> pp(list(cm.get_jobs_to_run(now=103))) + [] + >>> cm.get_wakeup_time() # no wakeup time, since queue is full + 0 + >>> pp(list(cm.get_jobs_to_run(now=104))) + [] + >>> cm.get_wakeup_time() # queue is still full + 0 + + >>> cm.notify(db, 'T', 'T1', 1, 0, 10, None, 'done') + >>> pp(list(cm.get_jobs_to_run(now=105))) + [] + >>> cm.get_wakeup_time() # queue is full + 0 + >>> cm.notify(db, 'T', 'T2', 1, 0, 10, None, 'done') + >>> cm.get_wakeup_time() + 107 + + Test wakeup time behaviour in presence of eta. + + >>> cm = ChannelManager() + >>> cm.simple_configure('root:4,E:1') + >>> cm.notify(db, 'E', 'E1', 1, 0, 10, None, 'pending') + >>> cm.notify(db, 'E', 'E2', 2, 0, 10, None, 'pending') + >>> cm.notify(db, 'E', 'E3', 3, 0, 10, None, 'pending') + + >>> pp(list(cm.get_jobs_to_run(now=100))) + [] + >>> pp(list(cm.get_jobs_to_run(now=101))) + [] + >>> cm.notify(db, 'E', 'E1', 1, 0, 10, 105, 'pending') + >>> cm.get_wakeup_time() # wakeup at eta + 105 + >>> pp(list(cm.get_jobs_to_run(now=102))) # but there is capacity + [] + >>> pp(list(cm.get_jobs_to_run(now=106))) # no capacity anymore + [] + >>> cm.get_wakeup_time() # no timed wakeup because no capacity + 0 + >>> cm.notify(db, 'E', 'E2', 1, 0, 10, None, 'done') + >>> cm.get_wakeup_time() + 105 + >>> pp(list(cm.get_jobs_to_run(now=107))) # no capacity anymore + [] + >>> cm.get_wakeup_time() + 0 + + Test wakeup time behaviour in a sequential queue. + + >>> cm = ChannelManager() + >>> cm.simple_configure('root:4,S:1:sequential') + >>> cm.notify(db, 'S', 'S1', 1, 0, 10, None, 'pending') + >>> cm.notify(db, 'S', 'S2', 2, 0, 10, None, 'pending') + >>> cm.notify(db, 'S', 'S3', 3, 0, 10, None, 'pending') + + >>> pp(list(cm.get_jobs_to_run(now=100))) + [] + >>> cm.notify(db, 'S', 'S1', 1, 0, 10, None, 'failed') + >>> pp(list(cm.get_jobs_to_run(now=101))) + [] + >>> cm.notify(db, 'S', 'S2', 2, 0, 10, 105, 'pending') + >>> pp(list(cm.get_jobs_to_run(now=102))) + [] + + No wakeup time because due to eta, because the sequential queue + is waiting for a failed job. + + >>> cm.get_wakeup_time() + 0 + >>> cm.notify(db, 'S', 'S1', 1, 0, 10, None, 'pending') + >>> cm.get_wakeup_time() + 105 + >>> pp(list(cm.get_jobs_to_run(now=102))) + [] + >>> pp(list(cm.get_jobs_to_run(now=103))) + [] + >>> cm.notify(db, 'S', 'S1', 1, 0, 10, None, 'done') + + At this stage, we have S2 with an eta of 105 and since the + queue is sequential, we wait for it. + + >>> pp(list(cm.get_jobs_to_run(now=103))) + [] + >>> pp(list(cm.get_jobs_to_run(now=105))) + [] + >>> cm.notify(db, 'S', 'S2', 2, 0, 10, 105, 'done') + >>> pp(list(cm.get_jobs_to_run(now=105))) + [] + >>> cm.notify(db, 'S', 'S3', 3, 0, 10, None, 'done') + >>> pp(list(cm.get_jobs_to_run(now=105))) + [] + """ + + def __init__(self): + self._jobs_by_uuid = WeakValueDictionary() + self._root_channel = Channel(name="root", parent=None, capacity=1) + self._channels_by_name = WeakValueDictionary(root=self._root_channel) + + @classmethod + def parse_simple_config(cls, config_string): + """Parse a simple channels configuration string. + + The general form is as follow: + channel(.subchannel)*(:capacity(:key(=value)?)*)? [, ...] + + If capacity is absent, it defaults to 1. + If a key is present without value, it gets True as value. + When declaring subchannels, the root channel may be omitted + (ie sub:4 is the same as root.sub:4). + + Returns a list of channel configuration dictionaries. + + >>> from pprint import pprint as pp + >>> pp(ChannelManager.parse_simple_config('root:4')) + [{'capacity': 4, 'name': 'root'}] + >>> pp(ChannelManager.parse_simple_config('root:4,root.sub:2')) + [{'capacity': 4, 'name': 'root'}, {'capacity': 2, 'name': 'root.sub'}] + >>> pp(ChannelManager.parse_simple_config('root:4,root.sub:2:' + ... 'sequential:k=v')) + [{'capacity': 4, 'name': 'root'}, + {'capacity': 2, 'k': 'v', 'name': 'root.sub', 'sequential': True}] + >>> pp(ChannelManager.parse_simple_config('root')) + [{'capacity': 1, 'name': 'root'}] + >>> pp(ChannelManager.parse_simple_config('sub:2')) + [{'capacity': 2, 'name': 'sub'}] + + It ignores whitespace around values, and drops empty entries which + would be generated by trailing commas, or commented lines on the Odoo + config file. + + >>> pp(ChannelManager.parse_simple_config(''' + ... root : 4, + ... , + ... foo bar:1: k=va lue, + ... ''')) + [{'capacity': 4, 'name': 'root'}, + {'capacity': 1, 'k': 'va lue', 'name': 'foo bar'}] + + It's also possible to replace commas with line breaks, which is more + readable if the channel configuration comes from the odoo config file. + + >>> pp(ChannelManager.parse_simple_config(''' + ... root : 4 + ... foo bar:1: k=va lue + ... baz + ... ''')) + [{'capacity': 4, 'name': 'root'}, + {'capacity': 1, 'k': 'va lue', 'name': 'foo bar'}, + {'capacity': 1, 'name': 'baz'}] + """ + res = [] + config_string = config_string.replace("\n", ",") + for channel_config_string in split_strip(config_string, ","): + if not channel_config_string: + # ignore empty entries (commented lines, trailing commas) + continue + config = {} + config_items = split_strip(channel_config_string, ":") + name = config_items[0] + if not name: + raise ValueError( + "Invalid channel config %s: missing channel name" % config_string + ) + config["name"] = name + if len(config_items) > 1: + capacity = config_items[1] + try: + config["capacity"] = int(capacity) + except Exception as ex: + raise ValueError( + "Invalid channel config %s: " + "invalid capacity %s" % (config_string, capacity) + ) from ex + for config_item in config_items[2:]: + kv = split_strip(config_item, "=") + if len(kv) == 1: + k, v = kv[0], True + elif len(kv) == 2: + k, v = kv + else: + raise ValueError( + "Invalid channel config %s: " + "incorrect config item %s" % (config_string, config_item) + ) + if k in config: + raise ValueError( + "Invalid channel config %s: " + "duplicate key %s" % (config_string, k) + ) + config[k] = v + else: + config["capacity"] = 1 + res.append(config) + return res + + def simple_configure(self, config_string): + """Configure the channel manager from a simple configuration string + + >>> cm = ChannelManager() + >>> c = cm.get_channel_by_name('root') + >>> c.capacity + 1 + >>> cm.simple_configure('root:4,autosub.sub:2,seq:1:sequential') + >>> cm.get_channel_by_name('root').capacity + 4 + >>> cm.get_channel_by_name('root').sequential + False + >>> cm.get_channel_by_name('root.autosub').capacity + >>> cm.get_channel_by_name('root.autosub.sub').capacity + 2 + >>> cm.get_channel_by_name('root.autosub.sub').sequential + False + >>> cm.get_channel_by_name('autosub.sub').capacity + 2 + >>> cm.get_channel_by_name('seq').capacity + 1 + >>> cm.get_channel_by_name('seq').sequential + True + """ + for config in ChannelManager.parse_simple_config(config_string): + self.get_channel_from_config(config) + + def get_channel_from_config(self, config): + """Return a Channel object from a parsed configuration. + + If the channel does not exist it is created. + The configuration is applied on the channel before returning it. + If some of the parent channels are missing when creating a subchannel, + the parent channels are auto created with an infinite capacity + (except for the root channel, which defaults to a capacity of 1 + when not configured explicity). + """ + channel = self.get_channel_by_name(config["name"], autocreate=True) + channel.configure(config) + _logger.info("Configured channel: %s", channel) + return channel + + def get_channel_by_name( + self, channel_name, autocreate=False, parent_fallback=False + ): + """Return a Channel object by its name. + + If it does not exist and autocreate is True, it is created + with a default configuration and inserted in the Channels structure. + If autocreate is False and the channel does not exist, an exception + is raised. + + >>> cm = ChannelManager() + >>> c = cm.get_channel_by_name('root', autocreate=False) + >>> c.name + 'root' + >>> c.fullname + 'root' + >>> c = cm.get_channel_by_name('root.sub', autocreate=True) + >>> c.name + 'sub' + >>> c.fullname + 'root.sub' + >>> c = cm.get_channel_by_name('sub', autocreate=True) + >>> c.name + 'sub' + >>> c.fullname + 'root.sub' + >>> c = cm.get_channel_by_name('autosub.sub', autocreate=True) + >>> c.name + 'sub' + >>> c.fullname + 'root.autosub.sub' + >>> c = cm.get_channel_by_name(None) + >>> c.fullname + 'root' + >>> c = cm.get_channel_by_name('root.sub') + >>> c.fullname + 'root.sub' + >>> c = cm.get_channel_by_name('sub') + >>> c.fullname + 'root.sub' + >>> c = cm.get_channel_by_name('root.sub.not.configured', parent_fallback=True) + >>> c.fullname + 'root.sub.sub.not.configured' + """ + if not channel_name or channel_name == self._root_channel.name: + return self._root_channel + if not channel_name.startswith(self._root_channel.name + "."): + channel_name = self._root_channel.name + "." + channel_name + if channel_name in self._channels_by_name: + return self._channels_by_name[channel_name] + if not autocreate and not parent_fallback: + raise ChannelNotFound("Channel %s not found" % channel_name) + parent = self._root_channel + if parent_fallback: + # Look for first direct parent w/ config. + # Eg: `root.edi.foo.baz` will falback on `root.edi.foo` + # or `root.edi` or `root` in sequence + parent_name = channel_name + while True: + parent_name = parent_name.rsplit(".", 1)[:-1][0] + if parent_name == self._root_channel.name: + break + if parent_name in self._channels_by_name: + parent = self._channels_by_name[parent_name] + _logger.debug( + "%s has no specific configuration: using %s", + channel_name, + parent_name, + ) + break + for subchannel_name in channel_name.split(".")[1:]: + subchannel = parent.get_subchannel_by_name(subchannel_name) + if not subchannel: + subchannel = Channel(subchannel_name, parent, capacity=None) + self._channels_by_name[subchannel.fullname] = subchannel + parent = subchannel + return parent + + def notify( + self, db_name, channel_name, uuid, seq, date_created, priority, eta, state + ): + channel = self.get_channel_by_name(channel_name, parent_fallback=True) + job = self._jobs_by_uuid.get(uuid) + if job: + # db_name is invariant + assert job.db_name == db_name + # date_created is invariant + assert job.date_created == date_created + # if one of the job properties that influence + # scheduling order has changed, we remove the job + # from the queues and create a new job object + if ( + seq != job.seq + or priority != job.priority + or eta != job.eta + or channel != job.channel + ): + _logger.debug("job %s properties changed, rescheduling it", uuid) + self.remove_job(uuid) + job = None + if not job: + job = ChannelJob(db_name, channel, uuid, seq, date_created, priority, eta) + self._jobs_by_uuid[uuid] = job + # state transitions + if not state or state in (DONE, CANCELLED): + job.channel.set_done(job) + elif state == PENDING: + job.channel.set_pending(job) + elif state in (ENQUEUED, STARTED): + job.channel.set_running(job) + elif state == FAILED: + job.channel.set_failed(job) + elif state == WAIT_DEPENDENCIES: + # wait until all parent jobs are done + pass + else: + _logger.error("unexpected state %s for job %s", state, job) + + def remove_job(self, uuid): + job = self._jobs_by_uuid.get(uuid) + if job: + job.channel.remove(job) + del self._jobs_by_uuid[job.uuid] + + def remove_db(self, db_name): + for job in list(self._jobs_by_uuid.values()): + if job.db_name == db_name: + job.channel.remove(job) + del self._jobs_by_uuid[job.uuid] + + def get_jobs_to_run(self, now): + return self._root_channel.get_jobs_to_run(now) + + def get_wakeup_time(self): + return self._root_channel.get_wakeup_time()