refactor(test): make pos closing deterministic

This commit is contained in:
ruthra kumar
2026-02-13 12:34:45 +05:30
parent ec9b2f0567
commit 0474d0c4e8
2 changed files with 48 additions and 39 deletions

View File

@@ -24,20 +24,10 @@ from erpnext.tests.utils import ERPNextTestSuite
class TestPOSClosingEntry(ERPNextTestSuite):
@classmethod
def setUpClass(cls):
super().setUpClass()
frappe.db.sql("delete from `tabPOS Opening Entry`")
cls.enterClassContext(cls.change_settings("POS Settings", {"invoice_type": "POS Invoice"}))
@classmethod
def tearDownClass(cls):
frappe.db.sql("delete from `tabPOS Opening Entry`")
def setUp(self):
# Make stock available for POS Sales
frappe.db.sql("delete from `tabPOS Opening Entry`")
init_user_and_profile()
make_stock_entry(target="_Test Warehouse - _TC", qty=2, basic_rate=100)
frappe.db.set_single_value("POS Settings", "invoice_type", "POS Invoice")
def test_pos_closing_entry(self):
test_user, pos_profile = init_user_and_profile()
@@ -54,6 +44,7 @@ class TestPOSClosingEntry(ERPNextTestSuite):
pos_inv2.submit()
pcv_doc = make_closing_entry_from_opening(opening_entry)
pcv_doc.flags.in_test = True
payment = pcv_doc.payment_reconciliation[0]
self.assertEqual(payment.mode_of_payment, "Cash")
@@ -62,6 +53,7 @@ class TestPOSClosingEntry(ERPNextTestSuite):
if d.mode_of_payment == "Cash":
d.closing_amount = 6700
pcv_doc.flags.in_test = True
pcv_doc.submit()
self.assertEqual(pcv_doc.total_quantity, 2)
@@ -80,6 +72,7 @@ class TestPOSClosingEntry(ERPNextTestSuite):
pos_inv.submit()
pcv_doc = make_closing_entry_from_opening(opening_entry)
pcv_doc.flags.in_test = True
pcv_doc.submit()
self.assertTrue(pcv_doc.name)
@@ -112,6 +105,7 @@ class TestPOSClosingEntry(ERPNextTestSuite):
pos_return.submit()
pcv_doc = make_closing_entry_from_opening(opening_entry)
pcv_doc.flags.in_test = True
pcv_doc.submit()
opening_entry = create_opening_entry(pos_profile, test_user.name)
@@ -141,6 +135,7 @@ class TestPOSClosingEntry(ERPNextTestSuite):
if d.mode_of_payment == "Cash":
d.closing_amount = 6700
pcv_doc.flags.in_test = True
pcv_doc.submit()
pos_inv1.load_from_db()
@@ -194,6 +189,7 @@ class TestPOSClosingEntry(ERPNextTestSuite):
pcv_doc = make_closing_entry_from_opening(opening_entry)
# will assert coz the new mandatory accounting dimension bank is not set in POS Profile
pcv_doc.flags.in_test = True
self.assertRaises(frappe.ValidationError, pcv_doc.submit)
accounting_dimension_department = frappe.get_doc(
@@ -261,6 +257,7 @@ class TestPOSClosingEntry(ERPNextTestSuite):
self.assertEqual(batch_qty_with_pos, 0.0)
pcv_doc = make_closing_entry_from_opening(opening_entry)
pcv_doc.flags.in_test = True
pcv_doc.submit()
piv_merge = frappe.db.get_value("POS Invoice Merge Log", {"pos_closing_entry": pcv_doc.name}, "name")
@@ -284,6 +281,7 @@ class TestPOSClosingEntry(ERPNextTestSuite):
frappe.flags.print_message = True
pcv_doc.reload()
pcv_doc.flags.in_test = True
pcv_doc.cancel()
batch_qty_with_pos = get_batch_qty(batch_no, "_Test Warehouse - _TC", item_code)
@@ -326,6 +324,7 @@ class TestPOSClosingEntry(ERPNextTestSuite):
if d.mode_of_payment == "Cash":
d.closing_amount = 1500
pcv_doc.flags.in_test = True
pcv_doc.submit()
self.assertEqual(pcv_doc.total_quantity, 15)
@@ -469,7 +468,7 @@ def init_user_and_profile(**args):
user = "test@example.com"
test_user = frappe.get_doc("User", user)
roles = ("Accounts Manager", "Accounts User", "Sales Manager")
roles = ("Accounts Manager", "Accounts User", "Sales Manager", "Stock User", "Item Manager")
test_user.add_roles(*roles)
frappe.set_user(user)

View File

@@ -572,7 +572,7 @@ def split_invoices(invoices):
def create_merge_logs(invoice_by_customer, closing_entry=None):
try:
def merge_and_close():
for customer, invoices_acc_dim in invoice_by_customer.items():
for invoices in invoices_acc_dim.values():
for _invoices in split_invoices(invoices):
@@ -594,25 +594,30 @@ def create_merge_logs(invoice_by_customer, closing_entry=None):
closing_entry.db_set("error_message", "")
closing_entry.update_opening_entry()
except Exception as e:
frappe.db.rollback()
message_log = frappe.message_log.pop() if frappe.message_log else str(e)
error_message = get_error_message(message_log)
if frappe.in_test:
merge_and_close()
else:
try:
merge_and_close()
except Exception as e:
frappe.db.rollback()
message_log = frappe.message_log.pop() if frappe.message_log else str(e)
error_message = get_error_message(message_log)
if closing_entry:
closing_entry.set_status(update=True, status="Failed")
if isinstance(error_message, list):
error_message = json.dumps(error_message)
closing_entry.db_set("error_message", error_message)
raise
if closing_entry:
closing_entry.set_status(update=True, status="Failed")
if isinstance(error_message, list):
error_message = json.dumps(error_message)
closing_entry.db_set("error_message", error_message)
raise
finally:
frappe.db.commit()
frappe.publish_realtime("closing_process_complete", user=frappe.session.user)
finally:
frappe.db.commit()
frappe.publish_realtime("closing_process_complete", user=frappe.session.user)
def cancel_merge_logs(merge_logs, closing_entry=None):
try:
def merge_cancel_and_close():
for log in merge_logs:
merge_log = frappe.get_doc("POS Invoice Merge Log", log)
if merge_log.docstatus == 2:
@@ -626,19 +631,24 @@ def cancel_merge_logs(merge_logs, closing_entry=None):
closing_entry.db_set("error_message", "")
closing_entry.update_opening_entry(for_cancel=True)
except Exception as e:
frappe.db.rollback()
message_log = frappe.message_log.pop() if frappe.message_log else str(e)
error_message = get_error_message(message_log)
if frappe.flags.in_test:
merge_cancel_and_close()
else:
try:
merge_cancel_and_close()
except Exception as e:
frappe.db.rollback()
message_log = frappe.message_log.pop() if frappe.message_log else str(e)
error_message = get_error_message(message_log)
if closing_entry:
closing_entry.set_status(update=True, status="Submitted")
closing_entry.db_set("error_message", error_message)
raise
if closing_entry:
closing_entry.set_status(update=True, status="Submitted")
closing_entry.db_set("error_message", error_message)
raise
finally:
frappe.db.commit()
frappe.publish_realtime("closing_process_complete", user=frappe.session.user)
finally:
frappe.db.commit()
frappe.publish_realtime("closing_process_complete", user=frappe.session.user)
def enqueue_job(job, **kwargs):