Merge pull request #50804 from ljain112/fix-asset-repair-amount

fix: correct logic for repair cost in asset repair
This commit is contained in:
Khushi Rawat
2025-12-10 12:58:13 +05:30
committed by GitHub
3 changed files with 298 additions and 85 deletions

View File

@@ -117,25 +117,52 @@ frappe.ui.form.on("Asset Repair", {
frm.refresh_field("stock_items");
}
},
});
purchase_invoice: function (frm) {
if (frm.doc.purchase_invoice) {
frappe.call({
method: "frappe.client.get_value",
args: {
doctype: "Purchase Invoice",
fieldname: "base_net_total",
filters: { name: frm.doc.purchase_invoice },
},
callback: function (r) {
if (r.message) {
frm.set_value("repair_cost", r.message.base_net_total);
}
},
});
} else {
frm.set_value("repair_cost", 0);
frappe.ui.form.on("Asset Repair Purchase Invoice", {
purchase_invoice: function (frm, cdt, cdn) {
frappe.model.set_value(cdt, cdn, {
expense_account: "",
repair_cost: 0,
});
},
expense_account: function (frm, cdt, cdn) {
let row = locals[cdt][cdn];
if (!row.purchase_invoice || !row.expense_account) {
frappe.model.set_value(cdt, cdn, "repair_cost", 0);
return;
}
frappe.call({
method: "erpnext.assets.doctype.asset_repair.asset_repair.get_unallocated_repair_cost",
args: {
purchase_invoice: row.purchase_invoice,
expense_account: row.expense_account,
},
callback: function (r) {
if (r.message !== undefined) {
if (r.message > 0) {
frappe.model.set_value(cdt, cdn, "repair_cost", r.message);
} else {
frappe.model.set_value(cdt, cdn, "repair_cost", 0);
let pi_link = frappe.utils.get_form_link(
"Purchase Invoice",
row.purchase_invoice,
true
);
frappe.msgprint({
message: __(
"Row {0}: The entire expense amount for account {1} in {2} has already been allocated.",
[row.idx, row.expense_account.bold(), pi_link]
),
indicator: "orange",
});
}
}
},
});
},
show_general_ledger: (frm) => {

View File

@@ -4,6 +4,7 @@
import frappe
from frappe import _
from frappe.query_builder import DocType
from frappe.query_builder.functions import Sum
from frappe.utils import cint, flt, get_link_to_form, getdate, time_diff_in_hours
import erpnext
@@ -14,7 +15,6 @@ from erpnext.accounts.general_ledger import make_gl_entries
from erpnext.assets.doctype.asset.asset import get_asset_account
from erpnext.assets.doctype.asset_activity.asset_activity import add_asset_activity
from erpnext.assets.doctype.asset_depreciation_schedule.asset_depreciation_schedule import (
get_depr_schedule,
reschedule_depreciation,
)
from erpnext.controllers.accounts_controller import AccountsController
@@ -59,7 +59,7 @@ class AssetRepair(AccountsController):
# end: auto-generated types
def validate(self):
self.asset_doc = frappe.get_doc("Asset", self.asset)
self.asset_doc = frappe.get_lazy_doc("Asset", self.asset)
self.validate_asset()
self.validate_dates()
self.validate_purchase_invoices()
@@ -84,60 +84,91 @@ class AssetRepair(AccountsController):
)
def validate_purchase_invoices(self):
self.validate_duplicate_purchase_invoices()
self.validate_purchase_invoice_status()
for d in self.invoices:
self.validate_purchase_invoice_status(d.purchase_invoice)
invoice_items = self.get_invoice_items(d.purchase_invoice)
self.validate_service_purchase_invoice(d.purchase_invoice, invoice_items)
self.validate_expense_account(d, invoice_items)
self.validate_purchase_invoice_repair_cost(d, invoice_items)
self.validate_expense_account(d)
self.validate_purchase_invoice_repair_cost(d)
def validate_purchase_invoice_status(self, purchase_invoice):
docstatus = frappe.db.get_value("Purchase Invoice", purchase_invoice, "docstatus")
if docstatus == 0:
frappe.throw(
_("{0} is still in Draft. Please submit it before saving the Asset Repair.").format(
get_link_to_form("Purchase Invoice", purchase_invoice)
)
def validate_duplicate_purchase_invoices(self):
# account wise duplicate check
purchase_invoices = set()
duplicates = []
for row in self.invoices:
key = (row.purchase_invoice, row.expense_account)
if key in purchase_invoices:
duplicates.append((row.idx, row.purchase_invoice, row.expense_account))
else:
purchase_invoices.add(key)
if duplicates:
duplicate_links = "".join(
[
f"<li>{_('Row #{0}:').format(idx)} {get_link_to_form('Purchase Invoice', pi)} - {frappe.bold(account)}</li>"
for idx, pi, account in duplicates
]
)
msg = _("The following rows are duplicates:") + f"<br><ul>{duplicate_links}</ul>"
frappe.throw(msg)
def get_invoice_items(self, pi):
invoice_items = frappe.get_all(
"Purchase Invoice Item",
filters={"parent": pi},
fields=["item_code", "expense_account", "base_net_amount"],
def validate_purchase_invoice_status(self):
pi_names = [row.purchase_invoice for row in self.invoices]
docstatus = frappe._dict(
frappe.db.get_all(
"Purchase Invoice",
filters={"name": ["in", pi_names]},
fields=["name", "docstatus"],
as_list=True,
)
)
return invoice_items
invalid_invoice = []
for row in self.invoices:
if docstatus.get(row.purchase_invoice) != 1:
invalid_invoice.append((row.idx, row.purchase_invoice))
def validate_service_purchase_invoice(self, purchase_invoice, invoice_items):
service_item_exists = False
for item in invoice_items:
if frappe.db.get_value("Item", item.item_code, "is_stock_item") == 0:
service_item_exists = True
break
if invalid_invoice:
invoice_links = "".join(
[
f"<li>{_('Row #{0}:').format(idx)} {get_link_to_form('Purchase Invoice', pi)}</li>"
for idx, pi in invalid_invoice
]
)
msg = _("The following Purchase Invoices are not submitted:") + f"<br><ul>{invoice_links}</ul>"
frappe.throw(msg)
if not service_item_exists:
def validate_expense_account(self, row):
"""Validate that the expense account exists in the purchase invoice for non-stock items."""
valid_accounts = _get_expense_accounts_for_purchase_invoice(row.purchase_invoice)
if row.expense_account not in valid_accounts:
frappe.throw(
_("Service item not present in Purchase Invoice {0}").format(
get_link_to_form("Purchase Invoice", purchase_invoice)
_(
"Row #{0}: Expense account {1} is not valid for Purchase Invoice {2}. "
"Only expense accounts from non-stock items are allowed."
).format(
row.idx,
frappe.bold(row.expense_account),
get_link_to_form("Purchase Invoice", row.purchase_invoice),
)
)
def validate_expense_account(self, row, invoice_items):
pi_expense_accounts = set([item.expense_account for item in invoice_items])
if row.expense_account not in pi_expense_accounts:
frappe.throw(
_("Expense account {0} not present in Purchase Invoice {1}").format(
row.expense_account, get_link_to_form("Purchase Invoice", row.purchase_invoice)
)
)
def validate_purchase_invoice_repair_cost(self, row):
"""Validate that repair cost doesn't exceed available amount."""
available_amount = get_unallocated_repair_cost(
row.purchase_invoice, row.expense_account, exclude_asset_repair=self.name
)
def validate_purchase_invoice_repair_cost(self, row, invoice_items):
pi_net_total = sum([flt(item.base_net_amount) for item in invoice_items])
if flt(row.repair_cost) > pi_net_total:
if flt(row.repair_cost) > available_amount:
frappe.throw(
_("Repair cost cannot be greater than purchase invoice base net total {0}").format(
pi_net_total
_(
"Row #{0}: Repair cost {1} exceeds available amount {2} for Purchase Invoice {3} and Account {4}"
).format(
row.idx,
frappe.bold(frappe.format_value(row.repair_cost, {"fieldtype": "Currency"})),
frappe.bold(frappe.format_value(available_amount, {"fieldtype": "Currency"})),
get_link_to_form("Purchase Invoice", row.purchase_invoice),
frappe.bold(row.expense_account),
)
)
@@ -199,7 +230,7 @@ class AssetRepair(AccountsController):
self.cancel_sabb()
def after_delete(self):
frappe.get_doc("Asset", self.asset).set_status()
frappe.get_lazy_doc("Asset", self.asset).set_status()
def check_repair_status(self):
if self.repair_status == "Pending" and self.docstatus == 1:
@@ -341,7 +372,10 @@ class AssetRepair(AccountsController):
return
# creating GL Entries for each row in Stock Items based on the Stock Entry created for it
stock_entry = frappe.get_doc("Stock Entry", {"asset_repair": self.name})
stock_entry_name = frappe.db.get_value("Stock Entry", {"asset_repair": self.name}, "name")
stock_entry_items = frappe.get_all(
"Stock Entry Detail", filters={"parent": stock_entry_name}, fields=["expense_account", "amount"]
)
default_expense_account = None
if not erpnext.is_perpetual_inventory_enabled(self.company):
@@ -351,7 +385,7 @@ class AssetRepair(AccountsController):
if not default_expense_account:
frappe.throw(_("Please set default Expense Account in Company {0}").format(self.company))
for item in stock_entry.items:
for item in stock_entry_items:
if flt(item.amount) > 0:
gl_entries.append(
self.get_gl_dict(
@@ -382,7 +416,7 @@ class AssetRepair(AccountsController):
"cost_center": self.cost_center,
"posting_date": self.completion_date,
"against_voucher_type": "Stock Entry",
"against_voucher": stock_entry.name,
"against_voucher": stock_entry_name,
"company": self.company,
},
item=self,
@@ -420,33 +454,152 @@ def get_downtime(failure_date, completion_date):
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def get_purchase_invoice(doctype, txt, searchfield, start, page_len, filters):
PurchaseInvoice = DocType("Purchase Invoice")
PurchaseInvoiceItem = DocType("Purchase Invoice Item")
Item = DocType("Item")
"""
Get Purchase Invoices that have expense accounts for non-stock items.
Only returns invoices with at least one non-stock, non-fixed-asset item with an expense account.
"""
pi = DocType("Purchase Invoice")
pi_item = DocType("Purchase Invoice Item")
item = DocType("Item")
return (
frappe.qb.from_(PurchaseInvoice)
.join(PurchaseInvoiceItem)
.on(PurchaseInvoiceItem.parent == PurchaseInvoice.name)
.join(Item)
.on(Item.name == PurchaseInvoiceItem.item_code)
.select(PurchaseInvoice.name)
query = (
frappe.qb.from_(pi)
.join(pi_item)
.on(pi_item.parent == pi.name)
.left_join(item)
.on(item.name == pi_item.item_code)
.select(pi.name)
.distinct()
.where(
(Item.is_stock_item == 0)
& (Item.is_fixed_asset == 0)
& (PurchaseInvoice.company == filters.get("company"))
& (PurchaseInvoice.docstatus == 1)
(pi.company == filters.get("company"))
& (pi.docstatus == 1)
& (pi_item.is_fixed_asset == 0)
& (pi_item.expense_account.isnotnull())
& (pi_item.expense_account != "")
& ((pi_item.item_code.isnull()) | (item.is_stock_item == 0))
)
).run(as_list=1)
)
if txt:
query = query.where(pi.name.like(f"%{txt}%"))
return query.run(as_list=1)
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def get_expense_accounts(doctype, txt, searchfield, start, page_len, filters):
PurchaseInvoiceItem = DocType("Purchase Invoice Item")
return (
frappe.qb.from_(PurchaseInvoiceItem)
.select(PurchaseInvoiceItem.expense_account)
.distinct()
.where(PurchaseInvoiceItem.parent == filters.get("purchase_invoice"))
).run(as_list=1)
"""
Get expense accounts for non-stock (service) items from the purchase invoice.
Used as a query function for link fields.
"""
purchase_invoice = filters.get("purchase_invoice")
if not purchase_invoice:
return []
expense_accounts = _get_expense_accounts_for_purchase_invoice(purchase_invoice)
return [[account] for account in expense_accounts]
def _get_expense_accounts_for_purchase_invoice(purchase_invoice: str) -> list[str]:
"""
Get expense accounts for non-stock items from the purchase invoice.
"""
pi_items = frappe.db.get_all(
"Purchase Invoice Item",
filters={"parent": purchase_invoice},
fields=["item_code", "expense_account", "is_fixed_asset"],
)
if not pi_items:
return []
# Get list of stock item codes from the invoice
item_codes = {item.item_code for item in pi_items if item.item_code}
stock_items = set()
if item_codes:
stock_items = set(
frappe.db.get_all(
"Item", filters={"name": ["in", list(item_codes)], "is_stock_item": 1}, pluck="name"
)
)
expense_accounts = set()
for item in pi_items:
# Skip stock items - they use warehouse accounts
if item.item_code and item.item_code in stock_items:
continue
# Skip fixed assets - they use asset accounts
if item.is_fixed_asset:
continue
# Use expense account from Purchase Invoice Item
if item.expense_account:
expense_accounts.add(item.expense_account)
return list(expense_accounts)
@frappe.whitelist()
def get_unallocated_repair_cost(
purchase_invoice: str, expense_account: str, exclude_asset_repair: str | None = None
) -> float:
"""
Calculate the unused repair cost for a purchase invoice and expense account.
"""
if not purchase_invoice or not expense_account:
return 0.0
frappe.has_permission("Purchase Invoice", "read", purchase_invoice, throw=True)
used_amount = get_allocated_repair_cost(purchase_invoice, expense_account, exclude_asset_repair)
total_amount = get_total_expense_amount(purchase_invoice, expense_account)
return flt(total_amount - used_amount)
def get_allocated_repair_cost(
purchase_invoice: str, expense_account: str, exclude_asset_repair: str | None = None
) -> float:
"""Get the total repair cost already allocated from submitted Asset Repairs."""
asset_repair_pi = DocType("Asset Repair Purchase Invoice")
query = (
frappe.qb.from_(asset_repair_pi)
.select(Sum(asset_repair_pi.repair_cost).as_("total"))
.where(
(asset_repair_pi.purchase_invoice == purchase_invoice)
& (asset_repair_pi.expense_account == expense_account)
& (asset_repair_pi.docstatus == 1)
)
)
if exclude_asset_repair:
query = query.where(asset_repair_pi.parent != exclude_asset_repair)
result = query.run(as_dict=True)
return flt(result[0].total) if result else 0.0
def get_total_expense_amount(purchase_invoice: str, expense_account: str) -> float:
"""Get the total expense amount from GL entries for a purchase invoice and account."""
gl_entry = DocType("GL Entry")
result = (
frappe.qb.from_(gl_entry)
.select((Sum(gl_entry.debit) - Sum(gl_entry.credit)).as_("total"))
.where(
(gl_entry.voucher_type == "Purchase Invoice")
& (gl_entry.voucher_no == purchase_invoice)
& (gl_entry.account == expense_account)
& (gl_entry.is_cancelled == 0)
)
).run(as_dict=True)
return flt(result[0].total) if result else 0.0

View File

@@ -175,6 +175,39 @@ class TestAssetRepair(IntegrationTestCase):
)
self.assertTrue(asset_repair.invoices)
def test_repair_cost_exceeds_available_amount(self):
"""Test that repair cost cannot exceed available amount from Purchase Invoice."""
asset_repair1 = create_asset_repair(
capitalize_repair_cost=1,
item="_Test Non Stock Item",
submit=1,
)
pi_name = asset_repair1.invoices[0].purchase_invoice
expense_account = asset_repair1.invoices[0].expense_account
asset_repair2 = frappe.new_doc("Asset Repair")
asset_repair2.update(
{
"asset": asset_repair1.asset,
"asset_name": asset_repair1.asset_name,
"failure_date": nowdate(),
"description": "Second Repair",
"company": asset_repair1.company,
"capitalize_repair_cost": 1,
}
)
asset_repair2.append(
"invoices",
{
"purchase_invoice": pi_name,
"expense_account": expense_account,
"repair_cost": 10, # PI already fully used, so this should fail
},
)
self.assertRaises(frappe.ValidationError, asset_repair2.save)
def test_gl_entries_with_perpetual_inventory(self):
set_depreciation_settings_in_company(company="_Test Company with perpetual inventory")