diff --git a/erpnext/accounts/doctype/payment_reconciliation/test_payment_reconciliation.py b/erpnext/accounts/doctype/payment_reconciliation/test_payment_reconciliation.py index d2374b77a63..575ac74a4eb 100644 --- a/erpnext/accounts/doctype/payment_reconciliation/test_payment_reconciliation.py +++ b/erpnext/accounts/doctype/payment_reconciliation/test_payment_reconciliation.py @@ -4,93 +4,453 @@ import unittest import frappe -from frappe.utils import add_days, getdate +from frappe import qb +from frappe.tests.utils import FrappeTestCase +from frappe.utils import add_days, nowdate +from erpnext.accounts.doctype.payment_entry.test_payment_entry import create_payment_entry from erpnext.accounts.doctype.sales_invoice.test_sales_invoice import create_sales_invoice +from erpnext.accounts.party import get_party_account +from erpnext.stock.doctype.item.test_item import create_item -class TestPaymentReconciliation(unittest.TestCase): - @classmethod - def setUpClass(cls): - make_customer() - make_invoice_and_payment() +class TestPaymentReconciliation(FrappeTestCase): + def setUp(self): + self.create_company() + self.create_item() + self.create_customer() + self.clear_old_entries() - def test_payment_reconciliation(self): - payment_reco = frappe.get_doc("Payment Reconciliation") - payment_reco.company = "_Test Company" - payment_reco.party_type = "Customer" - payment_reco.party = "_Test Payment Reco Customer" - payment_reco.receivable_payable_account = "Debtors - _TC" - payment_reco.from_invoice_date = add_days(getdate(), -1) - payment_reco.to_invoice_date = getdate() - payment_reco.from_payment_date = add_days(getdate(), -1) - payment_reco.to_payment_date = getdate() - payment_reco.maximum_invoice_amount = 1000 - payment_reco.maximum_payment_amount = 1000 - payment_reco.invoice_limit = 10 - payment_reco.payment_limit = 10 - payment_reco.bank_cash_account = "_Test Bank - _TC" - payment_reco.cost_center = "_Test Cost Center - _TC" - payment_reco.get_unreconciled_entries() + def tearDown(self): + frappe.db.rollback() - self.assertEqual(len(payment_reco.get("invoices")), 1) - self.assertEqual(len(payment_reco.get("payments")), 1) + def create_company(self): + company = None + if frappe.db.exists("Company", "_Test Payment Reconciliation"): + company = frappe.get_doc("Company", "_Test Payment Reconciliation") + else: + company = frappe.get_doc( + { + "doctype": "Company", + "company_name": "_Test Payment Reconciliation", + "country": "India", + "default_currency": "INR", + "create_chart_of_accounts_based_on": "Standard Template", + "chart_of_accounts": "Standard", + } + ) + company = company.save() - payment_entry = payment_reco.get("payments")[0].reference_name - invoice = payment_reco.get("invoices")[0].invoice_number + self.company = company.name + self.cost_center = company.cost_center + self.warehouse = "All Warehouses - _PR" + self.income_account = "Sales - _PR" + self.expense_account = "Cost of Goods Sold - _PR" + self.debit_to = "Debtors - _PR" + self.creditors = "Creditors - _PR" - payment_reco.allocate_entries( - { - "payments": [payment_reco.get("payments")[0].as_dict()], - "invoices": [payment_reco.get("invoices")[0].as_dict()], - } + # create bank account + if frappe.db.exists("Account", "HDFC - _PR"): + self.bank = "HDFC - _PR" + else: + bank_acc = frappe.get_doc( + { + "doctype": "Account", + "account_name": "HDFC", + "parent_account": "Bank Accounts - _PR", + "company": self.company, + } + ) + bank_acc.save() + self.bank = bank_acc.name + + def create_item(self): + item = create_item( + item_code="_Test PR Item", is_stock_item=0, company=self.company, warehouse=self.warehouse ) - payment_reco.reconcile() + self.item = item if isinstance(item, str) else item.item_code - payment_entry_doc = frappe.get_doc("Payment Entry", payment_entry) - self.assertEqual(payment_entry_doc.get("references")[0].reference_name, invoice) + def create_customer(self): + if frappe.db.exists("Customer", "_Test PR Customer"): + self.customer = "_Test PR Customer" + else: + customer = frappe.new_doc("Customer") + customer.customer_name = "_Test PR Customer" + customer.type = "Individual" + customer.save() + self.customer = customer.name + if frappe.db.exists("Customer", "_Test PR Customer 2"): + self.customer2 = "_Test PR Customer 2" + else: + customer = frappe.new_doc("Customer") + customer.customer_name = "_Test PR Customer 2" + customer.type = "Individual" + customer.save() + self.customer2 = customer.name -def make_customer(): - if not frappe.db.get_value("Customer", "_Test Payment Reco Customer"): - frappe.get_doc( - { - "doctype": "Customer", - "customer_name": "_Test Payment Reco Customer", - "customer_type": "Individual", - "customer_group": "_Test Customer Group", - "territory": "_Test Territory", - } - ).insert() + def create_sales_invoice( + self, qty=1, rate=100, posting_date=nowdate(), do_not_save=False, do_not_submit=False + ): + """ + Helper function to populate default values in sales invoice + """ + sinv = create_sales_invoice( + qty=qty, + rate=rate, + company=self.company, + customer=self.customer, + item_code=self.item, + item_name=self.item, + cost_center=self.cost_center, + warehouse=self.warehouse, + debit_to=self.debit_to, + parent_cost_center=self.cost_center, + update_stock=0, + currency="INR", + is_pos=0, + is_return=0, + return_against=None, + income_account=self.income_account, + expense_account=self.expense_account, + do_not_save=do_not_save, + do_not_submit=do_not_submit, + ) + return sinv + def create_payment_entry(self, amount=100, posting_date=nowdate()): + """ + Helper function to populate default values in payment entry + """ + payment = create_payment_entry( + company=self.company, + payment_type="Receive", + party_type="Customer", + party=self.customer, + paid_from=self.debit_to, + paid_to=self.bank, + paid_amount=amount, + ) + payment.posting_date = posting_date + return payment -def make_invoice_and_payment(): - si = create_sales_invoice( - customer="_Test Payment Reco Customer", qty=1, rate=690, do_not_save=True - ) - si.cost_center = "_Test Cost Center - _TC" - si.save() - si.submit() + def clear_old_entries(self): + doctype_list = [ + "GL Entry", + "Payment Ledger Entry", + "Sales Invoice", + "Purchase Invoice", + "Payment Entry", + "Journal Entry", + ] + for doctype in doctype_list: + qb.from_(qb.DocType(doctype)).delete().where(qb.DocType(doctype).company == self.company).run() - pe = frappe.get_doc( - { - "doctype": "Payment Entry", - "payment_type": "Receive", - "party_type": "Customer", - "party": "_Test Payment Reco Customer", - "company": "_Test Company", - "paid_from_account_currency": "INR", - "paid_to_account_currency": "INR", - "source_exchange_rate": 1, - "target_exchange_rate": 1, - "reference_no": "1", - "reference_date": getdate(), - "received_amount": 690, - "paid_amount": 690, - "paid_from": "Debtors - _TC", - "paid_to": "_Test Bank - _TC", - "cost_center": "_Test Cost Center - _TC", - } - ) - pe.insert() - pe.submit() + def create_payment_reconciliation(self): + pr = frappe.new_doc("Payment Reconciliation") + pr.company = self.company + pr.party_type = "Customer" + pr.party = self.customer + pr.receivable_payable_account = get_party_account(pr.party_type, pr.party, pr.company) + pr.from_invoice_date = pr.to_invoice_date = pr.from_payment_date = pr.to_payment_date = nowdate() + return pr + + def create_journal_entry( + self, acc1=None, acc2=None, amount=0, posting_date=None, cost_center=None + ): + je = frappe.new_doc("Journal Entry") + je.posting_date = posting_date or nowdate() + je.company = self.company + je.user_remark = "test" + if not cost_center: + cost_center = self.cost_center + je.set( + "accounts", + [ + { + "account": acc1, + "cost_center": cost_center, + "debit_in_account_currency": amount if amount > 0 else 0, + "credit_in_account_currency": abs(amount) if amount < 0 else 0, + }, + { + "account": acc2, + "cost_center": cost_center, + "credit_in_account_currency": amount if amount > 0 else 0, + "debit_in_account_currency": abs(amount) if amount < 0 else 0, + }, + ], + ) + return je + + def test_filter_min_max(self): + # check filter condition minimum and maximum amount + self.create_sales_invoice(qty=1, rate=300) + self.create_sales_invoice(qty=1, rate=400) + self.create_sales_invoice(qty=1, rate=500) + self.create_payment_entry(amount=300).save().submit() + self.create_payment_entry(amount=400).save().submit() + self.create_payment_entry(amount=500).save().submit() + + pr = self.create_payment_reconciliation() + pr.minimum_invoice_amount = 400 + pr.maximum_invoice_amount = 500 + pr.minimum_payment_amount = 300 + pr.maximum_payment_amount = 600 + pr.get_unreconciled_entries() + self.assertEqual(len(pr.get("invoices")), 2) + self.assertEqual(len(pr.get("payments")), 3) + + pr.minimum_invoice_amount = 300 + pr.maximum_invoice_amount = 600 + pr.minimum_payment_amount = 400 + pr.maximum_payment_amount = 500 + pr.get_unreconciled_entries() + self.assertEqual(len(pr.get("invoices")), 3) + self.assertEqual(len(pr.get("payments")), 2) + + pr.minimum_invoice_amount = ( + pr.maximum_invoice_amount + ) = pr.minimum_payment_amount = pr.maximum_payment_amount = 0 + pr.get_unreconciled_entries() + self.assertEqual(len(pr.get("invoices")), 3) + self.assertEqual(len(pr.get("payments")), 3) + + def test_filter_posting_date(self): + # check filter condition using transaction date + date1 = nowdate() + date2 = add_days(nowdate(), -1) + amount = 100 + self.create_sales_invoice(qty=1, rate=amount, posting_date=date1) + si2 = self.create_sales_invoice( + qty=1, rate=amount, posting_date=date2, do_not_save=True, do_not_submit=True + ) + si2.set_posting_time = 1 + si2.posting_date = date2 + si2.save().submit() + self.create_payment_entry(amount=amount, posting_date=date1).save().submit() + self.create_payment_entry(amount=amount, posting_date=date2).save().submit() + + pr = self.create_payment_reconciliation() + pr.from_invoice_date = pr.to_invoice_date = date1 + pr.from_payment_date = pr.to_payment_date = date1 + + pr.get_unreconciled_entries() + # assert only si and pe are fetched + self.assertEqual(len(pr.get("invoices")), 1) + self.assertEqual(len(pr.get("payments")), 1) + + pr.from_invoice_date = date2 + pr.to_invoice_date = date1 + pr.from_payment_date = date2 + pr.to_payment_date = date1 + + pr.get_unreconciled_entries() + # assert only si and pe are fetched + self.assertEqual(len(pr.get("invoices")), 2) + self.assertEqual(len(pr.get("payments")), 2) + + def test_filter_invoice_limit(self): + # check filter condition - invoice limit + transaction_date = nowdate() + rate = 100 + invoices = [] + payments = [] + for i in range(5): + invoices.append(self.create_sales_invoice(qty=1, rate=rate, posting_date=transaction_date)) + pe = self.create_payment_entry(amount=rate, posting_date=transaction_date).save().submit() + payments.append(pe) + + pr = self.create_payment_reconciliation() + pr.from_invoice_date = pr.to_invoice_date = transaction_date + pr.from_payment_date = pr.to_payment_date = transaction_date + pr.invoice_limit = 2 + pr.payment_limit = 3 + pr.get_unreconciled_entries() + + self.assertEqual(len(pr.get("invoices")), 2) + self.assertEqual(len(pr.get("payments")), 3) + + def test_payment_against_invoice(self): + si = self.create_sales_invoice(qty=1, rate=200) + pe = self.create_payment_entry(amount=55).save().submit() + # second payment entry + self.create_payment_entry(amount=35).save().submit() + + pr = self.create_payment_reconciliation() + + # reconcile multiple payments against invoice + pr.get_unreconciled_entries() + invoices = [x.as_dict() for x in pr.get("invoices")] + payments = [x.as_dict() for x in pr.get("payments")] + pr.allocate_entries(frappe._dict({"invoices": invoices, "payments": payments})) + pr.reconcile() + + si.reload() + self.assertEqual(si.status, "Partly Paid") + # check PR tool output post reconciliation + self.assertEqual(len(pr.get("invoices")), 1) + self.assertEqual(pr.get("invoices")[0].get("outstanding_amount"), 110) + self.assertEqual(pr.get("payments"), []) + + # cancel one PE + pe.reload() + pe.cancel() + pr.get_unreconciled_entries() + # check PR tool output + self.assertEqual(len(pr.get("invoices")), 1) + self.assertEqual(len(pr.get("payments")), 0) + self.assertEqual(pr.get("invoices")[0].get("outstanding_amount"), 165) + + def test_payment_against_journal(self): + transaction_date = nowdate() + + sales = "Sales - _PR" + amount = 921 + # debit debtors account to record an invoice + je = self.create_journal_entry(self.debit_to, sales, amount, transaction_date) + je.accounts[0].party_type = "Customer" + je.accounts[0].party = self.customer + je.save() + je.submit() + + self.create_payment_entry(amount=amount, posting_date=transaction_date).save().submit() + + pr = self.create_payment_reconciliation() + pr.minimum_invoice_amount = pr.maximum_invoice_amount = amount + pr.from_invoice_date = pr.to_invoice_date = transaction_date + pr.from_payment_date = pr.to_payment_date = transaction_date + + pr.get_unreconciled_entries() + invoices = [x.as_dict() for x in pr.get("invoices")] + payments = [x.as_dict() for x in pr.get("payments")] + pr.allocate_entries(frappe._dict({"invoices": invoices, "payments": payments})) + pr.reconcile() + + # check PR tool output + self.assertEqual(len(pr.get("invoices")), 0) + self.assertEqual(len(pr.get("payments")), 0) + + def test_journal_against_invoice(self): + transaction_date = nowdate() + amount = 100 + si = self.create_sales_invoice(qty=1, rate=amount, posting_date=transaction_date) + + # credit debtors account to record a payment + je = self.create_journal_entry(self.bank, self.debit_to, amount, transaction_date) + je.accounts[1].party_type = "Customer" + je.accounts[1].party = self.customer + je.save() + je.submit() + + pr = self.create_payment_reconciliation() + + pr.get_unreconciled_entries() + invoices = [x.as_dict() for x in pr.get("invoices")] + payments = [x.as_dict() for x in pr.get("payments")] + pr.allocate_entries(frappe._dict({"invoices": invoices, "payments": payments})) + pr.reconcile() + + # assert outstanding + si.reload() + self.assertEqual(si.status, "Paid") + self.assertEqual(si.outstanding_amount, 0) + + # check PR tool output + self.assertEqual(len(pr.get("invoices")), 0) + self.assertEqual(len(pr.get("payments")), 0) + + def test_journal_against_journal(self): + transaction_date = nowdate() + sales = "Sales - _PR" + amount = 100 + + # debit debtors account to simulate a invoice + je1 = self.create_journal_entry(self.debit_to, sales, amount, transaction_date) + je1.accounts[0].party_type = "Customer" + je1.accounts[0].party = self.customer + je1.save() + je1.submit() + + # credit debtors account to simulate a payment + je2 = self.create_journal_entry(self.bank, self.debit_to, amount, transaction_date) + je2.accounts[1].party_type = "Customer" + je2.accounts[1].party = self.customer + je2.save() + je2.submit() + + pr = self.create_payment_reconciliation() + + pr.get_unreconciled_entries() + invoices = [x.as_dict() for x in pr.get("invoices")] + payments = [x.as_dict() for x in pr.get("payments")] + pr.allocate_entries(frappe._dict({"invoices": invoices, "payments": payments})) + pr.reconcile() + + self.assertEqual(pr.get("invoices"), []) + self.assertEqual(pr.get("payments"), []) + + def test_cr_note_against_invoice(self): + transaction_date = nowdate() + amount = 100 + + si = self.create_sales_invoice(qty=1, rate=amount, posting_date=transaction_date) + + cr_note = self.create_sales_invoice( + qty=-1, rate=amount, posting_date=transaction_date, do_not_save=True, do_not_submit=True + ) + cr_note.is_return = 1 + cr_note = cr_note.save().submit() + + pr = self.create_payment_reconciliation() + + pr.get_unreconciled_entries() + invoices = [x.as_dict() for x in pr.get("invoices")] + payments = [x.as_dict() for x in pr.get("payments")] + pr.allocate_entries(frappe._dict({"invoices": invoices, "payments": payments})) + pr.reconcile() + + pr.get_unreconciled_entries() + # check reconciliation tool output + # reconciled invoice and credit note shouldn't show up in selection + self.assertEqual(pr.get("invoices"), []) + self.assertEqual(pr.get("payments"), []) + + # assert outstanding + si.reload() + self.assertEqual(si.status, "Paid") + self.assertEqual(si.outstanding_amount, 0) + + def test_cr_note_partial_against_invoice(self): + transaction_date = nowdate() + amount = 100 + allocated_amount = 80 + + si = self.create_sales_invoice(qty=1, rate=amount, posting_date=transaction_date) + + cr_note = self.create_sales_invoice( + qty=-1, rate=amount, posting_date=transaction_date, do_not_save=True, do_not_submit=True + ) + cr_note.is_return = 1 + cr_note = cr_note.save().submit() + + pr = self.create_payment_reconciliation() + + pr.get_unreconciled_entries() + invoices = [x.as_dict() for x in pr.get("invoices")] + payments = [x.as_dict() for x in pr.get("payments")] + pr.allocate_entries(frappe._dict({"invoices": invoices, "payments": payments})) + pr.allocation[0].allocated_amount = allocated_amount + pr.reconcile() + + # assert outstanding + si.reload() + self.assertEqual(si.status, "Partly Paid") + self.assertEqual(si.outstanding_amount, 20) + + pr.get_unreconciled_entries() + # check reconciliation tool output + self.assertEqual(len(pr.get("invoices")), 1) + self.assertEqual(len(pr.get("payments")), 1) + self.assertEqual(pr.get("invoices")[0].outstanding_amount, 20) + self.assertEqual(pr.get("payments")[0].amount, 20)