diff --git a/addons/queue_job/job.py b/addons/queue_job/job.py new file mode 100644 index 0000000..790e07d --- /dev/null +++ b/addons/queue_job/job.py @@ -0,0 +1,891 @@ +# Copyright 2013-2020 Camptocamp +# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) + +import hashlib +import inspect +import logging +import os +import sys +import uuid +import weakref +from datetime import datetime, timedelta +from random import randint + +import odoo + +from .exception import FailedJobError, NoSuchJobError, RetryableJobError + +WAIT_DEPENDENCIES = "wait_dependencies" +PENDING = "pending" +ENQUEUED = "enqueued" +CANCELLED = "cancelled" +DONE = "done" +STARTED = "started" +FAILED = "failed" + +STATES = [ + (WAIT_DEPENDENCIES, "Wait Dependencies"), + (PENDING, "Pending"), + (ENQUEUED, "Enqueued"), + (STARTED, "Started"), + (DONE, "Done"), + (CANCELLED, "Cancelled"), + (FAILED, "Failed"), +] + +DEFAULT_PRIORITY = 10 # used by the PriorityQueue to sort the jobs +DEFAULT_MAX_RETRIES = 5 +RETRY_INTERVAL = 10 * 60 # seconds + +_logger = logging.getLogger(__name__) + + +# TODO remove in 15.0 or 16.0, used to keep compatibility as the +# class has been moved in 'delay'. +def DelayableRecordset(*args, **kwargs): + # prevent circular import + from .delay import DelayableRecordset as dr + + _logger.warning( + "DelayableRecordset moved from the queue_job.job" + " to the queue_job.delay python module" + ) + return dr(*args, **kwargs) + + +def identity_exact(job_): + """Identity function using the model, method and all arguments as key + + When used, this identity key will have the effect that when a job should be + created and a pending job with the exact same recordset and arguments, the + second will not be created. + + It should be used with the ``identity_key`` argument: + + .. python:: + + from odoo.addons.queue_job.job import identity_exact + + # [...] + delayable = self.with_delay(identity_key=identity_exact) + delayable.export_record(force=True) + + Alternative identity keys can be built using the various fields of the job. + For example, you could compute a hash using only some arguments of + the job. + + .. python:: + + def identity_example(job_): + hasher = hashlib.sha1() + hasher.update(job_.model_name) + hasher.update(job_.method_name) + hasher.update(str(sorted(job_.recordset.ids))) + hasher.update(str(job_.args[1])) + hasher.update(str(job_.kwargs.get('foo', ''))) + return hasher.hexdigest() + + Usually you will probably always want to include at least the name of the + model and method. + """ + hasher = identity_exact_hasher(job_) + return hasher.hexdigest() + + +def identity_exact_hasher(job_): + """Prepare hasher object for identity_exact.""" + hasher = hashlib.sha1() + hasher.update(job_.model_name.encode("utf-8")) + hasher.update(job_.method_name.encode("utf-8")) + hasher.update(str(sorted(job_.recordset.ids)).encode("utf-8")) + hasher.update(str(job_.args).encode("utf-8")) + hasher.update(str(sorted(job_.kwargs.items())).encode("utf-8")) + return hasher + + +class Job: + """A Job is a task to execute. It is the in-memory representation of a job. + + Jobs are stored in the ``queue.job`` Odoo Model, but they are handled + through this class. + + .. attribute:: uuid + + Id (UUID) of the job. + + .. attribute:: graph_uuid + + Shared UUID of the job's graph. Empty if the job is a single job. + + .. attribute:: state + + State of the job, can pending, enqueued, started, done or failed. + The start state is pending and the final state is done. + + .. attribute:: retry + + The current try, starts at 0 and each time the job is executed, + it increases by 1. + + .. attribute:: max_retries + + The maximum number of retries allowed before the job is + considered as failed. + + .. attribute:: args + + Arguments passed to the function when executed. + + .. attribute:: kwargs + + Keyword arguments passed to the function when executed. + + .. attribute:: description + + Human description of the job. + + .. attribute:: func + + The python function itself. + + .. attribute:: model_name + + Odoo model on which the job will run. + + .. attribute:: priority + + Priority of the job, 0 being the higher priority. + + .. attribute:: date_created + + Date and time when the job was created. + + .. attribute:: date_enqueued + + Date and time when the job was enqueued. + + .. attribute:: date_started + + Date and time when the job was started. + + .. attribute:: date_done + + Date and time when the job was done. + + .. attribute:: result + + A description of the result (for humans). + + .. attribute:: exc_name + + Exception error name when the job failed. + + .. attribute:: exc_message + + Exception error message when the job failed. + + .. attribute:: exc_info + + Exception information (traceback) when the job failed. + + .. attribute:: user_id + + Odoo user id which created the job + + .. attribute:: eta + + Estimated Time of Arrival of the job. It will not be executed + before this date/time. + + .. attribute:: recordset + + Model recordset when we are on a delayed Model method + + .. attribute::channel + + The complete name of the channel to use to process the job. If + provided it overrides the one defined on the job's function. + + .. attribute::identity_key + + A key referencing the job, multiple job with the same key will not + be added to a channel if the existing job with the same key is not yet + started or executed. + + """ + + @classmethod + def load(cls, env, job_uuid): + """Read a single job from the Database + + Raise an error if the job is not found. + """ + stored = cls.db_records_from_uuids(env, [job_uuid]) + if not stored: + raise NoSuchJobError( + "Job %s does no longer exist in the storage." % job_uuid + ) + return cls._load_from_db_record(stored) + + @classmethod + def load_many(cls, env, job_uuids): + """Read jobs in batch from the Database + + Jobs not found are ignored. + """ + recordset = cls.db_records_from_uuids(env, job_uuids) + return {cls._load_from_db_record(record) for record in recordset} + + def add_lock_record(self): + """ + Create row in db to be locked while the job is being performed. + """ + self.env.cr.execute( + """ + INSERT INTO + queue_job_lock (id, queue_job_id) + SELECT + id, id + FROM + queue_job + WHERE + uuid = %s + ON CONFLICT(id) + DO NOTHING; + """, + [self.uuid], + ) + + def lock(self): + """ + Lock row of job that is being performed + + If a job cannot be locked, + it means that the job wasn't started, + a RetryableJobError is thrown. + """ + self.env.cr.execute( + """ + SELECT + * + FROM + queue_job_lock + WHERE + queue_job_id in ( + SELECT + id + FROM + queue_job + WHERE + uuid = %s + AND state='started' + ) + FOR UPDATE; + """, + [self.uuid], + ) + + # 1 job should be locked + if 1 != len(self.env.cr.fetchall()): + raise RetryableJobError( + f"Trying to lock job that wasn't started, uuid: {self.uuid}" + ) + + @classmethod + def _load_from_db_record(cls, job_db_record): + stored = job_db_record + + args = stored.args + kwargs = stored.kwargs + method_name = stored.method_name + + recordset = stored.records + method = getattr(recordset, method_name) + + eta = None + if stored.eta: + eta = stored.eta + + job_ = cls( + method, + args=args, + kwargs=kwargs, + priority=stored.priority, + eta=eta, + job_uuid=stored.uuid, + description=stored.name, + channel=stored.channel, + identity_key=stored.identity_key, + ) + + if stored.date_created: + job_.date_created = stored.date_created + + if stored.date_enqueued: + job_.date_enqueued = stored.date_enqueued + + if stored.date_started: + job_.date_started = stored.date_started + + if stored.date_done: + job_.date_done = stored.date_done + + if stored.date_cancelled: + job_.date_cancelled = stored.date_cancelled + + job_.state = stored.state + job_.graph_uuid = stored.graph_uuid if stored.graph_uuid else None + job_.result = stored.result if stored.result else None + job_.exc_info = stored.exc_info if stored.exc_info else None + job_.retry = stored.retry + job_.max_retries = stored.max_retries + if stored.company_id: + job_.company_id = stored.company_id.id + job_.identity_key = stored.identity_key + job_.worker_pid = stored.worker_pid + + job_.__depends_on_uuids.update(stored.dependencies.get("depends_on", [])) + job_.__reverse_depends_on_uuids.update( + stored.dependencies.get("reverse_depends_on", []) + ) + return job_ + + def job_record_with_same_identity_key(self): + """Check if a job to be executed with the same key exists.""" + existing = ( + self.env["queue.job"] + .sudo() + .search( + [ + ("identity_key", "=", self.identity_key), + ("state", "in", [WAIT_DEPENDENCIES, PENDING, ENQUEUED]), + ], + limit=1, + ) + ) + return existing + + @staticmethod + def db_record_from_uuid(env, job_uuid): + # TODO remove in 15.0 or 16.0 + _logger.debug("deprecated, use 'db_records_from_uuids") + return Job.db_records_from_uuids(env, [job_uuid]) + + @staticmethod + def db_records_from_uuids(env, job_uuids): + model = env["queue.job"].sudo() + record = model.search([("uuid", "in", tuple(job_uuids))]) + return record.with_env(env).sudo() + + def __init__( + self, + func, + args=None, + kwargs=None, + priority=None, + eta=None, + job_uuid=None, + max_retries=None, + description=None, + channel=None, + identity_key=None, + ): + """Create a Job + + :param func: function to execute + :type func: function + :param args: arguments for func + :type args: tuple + :param kwargs: keyworkd arguments for func + :type kwargs: dict + :param priority: priority of the job, + the smaller is the higher priority + :type priority: int + :param eta: the job can be executed only after this datetime + (or now + timedelta) + :type eta: datetime or timedelta + :param job_uuid: UUID of the job + :param max_retries: maximum number of retries before giving up and set + the job state to 'failed'. A value of 0 means infinite retries. + :param description: human description of the job. If None, description + is computed from the function doc or name + :param channel: The complete channel name to use to process the job. + :param identity_key: A hash to uniquely identify a job, or a function + that returns this hash (the function takes the job + as argument) + """ + if args is None: + args = () + if isinstance(args, list): + args = tuple(args) + assert isinstance(args, tuple), "%s: args are not a tuple" % args + if kwargs is None: + kwargs = {} + + assert isinstance(kwargs, dict), "%s: kwargs are not a dict" % kwargs + + if not _is_model_method(func): + raise TypeError("Job accepts only methods of Models") + + recordset = func.__self__ + env = recordset.env + self.method_name = func.__name__ + self.recordset = recordset + + self.env = env + self.job_model = self.env["queue.job"] + self.job_model_name = "queue.job" + + self.job_config = ( + self.env["queue.job.function"].sudo().job_config(self.job_function_name) + ) + + self.state = PENDING + + self.retry = 0 + if max_retries is None: + self.max_retries = DEFAULT_MAX_RETRIES + else: + self.max_retries = max_retries + + self._uuid = job_uuid + self.graph_uuid = None + + self.args = args + self.kwargs = kwargs + + self.__depends_on_uuids = set() + self.__reverse_depends_on_uuids = set() + self._depends_on = set() + self._reverse_depends_on = weakref.WeakSet() + + self.priority = priority + if self.priority is None: + self.priority = DEFAULT_PRIORITY + + self.date_created = datetime.now() + self._description = description + + if isinstance(identity_key, str): + self._identity_key = identity_key + self._identity_key_func = None + else: + # we'll compute the key on the fly when called + # from the function + self._identity_key = None + self._identity_key_func = identity_key + + self.date_enqueued = None + self.date_started = None + self.date_done = None + self.date_cancelled = None + + self.result = None + self.exc_name = None + self.exc_message = None + self.exc_info = None + + if "company_id" in env.context: + company_id = env.context["company_id"] + else: + company_id = env.company.id + self.company_id = company_id + self._eta = None + self.eta = eta + self.channel = channel + self.worker_pid = None + + def add_depends(self, jobs): + if self in jobs: + raise ValueError("job cannot depend on itself") + self.__depends_on_uuids |= {j.uuid for j in jobs} + self._depends_on.update(jobs) + for parent in jobs: + parent.__reverse_depends_on_uuids.add(self.uuid) + parent._reverse_depends_on.add(self) + if any(j.state != DONE for j in jobs): + self.state = WAIT_DEPENDENCIES + + def perform(self): + """Execute the job. + + The job is executed with the user which has initiated it. + """ + self.retry += 1 + try: + self.result = self.func(*tuple(self.args), **self.kwargs) + except RetryableJobError as err: + if err.ignore_retry: + self.retry -= 1 + raise + elif not self.max_retries: # infinite retries + raise + elif self.retry >= self.max_retries: + type_, value, traceback = sys.exc_info() + # change the exception type but keep the original + # traceback and message: + # http://blog.ianbicking.org/2007/09/12/re-raising-exceptions/ + new_exc = FailedJobError( + "Max. retries (%d) reached: %s" % (self.max_retries, value or type_) + ) + raise new_exc from err + raise + + return self.result + + def _get_common_dependent_jobs_query(self): + return """ + UPDATE queue_job + SET state = %s + FROM ( + SELECT child.id, array_agg(parent.state) as parent_states + FROM queue_job job + JOIN LATERAL + json_array_elements_text( + job.dependencies::json->'reverse_depends_on' + ) child_deps ON true + JOIN queue_job child + ON child.graph_uuid = job.graph_uuid + AND child.uuid = child_deps + JOIN LATERAL + json_array_elements_text( + child.dependencies::json->'depends_on' + ) parent_deps ON true + JOIN queue_job parent + ON parent.graph_uuid = job.graph_uuid + AND parent.uuid = parent_deps + WHERE job.uuid = %s + GROUP BY child.id + ) jobs + WHERE + queue_job.id = jobs.id + AND %s = ALL(jobs.parent_states) + AND state = %s; + """ + + def enqueue_waiting(self): + sql = self._get_common_dependent_jobs_query() + self.env.cr.execute(sql, (PENDING, self.uuid, DONE, WAIT_DEPENDENCIES)) + self.env["queue.job"].invalidate_model(["state"]) + + def cancel_dependent_jobs(self): + sql = self._get_common_dependent_jobs_query() + self.env.cr.execute(sql, (CANCELLED, self.uuid, CANCELLED, WAIT_DEPENDENCIES)) + self.env["queue.job"].invalidate_model(["state"]) + + def store(self): + """Store the Job""" + job_model = self.env["queue.job"] + # The sentinel is used to prevent edition sensitive fields (such as + # method_name) from RPC methods. + edit_sentinel = job_model.EDIT_SENTINEL + + db_record = self.db_record() + if db_record: + db_record.with_context(_job_edit_sentinel=edit_sentinel).write( + self._store_values() + ) + else: + job_model.with_context(_job_edit_sentinel=edit_sentinel).sudo().create( + self._store_values(create=True) + ) + + def _store_values(self, create=False): + vals = { + "state": self.state, + "priority": self.priority, + "retry": self.retry, + "max_retries": self.max_retries, + "exc_name": self.exc_name, + "exc_message": self.exc_message, + "exc_info": self.exc_info, + "company_id": self.company_id, + "result": str(self.result) if self.result else False, + "date_enqueued": False, + "date_started": False, + "date_done": False, + "exec_time": False, + "date_cancelled": False, + "eta": False, + "identity_key": False, + "worker_pid": self.worker_pid, + "graph_uuid": self.graph_uuid, + } + + if self.date_enqueued: + vals["date_enqueued"] = self.date_enqueued + if self.date_started: + vals["date_started"] = self.date_started + if self.date_done: + vals["date_done"] = self.date_done + if self.exec_time: + vals["exec_time"] = self.exec_time + if self.date_cancelled: + vals["date_cancelled"] = self.date_cancelled + if self.eta: + vals["eta"] = self.eta + if self.identity_key: + vals["identity_key"] = self.identity_key + + dependencies = { + "depends_on": [parent.uuid for parent in self.depends_on], + "reverse_depends_on": [ + children.uuid for children in self.reverse_depends_on + ], + } + vals["dependencies"] = dependencies + + if create: + vals.update( + { + "user_id": self.env.uid, + "channel": self.channel, + # The following values must never be modified after the + # creation of the job + "uuid": self.uuid, + "name": self.description, + "func_string": self.func_string, + "date_created": self.date_created, + "model_name": self.recordset._name, + "method_name": self.method_name, + "job_function_id": self.job_config.job_function_id, + "channel_method_name": self.job_function_name, + "records": self.recordset, + "args": self.args, + "kwargs": self.kwargs, + } + ) + + vals_from_model = self._store_values_from_model() + # Sanitize values: make sure you cannot screw core values + vals_from_model = {k: v for k, v in vals_from_model.items() if k not in vals} + vals.update(vals_from_model) + return vals + + def _store_values_from_model(self): + vals = {} + value_handlers_candidates = ( + "_job_store_values_for_" + self.method_name, + "_job_store_values", + ) + for candidate in value_handlers_candidates: + handler = getattr(self.recordset, candidate, None) + if handler is not None: + vals = handler(self) + return vals + + @property + def func_string(self): + model = repr(self.recordset) + args = [repr(arg) for arg in self.args] + kwargs = ["{}={!r}".format(key, val) for key, val in self.kwargs.items()] + all_args = ", ".join(args + kwargs) + return "{}.{}({})".format(model, self.method_name, all_args) + + def __eq__(self, other): + return self.uuid == other.uuid + + def __hash__(self): + return self.uuid.__hash__() + + def db_record(self): + return self.db_records_from_uuids(self.env, [self.uuid]) + + @property + def func(self): + recordset = self.recordset.with_context(job_uuid=self.uuid) + return getattr(recordset, self.method_name) + + @property + def job_function_name(self): + func_model = self.env["queue.job.function"].sudo() + return func_model.job_function_name(self.recordset._name, self.method_name) + + @property + def identity_key(self): + if self._identity_key is None: + if self._identity_key_func: + self._identity_key = self._identity_key_func(self) + return self._identity_key + + @identity_key.setter + def identity_key(self, value): + if isinstance(value, str): + self._identity_key = value + self._identity_key_func = None + else: + # we'll compute the key on the fly when called + # from the function + self._identity_key = None + self._identity_key_func = value + + @property + def depends_on(self): + if not self._depends_on: + self._depends_on = Job.load_many(self.env, self.__depends_on_uuids) + return self._depends_on + + @property + def reverse_depends_on(self): + if not self._reverse_depends_on: + self._reverse_depends_on = Job.load_many( + self.env, self.__reverse_depends_on_uuids + ) + return set(self._reverse_depends_on) + + @property + def description(self): + if self._description: + return self._description + elif self.func.__doc__: + return self.func.__doc__.splitlines()[0].strip() + else: + return "{}.{}".format(self.model_name, self.func.__name__) + + @property + def uuid(self): + """Job ID, this is an UUID""" + if self._uuid is None: + self._uuid = str(uuid.uuid4()) + return self._uuid + + @property + def model_name(self): + return self.recordset._name + + @property + def user_id(self): + return self.recordset.env.uid + + @property + def eta(self): + return self._eta + + @eta.setter + def eta(self, value): + if not value: + self._eta = None + elif isinstance(value, timedelta): + self._eta = datetime.now() + value + elif isinstance(value, int): + self._eta = datetime.now() + timedelta(seconds=value) + else: + self._eta = value + + @property + def channel(self): + return self._channel or self.job_config.channel + + @channel.setter + def channel(self, value): + self._channel = value + + @property + def exec_time(self): + if self.date_done and self.date_started: + return (self.date_done - self.date_started).total_seconds() + return None + + def set_pending(self, result=None, reset_retry=True): + if any(j.state != DONE for j in self.depends_on): + self.state = WAIT_DEPENDENCIES + else: + self.state = PENDING + self.date_enqueued = None + self.date_started = None + self.date_done = None + self.worker_pid = None + self.date_cancelled = None + if reset_retry: + self.retry = 0 + if result is not None: + self.result = result + + def set_enqueued(self): + self.state = ENQUEUED + self.date_enqueued = datetime.now() + self.date_started = None + self.worker_pid = None + + def set_started(self): + self.state = STARTED + self.date_started = datetime.now() + self.worker_pid = os.getpid() + self.add_lock_record() + + def set_done(self, result=None): + self.state = DONE + self.exc_name = None + self.exc_info = None + self.date_done = datetime.now() + if result is not None: + self.result = result + + def set_cancelled(self, result=None): + self.state = CANCELLED + self.date_cancelled = datetime.now() + if result is not None: + self.result = result + + def set_failed(self, **kw): + self.state = FAILED + for k, v in kw.items(): + if v is not None: + setattr(self, k, v) + + def __repr__(self): + return "" % (self.uuid, self.priority) + + def _get_retry_seconds(self, seconds=None): + retry_pattern = self.job_config.retry_pattern + if not seconds and retry_pattern: + # ordered from higher to lower count of retries + patt = sorted(retry_pattern.items(), key=lambda t: t[0]) + seconds = RETRY_INTERVAL + for retry_count, postpone_seconds in patt: + if self.retry >= retry_count: + seconds = postpone_seconds + else: + break + elif not seconds: + seconds = RETRY_INTERVAL + if isinstance(seconds, (list, tuple)): + seconds = randint(seconds[0], seconds[1]) + return seconds + + def postpone(self, result=None, seconds=None): + """Postpone the job + + Write an estimated time arrival to n seconds + later than now. Used when an retryable exception + want to retry a job later. + """ + eta_seconds = self._get_retry_seconds(seconds) + self.eta = timedelta(seconds=eta_seconds) + self.exc_name = None + self.exc_info = None + if result is not None: + self.result = result + + def related_action(self): + record = self.db_record() + if not self.job_config.related_action_enable: + return None + + funcname = self.job_config.related_action_func_name + if not funcname: + funcname = record._default_related_action + if not isinstance(funcname, str): + raise ValueError( + "related_action must be the name of the " + "method on queue.job as string" + ) + action = getattr(record, funcname) + action_kwargs = self.job_config.related_action_kwargs + return action(**action_kwargs) + + +def _is_model_method(func): + return inspect.ismethod(func) and isinstance( + func.__self__.__class__, odoo.models.MetaModel + )