fix(gross-profit): handle returns outside sale period

This commit is contained in:
Navin-S-R
2026-01-23 12:40:38 +05:30
parent 7b197cbe1c
commit 67d8223f73

View File

@@ -5,15 +5,16 @@ from collections import OrderedDict
import frappe
from frappe import _, qb, scrub
from frappe.query_builder import Order
from frappe.query_builder import Case, Order
from frappe.query_builder.functions import Coalesce
from frappe.utils import cint, flt, formatdate
from pypika.terms import ExistsCriterion
from erpnext.accounts.doctype.accounting_dimension.accounting_dimension import (
get_accounting_dimensions,
get_dimension_with_children,
)
from erpnext.accounts.report.financial_statements import get_cost_centers_with_children
from erpnext.controllers.queries import get_match_cond
from erpnext.stock.report.stock_ledger.stock_ledger import get_item_group_condition
from erpnext.stock.utils import get_incoming_rate
@@ -853,129 +854,171 @@ class GrossProfitGenerator:
return flt(last_purchase_rate[0][0]) if last_purchase_rate else 0
def load_invoice_items(self):
conditions = ""
if self.filters.company:
conditions += " and `tabSales Invoice`.company = %(company)s"
if self.filters.from_date:
conditions += " and posting_date >= %(from_date)s"
if self.filters.to_date:
conditions += " and posting_date <= %(to_date)s"
self.si_list = []
SalesInvoice = frappe.qb.DocType("Sales Invoice")
base_query = self.prepare_invoice_query()
if self.filters.include_returned_invoices:
conditions += " and (is_return = 0 or (is_return=1 and return_against is null))"
invoice_query = base_query.where(
(SalesInvoice.is_return == 0)
| ((SalesInvoice.is_return == 1) & SalesInvoice.return_against.isnull())
)
else:
conditions += " and is_return = 0"
invoice_query = base_query.where(SalesInvoice.is_return == 0)
if self.filters.item_group:
conditions += f" and {get_item_group_condition(self.filters.item_group)}"
self.si_list += invoice_query.run(as_dict=True)
self.prepare_vouchers_to_ignore()
if self.filters.sales_person:
conditions += """
and exists(select 1
from `tabSales Team` st
where st.parent = `tabSales Invoice`.name
and st.sales_person = %(sales_person)s)
"""
ret_invoice_query = base_query.where(
(SalesInvoice.is_return == 1) & SalesInvoice.return_against.isnotnull()
)
if self.vouchers_to_ignore:
ret_invoice_query = base_query.where(SalesInvoice.return_against.notin(self.vouchers_to_ignore))
self.si_list += ret_invoice_query.run(as_dict=True)
def prepare_invoice_query(self):
SalesInvoice = frappe.qb.DocType("Sales Invoice")
SalesInvoiceItem = frappe.qb.DocType("Sales Invoice Item")
Item = frappe.qb.DocType("Item")
SalesTeam = frappe.qb.DocType("Sales Team")
PaymentSchedule = frappe.qb.DocType("Payment Schedule")
query = (
frappe.qb.from_(SalesInvoice)
.join(SalesInvoiceItem)
.on(SalesInvoiceItem.parent == SalesInvoice.name)
.join(Item)
.on(Item.name == SalesInvoiceItem.item_code)
.where((SalesInvoice.docstatus == 1) & (SalesInvoice.is_opening != "Yes"))
)
query = self.apply_common_filters(query, SalesInvoice, SalesInvoiceItem, SalesTeam)
query = query.select(
SalesInvoiceItem.parenttype,
SalesInvoiceItem.parent,
SalesInvoice.posting_date,
SalesInvoice.posting_time,
SalesInvoice.project,
SalesInvoice.update_stock,
SalesInvoice.customer,
SalesInvoice.customer_group,
SalesInvoice.customer_name,
SalesInvoice.territory,
SalesInvoiceItem.item_code,
SalesInvoice.base_net_total.as_("invoice_base_net_total"),
SalesInvoiceItem.item_name,
SalesInvoiceItem.description,
SalesInvoiceItem.warehouse,
SalesInvoiceItem.item_group,
SalesInvoiceItem.brand,
SalesInvoiceItem.so_detail,
SalesInvoiceItem.sales_order,
SalesInvoiceItem.dn_detail,
SalesInvoiceItem.delivery_note,
SalesInvoiceItem.stock_qty.as_("qty"),
SalesInvoiceItem.base_net_rate,
SalesInvoiceItem.base_net_amount,
SalesInvoiceItem.name.as_("item_row"),
SalesInvoice.is_return,
SalesInvoiceItem.cost_center,
SalesInvoiceItem.serial_and_batch_bundle,
)
if self.filters.group_by == "Sales Person":
sales_person_cols = """, sales.sales_person,
sales.allocated_percentage * `tabSales Invoice Item`.base_net_amount / 100 as allocated_amount,
sales.incentives
"""
sales_team_table = "left join `tabSales Team` sales on sales.parent = `tabSales Invoice`.name"
else:
sales_person_cols = ""
sales_team_table = ""
query = query.select(
SalesTeam.sales_person,
(SalesTeam.allocated_percentage * SalesInvoiceItem.base_net_amount / 100).as_(
"allocated_amount"
),
SalesTeam.incentives,
)
query = query.left_join(SalesTeam).on(SalesTeam.parent == SalesInvoice.name)
if self.filters.group_by == "Payment Term":
payment_term_cols = """,if(`tabSales Invoice`.is_return = 1,
'{}',
coalesce(schedule.payment_term, '{}')) as payment_term,
schedule.invoice_portion,
schedule.payment_amount """.format(_("Sales Return"), _("No Terms"))
payment_term_table = """ left join `tabPayment Schedule` schedule on schedule.parent = `tabSales Invoice`.name and
`tabSales Invoice`.is_return = 0 """
else:
payment_term_cols = ""
payment_term_table = ""
query = query.select(
Case()
.when(SalesInvoice.is_return == 1, _("Sales Return"))
.else_(Coalesce(PaymentSchedule.payment_term, _("No Terms")))
.as_("payment_term"),
PaymentSchedule.invoice_portion,
PaymentSchedule.payment_amount,
)
if self.filters.get("sales_invoice"):
conditions += " and `tabSales Invoice`.name = %(sales_invoice)s"
query = query.left_join(PaymentSchedule).on(
(PaymentSchedule.parent == SalesInvoice.name) & (SalesInvoice.is_return == 0)
)
if self.filters.get("item_code"):
conditions += " and `tabSales Invoice Item`.item_code = %(item_code)s"
query = query.orderby(SalesInvoice.posting_date, order=Order.desc).orderby(
SalesInvoice.posting_time, order=Order.desc
)
if self.filters.get("cost_center"):
return query
def apply_common_filters(self, query, SalesInvoice, SalesInvoiceItem, SalesTeam):
if self.filters.company:
query = query.where(SalesInvoice.company == self.filters.company)
if self.filters.from_date:
query = query.where(SalesInvoice.posting_date >= self.filters.from_date)
if self.filters.to_date:
query = query.where(SalesInvoice.posting_date <= self.filters.to_date)
if self.filters.item_group:
query = query.where(get_item_group_condition(self.filters.item_group))
if self.filters.sales_person:
query = query.where(
ExistsCriterion(
frappe.qb.from_(SalesTeam)
.select(1)
.where(
(SalesTeam.parent == SalesInvoice.name)
& (SalesTeam.sales_person == self.filters.sales_person)
)
)
)
if self.filters.sales_invoice:
query = query.where(SalesInvoice.name == self.filters.sales_invoice)
if self.filters.item_code:
query = query.where(SalesInvoiceItem.item_code == self.filters.item_code)
if self.filters.cost_center:
self.filters.cost_center = frappe.parse_json(self.filters.get("cost_center"))
self.filters.cost_center = get_cost_centers_with_children(self.filters.cost_center)
conditions += " and `tabSales Invoice Item`.cost_center in %(cost_center)s"
query = query.where(SalesInvoiceItem.cost_center.isin(self.filters.cost_center))
if self.filters.get("project"):
if self.filters.project:
self.filters.project = frappe.parse_json(self.filters.get("project"))
conditions += " and `tabSales Invoice Item`.project in %(project)s"
query = query.where(SalesInvoiceItem.project.isin(self.filters.project))
accounting_dimensions = get_accounting_dimensions(as_list=False)
if accounting_dimensions:
for dimension in accounting_dimensions:
if self.filters.get(dimension.fieldname):
if frappe.get_cached_value("DocType", dimension.document_type, "is_tree"):
self.filters[dimension.fieldname] = get_dimension_with_children(
dimension.document_type, self.filters.get(dimension.fieldname)
)
conditions += (
f" and `tabSales Invoice Item`.{dimension.fieldname} in %({dimension.fieldname})s"
)
else:
conditions += (
f" and `tabSales Invoice Item`.{dimension.fieldname} in %({dimension.fieldname})s"
)
for dim in get_accounting_dimensions(as_list=False) or []:
if self.filters.get(dim.fieldname):
if frappe.get_cached_value("DocType", dim.document_type, "is_tree"):
self.filters[dim.fieldname] = get_dimension_with_children(
dim.document_type, self.filters.get(dim.fieldname)
)
query = query.where(SalesInvoiceItem[dim.fieldname].isin(self.filters[dim.fieldname]))
if self.filters.get("warehouse"):
warehouse_details = frappe.db.get_value(
"Warehouse", self.filters.get("warehouse"), ["lft", "rgt"], as_dict=1
if self.filters.warehouse:
lft, rgt = frappe.db.get_value("Warehouse", self.filters.warehouse, ["lft", "rgt"])
WH = frappe.qb.DocType("Warehouse")
query = query.where(
SalesInvoiceItem.warehouse.isin(
frappe.qb.from_(WH).select(WH.name).where((WH.lft >= lft) & (WH.rgt <= rgt))
)
)
if warehouse_details:
conditions += f" and `tabSales Invoice Item`.warehouse in (select name from `tabWarehouse` wh where wh.lft >= {warehouse_details.lft} and wh.rgt <= {warehouse_details.rgt} and warehouse = wh.name)"
self.si_list = frappe.db.sql(
"""
select
`tabSales Invoice Item`.parenttype, `tabSales Invoice Item`.parent,
`tabSales Invoice`.posting_date, `tabSales Invoice`.posting_time,
`tabSales Invoice`.project, `tabSales Invoice`.update_stock,
`tabSales Invoice`.customer, `tabSales Invoice`.customer_group, `tabSales Invoice`.customer_name,
`tabSales Invoice`.territory, `tabSales Invoice Item`.item_code,
`tabSales Invoice`.base_net_total as "invoice_base_net_total",
`tabSales Invoice Item`.item_name, `tabSales Invoice Item`.description,
`tabSales Invoice Item`.warehouse, `tabSales Invoice Item`.item_group,
`tabSales Invoice Item`.brand, `tabSales Invoice Item`.so_detail,
`tabSales Invoice Item`.sales_order, `tabSales Invoice Item`.dn_detail,
`tabSales Invoice Item`.delivery_note, `tabSales Invoice Item`.stock_qty as qty,
`tabSales Invoice Item`.base_net_rate, `tabSales Invoice Item`.base_net_amount,
`tabSales Invoice Item`.name as "item_row", `tabSales Invoice`.is_return,
`tabSales Invoice Item`.cost_center, `tabSales Invoice Item`.serial_and_batch_bundle
{sales_person_cols}
{payment_term_cols}
from
`tabSales Invoice` inner join `tabSales Invoice Item`
on `tabSales Invoice Item`.parent = `tabSales Invoice`.name
join `tabItem` item on item.name = `tabSales Invoice Item`.item_code
{sales_team_table}
{payment_term_table}
where
`tabSales Invoice`.docstatus=1 and `tabSales Invoice`.is_opening!='Yes' {conditions} {match_cond}
order by
`tabSales Invoice`.posting_date desc, `tabSales Invoice`.posting_time desc""".format(
conditions=conditions,
sales_person_cols=sales_person_cols,
sales_team_table=sales_team_table,
payment_term_cols=payment_term_cols,
payment_term_table=payment_term_table,
match_cond=get_match_cond("Sales Invoice"),
),
self.filters,
as_dict=1,
)
return query
def prepare_vouchers_to_ignore(self):
self.vouchers_to_ignore = tuple(row["parent"] for row in self.si_list)
def get_delivery_notes(self):
self.delivery_notes = frappe._dict({})