diff --git a/erpnext/assets/doctype/asset_repair/asset_repair.js b/erpnext/assets/doctype/asset_repair/asset_repair.js
index f09f2fe2e17..bdbf60f653e 100644
--- a/erpnext/assets/doctype/asset_repair/asset_repair.js
+++ b/erpnext/assets/doctype/asset_repair/asset_repair.js
@@ -117,25 +117,52 @@ frappe.ui.form.on("Asset Repair", {
frm.refresh_field("stock_items");
}
},
+});
- purchase_invoice: function (frm) {
- if (frm.doc.purchase_invoice) {
- frappe.call({
- method: "frappe.client.get_value",
- args: {
- doctype: "Purchase Invoice",
- fieldname: "base_net_total",
- filters: { name: frm.doc.purchase_invoice },
- },
- callback: function (r) {
- if (r.message) {
- frm.set_value("repair_cost", r.message.base_net_total);
- }
- },
- });
- } else {
- frm.set_value("repair_cost", 0);
+frappe.ui.form.on("Asset Repair Purchase Invoice", {
+ purchase_invoice: function (frm, cdt, cdn) {
+ frappe.model.set_value(cdt, cdn, {
+ expense_account: "",
+ repair_cost: 0,
+ });
+ },
+
+ expense_account: function (frm, cdt, cdn) {
+ let row = locals[cdt][cdn];
+
+ if (!row.purchase_invoice || !row.expense_account) {
+ frappe.model.set_value(cdt, cdn, "repair_cost", 0);
+ return;
}
+
+ frappe.call({
+ method: "erpnext.assets.doctype.asset_repair.asset_repair.get_unallocated_repair_cost",
+ args: {
+ purchase_invoice: row.purchase_invoice,
+ expense_account: row.expense_account,
+ },
+ callback: function (r) {
+ if (r.message !== undefined) {
+ if (r.message > 0) {
+ frappe.model.set_value(cdt, cdn, "repair_cost", r.message);
+ } else {
+ frappe.model.set_value(cdt, cdn, "repair_cost", 0);
+ let pi_link = frappe.utils.get_form_link(
+ "Purchase Invoice",
+ row.purchase_invoice,
+ true
+ );
+ frappe.msgprint({
+ message: __(
+ "Row {0}: The entire expense amount for account {1} in {2} has already been allocated.",
+ [row.idx, row.expense_account.bold(), pi_link]
+ ),
+ indicator: "orange",
+ });
+ }
+ }
+ },
+ });
},
show_general_ledger: (frm) => {
diff --git a/erpnext/assets/doctype/asset_repair/asset_repair.py b/erpnext/assets/doctype/asset_repair/asset_repair.py
index 5c280404958..476b0187bf1 100644
--- a/erpnext/assets/doctype/asset_repair/asset_repair.py
+++ b/erpnext/assets/doctype/asset_repair/asset_repair.py
@@ -4,6 +4,7 @@
import frappe
from frappe import _
from frappe.query_builder import DocType
+from frappe.query_builder.functions import Sum
from frappe.utils import cint, flt, get_link_to_form, getdate, time_diff_in_hours
import erpnext
@@ -14,7 +15,6 @@ from erpnext.accounts.general_ledger import make_gl_entries
from erpnext.assets.doctype.asset.asset import get_asset_account
from erpnext.assets.doctype.asset_activity.asset_activity import add_asset_activity
from erpnext.assets.doctype.asset_depreciation_schedule.asset_depreciation_schedule import (
- get_depr_schedule,
reschedule_depreciation,
)
from erpnext.controllers.accounts_controller import AccountsController
@@ -59,7 +59,7 @@ class AssetRepair(AccountsController):
# end: auto-generated types
def validate(self):
- self.asset_doc = frappe.get_doc("Asset", self.asset)
+ self.asset_doc = frappe.get_lazy_doc("Asset", self.asset)
self.validate_asset()
self.validate_dates()
self.validate_purchase_invoices()
@@ -84,60 +84,91 @@ class AssetRepair(AccountsController):
)
def validate_purchase_invoices(self):
+ self.validate_duplicate_purchase_invoices()
+ self.validate_purchase_invoice_status()
+
for d in self.invoices:
- self.validate_purchase_invoice_status(d.purchase_invoice)
- invoice_items = self.get_invoice_items(d.purchase_invoice)
- self.validate_service_purchase_invoice(d.purchase_invoice, invoice_items)
- self.validate_expense_account(d, invoice_items)
- self.validate_purchase_invoice_repair_cost(d, invoice_items)
+ self.validate_expense_account(d)
+ self.validate_purchase_invoice_repair_cost(d)
- def validate_purchase_invoice_status(self, purchase_invoice):
- docstatus = frappe.db.get_value("Purchase Invoice", purchase_invoice, "docstatus")
- if docstatus == 0:
- frappe.throw(
- _("{0} is still in Draft. Please submit it before saving the Asset Repair.").format(
- get_link_to_form("Purchase Invoice", purchase_invoice)
- )
+ def validate_duplicate_purchase_invoices(self):
+ # account wise duplicate check
+ purchase_invoices = set()
+ duplicates = []
+ for row in self.invoices:
+ key = (row.purchase_invoice, row.expense_account)
+ if key in purchase_invoices:
+ duplicates.append((row.idx, row.purchase_invoice, row.expense_account))
+ else:
+ purchase_invoices.add(key)
+
+ if duplicates:
+ duplicate_links = "".join(
+ [
+ f"
{_('Row #{0}:').format(idx)} {get_link_to_form('Purchase Invoice', pi)} - {frappe.bold(account)}"
+ for idx, pi, account in duplicates
+ ]
)
+ msg = _("The following rows are duplicates:") + f"
"
+ frappe.throw(msg)
- def get_invoice_items(self, pi):
- invoice_items = frappe.get_all(
- "Purchase Invoice Item",
- filters={"parent": pi},
- fields=["item_code", "expense_account", "base_net_amount"],
+ def validate_purchase_invoice_status(self):
+ pi_names = [row.purchase_invoice for row in self.invoices]
+ docstatus = frappe._dict(
+ frappe.db.get_all(
+ "Purchase Invoice",
+ filters={"name": ["in", pi_names]},
+ fields=["name", "docstatus"],
+ as_list=True,
+ )
)
- return invoice_items
+ invalid_invoice = []
+ for row in self.invoices:
+ if docstatus.get(row.purchase_invoice) != 1:
+ invalid_invoice.append((row.idx, row.purchase_invoice))
- def validate_service_purchase_invoice(self, purchase_invoice, invoice_items):
- service_item_exists = False
- for item in invoice_items:
- if frappe.db.get_value("Item", item.item_code, "is_stock_item") == 0:
- service_item_exists = True
- break
+ if invalid_invoice:
+ invoice_links = "".join(
+ [
+ f"{_('Row #{0}:').format(idx)} {get_link_to_form('Purchase Invoice', pi)}"
+ for idx, pi in invalid_invoice
+ ]
+ )
+ msg = _("The following Purchase Invoices are not submitted:") + f"
"
+ frappe.throw(msg)
- if not service_item_exists:
+ def validate_expense_account(self, row):
+ """Validate that the expense account exists in the purchase invoice for non-stock items."""
+ valid_accounts = _get_expense_accounts_for_purchase_invoice(row.purchase_invoice)
+ if row.expense_account not in valid_accounts:
frappe.throw(
- _("Service item not present in Purchase Invoice {0}").format(
- get_link_to_form("Purchase Invoice", purchase_invoice)
+ _(
+ "Row #{0}: Expense account {1} is not valid for Purchase Invoice {2}. "
+ "Only expense accounts from non-stock items are allowed."
+ ).format(
+ row.idx,
+ frappe.bold(row.expense_account),
+ get_link_to_form("Purchase Invoice", row.purchase_invoice),
)
)
- def validate_expense_account(self, row, invoice_items):
- pi_expense_accounts = set([item.expense_account for item in invoice_items])
- if row.expense_account not in pi_expense_accounts:
- frappe.throw(
- _("Expense account {0} not present in Purchase Invoice {1}").format(
- row.expense_account, get_link_to_form("Purchase Invoice", row.purchase_invoice)
- )
- )
+ def validate_purchase_invoice_repair_cost(self, row):
+ """Validate that repair cost doesn't exceed available amount."""
+ available_amount = get_unallocated_repair_cost(
+ row.purchase_invoice, row.expense_account, exclude_asset_repair=self.name
+ )
- def validate_purchase_invoice_repair_cost(self, row, invoice_items):
- pi_net_total = sum([flt(item.base_net_amount) for item in invoice_items])
- if flt(row.repair_cost) > pi_net_total:
+ if flt(row.repair_cost) > available_amount:
frappe.throw(
- _("Repair cost cannot be greater than purchase invoice base net total {0}").format(
- pi_net_total
+ _(
+ "Row #{0}: Repair cost {1} exceeds available amount {2} for Purchase Invoice {3} and Account {4}"
+ ).format(
+ row.idx,
+ frappe.bold(frappe.format_value(row.repair_cost, {"fieldtype": "Currency"})),
+ frappe.bold(frappe.format_value(available_amount, {"fieldtype": "Currency"})),
+ get_link_to_form("Purchase Invoice", row.purchase_invoice),
+ frappe.bold(row.expense_account),
)
)
@@ -199,7 +230,7 @@ class AssetRepair(AccountsController):
self.cancel_sabb()
def after_delete(self):
- frappe.get_doc("Asset", self.asset).set_status()
+ frappe.get_lazy_doc("Asset", self.asset).set_status()
def check_repair_status(self):
if self.repair_status == "Pending" and self.docstatus == 1:
@@ -341,7 +372,10 @@ class AssetRepair(AccountsController):
return
# creating GL Entries for each row in Stock Items based on the Stock Entry created for it
- stock_entry = frappe.get_doc("Stock Entry", {"asset_repair": self.name})
+ stock_entry_name = frappe.db.get_value("Stock Entry", {"asset_repair": self.name}, "name")
+ stock_entry_items = frappe.get_all(
+ "Stock Entry Detail", filters={"parent": stock_entry_name}, fields=["expense_account", "amount"]
+ )
default_expense_account = None
if not erpnext.is_perpetual_inventory_enabled(self.company):
@@ -351,7 +385,7 @@ class AssetRepair(AccountsController):
if not default_expense_account:
frappe.throw(_("Please set default Expense Account in Company {0}").format(self.company))
- for item in stock_entry.items:
+ for item in stock_entry_items:
if flt(item.amount) > 0:
gl_entries.append(
self.get_gl_dict(
@@ -382,7 +416,7 @@ class AssetRepair(AccountsController):
"cost_center": self.cost_center,
"posting_date": self.completion_date,
"against_voucher_type": "Stock Entry",
- "against_voucher": stock_entry.name,
+ "against_voucher": stock_entry_name,
"company": self.company,
},
item=self,
@@ -420,33 +454,152 @@ def get_downtime(failure_date, completion_date):
@frappe.whitelist()
+@frappe.validate_and_sanitize_search_inputs
def get_purchase_invoice(doctype, txt, searchfield, start, page_len, filters):
- PurchaseInvoice = DocType("Purchase Invoice")
- PurchaseInvoiceItem = DocType("Purchase Invoice Item")
- Item = DocType("Item")
+ """
+ Get Purchase Invoices that have expense accounts for non-stock items.
+ Only returns invoices with at least one non-stock, non-fixed-asset item with an expense account.
+ """
+ pi = DocType("Purchase Invoice")
+ pi_item = DocType("Purchase Invoice Item")
+ item = DocType("Item")
- return (
- frappe.qb.from_(PurchaseInvoice)
- .join(PurchaseInvoiceItem)
- .on(PurchaseInvoiceItem.parent == PurchaseInvoice.name)
- .join(Item)
- .on(Item.name == PurchaseInvoiceItem.item_code)
- .select(PurchaseInvoice.name)
+ query = (
+ frappe.qb.from_(pi)
+ .join(pi_item)
+ .on(pi_item.parent == pi.name)
+ .left_join(item)
+ .on(item.name == pi_item.item_code)
+ .select(pi.name)
+ .distinct()
.where(
- (Item.is_stock_item == 0)
- & (Item.is_fixed_asset == 0)
- & (PurchaseInvoice.company == filters.get("company"))
- & (PurchaseInvoice.docstatus == 1)
+ (pi.company == filters.get("company"))
+ & (pi.docstatus == 1)
+ & (pi_item.is_fixed_asset == 0)
+ & (pi_item.expense_account.isnotnull())
+ & (pi_item.expense_account != "")
+ & ((pi_item.item_code.isnull()) | (item.is_stock_item == 0))
)
- ).run(as_list=1)
+ )
+
+ if txt:
+ query = query.where(pi.name.like(f"%{txt}%"))
+
+ return query.run(as_list=1)
@frappe.whitelist()
+@frappe.validate_and_sanitize_search_inputs
def get_expense_accounts(doctype, txt, searchfield, start, page_len, filters):
- PurchaseInvoiceItem = DocType("Purchase Invoice Item")
- return (
- frappe.qb.from_(PurchaseInvoiceItem)
- .select(PurchaseInvoiceItem.expense_account)
- .distinct()
- .where(PurchaseInvoiceItem.parent == filters.get("purchase_invoice"))
- ).run(as_list=1)
+ """
+ Get expense accounts for non-stock (service) items from the purchase invoice.
+ Used as a query function for link fields.
+ """
+ purchase_invoice = filters.get("purchase_invoice")
+ if not purchase_invoice:
+ return []
+
+ expense_accounts = _get_expense_accounts_for_purchase_invoice(purchase_invoice)
+
+ return [[account] for account in expense_accounts]
+
+
+def _get_expense_accounts_for_purchase_invoice(purchase_invoice: str) -> list[str]:
+ """
+ Get expense accounts for non-stock items from the purchase invoice.
+ """
+ pi_items = frappe.db.get_all(
+ "Purchase Invoice Item",
+ filters={"parent": purchase_invoice},
+ fields=["item_code", "expense_account", "is_fixed_asset"],
+ )
+
+ if not pi_items:
+ return []
+
+ # Get list of stock item codes from the invoice
+ item_codes = {item.item_code for item in pi_items if item.item_code}
+ stock_items = set()
+ if item_codes:
+ stock_items = set(
+ frappe.db.get_all(
+ "Item", filters={"name": ["in", list(item_codes)], "is_stock_item": 1}, pluck="name"
+ )
+ )
+
+ expense_accounts = set()
+
+ for item in pi_items:
+ # Skip stock items - they use warehouse accounts
+ if item.item_code and item.item_code in stock_items:
+ continue
+
+ # Skip fixed assets - they use asset accounts
+ if item.is_fixed_asset:
+ continue
+
+ # Use expense account from Purchase Invoice Item
+ if item.expense_account:
+ expense_accounts.add(item.expense_account)
+
+ return list(expense_accounts)
+
+
+@frappe.whitelist()
+def get_unallocated_repair_cost(
+ purchase_invoice: str, expense_account: str, exclude_asset_repair: str | None = None
+) -> float:
+ """
+ Calculate the unused repair cost for a purchase invoice and expense account.
+ """
+ if not purchase_invoice or not expense_account:
+ return 0.0
+
+ frappe.has_permission("Purchase Invoice", "read", purchase_invoice, throw=True)
+
+ used_amount = get_allocated_repair_cost(purchase_invoice, expense_account, exclude_asset_repair)
+ total_amount = get_total_expense_amount(purchase_invoice, expense_account)
+
+ return flt(total_amount - used_amount)
+
+
+def get_allocated_repair_cost(
+ purchase_invoice: str, expense_account: str, exclude_asset_repair: str | None = None
+) -> float:
+ """Get the total repair cost already allocated from submitted Asset Repairs."""
+ asset_repair_pi = DocType("Asset Repair Purchase Invoice")
+
+ query = (
+ frappe.qb.from_(asset_repair_pi)
+ .select(Sum(asset_repair_pi.repair_cost).as_("total"))
+ .where(
+ (asset_repair_pi.purchase_invoice == purchase_invoice)
+ & (asset_repair_pi.expense_account == expense_account)
+ & (asset_repair_pi.docstatus == 1)
+ )
+ )
+
+ if exclude_asset_repair:
+ query = query.where(asset_repair_pi.parent != exclude_asset_repair)
+
+ result = query.run(as_dict=True)
+
+ return flt(result[0].total) if result else 0.0
+
+
+def get_total_expense_amount(purchase_invoice: str, expense_account: str) -> float:
+ """Get the total expense amount from GL entries for a purchase invoice and account."""
+ gl_entry = DocType("GL Entry")
+
+ result = (
+ frappe.qb.from_(gl_entry)
+ .select((Sum(gl_entry.debit) - Sum(gl_entry.credit)).as_("total"))
+ .where(
+ (gl_entry.voucher_type == "Purchase Invoice")
+ & (gl_entry.voucher_no == purchase_invoice)
+ & (gl_entry.account == expense_account)
+ & (gl_entry.is_cancelled == 0)
+ )
+ ).run(as_dict=True)
+
+ return flt(result[0].total) if result else 0.0
diff --git a/erpnext/assets/doctype/asset_repair/test_asset_repair.py b/erpnext/assets/doctype/asset_repair/test_asset_repair.py
index e9d603e2267..15ceb51648b 100644
--- a/erpnext/assets/doctype/asset_repair/test_asset_repair.py
+++ b/erpnext/assets/doctype/asset_repair/test_asset_repair.py
@@ -175,6 +175,39 @@ class TestAssetRepair(IntegrationTestCase):
)
self.assertTrue(asset_repair.invoices)
+ def test_repair_cost_exceeds_available_amount(self):
+ """Test that repair cost cannot exceed available amount from Purchase Invoice."""
+ asset_repair1 = create_asset_repair(
+ capitalize_repair_cost=1,
+ item="_Test Non Stock Item",
+ submit=1,
+ )
+
+ pi_name = asset_repair1.invoices[0].purchase_invoice
+ expense_account = asset_repair1.invoices[0].expense_account
+
+ asset_repair2 = frappe.new_doc("Asset Repair")
+ asset_repair2.update(
+ {
+ "asset": asset_repair1.asset,
+ "asset_name": asset_repair1.asset_name,
+ "failure_date": nowdate(),
+ "description": "Second Repair",
+ "company": asset_repair1.company,
+ "capitalize_repair_cost": 1,
+ }
+ )
+ asset_repair2.append(
+ "invoices",
+ {
+ "purchase_invoice": pi_name,
+ "expense_account": expense_account,
+ "repair_cost": 10, # PI already fully used, so this should fail
+ },
+ )
+
+ self.assertRaises(frappe.ValidationError, asset_repair2.save)
+
def test_gl_entries_with_perpetual_inventory(self):
set_depreciation_settings_in_company(company="_Test Company with perpetual inventory")