From d783d0d08d79946b7aff1568c32a7fe398e0b013 Mon Sep 17 00:00:00 2001 From: git_admin Date: Tue, 28 Apr 2026 07:34:31 +0000 Subject: [PATCH] Tower: upload at_accounting 18.0.1.7 (via marketplace) --- addons/at_accounting/models/account_move.py | 1563 +++++++++++++++++++ 1 file changed, 1563 insertions(+) create mode 100644 addons/at_accounting/models/account_move.py diff --git a/addons/at_accounting/models/account_move.py b/addons/at_accounting/models/account_move.py new file mode 100644 index 0000000..7e4ffd7 --- /dev/null +++ b/addons/at_accounting/models/account_move.py @@ -0,0 +1,1563 @@ +import calendar +from contextlib import contextmanager +from dateutil.relativedelta import relativedelta +import logging +import math +import re +from odoo import fields, models, api, _, Command +from odoo.exceptions import UserError +from odoo.osv import expression +from odoo.tools import SQL, float_compare +import ast +from odoo.addons.account.models.exceptions import TaxClosingNonPostedDependingMovesError +from odoo.tools.misc import format_date +from odoo.tools import date_utils +from odoo.addons.web.controllers.utils import clean_action + +from markupsafe import Markup +from odoo.exceptions import UserError, ValidationError +from odoo.tools.misc import formatLang + + + +_logger = logging.getLogger(__name__) + + +DEFERRED_DATE_MIN = '1900-01-01' +DEFERRED_DATE_MAX = '9999-12-31' + + +class AccountMove(models.Model): + _inherit = "account.move" + + + def _get_invoice_in_payment_state(self): + return 'in_payment' + + payment_state_before_switch = fields.Char(string="Payment State Before Switch", copy=False) + + deferred_move_ids = fields.Many2many( + string="Deferred Entries", + comodel_name='account.move', + relation='account_move_deferred_rel', + column1='original_move_id', + column2='deferred_move_id', + help="The deferred entries created by this invoice", + copy=False, + ) + deferred_original_move_ids = fields.Many2many( + string="Original Invoices", + comodel_name='account.move', + relation='account_move_deferred_rel', + column1='deferred_move_id', + column2='original_move_id', + help="The original invoices that created the deferred entries", + copy=False, + ) + deferred_entry_type = fields.Selection( + string="Deferred Entry Type", + selection=[ + ('expense', 'Deferred Expense'), + ('revenue', 'Deferred Revenue'), + ], + compute='_compute_deferred_entry_type', + copy=False, + ) + + signing_user = fields.Many2one( + string='Signer', + comodel_name='res.users', + compute='_compute_signing_user', store=True, + copy=False, + ) + show_signature_area = fields.Boolean(compute='_compute_signature') + signature = fields.Binary(compute='_compute_signature') # can't be `related`: the sign module might not be there + # used for VAT closing, containing the end date of the period this entry closes + tax_closing_report_id = fields.Many2one(comodel_name='account.report') + # technical field used to know whether to show the tax closing alert or not + tax_closing_alert = fields.Boolean(compute='_compute_tax_closing_alert') + + @api.depends('state', 'move_type', 'invoice_user_id') + def _compute_signing_user(self): + other_moves = self.filtered(lambda move: not move.is_sale_document()) + other_moves.signing_user = False + + is_odoobot_user = self.env.user == self.env.ref('base.user_root') + is_backend_user = self.env.user.has_group('base.group_user') + + for invoice in (self - other_moves).filtered(lambda inv: inv.state == 'posted'): + # signer priority: + # - res.user set in res.settings + # - real backend user posting the invoice + # - if odoobot: the person that initiated the invoice ie: The salesman + # - if invoice initiated by a portal user -> No signature + representative = invoice.company_id.signing_user + # checking `has_group('base.group_user')` ensure we never keep a portal user to sign + if is_odoobot_user: + user_can_sign = invoice.invoice_user_id and invoice.invoice_user_id.has_group('base.group_user') + invoice.signing_user = representative or invoice.invoice_user_id if user_can_sign else False + else: + invoice.signing_user = representative or self.env.user if is_backend_user else False + + @api.depends('state') + def _compute_signature(self): + is_portal_user = self.env.user.has_group('base.group_portal') + # Checking `company_id.sign_invoice` removes the needs to check if the sign module is installed + # Setting it to True through `res.settings` auto install the sign module + moves_not_to_sign = self.filtered( + lambda inv: not inv.company_id.sign_invoice + or inv.state in {'draft', 'cancel'} + or not inv.is_sale_document() + # Allow signature for portal user only if the invoice already went through the send&print workflow + or (is_portal_user and not inv.invoice_pdf_report_id) + ) + moves_not_to_sign.show_signature_area = False + moves_not_to_sign.signature = None + + invoice_with_signature = self - moves_not_to_sign + invoice_with_signature.show_signature_area = True + for invoice in invoice_with_signature: + invoice.signature = invoice.signing_user.sign_signature + + def _post(self, soft=True): + # Deferred management + posted = super()._post(soft) + for move in self: + if move._get_deferred_entries_method() == 'on_validation' and any(move.line_ids.mapped('deferred_start_date')): + move._generate_deferred_entries() + return posted + + def action_post(self): + # EXTENDS 'account' to trigger the CRON auto-reconciling the statement lines. + res = super().action_post() + if self.statement_line_id and not self._context.get('skip_statement_line_cron_trigger'): + self.env.ref('at_accounting.auto_reconcile_bank_statement_line')._trigger() + return res + + def button_draft(self): + if any(len(deferral_move.deferred_original_move_ids) > 1 for deferral_move in self.deferred_move_ids): + raise UserError(_("You cannot reset to draft an invoice that is grouped in deferral entry. You can create a credit note instead.")) + reversed_moves = self.deferred_move_ids._unlink_or_reverse() + if reversed_moves: + for move in reversed_moves: + move.with_context(skip_readonly_check=True).write({ + 'date': move._get_accounting_date(move.date, move._affect_tax_report()), + }) + self.deferred_move_ids |= reversed_moves + return super().button_draft() + + def unlink(self): + # Prevent deferred moves under audit trail restriction from being unlinked + deferral_moves = self.filtered(lambda move: move.company_id.check_account_audit_trail and move.deferred_original_move_ids) + deferral_moves.deferred_original_move_ids.deferred_move_ids = False + deferral_moves._reverse_moves() + return super(AccountMove, self - deferral_moves).unlink() + + # ============================= START - Deferred Management ==================================== + + def _get_deferred_entries_method(self): + self.ensure_one() + if self.is_outbound(): + return self.company_id.generate_deferred_expense_entries_method + return self.company_id.generate_deferred_revenue_entries_method + + @api.depends('deferred_original_move_ids') + def _compute_deferred_entry_type(self): + for move in self: + if move.deferred_original_move_ids: + move.deferred_entry_type = 'expense' if move.deferred_original_move_ids[0].is_outbound() else 'revenue' + else: + move.deferred_entry_type = False + + @api.model + def _get_deferred_diff_dates(self, start, end): + """ + Returns the number of months between two dates [start, end[ + The computation is done by using months of 30 days so that the deferred amount for february + (28-29 days), march (31 days) and april (30 days) are all the same (in case of monthly computation). + See test_deferred_management_get_diff_dates for examples. + """ + if start > end: + start, end = end, start + nb_months = end.month - start.month + 12 * (end.year - start.year) + start_day, end_day = start.day, end.day + if start_day == calendar.monthrange(start.year, start.month)[1]: + start_day = 30 + if end_day == calendar.monthrange(end.year, end.month)[1]: + end_day = 30 + nb_days = end_day - start_day + return (nb_months * 30 + nb_days) / 30 + + @api.model + def _get_deferred_period_amount(self, method, period_start, period_end, line_start, line_end, balance): + """ + Returns the amount to defer for the given period taking into account the deferred method (day/month/full_months). + """ + is_valid_period = period_end > line_start and period_end > period_start + if method == 'day': + amount_per_day = balance / (line_end - line_start).days + return (period_end - period_start).days * amount_per_day if is_valid_period else 0 + elif method == "month": + amount_per_month = balance / self._get_deferred_diff_dates(line_end, line_start) + nb_months_period = self._get_deferred_diff_dates(period_end, period_start) + return nb_months_period * amount_per_month if is_valid_period else 0 + elif method == "full_months": + line_diff = self._get_deferred_diff_dates(line_end, line_start) + period_diff = self._get_deferred_diff_dates(period_end, period_start) + if line_diff < 1: + amount = balance + else: + if line_end.day == calendar.monthrange(line_end.year, line_end.month)[1]: + line_diff = math.ceil(line_diff) + else: + line_diff = math.floor(line_diff) + if period_end.day == calendar.monthrange(period_end.year, period_end.month)[1] or line_end != period_end: + period_diff = math.ceil(period_diff) + else: + period_diff = math.floor(period_diff) + amount_per_month = balance / line_diff + amount = period_diff * amount_per_month + return amount if is_valid_period else 0 + + @api.model + def _get_deferred_amounts_by_line(self, lines, periods, deferred_type): + """ + :return: a list of dictionaries containing the deferred amounts for each line and each period + E.g. (where period1 = (date1, date2, label1), period2 = (date2, date3, label2), ...) + [ + {'account_id': 1, period_1: 100, period_2: 200}, + {'account_id': 1, period_1: 100, period_2: 200}, + {'account_id': 2, period_1: 300, period_2: 400}, + ] + """ + values = [] + for line in lines: + line_start = fields.Date.to_date(line['deferred_start_date']) + line_end = fields.Date.to_date(line['deferred_end_date']) + if line_end < line_start: + # This normally shouldn't happen, but if it does, would cause calculation errors later on. + # To not make the reports crash, we just set both dates to the same day. + # The user should fix the dates manually. + line_end = line_start + + columns = {} + for period in periods: + if period[2] == 'not_started' and line_start <= period[0]: + # The 'Not Started' column only considers lines starting the deferral after the report end date + columns[period] = 0.0 + continue + # periods = [Total, Not Started, Before, ..., Current, ..., Later] + # The dates to calculate the amount for the current period + period_start = max(period[0], line_start) + period_end = min(period[1], line_end) + if ( + period[2] in ('not_started', 'later') and period[0] < line_start + or len(periods) <= 1 + or period[2] not in ('not_started', 'before', 'later') + ): + # We are subtracting 1 day from `period_start` because the start date should be included when: + # - in the 'Not Started' or 'Later' period if the deferral has not started yet (line_start, line_end) + # - we only have one period + # - not in the 'Not Started', 'Before' or 'Later' period + period_start -= relativedelta(days=1) + columns[period] = self._get_deferred_period_amount( + self.env.company.deferred_expense_amount_computation_method if deferred_type == "expense" else self.env.company.deferred_revenue_amount_computation_method, + period_start, period_end, + line_start - relativedelta(days=1), line_end, # -1 because we want to include the start date + line['balance'] + ) + + values.append({ + **self.env['account.move.line']._get_deferred_amounts_by_line_values(line), + **columns, + }) + return values + + @api.model + def _get_deferred_lines(self, line, deferred_account, deferred_type, period, ref, force_balance=None, grouping_field='account_id'): + """ + :return: a list of Command objects to create the deferred lines of a single given period + """ + deferred_amounts = self._get_deferred_amounts_by_line(line, [period], deferred_type)[0] + balance = deferred_amounts[period] if force_balance is None else force_balance + return [ + Command.create({ + **self.env['account.move.line']._get_deferred_lines_values(account.id, coeff * balance, ref, line.analytic_distribution, line), + 'partner_id': line.partner_id.id, + 'product_id': line.product_id.id, + }) + for (account, coeff) in [(deferred_amounts[grouping_field], 1), (deferred_account, -1)] + ] + + def _generate_deferred_entries(self): + """ + Generates the deferred entries for the invoice. + """ + self.ensure_one() + if self.state != 'posted': + return + if self.is_entry(): + raise UserError(_("You cannot generate deferred entries for a miscellaneous journal entry.")) + deferred_type = "expense" if self.is_purchase_document() else "revenue" + deferred_account = self.company_id.deferred_expense_account_id if deferred_type == "expense" else self.company_id.deferred_revenue_account_id + deferred_journal = self.company_id.deferred_expense_journal_id if deferred_type == "expense" else self.company_id.deferred_revenue_journal_id + if not deferred_journal: + raise UserError(_("Please set the deferred journal in the accounting settings.")) + if not deferred_account: + raise UserError(_("Please set the deferred accounts in the accounting settings.")) + + for line in self.line_ids.filtered(lambda l: l.deferred_start_date and l.deferred_end_date): + periods = line._get_deferred_periods() + if not periods: + continue + + ref = _("Deferral of %s", line.move_id.name or '') + + move_vals = { + 'move_type': 'entry', + 'deferred_original_move_ids': [Command.set(line.move_id.ids)], + 'journal_id': deferred_journal.id, + 'company_id': self.company_id.id, + 'partner_id': line.partner_id.id, + 'auto_post': 'at_date', + 'ref': ref, + 'name': False, + } + + # Defer the current invoice + move_fully_deferred = self.create({ + **move_vals, + 'date': line.move_id.date, + }) + # We write the lines after creation, to make sure the `deferred_original_move_ids` is set. + # This way we can avoid adding taxes for deferred moves. + move_fully_deferred.write({ + 'line_ids': [ + Command.create( + self.env['account.move.line']._get_deferred_lines_values(account.id, coeff * line.balance, ref, line.analytic_distribution, line) + ) for (account, coeff) in [(line.account_id, -1), (deferred_account, 1)] + ], + }) + + # Create the deferred entries for the periods [deferred_start_date, deferred_end_date] + deferral_moves = self.create([{ + **move_vals, + 'date': period[1], + } for period in periods]) + remaining_balance = line.balance + for period_index, (period, deferral_move) in enumerate(zip(periods, deferral_moves)): + # For the last deferral move the balance is forced to remaining balance to avoid rounding errors + force_balance = remaining_balance if period_index == len(periods) - 1 else None + # Same as before, to avoid adding taxes for deferred moves. + deferral_move.write({ + 'line_ids': self._get_deferred_lines(line, deferred_account, deferred_type, period, ref, force_balance=force_balance), + }) + remaining_balance -= deferral_move.line_ids[0].balance + # Avoid having deferral moves with a total amount of 0 + if deferral_move.currency_id.is_zero(deferral_move.amount_total): + deferral_moves -= deferral_move + deferral_move.unlink() + + deferred_moves = move_fully_deferred + deferral_moves + if len(deferral_moves) == 1 and move_fully_deferred.date.month == deferral_moves.date.month: + # If, after calculation, we have 2 deferral entries in the same month, it means that + # they simply cancel out each other, so there is no point in creating them. + deferred_moves.unlink() + continue + line.move_id.deferred_move_ids |= deferred_moves + deferred_moves._post(soft=True) + + def open_deferred_entries(self): + self.ensure_one() + return { + 'type': 'ir.actions.act_window', + 'name': _("Deferred Entries"), + 'res_model': 'account.move.line', + 'domain': [('id', 'in', self.deferred_move_ids.line_ids.ids)], + 'views': [(self.env.ref('at_accounting.view_deferred_entries_tree').id, 'list')], + 'context': { + 'search_default_group_by_move': True, + 'expand': True, + } + } + + def open_deferred_original_entry(self): + self.ensure_one() + action = { + 'type': 'ir.actions.act_window', + 'name': _("Original Deferred Entries"), + 'res_model': 'account.move.line', + 'domain': [('id', 'in', self.deferred_original_move_ids.line_ids.ids)], + 'views': [(False, 'list'), (False, 'form')], + 'context': { + 'search_default_group_by_move': True, + 'expand': True, + } + } + if len(self.deferred_original_move_ids) == 1: + action.update({ + 'res_model': 'account.move', + 'res_id': self.deferred_original_move_ids[0].id, + 'views': [(False, 'form')], + }) + return action + + # ============================= END - Deferred management ====================================== + + def action_open_bank_reconciliation_widget(self): + return self.statement_line_id._action_open_bank_reconciliation_widget( + default_context={ + 'search_default_journal_id': self.statement_line_id.journal_id.id, + 'search_default_statement_line_id': self.statement_line_id.id, + 'default_st_line_id': self.statement_line_id.id, + } + ) + + def action_open_bank_reconciliation_widget_statement(self): + return self.statement_line_id._action_open_bank_reconciliation_widget( + extra_domain=[('statement_id', 'in', self.statement_id.ids)], + ) + + def action_open_business_doc(self): + if self.statement_line_id: + return self.action_open_bank_reconciliation_widget() + else: + action = super().action_open_business_doc() + # prevent propagation of the following keys + action['context'] = action.get('context', {}) | { + 'preferred_aml_value': None, + 'preferred_aml_currency_id': None, + } + return action + + def _get_mail_thread_data_attachments(self): + res = super()._get_mail_thread_data_attachments() + res += self.statement_line_id.statement_id.attachment_ids + return res + + @contextmanager + def _get_edi_creation(self): + with super()._get_edi_creation() as move: + previous_lines = move.invoice_line_ids + yield move.with_context(disable_onchange_name_predictive=True) + for line in move.invoice_line_ids - previous_lines: + line._onchange_name_predictive() + + + def _post(self, soft=True): + # Overridden to create carryover external values and join the pdf of the report when posting the tax closing + for move in self.filtered(lambda m: m.tax_closing_report_id): + report = move.tax_closing_report_id + options = move._get_tax_closing_report_options(move.company_id, move.fiscal_position_id, report, move.date) + move._close_tax_period(report, options) + + return super()._post(soft) + + def action_post(self): + # In the case of a TaxClosingNonPostedDependingMovesError, which can occur when dealing with branches or tax + # units during the closing process, the parent company may have non-posted closing entries from other companies. + # If this exception occurs, we will return an action client that will display a component indicating that there + # are non-posted dependent moves, along with a link to those moves. + # Also, we are not using a RedirectWarning because it will force a rollback on the closing move created for + # depending companies. + try: + res = super().action_post() + except TaxClosingNonPostedDependingMovesError as exception: + return { + "type": "ir.actions.client", + "tag": "at_accounting.redirect_action", + "target": "new", + "name": "Depending Action", + "params": { + "depending_action": exception.args[0], + "message": _("It seems there is some depending closing move to be posted"), + "button_text": _("Depending moves"), + }, + 'context': { + 'dialog_size': 'medium', + } + } + return res + + def button_draft(self): + # Overridden in order to delete the carryover values when resetting the tax closing to draft + super().button_draft() + for closing_move in self.filtered(lambda m: m.tax_closing_report_id): + report = closing_move.tax_closing_report_id + options = closing_move._get_tax_closing_report_options(closing_move.company_id, closing_move.fiscal_position_id, report, closing_move.date) + closing_months_delay = closing_move.company_id._get_tax_periodicity_months_delay(report) + + carryover_values = self.env['account.report.external.value'].search([ + ('carryover_origin_report_line_id', 'in', report.line_ids.ids), + ('date', '=', options['date']['date_to']), + ]) + + carryover_impacted_period_end = fields.Date.from_string(options['date']['date_to']) + relativedelta(months=closing_months_delay) + tax_lock_date = closing_move.company_id.tax_lock_date + if carryover_values and tax_lock_date and tax_lock_date >= carryover_impacted_period_end: + raise UserError(_("You cannot reset this closing entry to draft, as it would delete carryover values impacting the tax report of a " + "locked period. To do this, you first need to modify you tax return lock date.")) + + if self._has_subsequent_posted_closing_moves(): + raise UserError(_("You cannot reset this closing entry to draft, as another closing entry has been posted at a later date.")) + + carryover_values.unlink() + + def _has_subsequent_posted_closing_moves(self): + self.ensure_one() + closing_domains = [ + ('company_id', '=', self.company_id.id), + ('tax_closing_report_id', '!=', False), + ('state', '=', 'posted'), + ('date', '>', self.date), + ('fiscal_position_id', '=', self.fiscal_position_id.id) + ] + return bool(self.env['account.move'].search_count(closing_domains, limit=1)) + + def _get_tax_to_pay_on_closing(self): + self.ensure_one() + tax_payable_accounts = self.env['account.tax.group'].search([ + ('company_id', '=', self.company_id.id), + ]).tax_payable_account_id + payable_lines = self.line_ids.filtered(lambda line: line.account_id in tax_payable_accounts) + return self.currency_id.round(-sum(payable_lines.mapped('balance'))) + + def _action_tax_to_pay_wizard(self): + # hook for l10n tax payment wizard + return self.action_open_tax_report() + + def action_open_tax_report(self): + action = self.env["ir.actions.actions"]._for_xml_id("at_accounting.action_account_report_gt") + if not self.tax_closing_report_id: + raise UserError(_("You can't open a tax report from a move without a VAT closing date.")) + options = self._get_tax_closing_report_options(self.company_id, self.fiscal_position_id, self.tax_closing_report_id, self.date) + # Pass options in context and set ignore_session: true to prevent using session options + action.update({'params': {'options': options, 'ignore_session': True}}) + return action + + def _close_tax_period(self, report, options): + """ Closes tax closing entries. The tax closing activities on them will be marked done, and the next tax closing entry + will be generated or updated (if already existing). Also, a pdf of the tax report at the time of closing + will be posted in the chatter of each move. + + The tax lock date of each move's company will be set to the move's date in case no other draft tax closing + move exists for that company (whatever their foreign VAT fiscal position) before or at that date, meaning that + all the tax closings have been performed so far. + """ + self.ensure_one() + if not self.env.user.has_group('account.group_account_manager'): + raise UserError(_('Only Billing Administrators are allowed to change lock dates!')) + report = self.tax_closing_report_id + options = self._get_tax_closing_report_options(self.company_id, self.fiscal_position_id, report, self.date) + + if not self.fiscal_position_id and (not self.company_id.tax_lock_date or self.date > self.company_id.tax_lock_date): + self.company_id.sudo().tax_lock_date = self.date + self.env['account.report']._generate_default_external_values(options['date']['date_from'], options['date']['date_to'], True) + + sender_company = report._get_sender_company_for_export(options) + company_ids = report.get_report_company_ids(options) + if sender_company == self.company_id: + depending_closings = self.env['account.tax.report.handler']._get_tax_closing_entries_for_closed_period(report, options, self.env['res.company'].browse(company_ids), posted_only=False) - self + depending_closings_to_post = depending_closings.filtered(lambda x: x.state == 'draft') + if depending_closings_to_post: + depending_action = self.env["ir.actions.actions"]._for_xml_id("account.action_move_journal_line") + depending_action = clean_action(depending_action, env=self.env) + + if len(depending_closings_to_post) == 1: + depending_action['views'] = [(self.env.ref('account.view_move_form').id, 'form')] + depending_action['res_id'] = depending_closings_to_post.id + else: + depending_action['domain'] = [('id', 'in', depending_closings_to_post.ids)] + depending_action['context'] = dict(ast.literal_eval(depending_action['context'])) + depending_action['context'].pop('search_default_posted', None) + + # In case of dependent moves, we will raise an error that will be caught in the action_post method. + # When the exception is caught, a component will inform the user that there are some dependent moves + # to be posted and provide a link to these moves. + raise TaxClosingNonPostedDependingMovesError(depending_action) + + # Generate the carryover values. + report.with_context(allowed_company_ids=company_ids)._generate_carryover_external_values(options) + + # Post the message with the attachments (PDF of the report, and possibly an additional export file) + attachments = self._get_vat_report_attachments(report, options) + subject = _( + "Vat closing from %(date_from)s to %(date_to)s", + date_from=format_date(self.env, options['date']['date_from']), + date_to=format_date(self.env, options['date']['date_to']), + ) + self.with_context(no_new_invoice=True).message_post(body=self.ref, subject=subject, attachments=attachments) + + # Log a note on depending closings, redirecting to the main one + for closing_move in depending_closings: + closing_move.message_post( + body=Markup("%s") % _("The attachments of the tax report can be found on the closing entry of the representative company.", self.id), + ) + + # End activity + activity = self.company_id._get_tax_closing_reminder_activity(report.id, self.date, self.fiscal_position_id.id) + if activity: + activity.action_done() + + # Generate next activity + self.company_id._generate_tax_closing_reminder_activity(self.tax_closing_report_id, self.date + relativedelta(days=1), self.fiscal_position_id if self.fiscal_position_id.foreign_vat else None) + + self._close_tax_period_create_activities() + + def _close_tax_period_create_activities(self): + mat_to_send_xml_id = 'at_accounting.mail_activity_type_tax_report_to_be_sent' + mat_to_send = self.env.ref(mat_to_send_xml_id, raise_if_not_found=False) + if not mat_to_send: + # As this is introduced in stable, we ensure data exists by creating them on the fly if needed + mat_to_send = self.env['mail.activity.type'].sudo()._load_records([{ + 'xml_id': mat_to_send_xml_id, + 'noupdate': False, + 'values': { + 'name': 'Tax Report Ready', + 'summary': 'Tax report is ready to be sent to the administration', + 'category': 'tax_report', + 'delay_count': '0', + 'delay_unit': 'days', + 'delay_from': 'current_date', + 'res_model': 'account.move', + 'chaining_type': 'suggest', + } + }]) + mat_to_pay_xml_id = 'at_accounting.mail_activity_type_tax_report_to_pay' + mat_to_pay = self.env.ref(mat_to_pay_xml_id, raise_if_not_found=False) + + act_user = mat_to_send.default_user_id + if act_user and not (self.company_id in act_user.company_ids and act_user.has_group('account.group_account_manager')): + act_user = self.env['res.users'] + + moves_without_send_activity = self.filtered_domain([ + '|', + ('activity_ids', '=', False), + ('activity_ids', 'not any', [('activity_type_id.id', '=', mat_to_send.id)]), + ]) + + for move in moves_without_send_activity: + period_start, period_end = move.company_id._get_tax_closing_period_boundaries(move.date, move.tax_closing_report_id) + period_desc = move.company_id._get_tax_closing_move_description(move.company_id._get_tax_periodicity(move.tax_closing_report_id), period_start, period_end, move.fiscal_position_id, move.tax_closing_report_id) + move.with_context(mail_activity_quick_update=True).activity_schedule( + act_type_xmlid=mat_to_send_xml_id, + summary=_("Send tax report: %s", period_desc), + date_deadline=fields.Date.context_today(move), + user_id=act_user.id or self.env.user.id, + ) + + if mat_to_pay and mat_to_pay not in move.activity_ids.activity_type_id and move._get_tax_to_pay_on_closing() > 0: + move.with_context(mail_activity_quick_update=True).activity_schedule( + act_type_xmlid=mat_to_pay_xml_id, + summary=_("Pay tax: %s", period_desc), + date_deadline=fields.Date.context_today(move), + user_id=act_user.id or self.env.user.id, + ) + + def refresh_tax_entry(self): + for move in self.filtered(lambda m: m.tax_closing_report_id and m.state == 'draft'): + report = move.tax_closing_report_id + options = move._get_tax_closing_report_options(move.company_id, move.fiscal_position_id, report, move.date) + self.env[report.custom_handler_model_name or 'account.generic.tax.report.handler']._generate_tax_closing_entries(report, options, closing_moves=move) + + @api.model + def _get_tax_closing_report_options(self, company, fiscal_position, report, date_inside_period): + _dummy, date_to = company._get_tax_closing_period_boundaries(date_inside_period, report) + + # In case the company submits its report in different regions, a closing entry + # is made for each fiscal position defining a foreign VAT. + # We hence need to make sure to select a tax report in the right country when opening + # the report (in case there are many, we pick the first one available; it doesn't impact the closing) + if fiscal_position and fiscal_position.foreign_vat: + fpos_option = fiscal_position.id + report_country = fiscal_position.country_id + else: + fpos_option = 'domestic' + report_country = company.account_fiscal_country_id + + options = { + 'date': { + 'date_to': fields.Date.to_string(date_to), + 'filter': 'custom_tax_period', + 'mode': 'range', + }, + 'selected_variant_id': report.id, + 'sections_source_id': report.id, + 'fiscal_position': fpos_option, + 'tax_unit': 'company_only', + } + + if report.filter_multi_company == 'tax_units': + # Enforce multicompany if the closing is done for a tax unit + candidate_tax_unit = company.account_tax_unit_ids.filtered(lambda x: x.country_id == report_country) + if candidate_tax_unit: + options['tax_unit'] = candidate_tax_unit.id + company_ids = candidate_tax_unit.company_ids.ids + else: + same_vat_branches = self.env.company._get_branches_with_same_vat() + # Consider the one with the least number of parents (highest in hierarchy) as the active company, coming first + company_ids = same_vat_branches.sorted(lambda x: len(x.parent_ids)).ids + else: + company_ids = self.env.company.ids + + return report.with_context(allowed_company_ids=company_ids).get_options(previous_options=options) + + def _get_vat_report_attachments(self, report, options): + # Fetch pdf + pdf_data = report.export_to_pdf(options) + return [(pdf_data['file_name'], pdf_data['file_content'])] + + def _compute_tax_closing_alert(self): + for move in self: + move.tax_closing_alert = ( + move.state == 'posted' + and move.tax_closing_report_id + and move.company_id.tax_lock_date + and move.company_id.tax_lock_date < move.date + ) + + asset_id = fields.Many2one('account.asset', string='Asset', index=True, ondelete='cascade', copy=False, + domain="[('company_id', '=', company_id)]") + asset_remaining_value = fields.Monetary(string='Depreciable Value', + compute='_compute_depreciation_cumulative_value') + asset_depreciated_value = fields.Monetary(string='Cumulative Depreciation', + compute='_compute_depreciation_cumulative_value') + # true when this move is the result of the changing of value of an asset + asset_value_change = fields.Boolean() + # how many days of depreciation this entry corresponds to + asset_number_days = fields.Integer(string="Number of days", copy=False) # deprecated + asset_depreciation_beginning_date = fields.Date(string="Date of the beginning of the depreciation", + copy=False) # technical field stating when the depreciation associated with this entry has begun + depreciation_value = fields.Monetary( + string="Depreciation", + compute="_compute_depreciation_value", inverse="_inverse_depreciation_value", store=True, + ) + + asset_ids = fields.One2many('account.asset', string='Assets', compute="_compute_asset_ids") + asset_id_display_name = fields.Char( + compute="_compute_asset_ids") # just a button label. That's to avoid a plethora of different buttons defined in xml + count_asset = fields.Integer(compute="_compute_asset_ids") + draft_asset_exists = fields.Boolean(compute="_compute_asset_ids") + asset_move_type = fields.Selection( + selection=[ + ('depreciation', 'Depreciation'), + ('sale', 'Sale'), + ('purchase', 'Purchase'), + ('disposal', 'Disposal'), + ('negative_revaluation', 'Negative revaluation'), + ('positive_revaluation', 'Positive revaluation'), + ], + string='Asset Move Type', + compute='_compute_asset_move_type', store=True, + copy=False, + ) + + # ------------------------------------------------------------------------- + # COMPUTE METHODS + # ------------------------------------------------------------------------- + @api.depends('asset_id', 'depreciation_value', 'asset_id.total_depreciable_value', + 'asset_id.already_depreciated_amount_import', 'state') + def _compute_depreciation_cumulative_value(self): + self.asset_depreciated_value = 0 + self.asset_remaining_value = 0 + + # make sure to protect all the records being assigned, because the + # assignments invoke method write() on non-protected records, which may + # cause an infinite recursion in case method write() needs to read one + # of these fields (like in case of a base automation) + fields = [self._fields['asset_remaining_value'], self._fields['asset_depreciated_value']] + with self.env.protecting(fields, self.asset_id.depreciation_move_ids): + for asset in self.asset_id: + depreciated = 0 + remaining = asset.total_depreciable_value - asset.already_depreciated_amount_import + for move in asset.depreciation_move_ids.sorted(lambda mv: (mv.date, mv._origin.id)): + if move.state != 'cancel': + remaining -= move.depreciation_value + depreciated += move.depreciation_value + move.asset_remaining_value = remaining + move.asset_depreciated_value = depreciated + + @api.depends('line_ids.balance') + def _compute_depreciation_value(self): + for move in self: + asset = move.asset_id or move.reversed_entry_id.asset_id # reversed moves are created before being assigned to the asset + if asset: + account_internal_group = 'expense' + asset_depreciation = sum( + move.line_ids.filtered(lambda + l: l.account_id.internal_group == account_internal_group or l.account_id == asset.account_depreciation_expense_id).mapped( + 'balance') + ) + # Special case of closing entry - only disposed assets of type 'purchase' should match this condition + # The condition on len(move.line_ids) is to avoid the case where there is only one depreciation move, and it is not a disposal move + # The condition will be matched because a disposal move from a disposal move will always have more than 2 lines, unlike a normal depreciation move + if any( + line.account_id == asset.account_asset_id + and float_compare(-line.balance, asset.original_value, + precision_rounding=asset.currency_id.rounding) == 0 + for line in move.line_ids + ) and len(move.line_ids) > 2: + asset_depreciation = ( + asset.original_value + - asset.salvage_value + - ( + move.line_ids[1].debit if asset.original_value > 0 else move.line_ids[1].credit + ) * (-1 if asset.original_value < 0 else 1) + ) + else: + asset_depreciation = 0 + move.depreciation_value = asset_depreciation + + @api.depends('asset_id', 'asset_ids') + def _compute_asset_move_type(self): + for move in self: + if move.asset_ids: + move.asset_move_type = 'positive_revaluation' if move.asset_ids.parent_id else 'purchase' + elif not move.asset_move_type or not move.asset_id: + move.asset_move_type = False + + # ------------------------------------------------------------------------- + # INVERSE METHODS + # ------------------------------------------------------------------------- + def _inverse_depreciation_value(self): + for move in self: + asset = move.asset_id + amount = abs(move.depreciation_value) + account = asset.account_depreciation_expense_id + move.write({'line_ids': [ + Command.update(line.id, { + 'balance': amount if line.account_id == account else -amount, + }) + for line in move.line_ids + ]}) + + # ------------------------------------------------------------------------- + # CONSTRAINT METHODS + # ------------------------------------------------------------------------- + @api.constrains('state', 'asset_id') + def _constrains_check_asset_state(self): + for move in self.filtered(lambda mv: mv.asset_id): + asset_id = move.asset_id + if asset_id.state == 'draft' and move.state == 'posted': + raise ValidationError( + _("You can't post an entry related to a draft asset. Please post the asset before.")) + + def _post(self, soft=True): + # OVERRIDE + posted = super()._post(soft) + + # log the post of a depreciation + posted._log_depreciation_asset() + + # look for any asset to create, in case we just posted a bill on an account + # configured to automatically create assets + posted.sudo()._auto_create_asset() + + return posted + + def _reverse_moves(self, default_values_list=None, cancel=False): + if default_values_list is None: + default_values_list = [{} for _i in self] + for move, default_values in zip(self, default_values_list): + # Report the value of this move to the next draft move or create a new one + if move.asset_id: + # Recompute the status of the asset for all depreciations posted after the reversed entry + + first_draft = min(move.asset_id.depreciation_move_ids.filtered(lambda m: m.state == 'draft'), + key=lambda m: m.date, default=None) + if first_draft: + # If there is a draft, simply move/add the depreciation amount here + first_draft.depreciation_value += move.depreciation_value + elif move.asset_id.state != 'close': + # If there was no draft move left, create one. + # Unless the asset is being closed, then the closing move + # takes care of balancing the asset. + last_date = max(move.asset_id.depreciation_move_ids.mapped('date')) + method_period = move.asset_id.method_period + + self.create(self._prepare_move_for_asset_depreciation({ + 'asset_id': move.asset_id, + 'amount': move.depreciation_value, + 'depreciation_beginning_date': last_date + ( + relativedelta(months=1) if method_period == "1" else relativedelta(years=1)), + 'date': last_date + ( + relativedelta(months=1) if method_period == "1" else relativedelta(years=1)), + 'asset_number_days': 0 + })) + + msg = _('Depreciation entry %(name)s reversed (%(value)s)', name=move.name, + value=formatLang(self.env, move.depreciation_value, currency_obj=move.company_id.currency_id)) + move.asset_id.message_post(body=msg) + default_values['asset_id'] = move.asset_id.id + default_values['asset_number_days'] = -move.asset_number_days + default_values['asset_depreciation_beginning_date'] = default_values.get('date', move.date) + + return super(AccountMove, self)._reverse_moves(default_values_list, cancel) + + def button_cancel(self): + # OVERRIDE + res = super(AccountMove, self).button_cancel() + self.env['account.asset'].sudo().search([('original_move_line_ids.move_id', 'in', self.ids)]).write( + {'active': False}) + return res + + def button_draft(self): + for move in self: + if any(asset_id.state != 'draft' for asset_id in move.asset_ids): + raise UserError(_('You cannot reset to draft an entry related to a posted asset')) + # Remove any draft asset that could be linked to the account move being reset to draft + move.asset_ids.filtered(lambda x: x.state == 'draft').unlink() + return super(AccountMove, self).button_draft() + + def _log_depreciation_asset(self): + for move in self.filtered(lambda m: m.asset_id): + asset = move.asset_id + msg = _('Depreciation entry %(name)s posted (%(value)s)', name=move.name, + value=formatLang(self.env, move.depreciation_value, currency_obj=move.company_id.currency_id)) + asset.message_post(body=msg) + + def _auto_create_asset(self): + create_list = [] + invoice_list = [] + auto_validate = [] + for move in self: + if not move.is_invoice(): + continue + + for move_line in move.line_ids: + if ( + move_line.account_id + and (move_line.account_id.can_create_asset) + and move_line.account_id.create_asset != "no" + and not (move_line.currency_id or move.currency_id).is_zero(move_line.price_total) + and not move_line.asset_ids + and not move_line.tax_line_id + and move_line.price_total > 0 + and not (move.move_type in ('out_invoice', + 'out_refund') and move_line.account_id.internal_group == 'asset') + ): + if not move_line.name: + if move_line.product_id: + move_line.name = move_line.product_id.display_name + else: + raise UserError( + _('Journal Items of %(account)s should have a label in order to generate an asset', + account=move_line.account_id.display_name)) + if move_line.account_id.multiple_assets_per_line: + # decimal quantities are not supported, quantities are rounded to the lower int + units_quantity = max(1, int(move_line.quantity)) + else: + units_quantity = 1 + + model_ids = move_line.account_id.asset_model_ids + vals = { + 'name': move_line.name, + 'company_id': move_line.company_id.id, + 'currency_id': move_line.company_currency_id.id, + 'analytic_distribution': move_line.analytic_distribution, + 'original_move_line_ids': [(6, False, move_line.ids)], + 'state': 'draft', + 'acquisition_date': move.invoice_date if not move.reversed_entry_id else move.reversed_entry_id.invoice_date, + } + for model_id in model_ids or [None]: + if model_id: + vals['model_id'] = model_id.id + + auto_validate.extend([move_line.account_id.create_asset == 'validate'] * units_quantity) + invoice_list.extend([move] * units_quantity) + for i in range(1, units_quantity + 1): + if units_quantity > 1: + vals['name'] = _("%(move_line)s (%(current)s of %(total)s)", move_line=move_line.name, + current=i, total=units_quantity) + create_list.extend([vals.copy()]) + + assets = self.env['account.asset'].with_context({}).create(create_list) + for asset, vals, invoice, validate in zip(assets, create_list, invoice_list, auto_validate): + if 'model_id' in vals: + asset._onchange_model_id() + if validate: + asset.validate() + if invoice: + asset.message_post(body=_('Asset created from invoice: %s', invoice._get_html_link())) + asset._post_non_deductible_tax_value() + return assets + + @api.model + def _prepare_move_for_asset_depreciation(self, vals): + missing_fields = {'asset_id', 'amount', 'depreciation_beginning_date', 'date', 'asset_number_days'} - set(vals) + if missing_fields: + raise UserError(_('Some fields are missing %s', ', '.join(missing_fields))) + asset = vals['asset_id'] + analytic_distribution = asset.analytic_distribution + depreciation_date = vals.get('date', fields.Date.context_today(self)) + company_currency = asset.company_id.currency_id + current_currency = asset.currency_id + prec = company_currency.decimal_places + amount_currency = vals['amount'] + amount = current_currency._convert(amount_currency, company_currency, asset.company_id, depreciation_date) + # Keep the partner on the original invoice if there is only one + partner = asset.original_move_line_ids.mapped('partner_id') + partner = partner[:1] if len(partner) <= 1 else self.env['res.partner'] + name = _("%s: Depreciation", asset.name) + move_line_1 = { + 'name': name, + 'partner_id': partner.id, + 'account_id': asset.account_depreciation_id.id, + 'debit': 0.0 if float_compare(amount, 0.0, precision_digits=prec) > 0 else -amount, + 'credit': amount if float_compare(amount, 0.0, precision_digits=prec) > 0 else 0.0, + 'currency_id': current_currency.id, + 'amount_currency': -amount_currency, + } + move_line_2 = { + 'name': name, + 'partner_id': partner.id, + 'account_id': asset.account_depreciation_expense_id.id, + 'credit': 0.0 if float_compare(amount, 0.0, precision_digits=prec) > 0 else -amount, + 'debit': amount if float_compare(amount, 0.0, precision_digits=prec) > 0 else 0.0, + 'currency_id': current_currency.id, + 'amount_currency': amount_currency, + } + # Only set the 'analytic_distribution' key if there is an analytic distribution on the asset. + # Otherwise, it prevents the computation of the analytic distribution. + if analytic_distribution: + move_line_1['analytic_distribution'] = analytic_distribution + move_line_2['analytic_distribution'] = analytic_distribution + move_vals = { + 'partner_id': partner.id, + 'date': depreciation_date, + 'journal_id': asset.journal_id.id, + 'line_ids': [(0, 0, move_line_1), (0, 0, move_line_2)], + 'asset_id': asset.id, + 'ref': name, + 'asset_depreciation_beginning_date': vals['depreciation_beginning_date'], + 'asset_number_days': vals['asset_number_days'], + 'asset_value_change': vals.get('asset_value_change', False), + 'move_type': 'entry', + 'currency_id': current_currency.id, + 'asset_move_type': vals.get('asset_move_type', 'depreciation'), + 'company_id': asset.company_id.id, + } + return move_vals + + @api.depends('line_ids.asset_ids') + def _compute_asset_ids(self): + for record in self: + record.asset_ids = record.line_ids.asset_ids + record.count_asset = len(record.asset_ids) + record.asset_id_display_name = _('Asset') + record.draft_asset_exists = bool(record.asset_ids.filtered(lambda x: x.state == "draft")) + + def open_asset_view(self): + return self.asset_id.open_asset(['form']) + + def action_open_asset_ids(self): + return self.asset_ids.open_asset(['list', 'form']) + + +class AccountMoveLine(models.Model): + _name = "account.move.line" + _inherit = "account.move.line" + + move_attachment_ids = fields.One2many('ir.attachment', compute='_compute_attachment') + + # Deferred management fields + deferred_start_date = fields.Date( + string="Start Date", + compute='_compute_deferred_start_date', store=True, readonly=False, + index='btree_not_null', + copy=False, + help="Date at which the deferred expense/revenue starts" + ) + deferred_end_date = fields.Date( + string="End Date", + index='btree_not_null', + copy=False, + help="Date at which the deferred expense/revenue ends" + ) + has_deferred_moves = fields.Boolean(compute='_compute_has_deferred_moves') + has_abnormal_deferred_dates = fields.Boolean(compute='_compute_has_abnormal_deferred_dates') + + def _order_to_sql(self, order, query, alias=None, reverse=False): + sql_order = super()._order_to_sql(order, query, alias, reverse) + preferred_aml_residual_value = self._context.get('preferred_aml_value') + preferred_aml_currency_id = self._context.get('preferred_aml_currency_id') + if preferred_aml_residual_value and preferred_aml_currency_id and order == self._order: + currency = self.env['res.currency'].browse(preferred_aml_currency_id) + # using round since currency.round(55.55) = 55.550000000000004 + preferred_aml_residual_value = round(preferred_aml_residual_value, currency.decimal_places) + sql_residual_currency = self._field_to_sql(alias or self._table, 'amount_residual_currency', query) + sql_currency = self._field_to_sql(alias or self._table, 'currency_id', query) + return SQL( + "ROUND(%(residual_currency)s, %(decimal_places)s) = %(value)s " + "AND %(currency)s = %(currency_id)s DESC, %(order)s", + residual_currency=sql_residual_currency, + decimal_places=currency.decimal_places, + value=preferred_aml_residual_value, + currency=sql_currency, + currency_id=currency.id, + order=sql_order, + ) + return sql_order + + def copy_data(self, default=None): + data_list = super().copy_data(default=default) + for line, values in zip(self, data_list): + if 'move_reverse_cancel' in self._context: + values['deferred_start_date'] = line.deferred_start_date + values['deferred_end_date'] = line.deferred_end_date + return data_list + + def write(self, vals): + """ Prevent changing the account of a move line when there are already deferral entries. + """ + if 'account_id' in vals: + for line in self: + if ( + line.has_deferred_moves + and line.deferred_start_date + and line.deferred_end_date + and vals['account_id'] != line.account_id.id + ): + raise UserError(_( + "You cannot change the account for a deferred line in %(move_name)s if it has already been deferred.", + move_name=line.move_id.display_name + )) + return super().write(vals) + + # ============================= START - Deferred management ==================================== + def _compute_has_deferred_moves(self): + for line in self: + line.has_deferred_moves = line.move_id.deferred_move_ids + + @api.depends('deferred_start_date', 'deferred_end_date') + def _compute_has_abnormal_deferred_dates(self): + # In the deferred computations, we always assume that both the start and end date are inclusive + # E.g: 1st January -> 31st December is *exactly* 1 year = 12 months + # However, the user may instead put 1st January -> 1st January of next year which is then + # 12 months + 1/30 month = 12.03 months which may result in odd amounts when deferrals are created + # For this reason, we alert the user if we detect such a case + # Other cases were the number of months is not round should not be handled. + for line in self: + line.has_abnormal_deferred_dates = ( + line.deferred_start_date + and line.deferred_end_date + and float_compare( + self.env['account.move']._get_deferred_diff_dates(line.deferred_start_date, line.deferred_end_date + relativedelta(days=1)) % 1, # end date is included + 1 / 30, + precision_digits=2 + ) == 0 + ) + + def _has_deferred_compatible_account(self): + self.ensure_one() + return ( + self.move_id.is_purchase_document() + and + self.account_id.account_type in ('expense', 'expense_depreciation', 'expense_direct_cost') + ) or ( + self.move_id.is_sale_document() + and + self.account_id.account_type in ('income', 'income_other') + ) + + @api.onchange('deferred_start_date') + def _onchange_deferred_start_date(self): + if not self._has_deferred_compatible_account(): + self.deferred_start_date = False + + @api.onchange('deferred_end_date') + def _onchange_deferred_end_date(self): + if not self._has_deferred_compatible_account(): + self.deferred_end_date = False + + @api.depends('deferred_end_date', 'move_id.invoice_date', 'move_id.state') + def _compute_deferred_start_date(self): + for line in self: + if not line.deferred_start_date and line.move_id.invoice_date and line.deferred_end_date: + line.deferred_start_date = line.move_id.invoice_date + + @api.constrains('deferred_start_date', 'deferred_end_date', 'account_id') + def _check_deferred_dates(self): + for line in self: + if line.deferred_start_date and not line.deferred_end_date: + raise UserError(_("You cannot create a deferred entry with a start date but no end date.")) + elif line.deferred_start_date and line.deferred_end_date and line.deferred_start_date > line.deferred_end_date: + raise UserError(_("You cannot create a deferred entry with a start date later than the end date.")) + + @api.model + def _get_deferred_ends_of_month(self, start_date, end_date): + """ + :return: a list of dates corresponding to the end of each month between start_date and end_date. + See test_get_ends_of_month for examples. + """ + dates = [] + while start_date <= end_date: + start_date = start_date + relativedelta(day=31) # Go to end of month + dates.append(start_date) + start_date = start_date + relativedelta(days=1) # Go to first day of next month + return dates + + def _get_deferred_periods(self): + """ + :return: a list of tuples (start_date, end_date) during which the deferred expense/revenue is spread. + If there is only one period containing the move date, it means that we don't need to defer the + expense/revenue since the invoice deferral and its deferred entry will be created on the same day and will + thus cancel each other. + """ + self.ensure_one() + periods = [ + (max(self.deferred_start_date, date.replace(day=1)), min(date, self.deferred_end_date), 'current') + for date in self._get_deferred_ends_of_month(self.deferred_start_date, self.deferred_end_date) + ] + if not periods or len(periods) == 1 and periods[0][0].replace(day=1) == self.date.replace(day=1): + return [] + else: + return periods + + @api.model + def _get_deferred_amounts_by_line_values(self, line): + return { + 'account_id': line['account_id'], + # line either be a dict with ids (coming from SQL query), or a real account.move.line object + 'product_id': line['product_id'] if isinstance(line, dict) else line['product_id'].id, + 'product_category_id': line['product_category_id'] if isinstance(line, dict) else line['product_category_id'].id, + 'balance': line['balance'], + 'move_id': line['move_id'], + } + + @api.model + def _get_deferred_lines_values(self, account_id, balance, ref, analytic_distribution, line=None): + return { + 'account_id': account_id, + # line either be a dict with ids (coming from SQL query), or a real account.move.line object + 'product_id': line['product_id'] if isinstance(line, dict) else line['product_id'].id, + 'product_category_id': line['product_category_id'] if isinstance(line, dict) else line['product_category_id'].id, + 'balance': balance, + 'name': ref, + 'analytic_distribution': analytic_distribution, + } + + # ============================= END - Deferred management ==================================== + + def _get_computed_taxes(self): + if self.move_id.deferred_original_move_ids: + # If this line is part of a deferral move, do not (re)calculate its taxes automatically. + # Doing so might unvoluntarily impact the tax report in deferral moves (if a default tax is set on the account). + return self.tax_ids + return super()._get_computed_taxes() + + def _compute_attachment(self): + for record in self: + record.move_attachment_ids = self.env['ir.attachment'].search(expression.OR(record._get_attachment_domains())) + + def action_reconcile(self): + """ This function is called by the 'Reconcile' button of account.move.line's + list view. It performs reconciliation between the selected lines. + - If the reconciliation can be done directly we do it silently + - Else, if a write-off is required we open the wizard to let the client enter required information + """ + wizard = self.env['account.reconcile.wizard'].with_context( + active_model='account.move.line', + active_ids=self.ids, + ).new({}) + return wizard._action_open_wizard() if (wizard.is_write_off_required or wizard.force_partials) else wizard.reconcile() + + def _get_predict_postgres_dictionary(self): + lang = self._context.get('lang') and self._context.get('lang')[:2] + return {'fr': 'french'}.get(lang, 'english') + + @api.model + def _build_predictive_query(self, move_id, additional_domain=None): + move_query = self.env['account.move']._where_calc([ + ('move_type', '=', move_id.move_type), + ('state', '=', 'posted'), + ('partner_id', '=', move_id.partner_id.id), + ('company_id', '=', move_id.journal_id.company_id.id or self.env.company.id), + ]) + move_query.order = 'account_move.invoice_date' + move_query.limit = int(self.env["ir.config_parameter"].sudo().get_param( + "account.bill.predict.history.limit", + '100', + )) + return self.env['account.move.line']._where_calc([ + ('move_id', 'in', move_query), + ('display_type', '=', 'product'), + ] + (additional_domain or [])) + + @api.model + def _predicted_field(self, name, partner_id, field, query=None, additional_queries=None): + r"""Predict the most likely value based on the previous history. + + This method uses postgres tsvector in order to try to deduce a field of + an invoice line based on the text entered into the name (description) + field and the partner linked. + We only limit the search on the previous 100 entries, which according + to our tests bore the best results. However this limit parameter is + configurable by creating a config parameter with the key: + account.bill.predict.history.limit + + For information, the tests were executed with a dataset of 40 000 bills + from a live database, We split the dataset in 2, removing the 5000 most + recent entries and we tried to use this method to guess the account of + this validation set based on the previous entries. + The result is roughly 90% of success. + + :param field (str): the sql column that has to be predicted. + /!\ it is injected in the query without any checks. + :param query (osv.Query): the query object on account.move.line that is + used to do the ranking, containing the right domain, limit, etc. If + it is omitted, a default query is used. + :param additional_queries (list): can be used in addition to the + default query on account.move.line to fetch data coming from other + tables, to have starting values for instance. + /!\ it is injected in the query without any checks. + """ + if not name or not partner_id: + return False + + psql_lang = self._get_predict_postgres_dictionary() + description = name + ' account_move_line' # give more priority to main query than additional queries + parsed_description = re.sub(r"[*&()|!':<>=%/~@,.;$\[\]]+", " ", description) + parsed_description = ' | '.join(parsed_description.split()) + + try: + main_source = (query if query is not None else self._build_predictive_query(self.move_id)).select( + SQL("%s AS prediction", field), + SQL( + "setweight(to_tsvector(%s, account_move_line.name), 'B') || setweight(to_tsvector('simple', 'account_move_line'), 'A') AS document", + psql_lang + ), + ) + if "(" in field.code: # aggregate function + main_source = SQL("%s %s", main_source, SQL("GROUP BY account_move_line.id, account_move_line.name, account_move_line.partner_id")) + + self.env.cr.execute(SQL(""" + WITH account_move_line AS MATERIALIZED (%(account_move_line)s), + + source AS (%(source)s), + + ranking AS ( + SELECT prediction, ts_rank(source.document, query_plain) AS rank + FROM source, to_tsquery(%(lang)s, %(description)s) query_plain + WHERE source.document @@ query_plain + ) + + SELECT prediction, MAX(rank) AS ranking, COUNT(*) + FROM ranking + GROUP BY prediction + ORDER BY ranking DESC, count DESC + LIMIT 2 + """, + account_move_line=self._build_predictive_query(self.move_id).select(SQL('*')), + source=SQL('(%s)', SQL(') UNION ALL (').join([main_source] + (additional_queries or []))), + lang=psql_lang, + description=parsed_description, + )) + result = self.env.cr.dictfetchall() + if result: + # Only confirm the prediction if it's at least 10% better than the second one + if len(result) > 1 and result[0]['ranking'] < 1.1 * result[1]['ranking']: + return False + return result[0]['prediction'] + except Exception: + # In case there is an error while parsing the to_tsquery (wrong character for example) + # We don't want to have a blocking traceback, instead return False + _logger.exception('Error while predicting invoice line fields') + return False + + def _predict_taxes(self): + field = SQL('array_agg(account_move_line__tax_rel__tax_ids.id ORDER BY account_move_line__tax_rel__tax_ids.id)') + query = self._build_predictive_query(self.move_id) + query.left_join('account_move_line', 'id', 'account_move_line_account_tax_rel', 'account_move_line_id', 'tax_rel') + query.left_join('account_move_line__tax_rel', 'account_tax_id', 'account_tax', 'id', 'tax_ids') + query.add_where('account_move_line__tax_rel__tax_ids.active IS NOT FALSE') + predicted_tax_ids = self._predicted_field(self.name, self.partner_id, field, query) + if predicted_tax_ids == [None]: + return False + if predicted_tax_ids is not False and set(predicted_tax_ids) != set(self.tax_ids.ids): + return predicted_tax_ids + return False + + @api.model + def _predict_specific_tax(self, move, name, partner, amount_type, amount, type_tax_use): + field = SQL('array_agg(account_move_line__tax_rel__tax_ids.id ORDER BY account_move_line__tax_rel__tax_ids.id)') + query = self._build_predictive_query(move) + query.left_join('account_move_line', 'id', 'account_move_line_account_tax_rel', 'account_move_line_id', 'tax_rel') + query.left_join('account_move_line__tax_rel', 'account_tax_id', 'account_tax', 'id', 'tax_ids') + query.add_where(""" + account_move_line__tax_rel__tax_ids.active IS NOT FALSE + AND account_move_line__tax_rel__tax_ids.amount_type = %s + AND account_move_line__tax_rel__tax_ids.type_tax_use = %s + AND account_move_line__tax_rel__tax_ids.amount = %s + """, (amount_type, type_tax_use, amount)) + return self._predicted_field(name, partner, field, query) + + def _predict_product(self): + predict_product = int(self.env['ir.config_parameter'].sudo().get_param('account_predictive_bills.predict_product', '1')) + if predict_product and self.company_id.predict_bill_product: + query = self._build_predictive_query(self.move_id, ['|', ('product_id', '=', False), ('product_id.active', '=', True)]) + predicted_product_id = self._predicted_field(self.name, self.partner_id, SQL('account_move_line.product_id'), query) + if predicted_product_id and predicted_product_id != self.product_id.id: + return predicted_product_id + return False + + def _predict_account(self): + field = SQL('account_move_line.account_id') + if self.move_id.is_purchase_document(True): + excluded_group = 'income' + else: + excluded_group = 'expense' + account_query = self.env['account.account']._where_calc([ + *self.env['account.account']._check_company_domain(self.move_id.company_id or self.env.company), + ('deprecated', '=', False), + ('internal_group', 'not in', (excluded_group, 'off')), + ('account_type', 'not in', ('liability_payable', 'asset_receivable')), + ]) + account_name = self.env['account.account']._field_to_sql('account_account', 'name') + psql_lang = self._get_predict_postgres_dictionary() + additional_queries = [SQL(account_query.select( + SQL("account_account.id AS account_id"), + SQL("setweight(to_tsvector(%(psql_lang)s, %(account_name)s), 'B') AS document", psql_lang=psql_lang, account_name=account_name), + ))] + query = self._build_predictive_query(self.move_id, [('account_id', 'in', account_query)]) + + predicted_account_id = self._predicted_field(self.name, self.partner_id, field, query, additional_queries) + if predicted_account_id and predicted_account_id != self.account_id.id: + return predicted_account_id + return False + + @api.onchange('name') + def _onchange_name_predictive(self): + if ((self.move_id.quick_edit_mode or self.move_id.move_type == 'in_invoice') and self.name and self.display_type == 'product' + and not self.env.context.get('disable_onchange_name_predictive', False)): + + if not self.product_id: + predicted_product_id = self._predict_product() + if predicted_product_id: + # We only update the price_unit, tax_ids and name in case they evaluate to False + protected_fields = ['price_unit', 'tax_ids', 'name'] + to_protect = [self._fields[fname] for fname in protected_fields if self[fname]] + with self.env.protecting(to_protect, self): + self.product_id = predicted_product_id + + # In case no product has been set, the account and taxes + # will not depend on any product and can thus be predicted + if not self.product_id: + # Predict account. + predicted_account_id = self._predict_account() + if predicted_account_id: + self.account_id = predicted_account_id + + # Predict taxes + predicted_tax_ids = self._predict_taxes() + if predicted_tax_ids: + self.tax_ids = [Command.set(predicted_tax_ids)] + + def _read_group_select(self, aggregate_spec, query): + # Enable to use HAVING clause that sum rounded values depending on the + # currency precision settings. Limitation: we only handle a having + # clause of one element with that specific method :sum_rounded. + fname, __, func = models.parse_read_group_spec(aggregate_spec) + if func != 'sum_rounded': + return super()._read_group_select(aggregate_spec, query) + currency_alias = query.make_alias(self._table, 'currency_id') + query.add_join('LEFT JOIN', currency_alias, 'res_currency', SQL( + "%s = %s", + self._field_to_sql(self._table, 'currency_id', query), + SQL.identifier(currency_alias, 'id'), + )) + + return SQL( + 'SUM(ROUND(%s, %s))', + self._field_to_sql(self._table, fname, query), + self.env['res.currency']._field_to_sql(currency_alias, 'decimal_places', query), + ) + + def _read_group_groupby(self, groupby_spec, query): + # enable grouping by :abs_rounded on fields, which is useful when trying + # to match positive and negative amounts + if ':' in groupby_spec: + fname, method = groupby_spec.split(':') + if method == 'abs_rounded': + # rounds with the used currency settings + currency_alias = query.make_alias(self._table, 'currency_id') + query.add_join('LEFT JOIN', currency_alias, 'res_currency', SQL( + "%s = %s", + self._field_to_sql(self._table, 'currency_id', query), + SQL.identifier(currency_alias, 'id'), + )) + + return SQL( + 'ROUND(ABS(%s), %s)', + self._field_to_sql(self._table, fname, query), + self.env['res.currency']._field_to_sql(currency_alias, 'decimal_places', query), + ) + + return super()._read_group_groupby(groupby_spec, query) + + asset_ids = fields.Many2many('account.asset', 'asset_move_line_rel', 'line_id', 'asset_id', string='Related Assets', + copy=False) + non_deductible_tax_value = fields.Monetary(compute='_compute_non_deductible_tax_value', + currency_field='company_currency_id') + + def _get_computed_taxes(self): + if self.move_id.asset_id: + return self.tax_ids + return super()._get_computed_taxes() + + def turn_as_asset(self): + if len(self.company_id) != 1: + raise UserError(_("All the lines should be from the same company")) + if any(line.move_id.state == 'draft' for line in self): + raise UserError(_("All the lines should be posted")) + if any(account != self[0].account_id for account in self.mapped('account_id')): + raise UserError(_("All the lines should be from the same account")) + ctx = self.env.context.copy() + ctx.update({ + 'default_original_move_line_ids': [(6, False, self.env.context['active_ids'])], + 'default_company_id': self.company_id.id, + }) + return { + "name": _("Turn as an asset"), + "type": "ir.actions.act_window", + "res_model": "account.asset", + "views": [[False, "form"]], + "target": "current", + "context": ctx, + } + + @api.depends('tax_ids.invoice_repartition_line_ids') + def _compute_non_deductible_tax_value(self): + """ Handle the specific case of non deductible taxes, + such as "50% Non Déductible - Frais de voiture (Prix Excl.)" in Belgium. + """ + non_deductible_tax_ids = self.tax_ids.invoice_repartition_line_ids.filtered( + lambda line: line.repartition_type == 'tax' and not line.use_in_tax_closing + ).tax_id + + res = {} + if non_deductible_tax_ids: + domain = [('move_id', 'in', self.move_id.ids)] + tax_details_query = self._get_query_tax_details_from_domain(domain) + + self.flush_model() + self._cr.execute(SQL( + ''' + SELECT + tdq.base_line_id, + SUM(tdq.tax_amount_currency) + FROM (%(tax_details_query)s) AS tdq + JOIN account_move_line aml ON aml.id = tdq.tax_line_id + JOIN account_tax_repartition_line trl ON trl.id = tdq.tax_repartition_line_id + WHERE tdq.base_line_id IN %(base_line_ids)s + AND trl.use_in_tax_closing IS FALSE + GROUP BY tdq.base_line_id + ''', + tax_details_query=tax_details_query, + base_line_ids=tuple(self.ids), + )) + + res = {row['base_line_id']: row['sum'] for row in self._cr.dictfetchall()} + + for record in self: + record.non_deductible_tax_value = res.get(record._origin.id, 0.0)