import calendar from contextlib import contextmanager from dateutil.relativedelta import relativedelta import logging import math import re from odoo import fields, models, api, _, Command from odoo.exceptions import UserError from odoo.osv import expression from odoo.tools import SQL, float_compare import ast from odoo.addons.account.models.exceptions import TaxClosingNonPostedDependingMovesError from odoo.tools.misc import format_date from odoo.tools import date_utils from odoo.addons.web.controllers.utils import clean_action from markupsafe import Markup from odoo.exceptions import UserError, ValidationError from odoo.tools.misc import formatLang _logger = logging.getLogger(__name__) DEFERRED_DATE_MIN = '1900-01-01' DEFERRED_DATE_MAX = '9999-12-31' class AccountMove(models.Model): _inherit = "account.move" def _get_invoice_in_payment_state(self): return 'in_payment' payment_state_before_switch = fields.Char(string="Payment State Before Switch", copy=False) deferred_move_ids = fields.Many2many( string="Deferred Entries", comodel_name='account.move', relation='account_move_deferred_rel', column1='original_move_id', column2='deferred_move_id', help="The deferred entries created by this invoice", copy=False, ) deferred_original_move_ids = fields.Many2many( string="Original Invoices", comodel_name='account.move', relation='account_move_deferred_rel', column1='deferred_move_id', column2='original_move_id', help="The original invoices that created the deferred entries", copy=False, ) deferred_entry_type = fields.Selection( string="Deferred Entry Type", selection=[ ('expense', 'Deferred Expense'), ('revenue', 'Deferred Revenue'), ], compute='_compute_deferred_entry_type', copy=False, ) signing_user = fields.Many2one( string='Signer', comodel_name='res.users', compute='_compute_signing_user', store=True, copy=False, ) show_signature_area = fields.Boolean(compute='_compute_signature') signature = fields.Binary(compute='_compute_signature') # can't be `related`: the sign module might not be there # used for VAT closing, containing the end date of the period this entry closes tax_closing_report_id = fields.Many2one(comodel_name='account.report') # technical field used to know whether to show the tax closing alert or not tax_closing_alert = fields.Boolean(compute='_compute_tax_closing_alert') @api.depends('state', 'move_type', 'invoice_user_id') def _compute_signing_user(self): other_moves = self.filtered(lambda move: not move.is_sale_document()) other_moves.signing_user = False is_odoobot_user = self.env.user == self.env.ref('base.user_root') is_backend_user = self.env.user.has_group('base.group_user') for invoice in (self - other_moves).filtered(lambda inv: inv.state == 'posted'): # signer priority: # - res.user set in res.settings # - real backend user posting the invoice # - if odoobot: the person that initiated the invoice ie: The salesman # - if invoice initiated by a portal user -> No signature representative = invoice.company_id.signing_user # checking `has_group('base.group_user')` ensure we never keep a portal user to sign if is_odoobot_user: user_can_sign = invoice.invoice_user_id and invoice.invoice_user_id.has_group('base.group_user') invoice.signing_user = representative or invoice.invoice_user_id if user_can_sign else False else: invoice.signing_user = representative or self.env.user if is_backend_user else False @api.depends('state') def _compute_signature(self): is_portal_user = self.env.user.has_group('base.group_portal') # Checking `company_id.sign_invoice` removes the needs to check if the sign module is installed # Setting it to True through `res.settings` auto install the sign module moves_not_to_sign = self.filtered( lambda inv: not inv.company_id.sign_invoice or inv.state in {'draft', 'cancel'} or not inv.is_sale_document() # Allow signature for portal user only if the invoice already went through the send&print workflow or (is_portal_user and not inv.invoice_pdf_report_id) ) moves_not_to_sign.show_signature_area = False moves_not_to_sign.signature = None invoice_with_signature = self - moves_not_to_sign invoice_with_signature.show_signature_area = True for invoice in invoice_with_signature: invoice.signature = invoice.signing_user.sign_signature def _post(self, soft=True): # Deferred management posted = super()._post(soft) for move in self: if move._get_deferred_entries_method() == 'on_validation' and any(move.line_ids.mapped('deferred_start_date')): move._generate_deferred_entries() return posted def action_post(self): # EXTENDS 'account' to trigger the CRON auto-reconciling the statement lines. res = super().action_post() if self.statement_line_id and not self._context.get('skip_statement_line_cron_trigger'): self.env.ref('at_accounting.auto_reconcile_bank_statement_line')._trigger() return res def button_draft(self): if any(len(deferral_move.deferred_original_move_ids) > 1 for deferral_move in self.deferred_move_ids): raise UserError(_("You cannot reset to draft an invoice that is grouped in deferral entry. You can create a credit note instead.")) reversed_moves = self.deferred_move_ids._unlink_or_reverse() if reversed_moves: for move in reversed_moves: move.with_context(skip_readonly_check=True).write({ 'date': move._get_accounting_date(move.date, move._affect_tax_report()), }) self.deferred_move_ids |= reversed_moves return super().button_draft() def unlink(self): # Prevent deferred moves under audit trail restriction from being unlinked deferral_moves = self.filtered(lambda move: move.company_id.check_account_audit_trail and move.deferred_original_move_ids) deferral_moves.deferred_original_move_ids.deferred_move_ids = False deferral_moves._reverse_moves() return super(AccountMove, self - deferral_moves).unlink() # ============================= START - Deferred Management ==================================== def _get_deferred_entries_method(self): self.ensure_one() if self.is_outbound(): return self.company_id.generate_deferred_expense_entries_method return self.company_id.generate_deferred_revenue_entries_method @api.depends('deferred_original_move_ids') def _compute_deferred_entry_type(self): for move in self: if move.deferred_original_move_ids: move.deferred_entry_type = 'expense' if move.deferred_original_move_ids[0].is_outbound() else 'revenue' else: move.deferred_entry_type = False @api.model def _get_deferred_diff_dates(self, start, end): """ Returns the number of months between two dates [start, end[ The computation is done by using months of 30 days so that the deferred amount for february (28-29 days), march (31 days) and april (30 days) are all the same (in case of monthly computation). See test_deferred_management_get_diff_dates for examples. """ if start > end: start, end = end, start nb_months = end.month - start.month + 12 * (end.year - start.year) start_day, end_day = start.day, end.day if start_day == calendar.monthrange(start.year, start.month)[1]: start_day = 30 if end_day == calendar.monthrange(end.year, end.month)[1]: end_day = 30 nb_days = end_day - start_day return (nb_months * 30 + nb_days) / 30 @api.model def _get_deferred_period_amount(self, method, period_start, period_end, line_start, line_end, balance): """ Returns the amount to defer for the given period taking into account the deferred method (day/month/full_months). """ is_valid_period = period_end > line_start and period_end > period_start if method == 'day': amount_per_day = balance / (line_end - line_start).days return (period_end - period_start).days * amount_per_day if is_valid_period else 0 elif method == "month": amount_per_month = balance / self._get_deferred_diff_dates(line_end, line_start) nb_months_period = self._get_deferred_diff_dates(period_end, period_start) return nb_months_period * amount_per_month if is_valid_period else 0 elif method == "full_months": line_diff = self._get_deferred_diff_dates(line_end, line_start) period_diff = self._get_deferred_diff_dates(period_end, period_start) if line_diff < 1: amount = balance else: if line_end.day == calendar.monthrange(line_end.year, line_end.month)[1]: line_diff = math.ceil(line_diff) else: line_diff = math.floor(line_diff) if period_end.day == calendar.monthrange(period_end.year, period_end.month)[1] or line_end != period_end: period_diff = math.ceil(period_diff) else: period_diff = math.floor(period_diff) amount_per_month = balance / line_diff amount = period_diff * amount_per_month return amount if is_valid_period else 0 @api.model def _get_deferred_amounts_by_line(self, lines, periods, deferred_type): """ :return: a list of dictionaries containing the deferred amounts for each line and each period E.g. (where period1 = (date1, date2, label1), period2 = (date2, date3, label2), ...) [ {'account_id': 1, period_1: 100, period_2: 200}, {'account_id': 1, period_1: 100, period_2: 200}, {'account_id': 2, period_1: 300, period_2: 400}, ] """ values = [] for line in lines: line_start = fields.Date.to_date(line['deferred_start_date']) line_end = fields.Date.to_date(line['deferred_end_date']) if line_end < line_start: # This normally shouldn't happen, but if it does, would cause calculation errors later on. # To not make the reports crash, we just set both dates to the same day. # The user should fix the dates manually. line_end = line_start columns = {} for period in periods: if period[2] == 'not_started' and line_start <= period[0]: # The 'Not Started' column only considers lines starting the deferral after the report end date columns[period] = 0.0 continue # periods = [Total, Not Started, Before, ..., Current, ..., Later] # The dates to calculate the amount for the current period period_start = max(period[0], line_start) period_end = min(period[1], line_end) if ( period[2] in ('not_started', 'later') and period[0] < line_start or len(periods) <= 1 or period[2] not in ('not_started', 'before', 'later') ): # We are subtracting 1 day from `period_start` because the start date should be included when: # - in the 'Not Started' or 'Later' period if the deferral has not started yet (line_start, line_end) # - we only have one period # - not in the 'Not Started', 'Before' or 'Later' period period_start -= relativedelta(days=1) columns[period] = self._get_deferred_period_amount( self.env.company.deferred_expense_amount_computation_method if deferred_type == "expense" else self.env.company.deferred_revenue_amount_computation_method, period_start, period_end, line_start - relativedelta(days=1), line_end, # -1 because we want to include the start date line['balance'] ) values.append({ **self.env['account.move.line']._get_deferred_amounts_by_line_values(line), **columns, }) return values @api.model def _get_deferred_lines(self, line, deferred_account, deferred_type, period, ref, force_balance=None, grouping_field='account_id'): """ :return: a list of Command objects to create the deferred lines of a single given period """ deferred_amounts = self._get_deferred_amounts_by_line(line, [period], deferred_type)[0] balance = deferred_amounts[period] if force_balance is None else force_balance return [ Command.create({ **self.env['account.move.line']._get_deferred_lines_values(account.id, coeff * balance, ref, line.analytic_distribution, line), 'partner_id': line.partner_id.id, 'product_id': line.product_id.id, }) for (account, coeff) in [(deferred_amounts[grouping_field], 1), (deferred_account, -1)] ] def _generate_deferred_entries(self): """ Generates the deferred entries for the invoice. """ self.ensure_one() if self.state != 'posted': return if self.is_entry(): raise UserError(_("You cannot generate deferred entries for a miscellaneous journal entry.")) deferred_type = "expense" if self.is_purchase_document() else "revenue" deferred_account = self.company_id.deferred_expense_account_id if deferred_type == "expense" else self.company_id.deferred_revenue_account_id deferred_journal = self.company_id.deferred_expense_journal_id if deferred_type == "expense" else self.company_id.deferred_revenue_journal_id if not deferred_journal: raise UserError(_("Please set the deferred journal in the accounting settings.")) if not deferred_account: raise UserError(_("Please set the deferred accounts in the accounting settings.")) for line in self.line_ids.filtered(lambda l: l.deferred_start_date and l.deferred_end_date): periods = line._get_deferred_periods() if not periods: continue ref = _("Deferral of %s", line.move_id.name or '') move_vals = { 'move_type': 'entry', 'deferred_original_move_ids': [Command.set(line.move_id.ids)], 'journal_id': deferred_journal.id, 'company_id': self.company_id.id, 'partner_id': line.partner_id.id, 'auto_post': 'at_date', 'ref': ref, 'name': False, } # Defer the current invoice move_fully_deferred = self.create({ **move_vals, 'date': line.move_id.date, }) # We write the lines after creation, to make sure the `deferred_original_move_ids` is set. # This way we can avoid adding taxes for deferred moves. move_fully_deferred.write({ 'line_ids': [ Command.create( self.env['account.move.line']._get_deferred_lines_values(account.id, coeff * line.balance, ref, line.analytic_distribution, line) ) for (account, coeff) in [(line.account_id, -1), (deferred_account, 1)] ], }) # Create the deferred entries for the periods [deferred_start_date, deferred_end_date] deferral_moves = self.create([{ **move_vals, 'date': period[1], } for period in periods]) remaining_balance = line.balance for period_index, (period, deferral_move) in enumerate(zip(periods, deferral_moves)): # For the last deferral move the balance is forced to remaining balance to avoid rounding errors force_balance = remaining_balance if period_index == len(periods) - 1 else None # Same as before, to avoid adding taxes for deferred moves. deferral_move.write({ 'line_ids': self._get_deferred_lines(line, deferred_account, deferred_type, period, ref, force_balance=force_balance), }) remaining_balance -= deferral_move.line_ids[0].balance # Avoid having deferral moves with a total amount of 0 if deferral_move.currency_id.is_zero(deferral_move.amount_total): deferral_moves -= deferral_move deferral_move.unlink() deferred_moves = move_fully_deferred + deferral_moves if len(deferral_moves) == 1 and move_fully_deferred.date.month == deferral_moves.date.month: # If, after calculation, we have 2 deferral entries in the same month, it means that # they simply cancel out each other, so there is no point in creating them. deferred_moves.unlink() continue line.move_id.deferred_move_ids |= deferred_moves deferred_moves._post(soft=True) def open_deferred_entries(self): self.ensure_one() return { 'type': 'ir.actions.act_window', 'name': _("Deferred Entries"), 'res_model': 'account.move.line', 'domain': [('id', 'in', self.deferred_move_ids.line_ids.ids)], 'views': [(self.env.ref('at_accounting.view_deferred_entries_tree').id, 'list')], 'context': { 'search_default_group_by_move': True, 'expand': True, } } def open_deferred_original_entry(self): self.ensure_one() action = { 'type': 'ir.actions.act_window', 'name': _("Original Deferred Entries"), 'res_model': 'account.move.line', 'domain': [('id', 'in', self.deferred_original_move_ids.line_ids.ids)], 'views': [(False, 'list'), (False, 'form')], 'context': { 'search_default_group_by_move': True, 'expand': True, } } if len(self.deferred_original_move_ids) == 1: action.update({ 'res_model': 'account.move', 'res_id': self.deferred_original_move_ids[0].id, 'views': [(False, 'form')], }) return action # ============================= END - Deferred management ====================================== def action_open_bank_reconciliation_widget(self): return self.statement_line_id._action_open_bank_reconciliation_widget( default_context={ 'search_default_journal_id': self.statement_line_id.journal_id.id, 'search_default_statement_line_id': self.statement_line_id.id, 'default_st_line_id': self.statement_line_id.id, } ) def action_open_bank_reconciliation_widget_statement(self): return self.statement_line_id._action_open_bank_reconciliation_widget( extra_domain=[('statement_id', 'in', self.statement_id.ids)], ) def action_open_business_doc(self): if self.statement_line_id: return self.action_open_bank_reconciliation_widget() else: action = super().action_open_business_doc() # prevent propagation of the following keys action['context'] = action.get('context', {}) | { 'preferred_aml_value': None, 'preferred_aml_currency_id': None, } return action def _get_mail_thread_data_attachments(self): res = super()._get_mail_thread_data_attachments() res += self.statement_line_id.statement_id.attachment_ids return res @contextmanager def _get_edi_creation(self): with super()._get_edi_creation() as move: previous_lines = move.invoice_line_ids yield move.with_context(disable_onchange_name_predictive=True) for line in move.invoice_line_ids - previous_lines: line._onchange_name_predictive() def _post(self, soft=True): # Overridden to create carryover external values and join the pdf of the report when posting the tax closing for move in self.filtered(lambda m: m.tax_closing_report_id): report = move.tax_closing_report_id options = move._get_tax_closing_report_options(move.company_id, move.fiscal_position_id, report, move.date) move._close_tax_period(report, options) return super()._post(soft) def action_post(self): # In the case of a TaxClosingNonPostedDependingMovesError, which can occur when dealing with branches or tax # units during the closing process, the parent company may have non-posted closing entries from other companies. # If this exception occurs, we will return an action client that will display a component indicating that there # are non-posted dependent moves, along with a link to those moves. # Also, we are not using a RedirectWarning because it will force a rollback on the closing move created for # depending companies. try: res = super().action_post() except TaxClosingNonPostedDependingMovesError as exception: return { "type": "ir.actions.client", "tag": "at_accounting.redirect_action", "target": "new", "name": "Depending Action", "params": { "depending_action": exception.args[0], "message": _("It seems there is some depending closing move to be posted"), "button_text": _("Depending moves"), }, 'context': { 'dialog_size': 'medium', } } return res def button_draft(self): # Overridden in order to delete the carryover values when resetting the tax closing to draft super().button_draft() for closing_move in self.filtered(lambda m: m.tax_closing_report_id): report = closing_move.tax_closing_report_id options = closing_move._get_tax_closing_report_options(closing_move.company_id, closing_move.fiscal_position_id, report, closing_move.date) closing_months_delay = closing_move.company_id._get_tax_periodicity_months_delay(report) carryover_values = self.env['account.report.external.value'].search([ ('carryover_origin_report_line_id', 'in', report.line_ids.ids), ('date', '=', options['date']['date_to']), ]) carryover_impacted_period_end = fields.Date.from_string(options['date']['date_to']) + relativedelta(months=closing_months_delay) tax_lock_date = closing_move.company_id.tax_lock_date if carryover_values and tax_lock_date and tax_lock_date >= carryover_impacted_period_end: raise UserError(_("You cannot reset this closing entry to draft, as it would delete carryover values impacting the tax report of a " "locked period. To do this, you first need to modify you tax return lock date.")) if self._has_subsequent_posted_closing_moves(): raise UserError(_("You cannot reset this closing entry to draft, as another closing entry has been posted at a later date.")) carryover_values.unlink() def _has_subsequent_posted_closing_moves(self): self.ensure_one() closing_domains = [ ('company_id', '=', self.company_id.id), ('tax_closing_report_id', '!=', False), ('state', '=', 'posted'), ('date', '>', self.date), ('fiscal_position_id', '=', self.fiscal_position_id.id) ] return bool(self.env['account.move'].search_count(closing_domains, limit=1)) def _get_tax_to_pay_on_closing(self): self.ensure_one() tax_payable_accounts = self.env['account.tax.group'].search([ ('company_id', '=', self.company_id.id), ]).tax_payable_account_id payable_lines = self.line_ids.filtered(lambda line: line.account_id in tax_payable_accounts) return self.currency_id.round(-sum(payable_lines.mapped('balance'))) def _action_tax_to_pay_wizard(self): # hook for l10n tax payment wizard return self.action_open_tax_report() def action_open_tax_report(self): action = self.env["ir.actions.actions"]._for_xml_id("at_accounting.action_account_report_gt") if not self.tax_closing_report_id: raise UserError(_("You can't open a tax report from a move without a VAT closing date.")) options = self._get_tax_closing_report_options(self.company_id, self.fiscal_position_id, self.tax_closing_report_id, self.date) # Pass options in context and set ignore_session: true to prevent using session options action.update({'params': {'options': options, 'ignore_session': True}}) return action def _close_tax_period(self, report, options): """ Closes tax closing entries. The tax closing activities on them will be marked done, and the next tax closing entry will be generated or updated (if already existing). Also, a pdf of the tax report at the time of closing will be posted in the chatter of each move. The tax lock date of each move's company will be set to the move's date in case no other draft tax closing move exists for that company (whatever their foreign VAT fiscal position) before or at that date, meaning that all the tax closings have been performed so far. """ self.ensure_one() if not self.env.user.has_group('account.group_account_manager'): raise UserError(_('Only Billing Administrators are allowed to change lock dates!')) report = self.tax_closing_report_id options = self._get_tax_closing_report_options(self.company_id, self.fiscal_position_id, report, self.date) if not self.fiscal_position_id and (not self.company_id.tax_lock_date or self.date > self.company_id.tax_lock_date): self.company_id.sudo().tax_lock_date = self.date self.env['account.report']._generate_default_external_values(options['date']['date_from'], options['date']['date_to'], True) sender_company = report._get_sender_company_for_export(options) company_ids = report.get_report_company_ids(options) if sender_company == self.company_id: depending_closings = self.env['account.tax.report.handler']._get_tax_closing_entries_for_closed_period(report, options, self.env['res.company'].browse(company_ids), posted_only=False) - self depending_closings_to_post = depending_closings.filtered(lambda x: x.state == 'draft') if depending_closings_to_post: depending_action = self.env["ir.actions.actions"]._for_xml_id("account.action_move_journal_line") depending_action = clean_action(depending_action, env=self.env) if len(depending_closings_to_post) == 1: depending_action['views'] = [(self.env.ref('account.view_move_form').id, 'form')] depending_action['res_id'] = depending_closings_to_post.id else: depending_action['domain'] = [('id', 'in', depending_closings_to_post.ids)] depending_action['context'] = dict(ast.literal_eval(depending_action['context'])) depending_action['context'].pop('search_default_posted', None) # In case of dependent moves, we will raise an error that will be caught in the action_post method. # When the exception is caught, a component will inform the user that there are some dependent moves # to be posted and provide a link to these moves. raise TaxClosingNonPostedDependingMovesError(depending_action) # Generate the carryover values. report.with_context(allowed_company_ids=company_ids)._generate_carryover_external_values(options) # Post the message with the attachments (PDF of the report, and possibly an additional export file) attachments = self._get_vat_report_attachments(report, options) subject = _( "Vat closing from %(date_from)s to %(date_to)s", date_from=format_date(self.env, options['date']['date_from']), date_to=format_date(self.env, options['date']['date_to']), ) self.with_context(no_new_invoice=True).message_post(body=self.ref, subject=subject, attachments=attachments) # Log a note on depending closings, redirecting to the main one for closing_move in depending_closings: closing_move.message_post( body=Markup("%s") % _("The attachments of the tax report can be found on the closing entry of the representative company.", self.id), ) # End activity activity = self.company_id._get_tax_closing_reminder_activity(report.id, self.date, self.fiscal_position_id.id) if activity: activity.action_done() # Generate next activity self.company_id._generate_tax_closing_reminder_activity(self.tax_closing_report_id, self.date + relativedelta(days=1), self.fiscal_position_id if self.fiscal_position_id.foreign_vat else None) self._close_tax_period_create_activities() def _close_tax_period_create_activities(self): mat_to_send_xml_id = 'at_accounting.mail_activity_type_tax_report_to_be_sent' mat_to_send = self.env.ref(mat_to_send_xml_id, raise_if_not_found=False) if not mat_to_send: # As this is introduced in stable, we ensure data exists by creating them on the fly if needed mat_to_send = self.env['mail.activity.type'].sudo()._load_records([{ 'xml_id': mat_to_send_xml_id, 'noupdate': False, 'values': { 'name': 'Tax Report Ready', 'summary': 'Tax report is ready to be sent to the administration', 'category': 'tax_report', 'delay_count': '0', 'delay_unit': 'days', 'delay_from': 'current_date', 'res_model': 'account.move', 'chaining_type': 'suggest', } }]) mat_to_pay_xml_id = 'at_accounting.mail_activity_type_tax_report_to_pay' mat_to_pay = self.env.ref(mat_to_pay_xml_id, raise_if_not_found=False) act_user = mat_to_send.default_user_id if act_user and not (self.company_id in act_user.company_ids and act_user.has_group('account.group_account_manager')): act_user = self.env['res.users'] moves_without_send_activity = self.filtered_domain([ '|', ('activity_ids', '=', False), ('activity_ids', 'not any', [('activity_type_id.id', '=', mat_to_send.id)]), ]) for move in moves_without_send_activity: period_start, period_end = move.company_id._get_tax_closing_period_boundaries(move.date, move.tax_closing_report_id) period_desc = move.company_id._get_tax_closing_move_description(move.company_id._get_tax_periodicity(move.tax_closing_report_id), period_start, period_end, move.fiscal_position_id, move.tax_closing_report_id) move.with_context(mail_activity_quick_update=True).activity_schedule( act_type_xmlid=mat_to_send_xml_id, summary=_("Send tax report: %s", period_desc), date_deadline=fields.Date.context_today(move), user_id=act_user.id or self.env.user.id, ) if mat_to_pay and mat_to_pay not in move.activity_ids.activity_type_id and move._get_tax_to_pay_on_closing() > 0: move.with_context(mail_activity_quick_update=True).activity_schedule( act_type_xmlid=mat_to_pay_xml_id, summary=_("Pay tax: %s", period_desc), date_deadline=fields.Date.context_today(move), user_id=act_user.id or self.env.user.id, ) def refresh_tax_entry(self): for move in self.filtered(lambda m: m.tax_closing_report_id and m.state == 'draft'): report = move.tax_closing_report_id options = move._get_tax_closing_report_options(move.company_id, move.fiscal_position_id, report, move.date) self.env[report.custom_handler_model_name or 'account.generic.tax.report.handler']._generate_tax_closing_entries(report, options, closing_moves=move) @api.model def _get_tax_closing_report_options(self, company, fiscal_position, report, date_inside_period): _dummy, date_to = company._get_tax_closing_period_boundaries(date_inside_period, report) # In case the company submits its report in different regions, a closing entry # is made for each fiscal position defining a foreign VAT. # We hence need to make sure to select a tax report in the right country when opening # the report (in case there are many, we pick the first one available; it doesn't impact the closing) if fiscal_position and fiscal_position.foreign_vat: fpos_option = fiscal_position.id report_country = fiscal_position.country_id else: fpos_option = 'domestic' report_country = company.account_fiscal_country_id options = { 'date': { 'date_to': fields.Date.to_string(date_to), 'filter': 'custom_tax_period', 'mode': 'range', }, 'selected_variant_id': report.id, 'sections_source_id': report.id, 'fiscal_position': fpos_option, 'tax_unit': 'company_only', } if report.filter_multi_company == 'tax_units': # Enforce multicompany if the closing is done for a tax unit candidate_tax_unit = company.account_tax_unit_ids.filtered(lambda x: x.country_id == report_country) if candidate_tax_unit: options['tax_unit'] = candidate_tax_unit.id company_ids = candidate_tax_unit.company_ids.ids else: same_vat_branches = self.env.company._get_branches_with_same_vat() # Consider the one with the least number of parents (highest in hierarchy) as the active company, coming first company_ids = same_vat_branches.sorted(lambda x: len(x.parent_ids)).ids else: company_ids = self.env.company.ids return report.with_context(allowed_company_ids=company_ids).get_options(previous_options=options) def _get_vat_report_attachments(self, report, options): # Fetch pdf pdf_data = report.export_to_pdf(options) return [(pdf_data['file_name'], pdf_data['file_content'])] def _compute_tax_closing_alert(self): for move in self: move.tax_closing_alert = ( move.state == 'posted' and move.tax_closing_report_id and move.company_id.tax_lock_date and move.company_id.tax_lock_date < move.date ) asset_id = fields.Many2one('account.asset', string='Asset', index=True, ondelete='cascade', copy=False, domain="[('company_id', '=', company_id)]") asset_remaining_value = fields.Monetary(string='Depreciable Value', compute='_compute_depreciation_cumulative_value') asset_depreciated_value = fields.Monetary(string='Cumulative Depreciation', compute='_compute_depreciation_cumulative_value') # true when this move is the result of the changing of value of an asset asset_value_change = fields.Boolean() # how many days of depreciation this entry corresponds to asset_number_days = fields.Integer(string="Number of days", copy=False) # deprecated asset_depreciation_beginning_date = fields.Date(string="Date of the beginning of the depreciation", copy=False) # technical field stating when the depreciation associated with this entry has begun depreciation_value = fields.Monetary( string="Depreciation", compute="_compute_depreciation_value", inverse="_inverse_depreciation_value", store=True, ) asset_ids = fields.One2many('account.asset', string='Assets', compute="_compute_asset_ids") asset_id_display_name = fields.Char( compute="_compute_asset_ids") # just a button label. That's to avoid a plethora of different buttons defined in xml count_asset = fields.Integer(compute="_compute_asset_ids") draft_asset_exists = fields.Boolean(compute="_compute_asset_ids") asset_move_type = fields.Selection( selection=[ ('depreciation', 'Depreciation'), ('sale', 'Sale'), ('purchase', 'Purchase'), ('disposal', 'Disposal'), ('negative_revaluation', 'Negative revaluation'), ('positive_revaluation', 'Positive revaluation'), ], string='Asset Move Type', compute='_compute_asset_move_type', store=True, copy=False, ) # ------------------------------------------------------------------------- # COMPUTE METHODS # ------------------------------------------------------------------------- @api.depends('asset_id', 'depreciation_value', 'asset_id.total_depreciable_value', 'asset_id.already_depreciated_amount_import', 'state') def _compute_depreciation_cumulative_value(self): self.asset_depreciated_value = 0 self.asset_remaining_value = 0 # make sure to protect all the records being assigned, because the # assignments invoke method write() on non-protected records, which may # cause an infinite recursion in case method write() needs to read one # of these fields (like in case of a base automation) fields = [self._fields['asset_remaining_value'], self._fields['asset_depreciated_value']] with self.env.protecting(fields, self.asset_id.depreciation_move_ids): for asset in self.asset_id: depreciated = 0 remaining = asset.total_depreciable_value - asset.already_depreciated_amount_import for move in asset.depreciation_move_ids.sorted(lambda mv: (mv.date, mv._origin.id)): if move.state != 'cancel': remaining -= move.depreciation_value depreciated += move.depreciation_value move.asset_remaining_value = remaining move.asset_depreciated_value = depreciated @api.depends('line_ids.balance') def _compute_depreciation_value(self): for move in self: asset = move.asset_id or move.reversed_entry_id.asset_id # reversed moves are created before being assigned to the asset if asset: account_internal_group = 'expense' asset_depreciation = sum( move.line_ids.filtered(lambda l: l.account_id.internal_group == account_internal_group or l.account_id == asset.account_depreciation_expense_id).mapped( 'balance') ) # Special case of closing entry - only disposed assets of type 'purchase' should match this condition # The condition on len(move.line_ids) is to avoid the case where there is only one depreciation move, and it is not a disposal move # The condition will be matched because a disposal move from a disposal move will always have more than 2 lines, unlike a normal depreciation move if any( line.account_id == asset.account_asset_id and float_compare(-line.balance, asset.original_value, precision_rounding=asset.currency_id.rounding) == 0 for line in move.line_ids ) and len(move.line_ids) > 2: asset_depreciation = ( asset.original_value - asset.salvage_value - ( move.line_ids[1].debit if asset.original_value > 0 else move.line_ids[1].credit ) * (-1 if asset.original_value < 0 else 1) ) else: asset_depreciation = 0 move.depreciation_value = asset_depreciation @api.depends('asset_id', 'asset_ids') def _compute_asset_move_type(self): for move in self: if move.asset_ids: move.asset_move_type = 'positive_revaluation' if move.asset_ids.parent_id else 'purchase' elif not move.asset_move_type or not move.asset_id: move.asset_move_type = False # ------------------------------------------------------------------------- # INVERSE METHODS # ------------------------------------------------------------------------- def _inverse_depreciation_value(self): for move in self: asset = move.asset_id amount = abs(move.depreciation_value) account = asset.account_depreciation_expense_id move.write({'line_ids': [ Command.update(line.id, { 'balance': amount if line.account_id == account else -amount, }) for line in move.line_ids ]}) # ------------------------------------------------------------------------- # CONSTRAINT METHODS # ------------------------------------------------------------------------- @api.constrains('state', 'asset_id') def _constrains_check_asset_state(self): for move in self.filtered(lambda mv: mv.asset_id): asset_id = move.asset_id if asset_id.state == 'draft' and move.state == 'posted': raise ValidationError( _("You can't post an entry related to a draft asset. Please post the asset before.")) def _post(self, soft=True): # OVERRIDE posted = super()._post(soft) # log the post of a depreciation posted._log_depreciation_asset() # look for any asset to create, in case we just posted a bill on an account # configured to automatically create assets posted.sudo()._auto_create_asset() return posted def _reverse_moves(self, default_values_list=None, cancel=False): if default_values_list is None: default_values_list = [{} for _i in self] for move, default_values in zip(self, default_values_list): # Report the value of this move to the next draft move or create a new one if move.asset_id: # Recompute the status of the asset for all depreciations posted after the reversed entry first_draft = min(move.asset_id.depreciation_move_ids.filtered(lambda m: m.state == 'draft'), key=lambda m: m.date, default=None) if first_draft: # If there is a draft, simply move/add the depreciation amount here first_draft.depreciation_value += move.depreciation_value elif move.asset_id.state != 'close': # If there was no draft move left, create one. # Unless the asset is being closed, then the closing move # takes care of balancing the asset. last_date = max(move.asset_id.depreciation_move_ids.mapped('date')) method_period = move.asset_id.method_period self.create(self._prepare_move_for_asset_depreciation({ 'asset_id': move.asset_id, 'amount': move.depreciation_value, 'depreciation_beginning_date': last_date + ( relativedelta(months=1) if method_period == "1" else relativedelta(years=1)), 'date': last_date + ( relativedelta(months=1) if method_period == "1" else relativedelta(years=1)), 'asset_number_days': 0 })) msg = _('Depreciation entry %(name)s reversed (%(value)s)', name=move.name, value=formatLang(self.env, move.depreciation_value, currency_obj=move.company_id.currency_id)) move.asset_id.message_post(body=msg) default_values['asset_id'] = move.asset_id.id default_values['asset_number_days'] = -move.asset_number_days default_values['asset_depreciation_beginning_date'] = default_values.get('date', move.date) return super(AccountMove, self)._reverse_moves(default_values_list, cancel) def button_cancel(self): # OVERRIDE res = super(AccountMove, self).button_cancel() self.env['account.asset'].sudo().search([('original_move_line_ids.move_id', 'in', self.ids)]).write( {'active': False}) return res def button_draft(self): for move in self: if any(asset_id.state != 'draft' for asset_id in move.asset_ids): raise UserError(_('You cannot reset to draft an entry related to a posted asset')) # Remove any draft asset that could be linked to the account move being reset to draft move.asset_ids.filtered(lambda x: x.state == 'draft').unlink() return super(AccountMove, self).button_draft() def _log_depreciation_asset(self): for move in self.filtered(lambda m: m.asset_id): asset = move.asset_id msg = _('Depreciation entry %(name)s posted (%(value)s)', name=move.name, value=formatLang(self.env, move.depreciation_value, currency_obj=move.company_id.currency_id)) asset.message_post(body=msg) def _auto_create_asset(self): create_list = [] invoice_list = [] auto_validate = [] for move in self: if not move.is_invoice(): continue for move_line in move.line_ids: if ( move_line.account_id and (move_line.account_id.can_create_asset) and move_line.account_id.create_asset != "no" and not (move_line.currency_id or move.currency_id).is_zero(move_line.price_total) and not move_line.asset_ids and not move_line.tax_line_id and move_line.price_total > 0 and not (move.move_type in ('out_invoice', 'out_refund') and move_line.account_id.internal_group == 'asset') ): if not move_line.name: if move_line.product_id: move_line.name = move_line.product_id.display_name else: raise UserError( _('Journal Items of %(account)s should have a label in order to generate an asset', account=move_line.account_id.display_name)) if move_line.account_id.multiple_assets_per_line: # decimal quantities are not supported, quantities are rounded to the lower int units_quantity = max(1, int(move_line.quantity)) else: units_quantity = 1 model_ids = move_line.account_id.asset_model_ids vals = { 'name': move_line.name, 'company_id': move_line.company_id.id, 'currency_id': move_line.company_currency_id.id, 'analytic_distribution': move_line.analytic_distribution, 'original_move_line_ids': [(6, False, move_line.ids)], 'state': 'draft', 'acquisition_date': move.invoice_date if not move.reversed_entry_id else move.reversed_entry_id.invoice_date, } for model_id in model_ids or [None]: if model_id: vals['model_id'] = model_id.id auto_validate.extend([move_line.account_id.create_asset == 'validate'] * units_quantity) invoice_list.extend([move] * units_quantity) for i in range(1, units_quantity + 1): if units_quantity > 1: vals['name'] = _("%(move_line)s (%(current)s of %(total)s)", move_line=move_line.name, current=i, total=units_quantity) create_list.extend([vals.copy()]) assets = self.env['account.asset'].with_context({}).create(create_list) for asset, vals, invoice, validate in zip(assets, create_list, invoice_list, auto_validate): if 'model_id' in vals: asset._onchange_model_id() if validate: asset.validate() if invoice: asset.message_post(body=_('Asset created from invoice: %s', invoice._get_html_link())) asset._post_non_deductible_tax_value() return assets @api.model def _prepare_move_for_asset_depreciation(self, vals): missing_fields = {'asset_id', 'amount', 'depreciation_beginning_date', 'date', 'asset_number_days'} - set(vals) if missing_fields: raise UserError(_('Some fields are missing %s', ', '.join(missing_fields))) asset = vals['asset_id'] analytic_distribution = asset.analytic_distribution depreciation_date = vals.get('date', fields.Date.context_today(self)) company_currency = asset.company_id.currency_id current_currency = asset.currency_id prec = company_currency.decimal_places amount_currency = vals['amount'] amount = current_currency._convert(amount_currency, company_currency, asset.company_id, depreciation_date) # Keep the partner on the original invoice if there is only one partner = asset.original_move_line_ids.mapped('partner_id') partner = partner[:1] if len(partner) <= 1 else self.env['res.partner'] name = _("%s: Depreciation", asset.name) move_line_1 = { 'name': name, 'partner_id': partner.id, 'account_id': asset.account_depreciation_id.id, 'debit': 0.0 if float_compare(amount, 0.0, precision_digits=prec) > 0 else -amount, 'credit': amount if float_compare(amount, 0.0, precision_digits=prec) > 0 else 0.0, 'currency_id': current_currency.id, 'amount_currency': -amount_currency, } move_line_2 = { 'name': name, 'partner_id': partner.id, 'account_id': asset.account_depreciation_expense_id.id, 'credit': 0.0 if float_compare(amount, 0.0, precision_digits=prec) > 0 else -amount, 'debit': amount if float_compare(amount, 0.0, precision_digits=prec) > 0 else 0.0, 'currency_id': current_currency.id, 'amount_currency': amount_currency, } # Only set the 'analytic_distribution' key if there is an analytic distribution on the asset. # Otherwise, it prevents the computation of the analytic distribution. if analytic_distribution: move_line_1['analytic_distribution'] = analytic_distribution move_line_2['analytic_distribution'] = analytic_distribution move_vals = { 'partner_id': partner.id, 'date': depreciation_date, 'journal_id': asset.journal_id.id, 'line_ids': [(0, 0, move_line_1), (0, 0, move_line_2)], 'asset_id': asset.id, 'ref': name, 'asset_depreciation_beginning_date': vals['depreciation_beginning_date'], 'asset_number_days': vals['asset_number_days'], 'asset_value_change': vals.get('asset_value_change', False), 'move_type': 'entry', 'currency_id': current_currency.id, 'asset_move_type': vals.get('asset_move_type', 'depreciation'), 'company_id': asset.company_id.id, } return move_vals @api.depends('line_ids.asset_ids') def _compute_asset_ids(self): for record in self: record.asset_ids = record.line_ids.asset_ids record.count_asset = len(record.asset_ids) record.asset_id_display_name = _('Asset') record.draft_asset_exists = bool(record.asset_ids.filtered(lambda x: x.state == "draft")) def open_asset_view(self): return self.asset_id.open_asset(['form']) def action_open_asset_ids(self): return self.asset_ids.open_asset(['list', 'form']) class AccountMoveLine(models.Model): _name = "account.move.line" _inherit = "account.move.line" move_attachment_ids = fields.One2many('ir.attachment', compute='_compute_attachment') # Deferred management fields deferred_start_date = fields.Date( string="Start Date", compute='_compute_deferred_start_date', store=True, readonly=False, index='btree_not_null', copy=False, help="Date at which the deferred expense/revenue starts" ) deferred_end_date = fields.Date( string="End Date", index='btree_not_null', copy=False, help="Date at which the deferred expense/revenue ends" ) has_deferred_moves = fields.Boolean(compute='_compute_has_deferred_moves') has_abnormal_deferred_dates = fields.Boolean(compute='_compute_has_abnormal_deferred_dates') def _order_to_sql(self, order, query, alias=None, reverse=False): sql_order = super()._order_to_sql(order, query, alias, reverse) preferred_aml_residual_value = self._context.get('preferred_aml_value') preferred_aml_currency_id = self._context.get('preferred_aml_currency_id') if preferred_aml_residual_value and preferred_aml_currency_id and order == self._order: currency = self.env['res.currency'].browse(preferred_aml_currency_id) # using round since currency.round(55.55) = 55.550000000000004 preferred_aml_residual_value = round(preferred_aml_residual_value, currency.decimal_places) sql_residual_currency = self._field_to_sql(alias or self._table, 'amount_residual_currency', query) sql_currency = self._field_to_sql(alias or self._table, 'currency_id', query) return SQL( "ROUND(%(residual_currency)s, %(decimal_places)s) = %(value)s " "AND %(currency)s = %(currency_id)s DESC, %(order)s", residual_currency=sql_residual_currency, decimal_places=currency.decimal_places, value=preferred_aml_residual_value, currency=sql_currency, currency_id=currency.id, order=sql_order, ) return sql_order def copy_data(self, default=None): data_list = super().copy_data(default=default) for line, values in zip(self, data_list): if 'move_reverse_cancel' in self._context: values['deferred_start_date'] = line.deferred_start_date values['deferred_end_date'] = line.deferred_end_date return data_list def write(self, vals): """ Prevent changing the account of a move line when there are already deferral entries. """ if 'account_id' in vals: for line in self: if ( line.has_deferred_moves and line.deferred_start_date and line.deferred_end_date and vals['account_id'] != line.account_id.id ): raise UserError(_( "You cannot change the account for a deferred line in %(move_name)s if it has already been deferred.", move_name=line.move_id.display_name )) return super().write(vals) # ============================= START - Deferred management ==================================== def _compute_has_deferred_moves(self): for line in self: line.has_deferred_moves = line.move_id.deferred_move_ids @api.depends('deferred_start_date', 'deferred_end_date') def _compute_has_abnormal_deferred_dates(self): # In the deferred computations, we always assume that both the start and end date are inclusive # E.g: 1st January -> 31st December is *exactly* 1 year = 12 months # However, the user may instead put 1st January -> 1st January of next year which is then # 12 months + 1/30 month = 12.03 months which may result in odd amounts when deferrals are created # For this reason, we alert the user if we detect such a case # Other cases were the number of months is not round should not be handled. for line in self: line.has_abnormal_deferred_dates = ( line.deferred_start_date and line.deferred_end_date and float_compare( self.env['account.move']._get_deferred_diff_dates(line.deferred_start_date, line.deferred_end_date + relativedelta(days=1)) % 1, # end date is included 1 / 30, precision_digits=2 ) == 0 ) def _has_deferred_compatible_account(self): self.ensure_one() return ( self.move_id.is_purchase_document() and self.account_id.account_type in ('expense', 'expense_depreciation', 'expense_direct_cost') ) or ( self.move_id.is_sale_document() and self.account_id.account_type in ('income', 'income_other') ) @api.onchange('deferred_start_date') def _onchange_deferred_start_date(self): if not self._has_deferred_compatible_account(): self.deferred_start_date = False @api.onchange('deferred_end_date') def _onchange_deferred_end_date(self): if not self._has_deferred_compatible_account(): self.deferred_end_date = False @api.depends('deferred_end_date', 'move_id.invoice_date', 'move_id.state') def _compute_deferred_start_date(self): for line in self: if not line.deferred_start_date and line.move_id.invoice_date and line.deferred_end_date: line.deferred_start_date = line.move_id.invoice_date @api.constrains('deferred_start_date', 'deferred_end_date', 'account_id') def _check_deferred_dates(self): for line in self: if line.deferred_start_date and not line.deferred_end_date: raise UserError(_("You cannot create a deferred entry with a start date but no end date.")) elif line.deferred_start_date and line.deferred_end_date and line.deferred_start_date > line.deferred_end_date: raise UserError(_("You cannot create a deferred entry with a start date later than the end date.")) @api.model def _get_deferred_ends_of_month(self, start_date, end_date): """ :return: a list of dates corresponding to the end of each month between start_date and end_date. See test_get_ends_of_month for examples. """ dates = [] while start_date <= end_date: start_date = start_date + relativedelta(day=31) # Go to end of month dates.append(start_date) start_date = start_date + relativedelta(days=1) # Go to first day of next month return dates def _get_deferred_periods(self): """ :return: a list of tuples (start_date, end_date) during which the deferred expense/revenue is spread. If there is only one period containing the move date, it means that we don't need to defer the expense/revenue since the invoice deferral and its deferred entry will be created on the same day and will thus cancel each other. """ self.ensure_one() periods = [ (max(self.deferred_start_date, date.replace(day=1)), min(date, self.deferred_end_date), 'current') for date in self._get_deferred_ends_of_month(self.deferred_start_date, self.deferred_end_date) ] if not periods or len(periods) == 1 and periods[0][0].replace(day=1) == self.date.replace(day=1): return [] else: return periods @api.model def _get_deferred_amounts_by_line_values(self, line): return { 'account_id': line['account_id'], # line either be a dict with ids (coming from SQL query), or a real account.move.line object 'product_id': line['product_id'] if isinstance(line, dict) else line['product_id'].id, 'product_category_id': line['product_category_id'] if isinstance(line, dict) else line['product_category_id'].id, 'balance': line['balance'], 'move_id': line['move_id'], } @api.model def _get_deferred_lines_values(self, account_id, balance, ref, analytic_distribution, line=None): return { 'account_id': account_id, # line either be a dict with ids (coming from SQL query), or a real account.move.line object 'product_id': line['product_id'] if isinstance(line, dict) else line['product_id'].id, 'product_category_id': line['product_category_id'] if isinstance(line, dict) else line['product_category_id'].id, 'balance': balance, 'name': ref, 'analytic_distribution': analytic_distribution, } # ============================= END - Deferred management ==================================== def _get_computed_taxes(self): if self.move_id.deferred_original_move_ids: # If this line is part of a deferral move, do not (re)calculate its taxes automatically. # Doing so might unvoluntarily impact the tax report in deferral moves (if a default tax is set on the account). return self.tax_ids return super()._get_computed_taxes() def _compute_attachment(self): for record in self: record.move_attachment_ids = self.env['ir.attachment'].search(expression.OR(record._get_attachment_domains())) def action_reconcile(self): """ This function is called by the 'Reconcile' button of account.move.line's list view. It performs reconciliation between the selected lines. - If the reconciliation can be done directly we do it silently - Else, if a write-off is required we open the wizard to let the client enter required information """ wizard = self.env['account.reconcile.wizard'].with_context( active_model='account.move.line', active_ids=self.ids, ).new({}) return wizard._action_open_wizard() if (wizard.is_write_off_required or wizard.force_partials) else wizard.reconcile() def _get_predict_postgres_dictionary(self): lang = self._context.get('lang') and self._context.get('lang')[:2] return {'fr': 'french'}.get(lang, 'english') @api.model def _build_predictive_query(self, move_id, additional_domain=None): move_query = self.env['account.move']._where_calc([ ('move_type', '=', move_id.move_type), ('state', '=', 'posted'), ('partner_id', '=', move_id.partner_id.id), ('company_id', '=', move_id.journal_id.company_id.id or self.env.company.id), ]) move_query.order = 'account_move.invoice_date' move_query.limit = int(self.env["ir.config_parameter"].sudo().get_param( "account.bill.predict.history.limit", '100', )) return self.env['account.move.line']._where_calc([ ('move_id', 'in', move_query), ('display_type', '=', 'product'), ] + (additional_domain or [])) @api.model def _predicted_field(self, name, partner_id, field, query=None, additional_queries=None): r"""Predict the most likely value based on the previous history. This method uses postgres tsvector in order to try to deduce a field of an invoice line based on the text entered into the name (description) field and the partner linked. We only limit the search on the previous 100 entries, which according to our tests bore the best results. However this limit parameter is configurable by creating a config parameter with the key: account.bill.predict.history.limit For information, the tests were executed with a dataset of 40 000 bills from a live database, We split the dataset in 2, removing the 5000 most recent entries and we tried to use this method to guess the account of this validation set based on the previous entries. The result is roughly 90% of success. :param field (str): the sql column that has to be predicted. /!\ it is injected in the query without any checks. :param query (osv.Query): the query object on account.move.line that is used to do the ranking, containing the right domain, limit, etc. If it is omitted, a default query is used. :param additional_queries (list): can be used in addition to the default query on account.move.line to fetch data coming from other tables, to have starting values for instance. /!\ it is injected in the query without any checks. """ if not name or not partner_id: return False psql_lang = self._get_predict_postgres_dictionary() description = name + ' account_move_line' # give more priority to main query than additional queries parsed_description = re.sub(r"[*&()|!':<>=%/~@,.;$\[\]]+", " ", description) parsed_description = ' | '.join(parsed_description.split()) try: main_source = (query if query is not None else self._build_predictive_query(self.move_id)).select( SQL("%s AS prediction", field), SQL( "setweight(to_tsvector(%s, account_move_line.name), 'B') || setweight(to_tsvector('simple', 'account_move_line'), 'A') AS document", psql_lang ), ) if "(" in field.code: # aggregate function main_source = SQL("%s %s", main_source, SQL("GROUP BY account_move_line.id, account_move_line.name, account_move_line.partner_id")) self.env.cr.execute(SQL(""" WITH account_move_line AS MATERIALIZED (%(account_move_line)s), source AS (%(source)s), ranking AS ( SELECT prediction, ts_rank(source.document, query_plain) AS rank FROM source, to_tsquery(%(lang)s, %(description)s) query_plain WHERE source.document @@ query_plain ) SELECT prediction, MAX(rank) AS ranking, COUNT(*) FROM ranking GROUP BY prediction ORDER BY ranking DESC, count DESC LIMIT 2 """, account_move_line=self._build_predictive_query(self.move_id).select(SQL('*')), source=SQL('(%s)', SQL(') UNION ALL (').join([main_source] + (additional_queries or []))), lang=psql_lang, description=parsed_description, )) result = self.env.cr.dictfetchall() if result: # Only confirm the prediction if it's at least 10% better than the second one if len(result) > 1 and result[0]['ranking'] < 1.1 * result[1]['ranking']: return False return result[0]['prediction'] except Exception: # In case there is an error while parsing the to_tsquery (wrong character for example) # We don't want to have a blocking traceback, instead return False _logger.exception('Error while predicting invoice line fields') return False def _predict_taxes(self): field = SQL('array_agg(account_move_line__tax_rel__tax_ids.id ORDER BY account_move_line__tax_rel__tax_ids.id)') query = self._build_predictive_query(self.move_id) query.left_join('account_move_line', 'id', 'account_move_line_account_tax_rel', 'account_move_line_id', 'tax_rel') query.left_join('account_move_line__tax_rel', 'account_tax_id', 'account_tax', 'id', 'tax_ids') query.add_where('account_move_line__tax_rel__tax_ids.active IS NOT FALSE') predicted_tax_ids = self._predicted_field(self.name, self.partner_id, field, query) if predicted_tax_ids == [None]: return False if predicted_tax_ids is not False and set(predicted_tax_ids) != set(self.tax_ids.ids): return predicted_tax_ids return False @api.model def _predict_specific_tax(self, move, name, partner, amount_type, amount, type_tax_use): field = SQL('array_agg(account_move_line__tax_rel__tax_ids.id ORDER BY account_move_line__tax_rel__tax_ids.id)') query = self._build_predictive_query(move) query.left_join('account_move_line', 'id', 'account_move_line_account_tax_rel', 'account_move_line_id', 'tax_rel') query.left_join('account_move_line__tax_rel', 'account_tax_id', 'account_tax', 'id', 'tax_ids') query.add_where(""" account_move_line__tax_rel__tax_ids.active IS NOT FALSE AND account_move_line__tax_rel__tax_ids.amount_type = %s AND account_move_line__tax_rel__tax_ids.type_tax_use = %s AND account_move_line__tax_rel__tax_ids.amount = %s """, (amount_type, type_tax_use, amount)) return self._predicted_field(name, partner, field, query) def _predict_product(self): predict_product = int(self.env['ir.config_parameter'].sudo().get_param('account_predictive_bills.predict_product', '1')) if predict_product and self.company_id.predict_bill_product: query = self._build_predictive_query(self.move_id, ['|', ('product_id', '=', False), ('product_id.active', '=', True)]) predicted_product_id = self._predicted_field(self.name, self.partner_id, SQL('account_move_line.product_id'), query) if predicted_product_id and predicted_product_id != self.product_id.id: return predicted_product_id return False def _predict_account(self): field = SQL('account_move_line.account_id') if self.move_id.is_purchase_document(True): excluded_group = 'income' else: excluded_group = 'expense' account_query = self.env['account.account']._where_calc([ *self.env['account.account']._check_company_domain(self.move_id.company_id or self.env.company), ('deprecated', '=', False), ('internal_group', 'not in', (excluded_group, 'off')), ('account_type', 'not in', ('liability_payable', 'asset_receivable')), ]) account_name = self.env['account.account']._field_to_sql('account_account', 'name') psql_lang = self._get_predict_postgres_dictionary() additional_queries = [SQL(account_query.select( SQL("account_account.id AS account_id"), SQL("setweight(to_tsvector(%(psql_lang)s, %(account_name)s), 'B') AS document", psql_lang=psql_lang, account_name=account_name), ))] query = self._build_predictive_query(self.move_id, [('account_id', 'in', account_query)]) predicted_account_id = self._predicted_field(self.name, self.partner_id, field, query, additional_queries) if predicted_account_id and predicted_account_id != self.account_id.id: return predicted_account_id return False @api.onchange('name') def _onchange_name_predictive(self): if ((self.move_id.quick_edit_mode or self.move_id.move_type == 'in_invoice') and self.name and self.display_type == 'product' and not self.env.context.get('disable_onchange_name_predictive', False)): if not self.product_id: predicted_product_id = self._predict_product() if predicted_product_id: # We only update the price_unit, tax_ids and name in case they evaluate to False protected_fields = ['price_unit', 'tax_ids', 'name'] to_protect = [self._fields[fname] for fname in protected_fields if self[fname]] with self.env.protecting(to_protect, self): self.product_id = predicted_product_id # In case no product has been set, the account and taxes # will not depend on any product and can thus be predicted if not self.product_id: # Predict account. predicted_account_id = self._predict_account() if predicted_account_id: self.account_id = predicted_account_id # Predict taxes predicted_tax_ids = self._predict_taxes() if predicted_tax_ids: self.tax_ids = [Command.set(predicted_tax_ids)] def _read_group_select(self, aggregate_spec, query): # Enable to use HAVING clause that sum rounded values depending on the # currency precision settings. Limitation: we only handle a having # clause of one element with that specific method :sum_rounded. fname, __, func = models.parse_read_group_spec(aggregate_spec) if func != 'sum_rounded': return super()._read_group_select(aggregate_spec, query) currency_alias = query.make_alias(self._table, 'currency_id') query.add_join('LEFT JOIN', currency_alias, 'res_currency', SQL( "%s = %s", self._field_to_sql(self._table, 'currency_id', query), SQL.identifier(currency_alias, 'id'), )) return SQL( 'SUM(ROUND(%s, %s))', self._field_to_sql(self._table, fname, query), self.env['res.currency']._field_to_sql(currency_alias, 'decimal_places', query), ) def _read_group_groupby(self, groupby_spec, query): # enable grouping by :abs_rounded on fields, which is useful when trying # to match positive and negative amounts if ':' in groupby_spec: fname, method = groupby_spec.split(':') if method == 'abs_rounded': # rounds with the used currency settings currency_alias = query.make_alias(self._table, 'currency_id') query.add_join('LEFT JOIN', currency_alias, 'res_currency', SQL( "%s = %s", self._field_to_sql(self._table, 'currency_id', query), SQL.identifier(currency_alias, 'id'), )) return SQL( 'ROUND(ABS(%s), %s)', self._field_to_sql(self._table, fname, query), self.env['res.currency']._field_to_sql(currency_alias, 'decimal_places', query), ) return super()._read_group_groupby(groupby_spec, query) asset_ids = fields.Many2many('account.asset', 'asset_move_line_rel', 'line_id', 'asset_id', string='Related Assets', copy=False) non_deductible_tax_value = fields.Monetary(compute='_compute_non_deductible_tax_value', currency_field='company_currency_id') def _get_computed_taxes(self): if self.move_id.asset_id: return self.tax_ids return super()._get_computed_taxes() def turn_as_asset(self): if len(self.company_id) != 1: raise UserError(_("All the lines should be from the same company")) if any(line.move_id.state == 'draft' for line in self): raise UserError(_("All the lines should be posted")) if any(account != self[0].account_id for account in self.mapped('account_id')): raise UserError(_("All the lines should be from the same account")) ctx = self.env.context.copy() ctx.update({ 'default_original_move_line_ids': [(6, False, self.env.context['active_ids'])], 'default_company_id': self.company_id.id, }) return { "name": _("Turn as an asset"), "type": "ir.actions.act_window", "res_model": "account.asset", "views": [[False, "form"]], "target": "current", "context": ctx, } @api.depends('tax_ids.invoice_repartition_line_ids') def _compute_non_deductible_tax_value(self): """ Handle the specific case of non deductible taxes, such as "50% Non Déductible - Frais de voiture (Prix Excl.)" in Belgium. """ non_deductible_tax_ids = self.tax_ids.invoice_repartition_line_ids.filtered( lambda line: line.repartition_type == 'tax' and not line.use_in_tax_closing ).tax_id res = {} if non_deductible_tax_ids: domain = [('move_id', 'in', self.move_id.ids)] tax_details_query = self._get_query_tax_details_from_domain(domain) self.flush_model() self._cr.execute(SQL( ''' SELECT tdq.base_line_id, SUM(tdq.tax_amount_currency) FROM (%(tax_details_query)s) AS tdq JOIN account_move_line aml ON aml.id = tdq.tax_line_id JOIN account_tax_repartition_line trl ON trl.id = tdq.tax_repartition_line_id WHERE tdq.base_line_id IN %(base_line_ids)s AND trl.use_in_tax_closing IS FALSE GROUP BY tdq.base_line_id ''', tax_details_query=tax_details_query, base_line_ids=tuple(self.ids), )) res = {row['base_line_id']: row['sum'] for row in self._cr.dictfetchall()} for record in self: record.non_deductible_tax_value = res.get(record._origin.id, 0.0)