Files
odoo-addons/addons/at_accounting/models/account_generic_tax_report.py

1222 lines
62 KiB
Python

# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import ast
from collections import defaultdict
from odoo import models, api, fields, Command, _
from odoo.addons.web.controllers.utils import clean_action
from odoo.exceptions import UserError, RedirectWarning
from odoo.osv import expression
from odoo.tools import SQL
class AccountTaxReportHandler(models.AbstractModel):
_name = 'account.tax.report.handler'
_inherit = 'account.report.custom.handler'
_description = 'Account Report Handler for Tax Reports'
# This model is needed for the Closing Entry button to be available for all reports, including the generic one
# With this, custom tax reports don't need to inherit from the generic tax report
def _custom_options_initializer(self, report, options, previous_options):
optional_periods = {
'monthly': 'month',
'trimester': 'quarter',
'year': 'year',
}
options['buttons'].append({'name': _('Closing Entry'), 'action': 'action_periodic_vat_entries', 'sequence': 110, 'always_show': True})
self._enable_export_buttons_for_common_vat_groups_in_branches(options)
day, month = self.env.company._get_tax_closing_start_date_attributes(report)
periodicity = self.env.company._get_tax_periodicity(report)
options['tax_periodicity'] = {
'periodicity': periodicity,
'months_per_period': self.env.company._get_tax_periodicity_months_delay(report),
'start_day': day,
'start_month': month,
}
options['show_tax_period_filter'] = periodicity not in optional_periods or day != 1 or month != 1
if not options['show_tax_period_filter']:
period_type = optional_periods[periodicity]
options['date']['filter'] = options['date']['filter'].replace('tax_period', period_type)
options['date']['period_type'] = options['date']['period_type'].replace('tax_period', period_type)
def _get_custom_display_config(self):
display_config = defaultdict(dict)
display_config['templates']['AccountReportFilters'] = 'at_accounting.GenericTaxReportFiltersCustomizable'
return display_config
def _customize_warnings(self, report, options, all_column_groups_expression_totals, warnings):
if 'at_accounting.common_warning_draft_in_period' in warnings:
# Recompute the warning 'common_warning_draft_in_period' to not include tax closing entries in the banner of unposted moves
if not self.env['account.move'].search_count(
[('state', '=', 'draft'), ('date', '<=', options['date']['date_to']),
('tax_closing_report_id', '=', False)],
limit=1,
):
warnings.pop('at_accounting.common_warning_draft_in_period')
# Chek the use of inactive tags in the period
query = report._get_report_query(options, 'strict_range')
rows = self.env.execute_query(SQL("""
SELECT 1
FROM %s
JOIN account_account_tag_account_move_line_rel aml_tag
ON account_move_line.id = aml_tag.account_move_line_id
JOIN account_account_tag tag
ON aml_tag.account_account_tag_id = tag.id
WHERE %s
AND NOT tag.active
LIMIT 1
""", query.from_clause, query.where_clause))
if rows:
warnings['at_accounting.tax_report_warning_inactive_tags'] = {}
# -------------------------------------------------------------------------
# TAX CLOSING
# -------------------------------------------------------------------------
def _is_period_equal_to_options(self, report, options):
options_date_to = fields.Date.from_string(options['date']['date_to'])
options_date_from = fields.Date.from_string(options['date']['date_from'])
date_from, date_to = self.env.company._get_tax_closing_period_boundaries(options_date_to, report)
return date_from == options_date_from and date_to == options_date_to
def action_periodic_vat_entries(self, options, from_post=False):
report = self.env['account.report'].browse(options['report_id'])
if (options['date']['period_type'] != 'tax_period' and not self._is_period_equal_to_options(report,
options)) and not self.env.context.get(
'override_tax_closing_warning'):
if len(options['companies']) > 1 and (report.filter_multi_company != 'tax_units' or not (
report.country_id and options['available_tax_units'])):
message = _(
"You're about the generate the closing entries of multiple companies at once. Each of them will be created in accordance with its company tax periodicity.")
else:
message = _(
"The currently selected dates don't match a tax period. The closing entry will be created for the closest-matching period according to your periodicity setup.")
return {
'type': 'ir.actions.client',
'tag': 'at_accounting.redirect_action',
'target': 'new',
'params': {
'depending_action': self.with_context(
{'override_tax_closing_warning': True}).action_periodic_vat_entries(options),
'message': message,
'button_text': _("Proceed"),
},
'context': {
'dialog_size': 'medium',
'override_tax_closing_warning': True,
},
}
moves = self._get_periodic_vat_entries(options, from_post=from_post)
# Make the action for the retrieved move and return it.
action = self.env["ir.actions.actions"]._for_xml_id("account.action_move_journal_line")
action = clean_action(action, env=self.env)
action.pop('domain', None)
if len(moves) == 1:
action['views'] = [(self.env.ref('account.view_move_form').id, 'form')]
action['res_id'] = moves.id
else:
action['domain'] = [('id', 'in', moves.ids)]
action['context'] = dict(ast.literal_eval(action['context']))
action['context'].pop('search_default_posted', None)
return action
def _get_periodic_vat_entries(self, options, from_post=False):
report = self.env['account.report'].browse(options['report_id'])
# When integer_rounding is available, we always want it for tax closing (as it means it's a legal requirement)
if options.get('integer_rounding'):
options['integer_rounding_enabled'] = True
# Return action to open form view of newly created entry
moves = self.env['account.move']
# Get all companies impacting the report.
companies = self.env['res.company'].browse(report.get_report_company_ids(options))
companies_moves = self._get_tax_closing_entries_for_closed_period(report, options, companies, posted_only=False)
moves += companies_moves
moves += self._generate_tax_closing_entries(report, options, companies=companies - companies_moves.company_id, from_post=from_post)
return moves
def _generate_tax_closing_entries(self, report, options, closing_moves=None, companies=None, from_post=False):
"""Generates and/or updates VAT closing entries.
This method computes the content of the tax closing in the following way:
- Search on all tax lines in the given period, group them by tax_group (each tax group might have its own
tax receivable/payable account).
- Create a move line that balances each tax account and add the difference in the correct receivable/payable
account. Also take into account amounts already paid via advance tax payment account.
The tax closing is done so that an individual move is created per available VAT number: so, one for each
foreign vat fiscal position (each with fiscal_position_id set to this fiscal position), and one for the domestic
position (with fiscal_position_id = None). The moves created by this function hence depends on the content of the
options dictionary, and what fiscal positions are accepted by it.
:param options: the tax report options dict to use to make the closing.
:param closing_moves: If provided, closing moves to update the content from.
They need to be compatible with the provided options (if they have a fiscal_position_id, for example).
:param companies: optional params, the companies given will be used instead of taking all the companies impacting
the report.
:return: The closing moves.
"""
if companies is None:
companies = self.env['res.company'].browse(report.get_report_company_ids(options))
if closing_moves is None:
closing_moves = self.env['account.move']
end_date = fields.Date.from_string(options['date']['date_to'])
closing_moves_by_company = defaultdict(lambda: self.env['account.move'])
companies_without_closing = companies.filtered(lambda company: company not in closing_moves.company_id)
if closing_moves:
for move in closing_moves.filtered(lambda x: x.state == 'draft'):
closing_moves_by_company[move.company_id] |= move
for company in companies_without_closing:
include_domestic, fiscal_positions = self._get_fpos_info_for_tax_closing(company, report, options)
company_closing_moves = company._get_and_update_tax_closing_moves(end_date, report, fiscal_positions=fiscal_positions, include_domestic=include_domestic)
closing_moves_by_company[company] = company_closing_moves
closing_moves += company_closing_moves
for company, company_closing_moves in closing_moves_by_company.items():
# First gather the countries for which the closing is being done
countries = self.env['res.country']
for move in company_closing_moves:
if move.fiscal_position_id.foreign_vat:
countries |= move.fiscal_position_id.country_id
else:
countries |= company.account_fiscal_country_id
# Check the tax groups from the company for any misconfiguration in these countries
if self.env['account.tax.group']._check_misconfigured_tax_groups(company, countries):
self._redirect_to_misconfigured_tax_groups(company, countries)
for move in company_closing_moves:
# When coming from post and that the current move is the closing of the current company we don't want to
# write on it again
if from_post and move == closing_moves_by_company.get(self.env.company):
continue
# get tax entries by tax_group for the period defined in options
move_options = {**options, 'fiscal_position': move.fiscal_position_id.id if move.fiscal_position_id else 'domestic'}
line_ids_vals, tax_group_subtotal = self._compute_vat_closing_entry(company, move_options)
line_ids_vals += self._add_tax_group_closing_items(tax_group_subtotal, move)
if move.line_ids:
line_ids_vals += [Command.delete(aml.id) for aml in move.line_ids]
move_vals = {}
if line_ids_vals:
move_vals['line_ids'] = line_ids_vals
move.write(move_vals)
return closing_moves
def _get_tax_closing_entries_for_closed_period(self, report, options, companies, posted_only=True):
""" Fetch the closing entries related to the given companies for the currently selected tax report period.
Only used when the selected period already has a tax lock date impacting it, and assuming that these periods
all have a tax closing entry.
:param report: The tax report for which we are getting the closing entries.
:param options: the tax report options dict needed to get the period end date and fiscal position info.
:param companies: a recordset of companies for which the period has already been closed.
:return: The closing moves.
"""
closing_moves = self.env['account.move']
for company in companies:
_dummy, period_end = company._get_tax_closing_period_boundaries(fields.Date.from_string(options['date']['date_to']), report)
include_domestic, fiscal_positions = self._get_fpos_info_for_tax_closing(company, report, options)
fiscal_position_ids = fiscal_positions.ids + ([False] if include_domestic else [])
state_domain = ('state', '=', 'posted') if posted_only else ('state', '!=', 'cancel')
closing_moves += self.env['account.move'].search([
('company_id', '=', company.id),
('fiscal_position_id', 'in', fiscal_position_ids),
('date', '=', period_end),
('tax_closing_report_id', '=', options['report_id']),
state_domain,
], limit=1)
return closing_moves
@api.model
def _compute_vat_closing_entry(self, company, options):
"""Compute the VAT closing entry.
This method returns the one2many commands to balance the tax accounts for the selected period, and
a dictionnary that will help balance the different accounts set per tax group.
"""
self = self.with_company(company) # Needed to handle access to property fields correctly
# first, for each tax group, gather the tax entries per tax and account
self.env['account.tax'].flush_model(['name', 'tax_group_id'])
self.env['account.tax.repartition.line'].flush_model(['use_in_tax_closing'])
self.env['account.move.line'].flush_model(['account_id', 'debit', 'credit', 'move_id', 'tax_line_id', 'date', 'company_id', 'display_type', 'parent_state'])
self.env['account.move'].flush_model(['state'])
new_options = {
**options,
'all_entries': False,
'date': dict(options['date']),
}
report = self.env['account.report'].browse(options['report_id'])
period_start, period_end = company._get_tax_closing_period_boundaries(fields.Date.from_string(options['date']['date_to']), report)
new_options['date']['date_from'] = fields.Date.to_string(period_start)
new_options['date']['date_to'] = fields.Date.to_string(period_end)
new_options['date']['period_type'] = 'custom'
new_options['date']['filter'] = 'custom'
new_options = report.with_context(allowed_company_ids=company.ids).get_options(previous_options=new_options)
# Force the use of the fiscal position from the original options (_get_options sets the fiscal
# position to 'all' when the report is the generic tax report)
new_options['fiscal_position'] = options['fiscal_position']
query = self.env.ref('account.generic_tax_report')._get_report_query(
new_options,
'strict_range',
domain=self._get_vat_closing_entry_additional_domain()
)
# Check whether it is multilingual, in order to get the translation from the JSON value if present
tax_name = self.env['account.tax']._field_to_sql('tax', 'name')
query = SQL(
"""
SELECT "account_move_line".tax_line_id as tax_id,
tax.tax_group_id as tax_group_id,
%(tax_name)s as tax_name,
"account_move_line".account_id,
COALESCE(SUM("account_move_line".balance), 0) as amount
FROM account_tax tax, account_tax_repartition_line repartition, %(table_references)s
WHERE %(search_condition)s
AND tax.id = "account_move_line".tax_line_id
AND repartition.id = "account_move_line".tax_repartition_line_id
AND repartition.use_in_tax_closing
GROUP BY tax.tax_group_id, "account_move_line".tax_line_id, tax.name, "account_move_line".account_id
""",
tax_name=tax_name,
table_references=query.from_clause,
search_condition=query.where_clause,
)
self.env.cr.execute(query)
results = self.env.cr.dictfetchall()
results = self._postprocess_vat_closing_entry_results(company, new_options, results)
tax_group_ids = [r['tax_group_id'] for r in results]
tax_groups = {}
for tg, result in zip(self.env['account.tax.group'].browse(tax_group_ids), results):
if tg not in tax_groups:
tax_groups[tg] = {}
if result.get('tax_id') not in tax_groups[tg]:
tax_groups[tg][result.get('tax_id')] = []
tax_groups[tg][result.get('tax_id')].append((result.get('tax_name'), result.get('account_id'), result.get('amount')))
# then loop on previous results to
# * add the lines that will balance their sum per account
# * make the total per tax group's account triplet
# (if 2 tax groups share the same 3 accounts, they should consolidate in the vat closing entry)
move_vals_lines = []
tax_group_subtotal = {}
currency = self.env.company.currency_id
for tg, values in tax_groups.items():
total = 0
# ignore line that have no property defined on tax group
if not tg.tax_receivable_account_id or not tg.tax_payable_account_id:
continue
for dummy, value in values.items():
for v in value:
tax_name, account_id, amt = v
# Line to balance
move_vals_lines.append((0, 0, {'name': tax_name, 'debit': abs(amt) if amt < 0 else 0, 'credit': amt if amt > 0 else 0, 'account_id': account_id}))
total += amt
if not currency.is_zero(total):
# Add total to correct group
key = (tg.advance_tax_payment_account_id.id or False, tg.tax_receivable_account_id.id, tg.tax_payable_account_id.id)
if tax_group_subtotal.get(key):
tax_group_subtotal[key] += total
else:
tax_group_subtotal[key] = total
# If the tax report is completely empty, we add two 0-valued lines, using the first in in and out
# account id we find on the taxes.
if len(move_vals_lines) == 0:
rep_ln_in = self.env['account.tax.repartition.line'].search([
*self.env['account.tax.repartition.line']._check_company_domain(company),
('account_id.deprecated', '=', False),
('repartition_type', '=', 'tax'),
('document_type', '=', 'invoice'),
('tax_id.type_tax_use', '=', 'purchase')
], limit=1)
rep_ln_out = self.env['account.tax.repartition.line'].search([
*self.env['account.tax.repartition.line']._check_company_domain(company),
('account_id.deprecated', '=', False),
('repartition_type', '=', 'tax'),
('document_type', '=', 'invoice'),
('tax_id.type_tax_use', '=', 'sale')
], limit=1)
if rep_ln_out.account_id and rep_ln_in.account_id:
move_vals_lines = [
Command.create({
'name': _('Tax Received Adjustment'),
'debit': 0,
'credit': 0.0,
'account_id': rep_ln_out.account_id.id
}),
Command.create({
'name': _('Tax Paid Adjustment'),
'debit': 0.0,
'credit': 0,
'account_id': rep_ln_in.account_id.id
})
]
return move_vals_lines, tax_group_subtotal
def _get_vat_closing_entry_additional_domain(self):
return []
def _postprocess_vat_closing_entry_results(self, company, options, results):
# Override this to, for example, apply a rounding to the lines of the closing entry
return results
def _vat_closing_entry_results_rounding(self, company, options, results, rounding_accounts, vat_results_summary):
"""
Apply the rounding from the tax report by adding a line to the end of the query results
representing the sum of the roundings on each line of the tax report.
"""
# Ignore if the rounding accounts cannot be found
if not rounding_accounts.get('profit') or not rounding_accounts.get('loss'):
return results
total_amount = 0.0
tax_group_id = None
for line in results:
total_amount += line['amount']
# The accounts on the tax group ids from the results should be uniform,
# but we choose the greatest id so that the line appears last on the entry.
tax_group_id = line['tax_group_id']
report = self.env['account.report'].browse(options['report_id'])
for line in report._get_lines(options):
model, record_id = report._get_model_info_from_id(line['id'])
if model != 'account.report.line':
continue
for (operation_type, report_line_id, column_expression_label) in vat_results_summary:
for column in line['columns']:
if record_id != report_line_id or column['expression_label'] != column_expression_label:
continue
# We accept 3 types of operations:
# 1) due and 2) deductible - This is used for reports that have lines for the payable vat and
# lines for the reclaimable vat.
# 3) total - This is used for reports that have a single line with the payable/reclaimable vat.
if operation_type in {'due', 'total'}:
total_amount += column['no_format']
elif operation_type == 'deductible':
total_amount -= column['no_format']
currency = company.currency_id
total_difference = currency.round(total_amount)
if not currency.is_zero(total_difference):
results.append({
'tax_name': _('Difference from rounding taxes'),
'amount': total_difference * -1,
'tax_group_id': tax_group_id,
'account_id': rounding_accounts['profit'].id if total_difference < 0 else rounding_accounts['loss'].id
})
return results
@api.model
def _add_tax_group_closing_items(self, tax_group_subtotal, closing_move):
"""Transform the parameter tax_group_subtotal dictionnary into one2many commands.
Used to balance the tax group accounts for the creation of the vat closing entry.
"""
def _add_line(account, name, company_currency):
self.env.cr.execute(sql_account, (
account,
closing_move.date,
closing_move.company_id.id,
))
result = self.env.cr.dictfetchone()
advance_balance = result.get('balance') or 0
# Deduct/Add advance payment
if not company_currency.is_zero(advance_balance):
line_ids_vals.append((0, 0, {
'name': name,
'debit': abs(advance_balance) if advance_balance < 0 else 0,
'credit': abs(advance_balance) if advance_balance > 0 else 0,
'account_id': account
}))
return advance_balance
currency = closing_move.company_id.currency_id
sql_account = '''
SELECT SUM(aml.balance) AS balance
FROM account_move_line aml
LEFT JOIN account_move move ON move.id = aml.move_id
WHERE aml.account_id = %s
AND aml.date <= %s
AND move.state = 'posted'
AND aml.company_id = %s
'''
line_ids_vals = []
# keep track of already balanced account, as one can be used in several tax group
account_already_balanced = []
for key, value in tax_group_subtotal.items():
total = value
# Search if any advance payment done for that configuration
if key[0] and key[0] not in account_already_balanced:
total += _add_line(key[0], _('Balance tax advance payment account'), currency)
account_already_balanced.append(key[0])
if key[1] and key[1] not in account_already_balanced:
total += _add_line(key[1], _('Balance tax current account (receivable)'), currency)
account_already_balanced.append(key[1])
if key[2] and key[2] not in account_already_balanced:
total += _add_line(key[2], _('Balance tax current account (payable)'), currency)
account_already_balanced.append(key[2])
# Balance on the receivable/payable tax account
if not currency.is_zero(total):
line_ids_vals.append(Command.create({
'name': _('Payable tax amount') if total < 0 else _('Receivable tax amount'),
'debit': total if total > 0 else 0,
'credit': abs(total) if total < 0 else 0,
'account_id': key[2] if total < 0 else key[1]
}))
return line_ids_vals
@api.model
def _redirect_to_misconfigured_tax_groups(self, company, countries):
""" Raises a RedirectWarning informing the user his tax groups are missing configuration
for a given company, redirecting him to the list view of account.tax.group, filtered
accordingly to the provided countries.
"""
need_config_action = {
'type': 'ir.actions.act_window',
'name': 'Tax groups',
'res_model': 'account.tax.group',
'view_mode': 'list',
'views': [[False, 'list']],
'domain': ['|', ('country_id', 'in', countries.ids), ('country_id', '=', False)]
}
raise RedirectWarning(
_('Please specify the accounts necessary for the Tax Closing Entry.'),
need_config_action,
_('Configure your TAX accounts - %s', company.display_name),
)
def _get_fpos_info_for_tax_closing(self, company, report, options):
""" Returns the fiscal positions information to use to generate the tax closing
for this company, with the provided options.
:return: (include_domestic, fiscal_positions), where fiscal positions is a recordset
and include_domestic is a boolean telling whether or not the domestic closing
(i.e. the one without any fiscal position) must also be performed
"""
if options['fiscal_position'] == 'domestic':
fiscal_positions = self.env['account.fiscal.position']
elif options['fiscal_position'] == 'all':
fiscal_positions = self.env['account.fiscal.position'].search([
*self.env['account.fiscal.position']._check_company_domain(company),
('foreign_vat', '!=', False),
])
else:
fpos_ids = [options['fiscal_position']]
fiscal_positions = self.env['account.fiscal.position'].browse(fpos_ids)
if options['fiscal_position'] == 'all':
fiscal_country = company.account_fiscal_country_id
include_domestic = not fiscal_positions \
or not report.country_id \
or fiscal_country == fiscal_positions[0].country_id
else:
include_domestic = options['fiscal_position'] == 'domestic'
return include_domestic, fiscal_positions
def _get_amls_with_archived_tags_domain(self, options):
domain = [
('tax_tag_ids.active', '=', False),
('parent_state', '=', 'posted'),
('date', '>=', options['date']['date_from']),
]
if options['date']['mode'] == 'single':
domain.append(('date', '<=', options['date']['date_to']))
return domain
def action_open_amls_with_archived_tags(self, options, params=None):
return {
'name': _("Journal items with archived tax tags"),
'type': 'ir.actions.act_window',
'res_model': 'account.move.line',
'domain': self._get_amls_with_archived_tags_domain(options),
'context': {'active_test': False},
'views': [(self.env.ref('at_accounting.view_archived_tag_move_tree').id, 'list')],
}
class GenericTaxReportCustomHandler(models.AbstractModel):
_name = 'account.generic.tax.report.handler'
_inherit = 'account.tax.report.handler'
_description = 'Generic Tax Report Custom Handler'
def _get_custom_display_config(self):
parent_config = super()._get_custom_display_config()
parent_config['css_custom_class'] = 'generic_tax_report'
parent_config['templates']['AccountReportLineName'] = 'at_accounting.TaxReportLineName'
return parent_config
def _custom_options_initializer(self, report, options, previous_options=None):
super()._custom_options_initializer(report, options, previous_options=previous_options)
# We are on the generic tax report (no country) and the user can not change the fiscal position so we show them all.
if not report.country_id and len(options['available_vat_fiscal_positions']) <= (0 if options['allow_domestic'] else 1) and len(options['companies']) <= 1:
options['allow_domestic'] = False
options['fiscal_position'] = 'all'
def _dynamic_lines_generator(self, report, options, all_column_groups_expression_totals, warnings=None):
return self._get_dynamic_lines(report, options, 'default', warnings)
def _caret_options_initializer(self):
return {
'generic_tax_report': [
{'name': _("Audit"), 'action': 'caret_option_audit_tax'},
]
}
def _get_dynamic_lines(self, report, options, grouping, warnings=None):
""" Compute the report lines for the generic tax report.
:param options: The report options.
:return: A list of lines, each one being a python dictionary.
"""
options_by_column_group = report._split_options_per_column_group(options)
# Compute tax_base_amount / tax_amount for each selected groupby.
if grouping == 'tax_account':
groupby_fields = [('src_tax', 'type_tax_use'), ('src_tax', 'id'), ('account', 'id')]
comodels = [None, 'account.tax', 'account.account']
elif grouping == 'account_tax':
groupby_fields = [('src_tax', 'type_tax_use'), ('account', 'id'), ('src_tax', 'id')]
comodels = [None, 'account.account', 'account.tax']
else:
groupby_fields = [('src_tax', 'type_tax_use'), ('src_tax', 'id')]
comodels = [None, 'account.tax']
if grouping in ('tax_account', 'account_tax'):
tax_amount_hierarchy = self._read_generic_tax_report_amounts(report, options_by_column_group, groupby_fields)
else:
tax_amount_hierarchy = self._read_generic_tax_report_amounts_no_tax_details(report, options, options_by_column_group)
# Fetch involved records in order to ensure all lines are sorted according the comodel order.
# To do so, we compute 'sorting_map_list' allowing to retrieve each record by id and the order
# to be used.
record_ids_gb = [set() for dummy in groupby_fields]
def populate_record_ids_gb_recursively(node, level=0):
for k, v in node.items():
if k:
record_ids_gb[level].add(k)
if v.get('children'):
populate_record_ids_gb_recursively(v['children'], level=level + 1)
populate_record_ids_gb_recursively(tax_amount_hierarchy)
sorting_map_list = []
for i, comodel in enumerate(comodels):
if comodel:
# Relational records.
records = self.env[comodel].with_context(active_test=False).search([('id', 'in', tuple(record_ids_gb[i]))])
sorting_map = {r.id: (r, j) for j, r in enumerate(records)}
sorting_map_list.append(sorting_map)
else:
# src_tax_type_tax_use.
selection = self.env['account.tax']._fields['type_tax_use'].selection
sorting_map_list.append({v[0]: (v, j) for j, v in enumerate(selection) if v[0] in record_ids_gb[i]})
# Compute report lines.
lines = []
self._populate_lines_recursively(
report,
options,
lines,
sorting_map_list,
groupby_fields,
tax_amount_hierarchy,
warnings=warnings,
)
return lines
# -------------------------------------------------------------------------
# GENERIC TAX REPORT COMPUTATION (DYNAMIC LINES)
# -------------------------------------------------------------------------
@api.model
def _read_generic_tax_report_amounts_no_tax_details(self, report, options, options_by_column_group):
# Fetch the group of taxes.
# If all child taxes have a 'none' type_tax_use, all amounts are aggregated and only the group appears on the report.
company_ids = report.get_report_company_ids(options)
company_domain = self.env['account.tax']._check_company_domain(company_ids)
company_where_query = self.env['account.tax'].with_context(active_test=False)._where_calc(company_domain)
self._cr.execute(SQL(
'''
SELECT
account_tax.id,
account_tax.type_tax_use,
ARRAY_AGG(child_tax.id) AS child_tax_ids,
ARRAY_AGG(DISTINCT child_tax.type_tax_use) AS child_types
FROM account_tax_filiation_rel account_tax_rel
JOIN account_tax ON account_tax.id = account_tax_rel.parent_tax
JOIN account_tax child_tax ON child_tax.id = account_tax_rel.child_tax
WHERE account_tax.amount_type = 'group'
AND %s
GROUP BY account_tax.id
''', company_where_query.where_clause or SQL("TRUE")
))
group_of_taxes_info = {}
child_to_group_of_taxes = {}
for row in self._cr.dictfetchall():
row['to_expand'] = row['child_types'] != ['none']
group_of_taxes_info[row['id']] = row
for child_id in row['child_tax_ids']:
child_to_group_of_taxes[child_id] = row['id']
results = defaultdict(lambda: { # key: type_tax_use
'base_amount': {column_group_key: 0.0 for column_group_key in options['column_groups']},
'tax_amount': {column_group_key: 0.0 for column_group_key in options['column_groups']},
'tax_non_deductible': {column_group_key: 0.0 for column_group_key in options['column_groups']},
'tax_deductible': {column_group_key: 0.0 for column_group_key in options['column_groups']},
'tax_due': {column_group_key: 0.0 for column_group_key in options['column_groups']},
'children': defaultdict(lambda: { # key: tax_id
'base_amount': {column_group_key: 0.0 for column_group_key in options['column_groups']},
'tax_amount': {column_group_key: 0.0 for column_group_key in options['column_groups']},
'tax_non_deductible': {column_group_key: 0.0 for column_group_key in options['column_groups']},
'tax_deductible': {column_group_key: 0.0 for column_group_key in options['column_groups']},
'tax_due': {column_group_key: 0.0 for column_group_key in options['column_groups']},
}),
})
for column_group_key, options in options_by_column_group.items():
query = report._get_report_query(options, 'strict_range')
# Fetch the base amounts.
self._cr.execute(SQL(
'''
SELECT
tax.id AS tax_id,
tax.type_tax_use AS tax_type_tax_use,
src_group_tax.id AS src_group_tax_id,
src_group_tax.type_tax_use AS src_group_tax_type_tax_use,
src_tax.id AS src_tax_id,
src_tax.type_tax_use AS src_tax_type_tax_use,
SUM(account_move_line.balance) AS base_amount
FROM %(table_references)s
JOIN account_move_line_account_tax_rel tax_rel ON account_move_line.id = tax_rel.account_move_line_id
JOIN account_tax tax ON tax.id = tax_rel.account_tax_id
LEFT JOIN account_tax src_tax ON src_tax.id = account_move_line.tax_line_id
LEFT JOIN account_tax src_group_tax ON src_group_tax.id = account_move_line.group_tax_id
WHERE %(search_condition)s
AND (
/* CABA */
account_move_line__move_id.always_tax_exigible
OR account_move_line__move_id.tax_cash_basis_rec_id IS NOT NULL
OR tax.tax_exigibility != 'on_payment'
)
AND (
(
/* Tax lines affecting the base of others. */
account_move_line.tax_line_id IS NOT NULL
AND (
src_tax.type_tax_use IN ('sale', 'purchase')
OR src_group_tax.type_tax_use IN ('sale', 'purchase')
)
)
OR
(
/* For regular base lines. */
account_move_line.tax_line_id IS NULL
AND tax.type_tax_use IN ('sale', 'purchase')
)
)
GROUP BY tax.id, src_group_tax.id, src_tax.id
ORDER BY src_group_tax.sequence, src_group_tax.id, src_tax.sequence, src_tax.id, tax.sequence, tax.id
''',
table_references=query.from_clause,
search_condition=query.where_clause,
))
group_of_taxes_with_extra_base_amount = set()
for row in self._cr.dictfetchall():
is_tax_line = bool(row['src_tax_id'])
if is_tax_line:
if row['src_group_tax_id'] \
and not group_of_taxes_info[row['src_group_tax_id']]['to_expand'] \
and row['tax_id'] in group_of_taxes_info[row['src_group_tax_id']]['child_tax_ids']:
# Suppose a base of 1000 with a group of taxes 20% affect + 10%.
# The base of the group of taxes must be 1000, not 1200 because the group of taxes is not
# expanded. So the tax lines affecting the base of its own group of taxes are ignored.
pass
elif row['tax_type_tax_use'] == 'none' and child_to_group_of_taxes.get(row['tax_id']):
# The tax line is affecting the base of a 'none' tax belonging to a group of taxes.
# In that case, the amount is accounted as an extra base for that group. However, we need to
# account it only once.
# For example, suppose a tax 10% affect base of subsequent followed by a group of taxes
# 20% + 30%. On a base of 1000.0, the tax line for 10% will affect the base of 20% + 30%.
# However, this extra base must be accounted only once since the base of the group of taxes
# must be 1100.0 and not 1200.0.
group_tax_id = child_to_group_of_taxes[row['tax_id']]
if group_tax_id not in group_of_taxes_with_extra_base_amount:
group_tax_info = group_of_taxes_info[group_tax_id]
results[group_tax_info['type_tax_use']]['children'][group_tax_id]['base_amount'][column_group_key] += row['base_amount']
group_of_taxes_with_extra_base_amount.add(group_tax_id)
else:
tax_type_tax_use = row['src_group_tax_type_tax_use'] or row['src_tax_type_tax_use']
results[tax_type_tax_use]['children'][row['tax_id']]['base_amount'][column_group_key] += row['base_amount']
else:
if row['tax_id'] in group_of_taxes_info and group_of_taxes_info[row['tax_id']]['to_expand']:
# Expand the group of taxes since it contains at least one tax with a type != 'none'.
group_info = group_of_taxes_info[row['tax_id']]
for child_tax_id in group_info['child_tax_ids']:
results[group_info['type_tax_use']]['children'][child_tax_id]['base_amount'][column_group_key] += row['base_amount']
else:
results[row['tax_type_tax_use']]['children'][row['tax_id']]['base_amount'][column_group_key] += row['base_amount']
# Fetch the tax amounts.
select_deductible = join_deductible = group_by_deductible = SQL()
if options.get('account_journal_report_tax_deductibility_columns'):
select_deductible = SQL(""", repartition.use_in_tax_closing AS trl_tax_closing
, SIGN(repartition.factor_percent) AS trl_factor""")
join_deductible = SQL("""JOIN account_tax_repartition_line repartition
ON account_move_line.tax_repartition_line_id = repartition.id""")
group_by_deductible = SQL(', repartition.use_in_tax_closing, SIGN(repartition.factor_percent)')
self._cr.execute(SQL(
'''
SELECT
tax.id AS tax_id,
tax.type_tax_use AS tax_type_tax_use,
group_tax.id AS group_tax_id,
group_tax.type_tax_use AS group_tax_type_tax_use,
SUM(account_move_line.balance) AS tax_amount
%(select_deductible)s
FROM %(table_references)s
JOIN account_tax tax ON tax.id = account_move_line.tax_line_id
%(join_deductible)s
LEFT JOIN account_tax group_tax ON group_tax.id = account_move_line.group_tax_id
WHERE %(search_condition)s
AND (
/* CABA */
account_move_line__move_id.always_tax_exigible
OR account_move_line__move_id.tax_cash_basis_rec_id IS NOT NULL
OR tax.tax_exigibility != 'on_payment'
)
AND (
(group_tax.id IS NULL AND tax.type_tax_use IN ('sale', 'purchase'))
OR
(group_tax.id IS NOT NULL AND group_tax.type_tax_use IN ('sale', 'purchase'))
)
GROUP BY tax.id, group_tax.id %(group_by_deductible)s
''',
select_deductible=select_deductible,
table_references=query.from_clause,
join_deductible=join_deductible,
search_condition=query.where_clause,
group_by_deductible=group_by_deductible,
))
for row in self._cr.dictfetchall():
# Manage group of taxes.
# In case the group of taxes is mixing multiple taxes having a type_tax_use != 'none', consider
# them instead of the group.
tax_id = row['tax_id']
if row['group_tax_id']:
tax_type_tax_use = row['group_tax_type_tax_use']
if not group_of_taxes_info[row['group_tax_id']]['to_expand']:
tax_id = row['group_tax_id']
else:
tax_type_tax_use = row['group_tax_type_tax_use'] or row['tax_type_tax_use']
results[tax_type_tax_use]['tax_amount'][column_group_key] += row['tax_amount']
results[tax_type_tax_use]['children'][tax_id]['tax_amount'][column_group_key] += row['tax_amount']
if options.get('account_journal_report_tax_deductibility_columns'):
tax_detail_label = False
if row['trl_factor'] > 0 and tax_type_tax_use == 'purchase':
tax_detail_label = 'tax_deductible' if row['trl_tax_closing'] else 'tax_non_deductible'
elif row['trl_tax_closing'] and (row['trl_factor'] > 0, tax_type_tax_use) in ((False, 'purchase'), (True, 'sale')):
tax_detail_label = 'tax_due'
if tax_detail_label:
results[tax_type_tax_use][tax_detail_label][column_group_key] += row['tax_amount'] * row['trl_factor']
results[tax_type_tax_use]['children'][tax_id][tax_detail_label][column_group_key] += row['tax_amount'] * row['trl_factor']
return results
def _read_generic_tax_report_amounts(self, report, options_by_column_group, groupby_fields):
""" Read the tax details to compute the tax amounts.
:param options_list: The list of report options, one for each period.
:param groupby_fields: A list of tuple (alias, field) representing the way the amounts must be grouped.
:return: A dictionary mapping each groupby key (e.g. a tax_id) to a sub dictionary containing:
base_amount: The tax base amount expressed in company's currency.
tax_amount The tax amount expressed in company's currency.
children: The children nodes following the same pattern as the current dictionary.
"""
fetch_group_of_taxes = False
select_clause_list = []
groupby_query_list = []
for alias, field in groupby_fields:
select_clause_list.append(SQL("%s AS %s", SQL.identifier(alias, field), SQL.identifier(f'{alias}_{field}')))
groupby_query_list.append(SQL.identifier(alias, field))
# Fetch both info from the originator tax and the child tax to manage the group of taxes.
if alias == 'src_tax':
select_clause_list.append(SQL("%s AS %s", SQL.identifier('tax', field), SQL.identifier(f'tax_{field}')))
groupby_query_list.append(SQL.identifier('tax', field))
fetch_group_of_taxes = True
# Fetch the group of taxes.
# If all children taxes are 'none', all amounts are aggregated and only the group will appear on the report.
# If some children taxes are not 'none', the children are displayed.
group_of_taxes_to_expand = set()
if fetch_group_of_taxes:
group_of_taxes = self.env['account.tax'].with_context(active_test=False).search([('amount_type', '=', 'group')])
for group in group_of_taxes:
if set(group.children_tax_ids.mapped('type_tax_use')) != {'none'}:
group_of_taxes_to_expand.add(group.id)
res = {}
for column_group_key, options in options_by_column_group.items():
query = report._get_report_query(options, 'strict_range')
tax_details_query = self.env['account.move.line']._get_query_tax_details(query.from_clause, query.where_clause)
# Avoid adding multiple times the same base amount sharing the same grouping_key.
# It could happen when dealing with group of taxes for example.
row_keys = set()
self._cr.execute(SQL(
'''
SELECT
%(select_clause)s,
trl.document_type = 'refund' AS is_refund,
SUM(CASE WHEN tdr.display_type = 'rounding' THEN 0 ELSE tdr.base_amount END) AS base_amount,
SUM(tdr.tax_amount) AS tax_amount
FROM (%(tax_details_query)s) AS tdr
JOIN account_tax_repartition_line trl ON trl.id = tdr.tax_repartition_line_id
JOIN account_tax tax ON tax.id = tdr.tax_id
JOIN account_tax src_tax ON
src_tax.id = COALESCE(tdr.group_tax_id, tdr.tax_id)
AND src_tax.type_tax_use IN ('sale', 'purchase')
JOIN account_account account ON account.id = tdr.base_account_id
WHERE tdr.tax_exigible
GROUP BY tdr.tax_repartition_line_id, trl.document_type, %(groupby_query)s
ORDER BY src_tax.sequence, src_tax.id, tax.sequence, tax.id
''',
select_clause=SQL(',').join(select_clause_list),
tax_details_query=tax_details_query,
groupby_query=SQL(',').join(groupby_query_list),
))
for row in self._cr.dictfetchall():
node = res
# tuple of values used to prevent adding multiple times the same base amount.
cumulated_row_key = [row['is_refund']]
for alias, field in groupby_fields:
grouping_key = f'{alias}_{field}'
# Manage group of taxes.
# In case the group of taxes is mixing multiple taxes having a type_tax_use != 'none', consider
# them instead of the group.
if grouping_key == 'src_tax_id' and row['src_tax_id'] in group_of_taxes_to_expand:
# Add the originator group to the grouping key, to make sure that its base amount is not
# treated twice, for hybrid cases where a tax is both used in a group and independently.
cumulated_row_key.append(row[grouping_key])
# Ensure the child tax is used instead of the group.
grouping_key = 'tax_id'
row_key = row[grouping_key]
cumulated_row_key.append(row_key)
cumulated_row_key_tuple = tuple(cumulated_row_key)
node.setdefault(row_key, {
'base_amount': {column_group_key: 0.0 for column_group_key in options['column_groups']},
'tax_amount': {column_group_key: 0.0 for column_group_key in options['column_groups']},
'children': {},
})
sub_node = node[row_key]
# Add amounts.
if cumulated_row_key_tuple not in row_keys:
sub_node['base_amount'][column_group_key] += row['base_amount']
sub_node['tax_amount'][column_group_key] += row['tax_amount']
node = sub_node['children']
row_keys.add(cumulated_row_key_tuple)
return res
def _populate_lines_recursively(self, report, options, lines, sorting_map_list, groupby_fields, values_node, index=0, type_tax_use=None, parent_line_id=None, warnings=None):
''' Populate the list of report lines passed as parameter recursively. At this point, every amounts is already
fetched for every periods and every groupby.
:param options: The report options.
:param lines: The list of report lines to populate.
:param sorting_map_list: A list of dictionary mapping each encountered key with a weight to sort the results.
:param index: The index of the current element to process (also equals to the level into the hierarchy).
:param groupby_fields: A list of tuple <alias, field> defining in which way tax amounts should be grouped.
:param values_node: The node containing the amounts and children into the hierarchy.
:param type_tax_use: The type_tax_use of the tax.
:param parent_line_id: The line id of the parent line (if any)
:param warnings The warnings dictionnary.
'''
if index == len(groupby_fields):
return
alias, field = groupby_fields[index]
groupby_key = f'{alias}_{field}'
# Sort the keys in order to add the lines in the same order as the records.
sorting_map = sorting_map_list[index]
sorted_keys = sorted(list(values_node.keys()), key=lambda x: sorting_map[x][1])
for key in sorted_keys:
# Compute 'type_tax_use' with the first grouping since 'src_tax_type_tax_use' is always
# the first one.
if groupby_key == 'src_tax_type_tax_use':
type_tax_use = key
sign = -1 if type_tax_use == 'sale' else 1
# Prepare columns.
tax_amount_dict = values_node[key]
columns = []
tax_base_amounts = tax_amount_dict['base_amount']
tax_amounts = tax_amount_dict['tax_amount']
for column in options['columns']:
tax_base_amount = tax_base_amounts[column['column_group_key']]
tax_amount = tax_amounts[column['column_group_key']]
expr_label = column.get('expression_label')
if expr_label == 'net':
col_value = sign * tax_base_amount if index == len(groupby_fields) - 1 else ''
if expr_label == 'tax':
col_value = sign * tax_amount
columns.append(report._build_column_dict(col_value, column, options=options))
# Add the non-deductible, deductible and due tax amounts.
if expr_label == 'tax' and options.get('account_journal_report_tax_deductibility_columns'):
for deduct_type in ('tax_non_deductible', 'tax_deductible', 'tax_due'):
columns.append(report._build_column_dict(
col_value=sign * tax_amount_dict[deduct_type][column['column_group_key']],
col_data={
'figure_type': 'monetary',
'column_group_key': column['column_group_key'],
'expression_label': deduct_type,
},
options=options,
))
# Prepare line.
default_vals = {
'columns': columns,
'level': index if index == 0 else index + 1,
'unfoldable': False,
}
report_line = self._build_report_line(report, options, default_vals, groupby_key, sorting_map[key][0], parent_line_id, warnings)
if groupby_key == 'src_tax_id':
report_line['caret_options'] = 'generic_tax_report'
lines.append((0, report_line))
# Process children recursively.
self._populate_lines_recursively(
report,
options,
lines,
sorting_map_list,
groupby_fields,
tax_amount_dict.get('children'),
index=index + 1,
type_tax_use=type_tax_use,
parent_line_id=report_line['id'],
warnings=warnings,
)
def _build_report_line(self, report, options, default_vals, groupby_key, value, parent_line_id, warnings=None):
""" Build the report line accordingly to its type.
:param options: The report options.
:param default_vals: The pre-computed report line values.
:param groupby_key: The grouping_key record.
:param value: The value that could be a record.
:param parent_line_id The line id of the parent line (if any, can be None otherwise)
:param warnings: The warnings dictionary.
:return: A python dictionary.
"""
report_line = dict(default_vals)
if parent_line_id is not None:
report_line['parent_id'] = parent_line_id
if groupby_key == 'src_tax_type_tax_use':
type_tax_use_option = value
report_line['id'] = report._get_generic_line_id(None, None, markup=type_tax_use_option[0], parent_line_id=parent_line_id)
report_line['name'] = type_tax_use_option[1]
elif groupby_key == 'src_tax_id':
tax = value
report_line['id'] = report._get_generic_line_id(tax._name, tax.id, parent_line_id=parent_line_id)
if tax.amount_type == 'percent':
report_line['name'] = f"{tax.name} ({tax.amount}%)"
if warnings is not None:
self._check_line_consistency(report, options, report_line, tax, warnings)
elif tax.amount_type == 'fixed':
report_line['name'] = f"{tax.name} ({tax.amount})"
else:
report_line['name'] = tax.name
if options.get('multi-company'):
report_line['name'] = f"{report_line['name']} - {tax.company_id.display_name}"
elif groupby_key == 'account_id':
account = value
report_line['id'] = report._get_generic_line_id(account._name, account.id, parent_line_id=parent_line_id)
if options.get('multi-company'):
report_line['name'] = f"{account.display_name} - {account.company_id.display_name}"
else:
report_line['name'] = account.display_name
return report_line
def _check_line_consistency(self, report, options, report_line, tax, warnings=None):
tax_applied = tax.amount * sum(tax.invoice_repartition_line_ids.filtered(lambda tax_rep: tax_rep.repartition_type == 'tax').mapped('factor')) / 100
for column_group_key, column_group_options in report._split_options_per_column_group(options).items():
net_value = next((col['no_format'] for col in report_line['columns'] if col['column_group_key'] == column_group_key and col['expression_label'] == 'net'), 0)
tax_value = next((col['no_format'] for col in report_line['columns'] if col['column_group_key'] == column_group_key and col['expression_label'] == 'tax'), 0)
if net_value == '': # noqa: PLC1901
continue
currency = self.env.company.currency_id
computed_tax_amount = float(net_value or 0) * tax_applied
is_inconsistent = currency.compare_amounts(computed_tax_amount, tax_value)
if is_inconsistent:
error = abs(abs(tax_value) - abs(computed_tax_amount)) / float(net_value or 1)
# Error is bigger than 0.1%. We can not ignore it.
if error > 0.001:
report_line['alert'] = True
warnings['at_accounting.tax_report_warning_lines_consistency'] = {'alert_type': 'danger'}
return
# -------------------------------------------------------------------------
# BUTTONS & CARET OPTIONS
# -------------------------------------------------------------------------
def caret_option_audit_tax(self, options, params):
report = self.env['account.report'].browse(options['report_id'])
model, tax_id = report._get_model_info_from_id(params['line_id'])
if model != 'account.tax':
raise UserError(_("Cannot audit tax from another model than account.tax."))
tax = self.env['account.tax'].browse(tax_id)
if tax.amount_type == 'group':
tax_affecting_base_domain = [
('tax_ids', 'in', tax.children_tax_ids.ids),
('tax_repartition_line_id', '!=', False),
]
else:
tax_affecting_base_domain = [
('tax_ids', '=', tax.id),
('tax_ids.type_tax_use', '=', tax.type_tax_use),
('tax_repartition_line_id', '!=', False),
]
domain = report._get_options_domain(options, 'strict_range') + expression.OR((
# Base lines
[
('tax_ids', 'in', tax.ids),
('tax_ids.type_tax_use', '=', tax.type_tax_use),
('tax_repartition_line_id', '=', False),
],
# Tax lines
[
('group_tax_id', '=', tax.id) if tax.amount_type == 'group' else ('tax_line_id', '=', tax.id),
],
# Tax lines acting as base lines
tax_affecting_base_domain,
))
ctx = self._context.copy()
ctx.update({'search_default_group_by_account': 2, 'expand': 1})
return {
'type': 'ir.actions.act_window',
'name': _('Journal Items for Tax Audit'),
'res_model': 'account.move.line',
'views': [[self.env.ref('account.view_move_line_tax_audit_tree').id, 'list']],
'domain': domain,
'context': ctx,
}
class GenericTaxReportCustomHandlerAT(models.AbstractModel):
_name = 'account.generic.tax.report.handler.account.tax'
_inherit = 'account.generic.tax.report.handler'
_description = 'Generic Tax Report Custom Handler (Account -> Tax)'
def _dynamic_lines_generator(self, report, options, all_column_groups_expression_totals, warnings=None):
return super()._get_dynamic_lines(report, options, 'account_tax', warnings)
class GenericTaxReportCustomHandlerTA(models.AbstractModel):
_name = 'account.generic.tax.report.handler.tax.account'
_inherit = 'account.generic.tax.report.handler'
_description = 'Generic Tax Report Custom Handler (Tax -> Account)'
def _dynamic_lines_generator(self, report, options, all_column_groups_expression_totals, warnings=None):
return super()._get_dynamic_lines(report, options, 'tax_account', warnings)