Files
odoo-addons/addons/at_accounting/models/account_move.py

1564 lines
78 KiB
Python

import calendar
from contextlib import contextmanager
from dateutil.relativedelta import relativedelta
import logging
import math
import re
from odoo import fields, models, api, _, Command
from odoo.exceptions import UserError
from odoo.osv import expression
from odoo.tools import SQL, float_compare
import ast
from odoo.addons.account.models.exceptions import TaxClosingNonPostedDependingMovesError
from odoo.tools.misc import format_date
from odoo.tools import date_utils
from odoo.addons.web.controllers.utils import clean_action
from markupsafe import Markup
from odoo.exceptions import UserError, ValidationError
from odoo.tools.misc import formatLang
_logger = logging.getLogger(__name__)
DEFERRED_DATE_MIN = '1900-01-01'
DEFERRED_DATE_MAX = '9999-12-31'
class AccountMove(models.Model):
_inherit = "account.move"
def _get_invoice_in_payment_state(self):
return 'in_payment'
payment_state_before_switch = fields.Char(string="Payment State Before Switch", copy=False)
deferred_move_ids = fields.Many2many(
string="Deferred Entries",
comodel_name='account.move',
relation='account_move_deferred_rel',
column1='original_move_id',
column2='deferred_move_id',
help="The deferred entries created by this invoice",
copy=False,
)
deferred_original_move_ids = fields.Many2many(
string="Original Invoices",
comodel_name='account.move',
relation='account_move_deferred_rel',
column1='deferred_move_id',
column2='original_move_id',
help="The original invoices that created the deferred entries",
copy=False,
)
deferred_entry_type = fields.Selection(
string="Deferred Entry Type",
selection=[
('expense', 'Deferred Expense'),
('revenue', 'Deferred Revenue'),
],
compute='_compute_deferred_entry_type',
copy=False,
)
signing_user = fields.Many2one(
string='Signer',
comodel_name='res.users',
compute='_compute_signing_user', store=True,
copy=False,
)
show_signature_area = fields.Boolean(compute='_compute_signature')
signature = fields.Binary(compute='_compute_signature') # can't be `related`: the sign module might not be there
# used for VAT closing, containing the end date of the period this entry closes
tax_closing_report_id = fields.Many2one(comodel_name='account.report')
# technical field used to know whether to show the tax closing alert or not
tax_closing_alert = fields.Boolean(compute='_compute_tax_closing_alert')
@api.depends('state', 'move_type', 'invoice_user_id')
def _compute_signing_user(self):
other_moves = self.filtered(lambda move: not move.is_sale_document())
other_moves.signing_user = False
is_odoobot_user = self.env.user == self.env.ref('base.user_root')
is_backend_user = self.env.user.has_group('base.group_user')
for invoice in (self - other_moves).filtered(lambda inv: inv.state == 'posted'):
# signer priority:
# - res.user set in res.settings
# - real backend user posting the invoice
# - if odoobot: the person that initiated the invoice ie: The salesman
# - if invoice initiated by a portal user -> No signature
representative = invoice.company_id.signing_user
# checking `has_group('base.group_user')` ensure we never keep a portal user to sign
if is_odoobot_user:
user_can_sign = invoice.invoice_user_id and invoice.invoice_user_id.has_group('base.group_user')
invoice.signing_user = representative or invoice.invoice_user_id if user_can_sign else False
else:
invoice.signing_user = representative or self.env.user if is_backend_user else False
@api.depends('state')
def _compute_signature(self):
is_portal_user = self.env.user.has_group('base.group_portal')
# Checking `company_id.sign_invoice` removes the needs to check if the sign module is installed
# Setting it to True through `res.settings` auto install the sign module
moves_not_to_sign = self.filtered(
lambda inv: not inv.company_id.sign_invoice
or inv.state in {'draft', 'cancel'}
or not inv.is_sale_document()
# Allow signature for portal user only if the invoice already went through the send&print workflow
or (is_portal_user and not inv.invoice_pdf_report_id)
)
moves_not_to_sign.show_signature_area = False
moves_not_to_sign.signature = None
invoice_with_signature = self - moves_not_to_sign
invoice_with_signature.show_signature_area = True
for invoice in invoice_with_signature:
invoice.signature = invoice.signing_user.sign_signature
def _post(self, soft=True):
# Deferred management
posted = super()._post(soft)
for move in self:
if move._get_deferred_entries_method() == 'on_validation' and any(move.line_ids.mapped('deferred_start_date')):
move._generate_deferred_entries()
return posted
def action_post(self):
# EXTENDS 'account' to trigger the CRON auto-reconciling the statement lines.
res = super().action_post()
if self.statement_line_id and not self._context.get('skip_statement_line_cron_trigger'):
self.env.ref('at_accounting.auto_reconcile_bank_statement_line')._trigger()
return res
def button_draft(self):
if any(len(deferral_move.deferred_original_move_ids) > 1 for deferral_move in self.deferred_move_ids):
raise UserError(_("You cannot reset to draft an invoice that is grouped in deferral entry. You can create a credit note instead."))
reversed_moves = self.deferred_move_ids._unlink_or_reverse()
if reversed_moves:
for move in reversed_moves:
move.with_context(skip_readonly_check=True).write({
'date': move._get_accounting_date(move.date, move._affect_tax_report()),
})
self.deferred_move_ids |= reversed_moves
return super().button_draft()
def unlink(self):
# Prevent deferred moves under audit trail restriction from being unlinked
deferral_moves = self.filtered(lambda move: move.company_id.check_account_audit_trail and move.deferred_original_move_ids)
deferral_moves.deferred_original_move_ids.deferred_move_ids = False
deferral_moves._reverse_moves()
return super(AccountMove, self - deferral_moves).unlink()
# ============================= START - Deferred Management ====================================
def _get_deferred_entries_method(self):
self.ensure_one()
if self.is_outbound():
return self.company_id.generate_deferred_expense_entries_method
return self.company_id.generate_deferred_revenue_entries_method
@api.depends('deferred_original_move_ids')
def _compute_deferred_entry_type(self):
for move in self:
if move.deferred_original_move_ids:
move.deferred_entry_type = 'expense' if move.deferred_original_move_ids[0].is_outbound() else 'revenue'
else:
move.deferred_entry_type = False
@api.model
def _get_deferred_diff_dates(self, start, end):
"""
Returns the number of months between two dates [start, end[
The computation is done by using months of 30 days so that the deferred amount for february
(28-29 days), march (31 days) and april (30 days) are all the same (in case of monthly computation).
See test_deferred_management_get_diff_dates for examples.
"""
if start > end:
start, end = end, start
nb_months = end.month - start.month + 12 * (end.year - start.year)
start_day, end_day = start.day, end.day
if start_day == calendar.monthrange(start.year, start.month)[1]:
start_day = 30
if end_day == calendar.monthrange(end.year, end.month)[1]:
end_day = 30
nb_days = end_day - start_day
return (nb_months * 30 + nb_days) / 30
@api.model
def _get_deferred_period_amount(self, method, period_start, period_end, line_start, line_end, balance):
"""
Returns the amount to defer for the given period taking into account the deferred method (day/month/full_months).
"""
is_valid_period = period_end > line_start and period_end > period_start
if method == 'day':
amount_per_day = balance / (line_end - line_start).days
return (period_end - period_start).days * amount_per_day if is_valid_period else 0
elif method == "month":
amount_per_month = balance / self._get_deferred_diff_dates(line_end, line_start)
nb_months_period = self._get_deferred_diff_dates(period_end, period_start)
return nb_months_period * amount_per_month if is_valid_period else 0
elif method == "full_months":
line_diff = self._get_deferred_diff_dates(line_end, line_start)
period_diff = self._get_deferred_diff_dates(period_end, period_start)
if line_diff < 1:
amount = balance
else:
if line_end.day == calendar.monthrange(line_end.year, line_end.month)[1]:
line_diff = math.ceil(line_diff)
else:
line_diff = math.floor(line_diff)
if period_end.day == calendar.monthrange(period_end.year, period_end.month)[1] or line_end != period_end:
period_diff = math.ceil(period_diff)
else:
period_diff = math.floor(period_diff)
amount_per_month = balance / line_diff
amount = period_diff * amount_per_month
return amount if is_valid_period else 0
@api.model
def _get_deferred_amounts_by_line(self, lines, periods, deferred_type):
"""
:return: a list of dictionaries containing the deferred amounts for each line and each period
E.g. (where period1 = (date1, date2, label1), period2 = (date2, date3, label2), ...)
[
{'account_id': 1, period_1: 100, period_2: 200},
{'account_id': 1, period_1: 100, period_2: 200},
{'account_id': 2, period_1: 300, period_2: 400},
]
"""
values = []
for line in lines:
line_start = fields.Date.to_date(line['deferred_start_date'])
line_end = fields.Date.to_date(line['deferred_end_date'])
if line_end < line_start:
# This normally shouldn't happen, but if it does, would cause calculation errors later on.
# To not make the reports crash, we just set both dates to the same day.
# The user should fix the dates manually.
line_end = line_start
columns = {}
for period in periods:
if period[2] == 'not_started' and line_start <= period[0]:
# The 'Not Started' column only considers lines starting the deferral after the report end date
columns[period] = 0.0
continue
# periods = [Total, Not Started, Before, ..., Current, ..., Later]
# The dates to calculate the amount for the current period
period_start = max(period[0], line_start)
period_end = min(period[1], line_end)
if (
period[2] in ('not_started', 'later') and period[0] < line_start
or len(periods) <= 1
or period[2] not in ('not_started', 'before', 'later')
):
# We are subtracting 1 day from `period_start` because the start date should be included when:
# - in the 'Not Started' or 'Later' period if the deferral has not started yet (line_start, line_end)
# - we only have one period
# - not in the 'Not Started', 'Before' or 'Later' period
period_start -= relativedelta(days=1)
columns[period] = self._get_deferred_period_amount(
self.env.company.deferred_expense_amount_computation_method if deferred_type == "expense" else self.env.company.deferred_revenue_amount_computation_method,
period_start, period_end,
line_start - relativedelta(days=1), line_end, # -1 because we want to include the start date
line['balance']
)
values.append({
**self.env['account.move.line']._get_deferred_amounts_by_line_values(line),
**columns,
})
return values
@api.model
def _get_deferred_lines(self, line, deferred_account, deferred_type, period, ref, force_balance=None, grouping_field='account_id'):
"""
:return: a list of Command objects to create the deferred lines of a single given period
"""
deferred_amounts = self._get_deferred_amounts_by_line(line, [period], deferred_type)[0]
balance = deferred_amounts[period] if force_balance is None else force_balance
return [
Command.create({
**self.env['account.move.line']._get_deferred_lines_values(account.id, coeff * balance, ref, line.analytic_distribution, line),
'partner_id': line.partner_id.id,
'product_id': line.product_id.id,
})
for (account, coeff) in [(deferred_amounts[grouping_field], 1), (deferred_account, -1)]
]
def _generate_deferred_entries(self):
"""
Generates the deferred entries for the invoice.
"""
self.ensure_one()
if self.state != 'posted':
return
if self.is_entry():
raise UserError(_("You cannot generate deferred entries for a miscellaneous journal entry."))
deferred_type = "expense" if self.is_purchase_document() else "revenue"
deferred_account = self.company_id.deferred_expense_account_id if deferred_type == "expense" else self.company_id.deferred_revenue_account_id
deferred_journal = self.company_id.deferred_expense_journal_id if deferred_type == "expense" else self.company_id.deferred_revenue_journal_id
if not deferred_journal:
raise UserError(_("Please set the deferred journal in the accounting settings."))
if not deferred_account:
raise UserError(_("Please set the deferred accounts in the accounting settings."))
for line in self.line_ids.filtered(lambda l: l.deferred_start_date and l.deferred_end_date):
periods = line._get_deferred_periods()
if not periods:
continue
ref = _("Deferral of %s", line.move_id.name or '')
move_vals = {
'move_type': 'entry',
'deferred_original_move_ids': [Command.set(line.move_id.ids)],
'journal_id': deferred_journal.id,
'company_id': self.company_id.id,
'partner_id': line.partner_id.id,
'auto_post': 'at_date',
'ref': ref,
'name': False,
}
# Defer the current invoice
move_fully_deferred = self.create({
**move_vals,
'date': line.move_id.date,
})
# We write the lines after creation, to make sure the `deferred_original_move_ids` is set.
# This way we can avoid adding taxes for deferred moves.
move_fully_deferred.write({
'line_ids': [
Command.create(
self.env['account.move.line']._get_deferred_lines_values(account.id, coeff * line.balance, ref, line.analytic_distribution, line)
) for (account, coeff) in [(line.account_id, -1), (deferred_account, 1)]
],
})
# Create the deferred entries for the periods [deferred_start_date, deferred_end_date]
deferral_moves = self.create([{
**move_vals,
'date': period[1],
} for period in periods])
remaining_balance = line.balance
for period_index, (period, deferral_move) in enumerate(zip(periods, deferral_moves)):
# For the last deferral move the balance is forced to remaining balance to avoid rounding errors
force_balance = remaining_balance if period_index == len(periods) - 1 else None
# Same as before, to avoid adding taxes for deferred moves.
deferral_move.write({
'line_ids': self._get_deferred_lines(line, deferred_account, deferred_type, period, ref, force_balance=force_balance),
})
remaining_balance -= deferral_move.line_ids[0].balance
# Avoid having deferral moves with a total amount of 0
if deferral_move.currency_id.is_zero(deferral_move.amount_total):
deferral_moves -= deferral_move
deferral_move.unlink()
deferred_moves = move_fully_deferred + deferral_moves
if len(deferral_moves) == 1 and move_fully_deferred.date.month == deferral_moves.date.month:
# If, after calculation, we have 2 deferral entries in the same month, it means that
# they simply cancel out each other, so there is no point in creating them.
deferred_moves.unlink()
continue
line.move_id.deferred_move_ids |= deferred_moves
deferred_moves._post(soft=True)
def open_deferred_entries(self):
self.ensure_one()
return {
'type': 'ir.actions.act_window',
'name': _("Deferred Entries"),
'res_model': 'account.move.line',
'domain': [('id', 'in', self.deferred_move_ids.line_ids.ids)],
'views': [(self.env.ref('at_accounting.view_deferred_entries_tree').id, 'list')],
'context': {
'search_default_group_by_move': True,
'expand': True,
}
}
def open_deferred_original_entry(self):
self.ensure_one()
action = {
'type': 'ir.actions.act_window',
'name': _("Original Deferred Entries"),
'res_model': 'account.move.line',
'domain': [('id', 'in', self.deferred_original_move_ids.line_ids.ids)],
'views': [(False, 'list'), (False, 'form')],
'context': {
'search_default_group_by_move': True,
'expand': True,
}
}
if len(self.deferred_original_move_ids) == 1:
action.update({
'res_model': 'account.move',
'res_id': self.deferred_original_move_ids[0].id,
'views': [(False, 'form')],
})
return action
# ============================= END - Deferred management ======================================
def action_open_bank_reconciliation_widget(self):
return self.statement_line_id._action_open_bank_reconciliation_widget(
default_context={
'search_default_journal_id': self.statement_line_id.journal_id.id,
'search_default_statement_line_id': self.statement_line_id.id,
'default_st_line_id': self.statement_line_id.id,
}
)
def action_open_bank_reconciliation_widget_statement(self):
return self.statement_line_id._action_open_bank_reconciliation_widget(
extra_domain=[('statement_id', 'in', self.statement_id.ids)],
)
def action_open_business_doc(self):
if self.statement_line_id:
return self.action_open_bank_reconciliation_widget()
else:
action = super().action_open_business_doc()
# prevent propagation of the following keys
action['context'] = action.get('context', {}) | {
'preferred_aml_value': None,
'preferred_aml_currency_id': None,
}
return action
def _get_mail_thread_data_attachments(self):
res = super()._get_mail_thread_data_attachments()
res += self.statement_line_id.statement_id.attachment_ids
return res
@contextmanager
def _get_edi_creation(self):
with super()._get_edi_creation() as move:
previous_lines = move.invoice_line_ids
yield move.with_context(disable_onchange_name_predictive=True)
for line in move.invoice_line_ids - previous_lines:
line._onchange_name_predictive()
def _post(self, soft=True):
# Overridden to create carryover external values and join the pdf of the report when posting the tax closing
for move in self.filtered(lambda m: m.tax_closing_report_id):
report = move.tax_closing_report_id
options = move._get_tax_closing_report_options(move.company_id, move.fiscal_position_id, report, move.date)
move._close_tax_period(report, options)
return super()._post(soft)
def action_post(self):
# In the case of a TaxClosingNonPostedDependingMovesError, which can occur when dealing with branches or tax
# units during the closing process, the parent company may have non-posted closing entries from other companies.
# If this exception occurs, we will return an action client that will display a component indicating that there
# are non-posted dependent moves, along with a link to those moves.
# Also, we are not using a RedirectWarning because it will force a rollback on the closing move created for
# depending companies.
try:
res = super().action_post()
except TaxClosingNonPostedDependingMovesError as exception:
return {
"type": "ir.actions.client",
"tag": "at_accounting.redirect_action",
"target": "new",
"name": "Depending Action",
"params": {
"depending_action": exception.args[0],
"message": _("It seems there is some depending closing move to be posted"),
"button_text": _("Depending moves"),
},
'context': {
'dialog_size': 'medium',
}
}
return res
def button_draft(self):
# Overridden in order to delete the carryover values when resetting the tax closing to draft
super().button_draft()
for closing_move in self.filtered(lambda m: m.tax_closing_report_id):
report = closing_move.tax_closing_report_id
options = closing_move._get_tax_closing_report_options(closing_move.company_id, closing_move.fiscal_position_id, report, closing_move.date)
closing_months_delay = closing_move.company_id._get_tax_periodicity_months_delay(report)
carryover_values = self.env['account.report.external.value'].search([
('carryover_origin_report_line_id', 'in', report.line_ids.ids),
('date', '=', options['date']['date_to']),
])
carryover_impacted_period_end = fields.Date.from_string(options['date']['date_to']) + relativedelta(months=closing_months_delay)
tax_lock_date = closing_move.company_id.tax_lock_date
if carryover_values and tax_lock_date and tax_lock_date >= carryover_impacted_period_end:
raise UserError(_("You cannot reset this closing entry to draft, as it would delete carryover values impacting the tax report of a "
"locked period. To do this, you first need to modify you tax return lock date."))
if self._has_subsequent_posted_closing_moves():
raise UserError(_("You cannot reset this closing entry to draft, as another closing entry has been posted at a later date."))
carryover_values.unlink()
def _has_subsequent_posted_closing_moves(self):
self.ensure_one()
closing_domains = [
('company_id', '=', self.company_id.id),
('tax_closing_report_id', '!=', False),
('state', '=', 'posted'),
('date', '>', self.date),
('fiscal_position_id', '=', self.fiscal_position_id.id)
]
return bool(self.env['account.move'].search_count(closing_domains, limit=1))
def _get_tax_to_pay_on_closing(self):
self.ensure_one()
tax_payable_accounts = self.env['account.tax.group'].search([
('company_id', '=', self.company_id.id),
]).tax_payable_account_id
payable_lines = self.line_ids.filtered(lambda line: line.account_id in tax_payable_accounts)
return self.currency_id.round(-sum(payable_lines.mapped('balance')))
def _action_tax_to_pay_wizard(self):
# hook for l10n tax payment wizard
return self.action_open_tax_report()
def action_open_tax_report(self):
action = self.env["ir.actions.actions"]._for_xml_id("at_accounting.action_account_report_gt")
if not self.tax_closing_report_id:
raise UserError(_("You can't open a tax report from a move without a VAT closing date."))
options = self._get_tax_closing_report_options(self.company_id, self.fiscal_position_id, self.tax_closing_report_id, self.date)
# Pass options in context and set ignore_session: true to prevent using session options
action.update({'params': {'options': options, 'ignore_session': True}})
return action
def _close_tax_period(self, report, options):
""" Closes tax closing entries. The tax closing activities on them will be marked done, and the next tax closing entry
will be generated or updated (if already existing). Also, a pdf of the tax report at the time of closing
will be posted in the chatter of each move.
The tax lock date of each move's company will be set to the move's date in case no other draft tax closing
move exists for that company (whatever their foreign VAT fiscal position) before or at that date, meaning that
all the tax closings have been performed so far.
"""
self.ensure_one()
if not self.env.user.has_group('account.group_account_manager'):
raise UserError(_('Only Billing Administrators are allowed to change lock dates!'))
report = self.tax_closing_report_id
options = self._get_tax_closing_report_options(self.company_id, self.fiscal_position_id, report, self.date)
if not self.fiscal_position_id and (not self.company_id.tax_lock_date or self.date > self.company_id.tax_lock_date):
self.company_id.sudo().tax_lock_date = self.date
self.env['account.report']._generate_default_external_values(options['date']['date_from'], options['date']['date_to'], True)
sender_company = report._get_sender_company_for_export(options)
company_ids = report.get_report_company_ids(options)
if sender_company == self.company_id:
depending_closings = self.env['account.tax.report.handler']._get_tax_closing_entries_for_closed_period(report, options, self.env['res.company'].browse(company_ids), posted_only=False) - self
depending_closings_to_post = depending_closings.filtered(lambda x: x.state == 'draft')
if depending_closings_to_post:
depending_action = self.env["ir.actions.actions"]._for_xml_id("account.action_move_journal_line")
depending_action = clean_action(depending_action, env=self.env)
if len(depending_closings_to_post) == 1:
depending_action['views'] = [(self.env.ref('account.view_move_form').id, 'form')]
depending_action['res_id'] = depending_closings_to_post.id
else:
depending_action['domain'] = [('id', 'in', depending_closings_to_post.ids)]
depending_action['context'] = dict(ast.literal_eval(depending_action['context']))
depending_action['context'].pop('search_default_posted', None)
# In case of dependent moves, we will raise an error that will be caught in the action_post method.
# When the exception is caught, a component will inform the user that there are some dependent moves
# to be posted and provide a link to these moves.
raise TaxClosingNonPostedDependingMovesError(depending_action)
# Generate the carryover values.
report.with_context(allowed_company_ids=company_ids)._generate_carryover_external_values(options)
# Post the message with the attachments (PDF of the report, and possibly an additional export file)
attachments = self._get_vat_report_attachments(report, options)
subject = _(
"Vat closing from %(date_from)s to %(date_to)s",
date_from=format_date(self.env, options['date']['date_from']),
date_to=format_date(self.env, options['date']['date_to']),
)
self.with_context(no_new_invoice=True).message_post(body=self.ref, subject=subject, attachments=attachments)
# Log a note on depending closings, redirecting to the main one
for closing_move in depending_closings:
closing_move.message_post(
body=Markup("%s") % _("The attachments of the tax report can be found on the <a href='#' data-oe-model='account.move' data-oe-id='%s'>closing entry</a> of the representative company.", self.id),
)
# End activity
activity = self.company_id._get_tax_closing_reminder_activity(report.id, self.date, self.fiscal_position_id.id)
if activity:
activity.action_done()
# Generate next activity
self.company_id._generate_tax_closing_reminder_activity(self.tax_closing_report_id, self.date + relativedelta(days=1), self.fiscal_position_id if self.fiscal_position_id.foreign_vat else None)
self._close_tax_period_create_activities()
def _close_tax_period_create_activities(self):
mat_to_send_xml_id = 'at_accounting.mail_activity_type_tax_report_to_be_sent'
mat_to_send = self.env.ref(mat_to_send_xml_id, raise_if_not_found=False)
if not mat_to_send:
# As this is introduced in stable, we ensure data exists by creating them on the fly if needed
mat_to_send = self.env['mail.activity.type'].sudo()._load_records([{
'xml_id': mat_to_send_xml_id,
'noupdate': False,
'values': {
'name': 'Tax Report Ready',
'summary': 'Tax report is ready to be sent to the administration',
'category': 'tax_report',
'delay_count': '0',
'delay_unit': 'days',
'delay_from': 'current_date',
'res_model': 'account.move',
'chaining_type': 'suggest',
}
}])
mat_to_pay_xml_id = 'at_accounting.mail_activity_type_tax_report_to_pay'
mat_to_pay = self.env.ref(mat_to_pay_xml_id, raise_if_not_found=False)
act_user = mat_to_send.default_user_id
if act_user and not (self.company_id in act_user.company_ids and act_user.has_group('account.group_account_manager')):
act_user = self.env['res.users']
moves_without_send_activity = self.filtered_domain([
'|',
('activity_ids', '=', False),
('activity_ids', 'not any', [('activity_type_id.id', '=', mat_to_send.id)]),
])
for move in moves_without_send_activity:
period_start, period_end = move.company_id._get_tax_closing_period_boundaries(move.date, move.tax_closing_report_id)
period_desc = move.company_id._get_tax_closing_move_description(move.company_id._get_tax_periodicity(move.tax_closing_report_id), period_start, period_end, move.fiscal_position_id, move.tax_closing_report_id)
move.with_context(mail_activity_quick_update=True).activity_schedule(
act_type_xmlid=mat_to_send_xml_id,
summary=_("Send tax report: %s", period_desc),
date_deadline=fields.Date.context_today(move),
user_id=act_user.id or self.env.user.id,
)
if mat_to_pay and mat_to_pay not in move.activity_ids.activity_type_id and move._get_tax_to_pay_on_closing() > 0:
move.with_context(mail_activity_quick_update=True).activity_schedule(
act_type_xmlid=mat_to_pay_xml_id,
summary=_("Pay tax: %s", period_desc),
date_deadline=fields.Date.context_today(move),
user_id=act_user.id or self.env.user.id,
)
def refresh_tax_entry(self):
for move in self.filtered(lambda m: m.tax_closing_report_id and m.state == 'draft'):
report = move.tax_closing_report_id
options = move._get_tax_closing_report_options(move.company_id, move.fiscal_position_id, report, move.date)
self.env[report.custom_handler_model_name or 'account.generic.tax.report.handler']._generate_tax_closing_entries(report, options, closing_moves=move)
@api.model
def _get_tax_closing_report_options(self, company, fiscal_position, report, date_inside_period):
_dummy, date_to = company._get_tax_closing_period_boundaries(date_inside_period, report)
# In case the company submits its report in different regions, a closing entry
# is made for each fiscal position defining a foreign VAT.
# We hence need to make sure to select a tax report in the right country when opening
# the report (in case there are many, we pick the first one available; it doesn't impact the closing)
if fiscal_position and fiscal_position.foreign_vat:
fpos_option = fiscal_position.id
report_country = fiscal_position.country_id
else:
fpos_option = 'domestic'
report_country = company.account_fiscal_country_id
options = {
'date': {
'date_to': fields.Date.to_string(date_to),
'filter': 'custom_tax_period',
'mode': 'range',
},
'selected_variant_id': report.id,
'sections_source_id': report.id,
'fiscal_position': fpos_option,
'tax_unit': 'company_only',
}
if report.filter_multi_company == 'tax_units':
# Enforce multicompany if the closing is done for a tax unit
candidate_tax_unit = company.account_tax_unit_ids.filtered(lambda x: x.country_id == report_country)
if candidate_tax_unit:
options['tax_unit'] = candidate_tax_unit.id
company_ids = candidate_tax_unit.company_ids.ids
else:
same_vat_branches = self.env.company._get_branches_with_same_vat()
# Consider the one with the least number of parents (highest in hierarchy) as the active company, coming first
company_ids = same_vat_branches.sorted(lambda x: len(x.parent_ids)).ids
else:
company_ids = self.env.company.ids
return report.with_context(allowed_company_ids=company_ids).get_options(previous_options=options)
def _get_vat_report_attachments(self, report, options):
# Fetch pdf
pdf_data = report.export_to_pdf(options)
return [(pdf_data['file_name'], pdf_data['file_content'])]
def _compute_tax_closing_alert(self):
for move in self:
move.tax_closing_alert = (
move.state == 'posted'
and move.tax_closing_report_id
and move.company_id.tax_lock_date
and move.company_id.tax_lock_date < move.date
)
asset_id = fields.Many2one('account.asset', string='Asset', index=True, ondelete='cascade', copy=False,
domain="[('company_id', '=', company_id)]")
asset_remaining_value = fields.Monetary(string='Depreciable Value',
compute='_compute_depreciation_cumulative_value')
asset_depreciated_value = fields.Monetary(string='Cumulative Depreciation',
compute='_compute_depreciation_cumulative_value')
# true when this move is the result of the changing of value of an asset
asset_value_change = fields.Boolean()
# how many days of depreciation this entry corresponds to
asset_number_days = fields.Integer(string="Number of days", copy=False) # deprecated
asset_depreciation_beginning_date = fields.Date(string="Date of the beginning of the depreciation",
copy=False) # technical field stating when the depreciation associated with this entry has begun
depreciation_value = fields.Monetary(
string="Depreciation",
compute="_compute_depreciation_value", inverse="_inverse_depreciation_value", store=True,
)
asset_ids = fields.One2many('account.asset', string='Assets', compute="_compute_asset_ids")
asset_id_display_name = fields.Char(
compute="_compute_asset_ids") # just a button label. That's to avoid a plethora of different buttons defined in xml
count_asset = fields.Integer(compute="_compute_asset_ids")
draft_asset_exists = fields.Boolean(compute="_compute_asset_ids")
asset_move_type = fields.Selection(
selection=[
('depreciation', 'Depreciation'),
('sale', 'Sale'),
('purchase', 'Purchase'),
('disposal', 'Disposal'),
('negative_revaluation', 'Negative revaluation'),
('positive_revaluation', 'Positive revaluation'),
],
string='Asset Move Type',
compute='_compute_asset_move_type', store=True,
copy=False,
)
# -------------------------------------------------------------------------
# COMPUTE METHODS
# -------------------------------------------------------------------------
@api.depends('asset_id', 'depreciation_value', 'asset_id.total_depreciable_value',
'asset_id.already_depreciated_amount_import', 'state')
def _compute_depreciation_cumulative_value(self):
self.asset_depreciated_value = 0
self.asset_remaining_value = 0
# make sure to protect all the records being assigned, because the
# assignments invoke method write() on non-protected records, which may
# cause an infinite recursion in case method write() needs to read one
# of these fields (like in case of a base automation)
fields = [self._fields['asset_remaining_value'], self._fields['asset_depreciated_value']]
with self.env.protecting(fields, self.asset_id.depreciation_move_ids):
for asset in self.asset_id:
depreciated = 0
remaining = asset.total_depreciable_value - asset.already_depreciated_amount_import
for move in asset.depreciation_move_ids.sorted(lambda mv: (mv.date, mv._origin.id)):
if move.state != 'cancel':
remaining -= move.depreciation_value
depreciated += move.depreciation_value
move.asset_remaining_value = remaining
move.asset_depreciated_value = depreciated
@api.depends('line_ids.balance')
def _compute_depreciation_value(self):
for move in self:
asset = move.asset_id or move.reversed_entry_id.asset_id # reversed moves are created before being assigned to the asset
if asset:
account_internal_group = 'expense'
asset_depreciation = sum(
move.line_ids.filtered(lambda
l: l.account_id.internal_group == account_internal_group or l.account_id == asset.account_depreciation_expense_id).mapped(
'balance')
)
# Special case of closing entry - only disposed assets of type 'purchase' should match this condition
# The condition on len(move.line_ids) is to avoid the case where there is only one depreciation move, and it is not a disposal move
# The condition will be matched because a disposal move from a disposal move will always have more than 2 lines, unlike a normal depreciation move
if any(
line.account_id == asset.account_asset_id
and float_compare(-line.balance, asset.original_value,
precision_rounding=asset.currency_id.rounding) == 0
for line in move.line_ids
) and len(move.line_ids) > 2:
asset_depreciation = (
asset.original_value
- asset.salvage_value
- (
move.line_ids[1].debit if asset.original_value > 0 else move.line_ids[1].credit
) * (-1 if asset.original_value < 0 else 1)
)
else:
asset_depreciation = 0
move.depreciation_value = asset_depreciation
@api.depends('asset_id', 'asset_ids')
def _compute_asset_move_type(self):
for move in self:
if move.asset_ids:
move.asset_move_type = 'positive_revaluation' if move.asset_ids.parent_id else 'purchase'
elif not move.asset_move_type or not move.asset_id:
move.asset_move_type = False
# -------------------------------------------------------------------------
# INVERSE METHODS
# -------------------------------------------------------------------------
def _inverse_depreciation_value(self):
for move in self:
asset = move.asset_id
amount = abs(move.depreciation_value)
account = asset.account_depreciation_expense_id
move.write({'line_ids': [
Command.update(line.id, {
'balance': amount if line.account_id == account else -amount,
})
for line in move.line_ids
]})
# -------------------------------------------------------------------------
# CONSTRAINT METHODS
# -------------------------------------------------------------------------
@api.constrains('state', 'asset_id')
def _constrains_check_asset_state(self):
for move in self.filtered(lambda mv: mv.asset_id):
asset_id = move.asset_id
if asset_id.state == 'draft' and move.state == 'posted':
raise ValidationError(
_("You can't post an entry related to a draft asset. Please post the asset before."))
def _post(self, soft=True):
# OVERRIDE
posted = super()._post(soft)
# log the post of a depreciation
posted._log_depreciation_asset()
# look for any asset to create, in case we just posted a bill on an account
# configured to automatically create assets
posted.sudo()._auto_create_asset()
return posted
def _reverse_moves(self, default_values_list=None, cancel=False):
if default_values_list is None:
default_values_list = [{} for _i in self]
for move, default_values in zip(self, default_values_list):
# Report the value of this move to the next draft move or create a new one
if move.asset_id:
# Recompute the status of the asset for all depreciations posted after the reversed entry
first_draft = min(move.asset_id.depreciation_move_ids.filtered(lambda m: m.state == 'draft'),
key=lambda m: m.date, default=None)
if first_draft:
# If there is a draft, simply move/add the depreciation amount here
first_draft.depreciation_value += move.depreciation_value
elif move.asset_id.state != 'close':
# If there was no draft move left, create one.
# Unless the asset is being closed, then the closing move
# takes care of balancing the asset.
last_date = max(move.asset_id.depreciation_move_ids.mapped('date'))
method_period = move.asset_id.method_period
self.create(self._prepare_move_for_asset_depreciation({
'asset_id': move.asset_id,
'amount': move.depreciation_value,
'depreciation_beginning_date': last_date + (
relativedelta(months=1) if method_period == "1" else relativedelta(years=1)),
'date': last_date + (
relativedelta(months=1) if method_period == "1" else relativedelta(years=1)),
'asset_number_days': 0
}))
msg = _('Depreciation entry %(name)s reversed (%(value)s)', name=move.name,
value=formatLang(self.env, move.depreciation_value, currency_obj=move.company_id.currency_id))
move.asset_id.message_post(body=msg)
default_values['asset_id'] = move.asset_id.id
default_values['asset_number_days'] = -move.asset_number_days
default_values['asset_depreciation_beginning_date'] = default_values.get('date', move.date)
return super(AccountMove, self)._reverse_moves(default_values_list, cancel)
def button_cancel(self):
# OVERRIDE
res = super(AccountMove, self).button_cancel()
self.env['account.asset'].sudo().search([('original_move_line_ids.move_id', 'in', self.ids)]).write(
{'active': False})
return res
def button_draft(self):
for move in self:
if any(asset_id.state != 'draft' for asset_id in move.asset_ids):
raise UserError(_('You cannot reset to draft an entry related to a posted asset'))
# Remove any draft asset that could be linked to the account move being reset to draft
move.asset_ids.filtered(lambda x: x.state == 'draft').unlink()
return super(AccountMove, self).button_draft()
def _log_depreciation_asset(self):
for move in self.filtered(lambda m: m.asset_id):
asset = move.asset_id
msg = _('Depreciation entry %(name)s posted (%(value)s)', name=move.name,
value=formatLang(self.env, move.depreciation_value, currency_obj=move.company_id.currency_id))
asset.message_post(body=msg)
def _auto_create_asset(self):
create_list = []
invoice_list = []
auto_validate = []
for move in self:
if not move.is_invoice():
continue
for move_line in move.line_ids:
if (
move_line.account_id
and (move_line.account_id.can_create_asset)
and move_line.account_id.create_asset != "no"
and not (move_line.currency_id or move.currency_id).is_zero(move_line.price_total)
and not move_line.asset_ids
and not move_line.tax_line_id
and move_line.price_total > 0
and not (move.move_type in ('out_invoice',
'out_refund') and move_line.account_id.internal_group == 'asset')
):
if not move_line.name:
if move_line.product_id:
move_line.name = move_line.product_id.display_name
else:
raise UserError(
_('Journal Items of %(account)s should have a label in order to generate an asset',
account=move_line.account_id.display_name))
if move_line.account_id.multiple_assets_per_line:
# decimal quantities are not supported, quantities are rounded to the lower int
units_quantity = max(1, int(move_line.quantity))
else:
units_quantity = 1
model_ids = move_line.account_id.asset_model_ids
vals = {
'name': move_line.name,
'company_id': move_line.company_id.id,
'currency_id': move_line.company_currency_id.id,
'analytic_distribution': move_line.analytic_distribution,
'original_move_line_ids': [(6, False, move_line.ids)],
'state': 'draft',
'acquisition_date': move.invoice_date if not move.reversed_entry_id else move.reversed_entry_id.invoice_date,
}
for model_id in model_ids or [None]:
if model_id:
vals['model_id'] = model_id.id
auto_validate.extend([move_line.account_id.create_asset == 'validate'] * units_quantity)
invoice_list.extend([move] * units_quantity)
for i in range(1, units_quantity + 1):
if units_quantity > 1:
vals['name'] = _("%(move_line)s (%(current)s of %(total)s)", move_line=move_line.name,
current=i, total=units_quantity)
create_list.extend([vals.copy()])
assets = self.env['account.asset'].with_context({}).create(create_list)
for asset, vals, invoice, validate in zip(assets, create_list, invoice_list, auto_validate):
if 'model_id' in vals:
asset._onchange_model_id()
if validate:
asset.validate()
if invoice:
asset.message_post(body=_('Asset created from invoice: %s', invoice._get_html_link()))
asset._post_non_deductible_tax_value()
return assets
@api.model
def _prepare_move_for_asset_depreciation(self, vals):
missing_fields = {'asset_id', 'amount', 'depreciation_beginning_date', 'date', 'asset_number_days'} - set(vals)
if missing_fields:
raise UserError(_('Some fields are missing %s', ', '.join(missing_fields)))
asset = vals['asset_id']
analytic_distribution = asset.analytic_distribution
depreciation_date = vals.get('date', fields.Date.context_today(self))
company_currency = asset.company_id.currency_id
current_currency = asset.currency_id
prec = company_currency.decimal_places
amount_currency = vals['amount']
amount = current_currency._convert(amount_currency, company_currency, asset.company_id, depreciation_date)
# Keep the partner on the original invoice if there is only one
partner = asset.original_move_line_ids.mapped('partner_id')
partner = partner[:1] if len(partner) <= 1 else self.env['res.partner']
name = _("%s: Depreciation", asset.name)
move_line_1 = {
'name': name,
'partner_id': partner.id,
'account_id': asset.account_depreciation_id.id,
'debit': 0.0 if float_compare(amount, 0.0, precision_digits=prec) > 0 else -amount,
'credit': amount if float_compare(amount, 0.0, precision_digits=prec) > 0 else 0.0,
'currency_id': current_currency.id,
'amount_currency': -amount_currency,
}
move_line_2 = {
'name': name,
'partner_id': partner.id,
'account_id': asset.account_depreciation_expense_id.id,
'credit': 0.0 if float_compare(amount, 0.0, precision_digits=prec) > 0 else -amount,
'debit': amount if float_compare(amount, 0.0, precision_digits=prec) > 0 else 0.0,
'currency_id': current_currency.id,
'amount_currency': amount_currency,
}
# Only set the 'analytic_distribution' key if there is an analytic distribution on the asset.
# Otherwise, it prevents the computation of the analytic distribution.
if analytic_distribution:
move_line_1['analytic_distribution'] = analytic_distribution
move_line_2['analytic_distribution'] = analytic_distribution
move_vals = {
'partner_id': partner.id,
'date': depreciation_date,
'journal_id': asset.journal_id.id,
'line_ids': [(0, 0, move_line_1), (0, 0, move_line_2)],
'asset_id': asset.id,
'ref': name,
'asset_depreciation_beginning_date': vals['depreciation_beginning_date'],
'asset_number_days': vals['asset_number_days'],
'asset_value_change': vals.get('asset_value_change', False),
'move_type': 'entry',
'currency_id': current_currency.id,
'asset_move_type': vals.get('asset_move_type', 'depreciation'),
'company_id': asset.company_id.id,
}
return move_vals
@api.depends('line_ids.asset_ids')
def _compute_asset_ids(self):
for record in self:
record.asset_ids = record.line_ids.asset_ids
record.count_asset = len(record.asset_ids)
record.asset_id_display_name = _('Asset')
record.draft_asset_exists = bool(record.asset_ids.filtered(lambda x: x.state == "draft"))
def open_asset_view(self):
return self.asset_id.open_asset(['form'])
def action_open_asset_ids(self):
return self.asset_ids.open_asset(['list', 'form'])
class AccountMoveLine(models.Model):
_name = "account.move.line"
_inherit = "account.move.line"
move_attachment_ids = fields.One2many('ir.attachment', compute='_compute_attachment')
# Deferred management fields
deferred_start_date = fields.Date(
string="Start Date",
compute='_compute_deferred_start_date', store=True, readonly=False,
index='btree_not_null',
copy=False,
help="Date at which the deferred expense/revenue starts"
)
deferred_end_date = fields.Date(
string="End Date",
index='btree_not_null',
copy=False,
help="Date at which the deferred expense/revenue ends"
)
has_deferred_moves = fields.Boolean(compute='_compute_has_deferred_moves')
has_abnormal_deferred_dates = fields.Boolean(compute='_compute_has_abnormal_deferred_dates')
def _order_to_sql(self, order, query, alias=None, reverse=False):
sql_order = super()._order_to_sql(order, query, alias, reverse)
preferred_aml_residual_value = self._context.get('preferred_aml_value')
preferred_aml_currency_id = self._context.get('preferred_aml_currency_id')
if preferred_aml_residual_value and preferred_aml_currency_id and order == self._order:
currency = self.env['res.currency'].browse(preferred_aml_currency_id)
# using round since currency.round(55.55) = 55.550000000000004
preferred_aml_residual_value = round(preferred_aml_residual_value, currency.decimal_places)
sql_residual_currency = self._field_to_sql(alias or self._table, 'amount_residual_currency', query)
sql_currency = self._field_to_sql(alias or self._table, 'currency_id', query)
return SQL(
"ROUND(%(residual_currency)s, %(decimal_places)s) = %(value)s "
"AND %(currency)s = %(currency_id)s DESC, %(order)s",
residual_currency=sql_residual_currency,
decimal_places=currency.decimal_places,
value=preferred_aml_residual_value,
currency=sql_currency,
currency_id=currency.id,
order=sql_order,
)
return sql_order
def copy_data(self, default=None):
data_list = super().copy_data(default=default)
for line, values in zip(self, data_list):
if 'move_reverse_cancel' in self._context:
values['deferred_start_date'] = line.deferred_start_date
values['deferred_end_date'] = line.deferred_end_date
return data_list
def write(self, vals):
""" Prevent changing the account of a move line when there are already deferral entries.
"""
if 'account_id' in vals:
for line in self:
if (
line.has_deferred_moves
and line.deferred_start_date
and line.deferred_end_date
and vals['account_id'] != line.account_id.id
):
raise UserError(_(
"You cannot change the account for a deferred line in %(move_name)s if it has already been deferred.",
move_name=line.move_id.display_name
))
return super().write(vals)
# ============================= START - Deferred management ====================================
def _compute_has_deferred_moves(self):
for line in self:
line.has_deferred_moves = line.move_id.deferred_move_ids
@api.depends('deferred_start_date', 'deferred_end_date')
def _compute_has_abnormal_deferred_dates(self):
# In the deferred computations, we always assume that both the start and end date are inclusive
# E.g: 1st January -> 31st December is *exactly* 1 year = 12 months
# However, the user may instead put 1st January -> 1st January of next year which is then
# 12 months + 1/30 month = 12.03 months which may result in odd amounts when deferrals are created
# For this reason, we alert the user if we detect such a case
# Other cases were the number of months is not round should not be handled.
for line in self:
line.has_abnormal_deferred_dates = (
line.deferred_start_date
and line.deferred_end_date
and float_compare(
self.env['account.move']._get_deferred_diff_dates(line.deferred_start_date, line.deferred_end_date + relativedelta(days=1)) % 1, # end date is included
1 / 30,
precision_digits=2
) == 0
)
def _has_deferred_compatible_account(self):
self.ensure_one()
return (
self.move_id.is_purchase_document()
and
self.account_id.account_type in ('expense', 'expense_depreciation', 'expense_direct_cost')
) or (
self.move_id.is_sale_document()
and
self.account_id.account_type in ('income', 'income_other')
)
@api.onchange('deferred_start_date')
def _onchange_deferred_start_date(self):
if not self._has_deferred_compatible_account():
self.deferred_start_date = False
@api.onchange('deferred_end_date')
def _onchange_deferred_end_date(self):
if not self._has_deferred_compatible_account():
self.deferred_end_date = False
@api.depends('deferred_end_date', 'move_id.invoice_date', 'move_id.state')
def _compute_deferred_start_date(self):
for line in self:
if not line.deferred_start_date and line.move_id.invoice_date and line.deferred_end_date:
line.deferred_start_date = line.move_id.invoice_date
@api.constrains('deferred_start_date', 'deferred_end_date', 'account_id')
def _check_deferred_dates(self):
for line in self:
if line.deferred_start_date and not line.deferred_end_date:
raise UserError(_("You cannot create a deferred entry with a start date but no end date."))
elif line.deferred_start_date and line.deferred_end_date and line.deferred_start_date > line.deferred_end_date:
raise UserError(_("You cannot create a deferred entry with a start date later than the end date."))
@api.model
def _get_deferred_ends_of_month(self, start_date, end_date):
"""
:return: a list of dates corresponding to the end of each month between start_date and end_date.
See test_get_ends_of_month for examples.
"""
dates = []
while start_date <= end_date:
start_date = start_date + relativedelta(day=31) # Go to end of month
dates.append(start_date)
start_date = start_date + relativedelta(days=1) # Go to first day of next month
return dates
def _get_deferred_periods(self):
"""
:return: a list of tuples (start_date, end_date) during which the deferred expense/revenue is spread.
If there is only one period containing the move date, it means that we don't need to defer the
expense/revenue since the invoice deferral and its deferred entry will be created on the same day and will
thus cancel each other.
"""
self.ensure_one()
periods = [
(max(self.deferred_start_date, date.replace(day=1)), min(date, self.deferred_end_date), 'current')
for date in self._get_deferred_ends_of_month(self.deferred_start_date, self.deferred_end_date)
]
if not periods or len(periods) == 1 and periods[0][0].replace(day=1) == self.date.replace(day=1):
return []
else:
return periods
@api.model
def _get_deferred_amounts_by_line_values(self, line):
return {
'account_id': line['account_id'],
# line either be a dict with ids (coming from SQL query), or a real account.move.line object
'product_id': line['product_id'] if isinstance(line, dict) else line['product_id'].id,
'product_category_id': line['product_category_id'] if isinstance(line, dict) else line['product_category_id'].id,
'balance': line['balance'],
'move_id': line['move_id'],
}
@api.model
def _get_deferred_lines_values(self, account_id, balance, ref, analytic_distribution, line=None):
return {
'account_id': account_id,
# line either be a dict with ids (coming from SQL query), or a real account.move.line object
'product_id': line['product_id'] if isinstance(line, dict) else line['product_id'].id,
'product_category_id': line['product_category_id'] if isinstance(line, dict) else line['product_category_id'].id,
'balance': balance,
'name': ref,
'analytic_distribution': analytic_distribution,
}
# ============================= END - Deferred management ====================================
def _get_computed_taxes(self):
if self.move_id.deferred_original_move_ids:
# If this line is part of a deferral move, do not (re)calculate its taxes automatically.
# Doing so might unvoluntarily impact the tax report in deferral moves (if a default tax is set on the account).
return self.tax_ids
return super()._get_computed_taxes()
def _compute_attachment(self):
for record in self:
record.move_attachment_ids = self.env['ir.attachment'].search(expression.OR(record._get_attachment_domains()))
def action_reconcile(self):
""" This function is called by the 'Reconcile' button of account.move.line's
list view. It performs reconciliation between the selected lines.
- If the reconciliation can be done directly we do it silently
- Else, if a write-off is required we open the wizard to let the client enter required information
"""
wizard = self.env['account.reconcile.wizard'].with_context(
active_model='account.move.line',
active_ids=self.ids,
).new({})
return wizard._action_open_wizard() if (wizard.is_write_off_required or wizard.force_partials) else wizard.reconcile()
def _get_predict_postgres_dictionary(self):
lang = self._context.get('lang') and self._context.get('lang')[:2]
return {'fr': 'french'}.get(lang, 'english')
@api.model
def _build_predictive_query(self, move_id, additional_domain=None):
move_query = self.env['account.move']._where_calc([
('move_type', '=', move_id.move_type),
('state', '=', 'posted'),
('partner_id', '=', move_id.partner_id.id),
('company_id', '=', move_id.journal_id.company_id.id or self.env.company.id),
])
move_query.order = 'account_move.invoice_date'
move_query.limit = int(self.env["ir.config_parameter"].sudo().get_param(
"account.bill.predict.history.limit",
'100',
))
return self.env['account.move.line']._where_calc([
('move_id', 'in', move_query),
('display_type', '=', 'product'),
] + (additional_domain or []))
@api.model
def _predicted_field(self, name, partner_id, field, query=None, additional_queries=None):
r"""Predict the most likely value based on the previous history.
This method uses postgres tsvector in order to try to deduce a field of
an invoice line based on the text entered into the name (description)
field and the partner linked.
We only limit the search on the previous 100 entries, which according
to our tests bore the best results. However this limit parameter is
configurable by creating a config parameter with the key:
account.bill.predict.history.limit
For information, the tests were executed with a dataset of 40 000 bills
from a live database, We split the dataset in 2, removing the 5000 most
recent entries and we tried to use this method to guess the account of
this validation set based on the previous entries.
The result is roughly 90% of success.
:param field (str): the sql column that has to be predicted.
/!\ it is injected in the query without any checks.
:param query (osv.Query): the query object on account.move.line that is
used to do the ranking, containing the right domain, limit, etc. If
it is omitted, a default query is used.
:param additional_queries (list<str>): can be used in addition to the
default query on account.move.line to fetch data coming from other
tables, to have starting values for instance.
/!\ it is injected in the query without any checks.
"""
if not name or not partner_id:
return False
psql_lang = self._get_predict_postgres_dictionary()
description = name + ' account_move_line' # give more priority to main query than additional queries
parsed_description = re.sub(r"[*&()|!':<>=%/~@,.;$\[\]]+", " ", description)
parsed_description = ' | '.join(parsed_description.split())
try:
main_source = (query if query is not None else self._build_predictive_query(self.move_id)).select(
SQL("%s AS prediction", field),
SQL(
"setweight(to_tsvector(%s, account_move_line.name), 'B') || setweight(to_tsvector('simple', 'account_move_line'), 'A') AS document",
psql_lang
),
)
if "(" in field.code: # aggregate function
main_source = SQL("%s %s", main_source, SQL("GROUP BY account_move_line.id, account_move_line.name, account_move_line.partner_id"))
self.env.cr.execute(SQL("""
WITH account_move_line AS MATERIALIZED (%(account_move_line)s),
source AS (%(source)s),
ranking AS (
SELECT prediction, ts_rank(source.document, query_plain) AS rank
FROM source, to_tsquery(%(lang)s, %(description)s) query_plain
WHERE source.document @@ query_plain
)
SELECT prediction, MAX(rank) AS ranking, COUNT(*)
FROM ranking
GROUP BY prediction
ORDER BY ranking DESC, count DESC
LIMIT 2
""",
account_move_line=self._build_predictive_query(self.move_id).select(SQL('*')),
source=SQL('(%s)', SQL(') UNION ALL (').join([main_source] + (additional_queries or []))),
lang=psql_lang,
description=parsed_description,
))
result = self.env.cr.dictfetchall()
if result:
# Only confirm the prediction if it's at least 10% better than the second one
if len(result) > 1 and result[0]['ranking'] < 1.1 * result[1]['ranking']:
return False
return result[0]['prediction']
except Exception:
# In case there is an error while parsing the to_tsquery (wrong character for example)
# We don't want to have a blocking traceback, instead return False
_logger.exception('Error while predicting invoice line fields')
return False
def _predict_taxes(self):
field = SQL('array_agg(account_move_line__tax_rel__tax_ids.id ORDER BY account_move_line__tax_rel__tax_ids.id)')
query = self._build_predictive_query(self.move_id)
query.left_join('account_move_line', 'id', 'account_move_line_account_tax_rel', 'account_move_line_id', 'tax_rel')
query.left_join('account_move_line__tax_rel', 'account_tax_id', 'account_tax', 'id', 'tax_ids')
query.add_where('account_move_line__tax_rel__tax_ids.active IS NOT FALSE')
predicted_tax_ids = self._predicted_field(self.name, self.partner_id, field, query)
if predicted_tax_ids == [None]:
return False
if predicted_tax_ids is not False and set(predicted_tax_ids) != set(self.tax_ids.ids):
return predicted_tax_ids
return False
@api.model
def _predict_specific_tax(self, move, name, partner, amount_type, amount, type_tax_use):
field = SQL('array_agg(account_move_line__tax_rel__tax_ids.id ORDER BY account_move_line__tax_rel__tax_ids.id)')
query = self._build_predictive_query(move)
query.left_join('account_move_line', 'id', 'account_move_line_account_tax_rel', 'account_move_line_id', 'tax_rel')
query.left_join('account_move_line__tax_rel', 'account_tax_id', 'account_tax', 'id', 'tax_ids')
query.add_where("""
account_move_line__tax_rel__tax_ids.active IS NOT FALSE
AND account_move_line__tax_rel__tax_ids.amount_type = %s
AND account_move_line__tax_rel__tax_ids.type_tax_use = %s
AND account_move_line__tax_rel__tax_ids.amount = %s
""", (amount_type, type_tax_use, amount))
return self._predicted_field(name, partner, field, query)
def _predict_product(self):
predict_product = int(self.env['ir.config_parameter'].sudo().get_param('account_predictive_bills.predict_product', '1'))
if predict_product and self.company_id.predict_bill_product:
query = self._build_predictive_query(self.move_id, ['|', ('product_id', '=', False), ('product_id.active', '=', True)])
predicted_product_id = self._predicted_field(self.name, self.partner_id, SQL('account_move_line.product_id'), query)
if predicted_product_id and predicted_product_id != self.product_id.id:
return predicted_product_id
return False
def _predict_account(self):
field = SQL('account_move_line.account_id')
if self.move_id.is_purchase_document(True):
excluded_group = 'income'
else:
excluded_group = 'expense'
account_query = self.env['account.account']._where_calc([
*self.env['account.account']._check_company_domain(self.move_id.company_id or self.env.company),
('deprecated', '=', False),
('internal_group', 'not in', (excluded_group, 'off')),
('account_type', 'not in', ('liability_payable', 'asset_receivable')),
])
account_name = self.env['account.account']._field_to_sql('account_account', 'name')
psql_lang = self._get_predict_postgres_dictionary()
additional_queries = [SQL(account_query.select(
SQL("account_account.id AS account_id"),
SQL("setweight(to_tsvector(%(psql_lang)s, %(account_name)s), 'B') AS document", psql_lang=psql_lang, account_name=account_name),
))]
query = self._build_predictive_query(self.move_id, [('account_id', 'in', account_query)])
predicted_account_id = self._predicted_field(self.name, self.partner_id, field, query, additional_queries)
if predicted_account_id and predicted_account_id != self.account_id.id:
return predicted_account_id
return False
@api.onchange('name')
def _onchange_name_predictive(self):
if ((self.move_id.quick_edit_mode or self.move_id.move_type == 'in_invoice') and self.name and self.display_type == 'product'
and not self.env.context.get('disable_onchange_name_predictive', False)):
if not self.product_id:
predicted_product_id = self._predict_product()
if predicted_product_id:
# We only update the price_unit, tax_ids and name in case they evaluate to False
protected_fields = ['price_unit', 'tax_ids', 'name']
to_protect = [self._fields[fname] for fname in protected_fields if self[fname]]
with self.env.protecting(to_protect, self):
self.product_id = predicted_product_id
# In case no product has been set, the account and taxes
# will not depend on any product and can thus be predicted
if not self.product_id:
# Predict account.
predicted_account_id = self._predict_account()
if predicted_account_id:
self.account_id = predicted_account_id
# Predict taxes
predicted_tax_ids = self._predict_taxes()
if predicted_tax_ids:
self.tax_ids = [Command.set(predicted_tax_ids)]
def _read_group_select(self, aggregate_spec, query):
# Enable to use HAVING clause that sum rounded values depending on the
# currency precision settings. Limitation: we only handle a having
# clause of one element with that specific method :sum_rounded.
fname, __, func = models.parse_read_group_spec(aggregate_spec)
if func != 'sum_rounded':
return super()._read_group_select(aggregate_spec, query)
currency_alias = query.make_alias(self._table, 'currency_id')
query.add_join('LEFT JOIN', currency_alias, 'res_currency', SQL(
"%s = %s",
self._field_to_sql(self._table, 'currency_id', query),
SQL.identifier(currency_alias, 'id'),
))
return SQL(
'SUM(ROUND(%s, %s))',
self._field_to_sql(self._table, fname, query),
self.env['res.currency']._field_to_sql(currency_alias, 'decimal_places', query),
)
def _read_group_groupby(self, groupby_spec, query):
# enable grouping by :abs_rounded on fields, which is useful when trying
# to match positive and negative amounts
if ':' in groupby_spec:
fname, method = groupby_spec.split(':')
if method == 'abs_rounded':
# rounds with the used currency settings
currency_alias = query.make_alias(self._table, 'currency_id')
query.add_join('LEFT JOIN', currency_alias, 'res_currency', SQL(
"%s = %s",
self._field_to_sql(self._table, 'currency_id', query),
SQL.identifier(currency_alias, 'id'),
))
return SQL(
'ROUND(ABS(%s), %s)',
self._field_to_sql(self._table, fname, query),
self.env['res.currency']._field_to_sql(currency_alias, 'decimal_places', query),
)
return super()._read_group_groupby(groupby_spec, query)
asset_ids = fields.Many2many('account.asset', 'asset_move_line_rel', 'line_id', 'asset_id', string='Related Assets',
copy=False)
non_deductible_tax_value = fields.Monetary(compute='_compute_non_deductible_tax_value',
currency_field='company_currency_id')
def _get_computed_taxes(self):
if self.move_id.asset_id:
return self.tax_ids
return super()._get_computed_taxes()
def turn_as_asset(self):
if len(self.company_id) != 1:
raise UserError(_("All the lines should be from the same company"))
if any(line.move_id.state == 'draft' for line in self):
raise UserError(_("All the lines should be posted"))
if any(account != self[0].account_id for account in self.mapped('account_id')):
raise UserError(_("All the lines should be from the same account"))
ctx = self.env.context.copy()
ctx.update({
'default_original_move_line_ids': [(6, False, self.env.context['active_ids'])],
'default_company_id': self.company_id.id,
})
return {
"name": _("Turn as an asset"),
"type": "ir.actions.act_window",
"res_model": "account.asset",
"views": [[False, "form"]],
"target": "current",
"context": ctx,
}
@api.depends('tax_ids.invoice_repartition_line_ids')
def _compute_non_deductible_tax_value(self):
""" Handle the specific case of non deductible taxes,
such as "50% Non Déductible - Frais de voiture (Prix Excl.)" in Belgium.
"""
non_deductible_tax_ids = self.tax_ids.invoice_repartition_line_ids.filtered(
lambda line: line.repartition_type == 'tax' and not line.use_in_tax_closing
).tax_id
res = {}
if non_deductible_tax_ids:
domain = [('move_id', 'in', self.move_id.ids)]
tax_details_query = self._get_query_tax_details_from_domain(domain)
self.flush_model()
self._cr.execute(SQL(
'''
SELECT
tdq.base_line_id,
SUM(tdq.tax_amount_currency)
FROM (%(tax_details_query)s) AS tdq
JOIN account_move_line aml ON aml.id = tdq.tax_line_id
JOIN account_tax_repartition_line trl ON trl.id = tdq.tax_repartition_line_id
WHERE tdq.base_line_id IN %(base_line_ids)s
AND trl.use_in_tax_closing IS FALSE
GROUP BY tdq.base_line_id
''',
tax_details_query=tax_details_query,
base_line_ids=tuple(self.ids),
))
res = {row['base_line_id']: row['sum'] for row in self._cr.dictfetchall()}
for record in self:
record.non_deductible_tax_value = res.get(record._origin.id, 0.0)