diff --git a/addons/base_accounting_kit/wizard/import_bank_statement.py b/addons/base_accounting_kit/wizard/import_bank_statement.py new file mode 100644 index 0000000..4b22a6c --- /dev/null +++ b/addons/base_accounting_kit/wizard/import_bank_statement.py @@ -0,0 +1,446 @@ +# -*- coding: utf-8 -*- +############################################################################### +# +# Cybrosys Technologies Pvt. Ltd. +# +# Copyright (C) 2025-TODAY Cybrosys Technologies() +# Author: Akhil Ashok (odoo@cybrosys.com) +# +# You can modify it under the terms of the GNU LESSER +# GENERAL PUBLIC LICENSE (LGPL v3), Version 3. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU LESSER GENERAL PUBLIC LICENSE (LGPL v3) for more details. +# +# You should have received a copy of the GNU LESSER GENERAL PUBLIC LICENSE +# (LGPL v3) along with this program. +# If not, see . +# +############################################################################### +import base64 +import codecs +import csv +import openpyxl +import os +import io +from datetime import datetime +from io import BytesIO +from odoo import fields, models, _ +from odoo.exceptions import ValidationError +from ofxparse import OfxParser +from qifparse.parser import QifParser + + +class ImportBankStatement(models.TransientModel): + """ A class to import files as bank statement """ + _name = "import.bank.statement" + _description = "Import button" + _rec_name = "file_name" + + attachment = fields.Binary(string="File", required=True, + help="Choose the file to import") + file_name = fields.Char(string="File Name", help="Name of the file") + journal_id = fields.Many2one('account.journal', string="Journal ID", + help="Journal in which the file importing") + + def _parse_date(self, date_str): + """ Helper to parse date from string """ + if not date_str: + return fields.Date.today() + + if isinstance(date_str, datetime): + return date_str.date() + + # Remove potential quotes and whitespace + date_str = str(date_str).strip().strip('"').strip("'") + if not date_str: + return fields.Date.today() + + # Try common formats + for fmt in ('%Y-%m-%d', '%d/%m/%Y', '%m/%d/%Y', '%d-%m-%Y', '%Y/%m/%d'): + try: + return datetime.strptime(date_str, fmt).date() + except (ValueError, TypeError): + continue + + # Fallback to Odoo fields.Date.from_string + try: + res = fields.Date.from_string(date_str) + if res: + return res + except: + pass + + return fields.Date.today() + + def _parse_float(self, val): + """ Helper to parse float from string with currency symbols and commas """ + if not val: + return 0.0 + if isinstance(val, (int, float)): + return float(val) + + # Remove currency symbols, quotes, commas, spaces + clean_val = str(val).strip().replace('"', '').replace("'", "").replace(',', '').replace(' ', '') + for symbol in ('$', '€', '£', '¥', '₹'): + clean_val = clean_val.replace(symbol, '') + + # Handle accounting negative format (100.0) -> -100.0 + if clean_val.startswith('(') and clean_val.endswith(')'): + clean_val = '-' + clean_val[1:-1] + + try: + return float(clean_val) + except (ValueError, TypeError): + return 0.0 + + def action_statement_import(self): + """Function to import csv, xlsx, ofx and qif file format""" + split_tup = os.path.splitext(self.file_name) + if split_tup[1] == '.csv' or split_tup[1] == '.xlsx' or split_tup[ + 1] == '.ofx' or split_tup[1] == '.qif': + if split_tup[1] == '.csv': + try: + file_data = base64.b64decode(self.attachment) + file_string = file_data.decode('utf-8-sig') + f = io.StringIO(file_string) + reader = csv.DictReader(f) + fieldnames = reader.fieldnames or [] + header_map = {f.strip().lower(): f for f in fieldnames} + except Exception as e: + raise ValidationError(_("Error reading CSV file: %s") % str(e)) + + statement_id = False + for row in reader: + if not any(row.values()): + continue + + def get_field(keys): + for k in keys: + if k.lower() in header_map: + return row.get(header_map[k.lower()]) + return None + + # Mappings for common Odoo and generic bank headers + name = get_field(['name', 'label', 'description', 'reference', 'account', 'line_ids/payment_ref', 'payment_ref']) + amount = get_field(['amount', 'value', 'price', 'total', 'line_ids/amount']) + partner_name = get_field(['partner', 'partner_id/name', 'contact', 'payee', 'customer', 'supplier', 'line_ids/partner_id', 'partner_id']) + date_str = get_field(['date', 'transaction date', 'time', 'line_ids/date']) + starting_balance = get_field(['starting balance', 'start balance', 'opening balance', 'balance_start']) + ending_balance = get_field(['ending balance', 'balance', 'real balance', 'end balance', 'balance_end_real']) + + values = [v.strip() if v else '' for v in row.values()] + keys = list(row.keys()) + mapped_indices = set() + + def find_column(targets, is_numeric=False, is_date=False, exclude_indices=None): + exclude_indices = exclude_indices or set() + for k, v in header_map.items(): + if any(t in k for t in targets): + idx = keys.index(v) + if idx not in exclude_indices: + return row.get(v), idx + if is_numeric or is_date: + for i, val in enumerate(values): + if i in exclude_indices: continue + if is_numeric: + try: + temp = val.replace('$', '').replace(',', '').strip() + if temp: + float(temp) + return val, i + except: pass + if is_date: + try: + self._parse_date(val) + return val, i + except: pass + return None, -1 + + # Heuristic backups for unmapped fields + if amount is None: + amount_val, idx = find_column(['amount', 'value', 'price', 'total'], is_numeric=True) + if idx != -1: + amount = amount_val + mapped_indices.add(idx) + + if date_str is None: + date_val, idx = find_column(['date', 'time'], is_date=True, exclude_indices=mapped_indices) + if idx != -1: + date_str = date_val + mapped_indices.add(idx) + + if ending_balance is None: + bal_val, idx = find_column(['balance', 'ending', 'real', 'end balance'], is_numeric=True, exclude_indices=mapped_indices) + if idx != -1: + ending_balance = bal_val + mapped_indices.add(idx) + + if starting_balance is None: + # Attempt to find starting balance only if multiple numeric columns exist + numeric_count = len([v for v in values if v.replace('$', '').replace(',', '').strip().replace('.', '').isdigit()]) + bal_val, idx = find_column(['starting', 'start balance', 'opening'], is_numeric=(numeric_count > 2), exclude_indices=mapped_indices) + if idx != -1: + starting_balance = bal_val + mapped_indices.add(idx) + + if name is None: + for i, val in enumerate(values): + if i in mapped_indices: continue + try: + float(val.replace('$', '').replace(',', '').strip()) + continue + except: + try: + self._parse_date(val) + continue + except: + name = val + mapped_indices.add(i) + break + + if not name and values: + name = values[0] + + transaction_date = self._parse_date(date_str) + clean_amount = self._parse_float(amount) + clean_start_balance = self._parse_float(starting_balance) + clean_end_balance = self._parse_float(ending_balance) + + # Ensure statement is balanced (End = Start + Amount) to prevent red state + if starting_balance is not None and ending_balance is not None: + b_start, b_end = clean_start_balance, clean_end_balance + clean_amount = b_end - b_start + elif ending_balance is not None: + b_end = clean_end_balance + b_start = b_end - clean_amount + elif starting_balance is not None: + b_start = clean_start_balance + b_end = b_start + clean_amount + else: + b_start, b_end = 0.0, clean_amount + + partner = False + if partner_name: + partner_name = partner_name.strip() + if partner_name.lower() in ('bank', 'cash', 'main', 'demo', 'yourcompany'): + partner_name = False + if partner_name: + partner = self.env['res.partner'].search([('name', '=', partner_name)], limit=1) + if not partner and len(partner_name) > 1 and not partner_name.isdigit(): + try: + self._parse_date(partner_name) + except: + raise ValidationError(_("Partner '%s' does not exist") % partner_name) + + statement = self.env['account.bank.statement'].create({ + 'name': name, + 'journal_id': self.journal_id.id, + 'company_id': self.journal_id.company_id.id, + 'date': transaction_date, + 'balance_start': b_start, + 'balance_end_real': b_end, + 'line_ids': [(0, 0, { + 'date': transaction_date, + 'payment_ref': name or 'csv file', + 'partner_id': partner.id if partner else False, + 'journal_id': self.journal_id.id, + 'amount': clean_amount, + })], + }) + statement_id = statement.id + + return { + 'type': 'ir.actions.act_window', + 'name': 'Statements', + 'view_mode': 'list', + 'res_model': 'account.bank.statement', + 'res_id': statement.id, + } + elif split_tup[1] == '.xlsx': + # Reading xlsx file + try: + order = openpyxl.load_workbook( + filename=BytesIO(base64.b64decode(self.attachment))) + xl_order = order.active + except: + raise ValidationError(_("Choose correct file")) + for record in xl_order.iter_rows(min_row=2, max_row=None, + min_col=None, + max_col=None, + values_only=True): + line = list(record) + # Reading the content from file + if line[0] and line[1] and line[3]: + partner = self.env['res.partner'].search( + [('name', '=', line[3])]) + date_obj = self._parse_date(line[2]) + # Creating record + if partner: + statement = self.env[ + 'account.bank.statement'].create({ + 'name': line[0], + 'line_ids': [ + (0, 0, { + 'date': date_obj, + 'payment_ref': 'xlsx file', + 'partner_id': partner.id, + 'journal_id': self.journal_id.id, + 'amount': line[1], + }), + ], + }) + else: + raise ValidationError(_("Partner not exist")) + else: + if not line[0]: + raise ValidationError( + _("Account name is not set")) + elif not line[1]: + raise ValidationError( + _("Amount is not set")) + elif not line[3]: + date_obj = self._parse_date(line[2]) + # Creating record + statement = self.env[ + 'account.bank.statement'].create({ + 'name': line[0], + 'line_ids': [ + (0, 0, { + 'date': date_obj, + 'payment_ref': 'xlsx file', + 'journal_id': self.journal_id.id, + 'amount': line[1], + }), + ], + }) + return { + 'type': 'ir.actions.act_window', + 'name': 'Statements', + 'view_mode': 'list', + 'res_model': 'account.bank.statement', + 'res_id': statement.id, + } + elif split_tup[1] == '.ofx': + try: + file_data = base64.b64decode(self.attachment) + ofx_file = OfxParser.parse(io.BytesIO(file_data)) + except Exception as e: + raise ValidationError(_("Wrong file format or parsing error: %s") % str(e)) + + if not ofx_file.account or not ofx_file.account.statement: + raise ValidationError(_("No account information found in OFX file.")) + + statement_id = False + stmt = ofx_file.account.statement + + # Standardize balance extraction + ledger_bal = getattr(stmt, 'balance', getattr(stmt, 'ledger_balance', None)) + final_balance = self._parse_float(ledger_bal) if ledger_bal is not None else 0.0 + + for transaction in stmt.transactions: + amount = self._parse_float(transaction.amount) + if amount == 0: + continue + + # Clean labels and match partners + label = (transaction.memo or transaction.name or transaction.payee or 'ofx transaction').strip() + payee_name = transaction.payee.strip() if transaction.payee else False + + partner = False + if payee_name: + # Shared noise-filtering for partners + if payee_name.lower() in ('bank', 'cash', 'main', 'demo', 'yourcompany'): + payee_name = False + if payee_name: + partner = self.env['res.partner'].search([('name', '=', payee_name)], limit=1) + + date = self._parse_date(transaction.date) + b_end = final_balance + b_start = b_end - amount + + statement = self.env['account.bank.statement'].create({ + 'name': label, + 'journal_id': self.journal_id.id, + 'company_id': self.journal_id.company_id.id, + 'date': date, + 'balance_start': b_start, + 'balance_end_real': b_end, + 'line_ids': [(0, 0, { + 'date': date, + 'payment_ref': label, + 'partner_id': partner.id if partner else False, + 'journal_id': self.journal_id.id, + 'amount': amount, + })], + }) + statement_id = statement.id + + if not statement_id: + raise ValidationError(_("No valid transactions found in the OFX file.")) + + return { + 'type': 'ir.actions.act_window', + 'name': 'Statements', + 'view_mode': 'list', + 'res_model': 'account.bank.statement', + 'res_id': statement_id, + } + + elif split_tup[1] == '.qif': + try: + file_data = base64.b64decode(self.attachment) + file_string = file_data.decode('utf-8-sig') + qif = QifParser().parse(io.StringIO(file_string)) + except Exception as e: + raise ValidationError(_("Error parsing QIF file: %s") % str(e)) + + statement_id = False + for account in qif.get_accounts(): + for transaction in qif.get_transactions(account): + amount = self._parse_float(transaction.amount) + if amount == 0: + continue + + date = self._parse_date(transaction.date) + label = (transaction.payee or transaction.memo or 'qif transaction').strip() + payee_name = transaction.payee.strip() if transaction.payee else False + + partner = False + if payee_name: + if payee_name.lower() in ('bank', 'cash', 'main', 'demo', 'yourcompany'): + payee_name = False + if payee_name: + partner = self.env['res.partner'].search([('name', '=', payee_name)], limit=1) + + statement = self.env['account.bank.statement'].create({ + 'name': label, + 'journal_id': self.journal_id.id, + 'company_id': self.journal_id.company_id.id, + 'date': date, + 'balance_start': 0.0, + 'balance_end_real': amount, + 'line_ids': [(0, 0, { + 'date': date, + 'payment_ref': label, + 'partner_id': partner.id if partner else False, + 'journal_id': self.journal_id.id, + 'amount': amount, + })], + }) + statement_id = statement.id + + if not statement_id: + raise ValidationError(_("No valid transactions found in the QIF file.")) + + return { + 'type': 'ir.actions.act_window', + 'name': 'Statements', + 'view_mode': 'list', + 'res_model': 'account.bank.statement', + 'res_id': statement_id, + } + else: + raise ValidationError(_("Choose correct file"))