Merge pull request #52964 from ljain112/fix-tds-party

fix(tds-report): correct party type filtering and refactor
This commit is contained in:
NaviN
2026-03-11 11:41:44 +05:30
committed by GitHub
2 changed files with 128 additions and 99 deletions

View File

@@ -5,6 +5,7 @@
import frappe
from frappe import _
from frappe.utils import flt, getdate
from pypika import Tuple
from erpnext.accounts.utils import get_currency_precision
@@ -19,18 +20,14 @@ def execute(filters=None):
validate_filters(filters)
(
tds_docs,
tds_accounts,
tax_category_map,
journal_entry_party_map,
net_total_map,
) = get_tds_docs(filters)
columns = get_columns(filters)
res = get_result(
filters, tds_docs, tds_accounts, tax_category_map, journal_entry_party_map, net_total_map
)
res = get_result(filters, tds_accounts, tax_category_map, net_total_map)
return columns, res
@@ -41,27 +38,23 @@ def validate_filters(filters):
frappe.throw(_("From Date must be before To Date"))
def get_result(filters, tds_docs, tds_accounts, tax_category_map, journal_entry_party_map, net_total_map):
party_map = get_party_pan_map(filters.get("party_type"))
def get_result(filters, tds_accounts, tax_category_map, net_total_map):
party_names = {v.party for v in net_total_map.values() if v.party}
party_map = get_party_pan_map(filters.get("party_type"), party_names)
tax_rate_map = get_tax_rate_map(filters)
gle_map = get_gle_map(tds_docs)
gle_map = get_gle_map(net_total_map)
precision = get_currency_precision()
out = []
entries = {}
for name, details in gle_map.items():
for (voucher_type, name), details in gle_map.items():
for entry in details:
tax_amount, total_amount, grand_total, base_total, base_tax_withholding_net_total = 0, 0, 0, 0, 0
tax_withholding_category, rate = None, None
bill_no, bill_date = "", ""
party = entry.party or entry.against
posting_date = entry.posting_date
voucher_type = entry.voucher_type
if voucher_type == "Journal Entry":
party_list = journal_entry_party_map.get(name)
if party_list:
party = party_list[0]
values = net_total_map.get((voucher_type, name))
party = values.party if values else (entry.party or entry.against)
if entry.account in tds_accounts.keys():
tax_amount += entry.credit - entry.debit
@@ -76,12 +69,13 @@ def get_result(filters, tds_docs, tds_accounts, tax_category_map, journal_entry_
rate = get_tax_withholding_rates(tax_rate_map.get(tax_withholding_category, []), posting_date)
values = net_total_map.get((voucher_type, name))
if values:
if voucher_type == "Journal Entry" and tax_amount and rate:
# back calculate total amount from rate and tax_amount
base_total = min(flt(tax_amount / (rate / 100), precision=precision), values[0])
base_total = min(
flt(tax_amount / (rate / 100), precision=precision),
values.base_tax_withholding_net_total,
)
total_amount = grand_total = base_total
base_tax_withholding_net_total = total_amount
@@ -90,16 +84,16 @@ def get_result(filters, tds_docs, tds_accounts, tax_category_map, journal_entry_
# back calculate total amount from rate and tax_amount
total_amount = flt((tax_amount * 100) / rate, precision=precision)
else:
total_amount = values[0]
total_amount = values.base_tax_withholding_net_total
grand_total = values[1]
base_total = values[2]
grand_total = values.grand_total
base_total = values.base_total
base_tax_withholding_net_total = total_amount
if voucher_type == "Purchase Invoice":
base_tax_withholding_net_total = values[0]
bill_no = values[3]
bill_date = values[4]
base_tax_withholding_net_total = values.base_tax_withholding_net_total
bill_no = values.bill_no
bill_date = values.bill_date
else:
total_amount += entry.credit
@@ -147,14 +141,17 @@ def get_result(filters, tds_docs, tds_accounts, tax_category_map, journal_entry_
else:
entries[key] = row
out = list(entries.values())
out.sort(key=lambda x: (x["section_code"], x["transaction_date"]))
out.sort(key=lambda x: (x["section_code"], x["transaction_date"], x["ref_no"]))
return out
def get_party_pan_map(party_type):
def get_party_pan_map(party_type, party_names):
party_map = frappe._dict()
if not party_names:
return party_map
fields = ["name", "tax_withholding_category"]
if party_type == "Supplier":
fields += ["supplier_type", "supplier_name"]
@@ -164,7 +161,7 @@ def get_party_pan_map(party_type):
if frappe.db.has_column(party_type, "pan"):
fields.append("pan")
party_details = frappe.db.get_all(party_type, fields=fields)
party_details = frappe.db.get_all(party_type, filters={"name": ("in", list(party_names))}, fields=fields)
for party in party_details:
party.party_type = party_type
@@ -173,22 +170,33 @@ def get_party_pan_map(party_type):
return party_map
def get_gle_map(documents):
# create gle_map of the form
# {"purchase_invoice": list of dict of all gle created for this invoice}
def get_gle_map(net_total_map):
if not net_total_map:
return {}
gle = frappe.qb.DocType("GL Entry")
voucher_pairs = list(net_total_map.keys())
rows = (
frappe.qb.from_(gle)
.select(
gle.credit,
gle.debit,
gle.account,
gle.voucher_no,
gle.posting_date,
gle.voucher_type,
gle.against,
gle.party,
gle.party_type,
)
.where(gle.is_cancelled == 0)
.where(Tuple(gle.voucher_type, gle.voucher_no).isin(voucher_pairs))
).run(as_dict=True)
gle_map = {}
gle = frappe.db.get_all(
"GL Entry",
{"voucher_no": ["in", documents], "is_cancelled": 0},
["credit", "debit", "account", "voucher_no", "posting_date", "voucher_type", "against", "party"],
)
for d in gle:
if d.voucher_no not in gle_map:
gle_map[d.voucher_no] = [d]
else:
gle_map[d.voucher_no].append(d)
for d in rows:
gle_map.setdefault((d.voucher_type, d.voucher_no), []).append(d)
return gle_map
@@ -308,14 +316,9 @@ def get_columns(filters):
def get_tds_docs(filters):
tds_documents = []
purchase_invoices = []
sales_invoices = []
payment_entries = []
journal_entries = []
vouchers = frappe._dict()
tax_category_map = frappe._dict()
net_total_map = frappe._dict()
journal_entry_party_map = frappe._dict()
bank_accounts = frappe.get_all("Account", {"is_group": 0, "account_type": "Bank"}, pluck="name")
_tds_accounts = frappe.get_all(
@@ -334,35 +337,14 @@ def get_tds_docs(filters):
tds_docs = get_tds_docs_query(filters, bank_accounts, list(tds_accounts.keys())).run(as_dict=True)
for d in tds_docs:
if d.voucher_type == "Purchase Invoice":
purchase_invoices.append(d.voucher_no)
if d.voucher_type == "Sales Invoice":
sales_invoices.append(d.voucher_no)
elif d.voucher_type == "Payment Entry":
payment_entries.append(d.voucher_no)
elif d.voucher_type == "Journal Entry":
journal_entries.append(d.voucher_no)
vouchers.setdefault(d.voucher_type, set()).add(d.voucher_no)
tds_documents.append(d.voucher_no)
if purchase_invoices:
get_doc_info(purchase_invoices, "Purchase Invoice", tax_category_map, net_total_map)
if sales_invoices:
get_doc_info(sales_invoices, "Sales Invoice", tax_category_map, net_total_map)
if payment_entries:
get_doc_info(payment_entries, "Payment Entry", tax_category_map, net_total_map)
if journal_entries:
journal_entry_party_map = get_journal_entry_party_map(journal_entries)
get_doc_info(journal_entries, "Journal Entry", tax_category_map, net_total_map)
for voucher_type, docs in vouchers.items():
get_doc_info(docs, voucher_type, tax_category_map, net_total_map, filters)
return (
tds_documents,
tds_accounts,
tax_category_map,
journal_entry_party_map,
net_total_map,
)
@@ -373,11 +355,16 @@ def get_tds_docs_query(filters, bank_accounts, tds_accounts):
_("No {0} Accounts found for this company.").format(frappe.bold(_("Tax Withholding"))),
title=_("Accounts Missing Error"),
)
invoice_voucher = "Purchase Invoice" if filters.get("party_type") == "Supplier" else "Sales Invoice"
voucher_types = {"Payment Entry", "Journal Entry", invoice_voucher}
gle = frappe.qb.DocType("GL Entry")
query = (
frappe.qb.from_(gle)
.select("voucher_no", "voucher_type", "against", "party")
.where(gle.is_cancelled == 0)
.where(gle.voucher_type.isin(voucher_types))
)
if filters.get("from_date"):
@@ -403,25 +390,27 @@ def get_tds_docs_query(filters, bank_accounts, tds_accounts):
return query
def get_journal_entry_party_map(journal_entries):
def get_journal_entry_party_map(journal_entries, party_type):
journal_entry_party_map = {}
for d in frappe.db.get_all(
"Journal Entry Account",
{
"parent": ("in", journal_entries),
"party_type": ("in", ("Supplier", "Customer")),
"party_type": party_type,
"party": ("is", "set"),
},
["parent", "party"],
):
if d.parent not in journal_entry_party_map:
journal_entry_party_map[d.parent] = []
journal_entry_party_map[d.parent].append(d.party)
journal_entry_party_map.setdefault(d.parent, []).append(d.party)
return journal_entry_party_map
def get_doc_info(vouchers, doctype, tax_category_map, net_total_map=None):
def get_doc_info(vouchers, doctype, tax_category_map, net_total_map=None, filters=None):
journal_entry_party_map = {}
party_type = filters.get("party_type") if filters else None
party = filters.get("party") if filters else None
common_fields = ["name"]
fields_dict = {
"Purchase Invoice": [
@@ -431,37 +420,81 @@ def get_doc_info(vouchers, doctype, tax_category_map, net_total_map=None):
"base_total",
"bill_no",
"bill_date",
"supplier",
],
"Sales Invoice": ["base_net_total", "grand_total", "base_total"],
"Sales Invoice": ["base_net_total", "grand_total", "base_total", "customer"],
"Payment Entry": [
"tax_withholding_category",
"paid_amount",
"paid_amount_after_tax",
"base_paid_amount",
"party",
"party_type",
],
"Journal Entry": ["tax_withholding_category", "total_debit"],
}
party_field = {
"Purchase Invoice": "supplier",
"Sales Invoice": "customer",
"Payment Entry": "party",
}
entries = frappe.get_all(
doctype, filters={"name": ("in", vouchers)}, fields=common_fields + fields_dict[doctype]
)
doc_filters = {"name": ("in", vouchers)}
if party and party_field.get(doctype):
doc_filters[party_field[doctype]] = party
if doctype == "Payment Entry":
doc_filters["party_type"] = party_type
entries = frappe.get_all(doctype, filters=doc_filters, fields=common_fields + fields_dict[doctype])
if doctype == "Journal Entry":
journal_entry_party_map = get_journal_entry_party_map(vouchers, party_type=party_type)
for entry in entries:
tax_category_map[(doctype, entry.name)] = entry.tax_withholding_category
value = frappe._dict(
party=None,
party_type=party_type,
base_tax_withholding_net_total=0,
grand_total=0,
base_total=0,
bill_no="",
bill_date="",
)
if doctype == "Purchase Invoice":
value = [
entry.base_tax_withholding_net_total,
entry.grand_total,
entry.base_total,
entry.bill_no,
entry.bill_date,
]
value.party = entry.supplier
value.party_type = "Supplier"
value.base_tax_withholding_net_total = entry.base_tax_withholding_net_total
value.grand_total = entry.grand_total
value.base_total = entry.base_total
value.bill_no = entry.bill_no
value.bill_date = entry.bill_date
elif doctype == "Sales Invoice":
value = [entry.base_net_total, entry.grand_total, entry.base_total]
value.party = entry.customer
value.party_type = "Customer"
value.base_tax_withholding_net_total = entry.base_net_total
value.grand_total = entry.grand_total
value.base_total = entry.base_total
elif doctype == "Payment Entry":
value = [entry.paid_amount, entry.paid_amount_after_tax, entry.base_paid_amount]
value.party = entry.party
value.party_type = entry.party_type
value.base_tax_withholding_net_total = entry.paid_amount
value.grand_total = entry.paid_amount_after_tax
value.base_total = entry.base_paid_amount
else:
value = [entry.total_debit] * 3
party_list = journal_entry_party_map.get(entry.name, [])
if party and party in party_list:
value.party = party
elif party_list:
value.party = sorted(party_list)[0]
value.party_type = party_type
value.base_tax_withholding_net_total = entry.total_debit
value.grand_total = entry.total_debit
value.base_total = entry.total_debit
net_total_map[(doctype, entry.name)] = value

View File

@@ -20,16 +20,12 @@ def execute(filters=None):
columns = get_columns(filters)
(
tds_docs,
tds_accounts,
tax_category_map,
journal_entry_party_map,
invoice_total_map,
net_total_map,
) = get_tds_docs(filters)
res = get_result(
filters, tds_docs, tds_accounts, tax_category_map, journal_entry_party_map, invoice_total_map
)
res = get_result(filters, tds_accounts, tax_category_map, net_total_map)
final_result = group_by_party_and_category(res, filters)
return columns, final_result