From 767e4762bb7dd7d854bcd5071e187af6efe0bcab Mon Sep 17 00:00:00 2001 From: Rohit Waghchaure Date: Wed, 10 Dec 2025 17:46:06 +0530 Subject: [PATCH] feat: run parallel reposting --- erpnext/hooks.py | 4 +- .../repost_item_valuation.py | 121 ++++++++++++++---- .../stock_reposting_settings.json | 17 ++- .../stock_reposting_settings.py | 12 ++ 4 files changed, 126 insertions(+), 28 deletions(-) diff --git a/erpnext/hooks.py b/erpnext/hooks.py index 9a65062e7c4..22a8bd0ae89 100644 --- a/erpnext/hooks.py +++ b/erpnext/hooks.py @@ -411,7 +411,9 @@ scheduler_events = { "0/15 * * * *": [ "erpnext.manufacturing.doctype.bom_update_log.bom_update_log.resume_bom_cost_update_jobs", ], - "0/30 * * * *": [], + "0/30 * * * *": [ + "erpnext.stock.doctype.repost_item_valuation.repost_item_valuation.run_parallel_reposting", + ], # Hourly but offset by 30 minutes "30 * * * *": [ "erpnext.accounts.doctype.gl_entry.gl_entry.rename_gle_sle_docs", diff --git a/erpnext/stock/doctype/repost_item_valuation/repost_item_valuation.py b/erpnext/stock/doctype/repost_item_valuation/repost_item_valuation.py index a6899207281..e74901584e3 100644 --- a/erpnext/stock/doctype/repost_item_valuation/repost_item_valuation.py +++ b/erpnext/stock/doctype/repost_item_valuation/repost_item_valuation.py @@ -9,7 +9,7 @@ from frappe.desk.form.load import get_attachments from frappe.exceptions import QueryDeadlockError, QueryTimeoutError from frappe.model.document import Document from frappe.query_builder import DocType, Interval -from frappe.query_builder.functions import Max, Now +from frappe.query_builder.functions import CombineDatetime, Max, Now from frappe.utils import cint, get_link_to_form, get_weekday, getdate, now, nowtime from frappe.utils.user import get_users_with_role from rq.timeouts import JobTimeoutException @@ -539,41 +539,105 @@ def get_recipients(): return recipients +def run_parallel_reposting(): + # This function is called every 15 minutes via hooks.py + + if not frappe.db.get_single_value("Stock Reposting Settings", "enable_parallel_reposting"): + return + + if not in_configured_timeslot(): + return + + items = set() + no_of_parallel_reposting = ( + frappe.db.get_single_value("Stock Reposting Settings", "no_of_parallel_reposting") or 4 + ) + + riv_entries = get_repost_item_valuation_entries("Item and Warehouse") + + for row in riv_entries: + if row.repost_only_accounting_ledgers: + execute_reposting_entry(row.name) + continue + + if frappe.db.get_value( + "Repost Item Valuation", + { + "based_on": "Item and Warehouse", + "item_code": row.item_code, + "docstatus": 1, + "status": "In Progress", + }, + "name", + ): + continue + + if row.item_code in items: + continue + + items.add(row.item_code) + if len(items) > no_of_parallel_reposting: + break + + frappe.enqueue( + execute_reposting_entry, + name=row.name, + queue="long", + timeout=1800, + ) + + def repost_entries(): - """ - Reposts 'Repost Item Valuation' entries in queue. - Called hourly via hooks.py. - """ + # This function is called every hour via hooks.py + + if frappe.db.get_single_value("Stock Reposting Settings", "enable_parallel_reposting"): + return + if not in_configured_timeslot(): return riv_entries = get_repost_item_valuation_entries() for row in riv_entries: - doc = frappe.get_doc("Repost Item Valuation", row.name) - if ( - doc.repost_only_accounting_ledgers - and doc.reposting_reference - and frappe.db.get_value("Repost Item Valuation", doc.reposting_reference, "status") - not in ["Completed", "Skipped"] - ): - continue - - if doc.status in ("Queued", "In Progress"): - repost(doc) - doc.deduplicate_similar_repost() + execute_reposting_entry(row.name) -def get_repost_item_valuation_entries(): - return frappe.db.sql( - """ SELECT name from `tabRepost Item Valuation` - WHERE status in ('Queued', 'In Progress') and creation <= %s and docstatus = 1 - ORDER BY timestamp(posting_date, posting_time) asc, creation asc, status asc - """, - now(), - as_dict=1, +def execute_reposting_entry(name): + doc = frappe.get_doc("Repost Item Valuation", name) + if ( + doc.repost_only_accounting_ledgers + and doc.reposting_reference + and frappe.db.get_value("Repost Item Valuation", doc.reposting_reference, "status") + not in ["Completed", "Skipped"] + ): + return + + if doc.status in ("Queued", "In Progress"): + repost(doc) + doc.deduplicate_similar_repost() + + +def get_repost_item_valuation_entries(based_on=None): + doctype = frappe.qb.DocType("Repost Item Valuation") + + query = ( + frappe.qb.from_(doctype) + .select(doctype.name, doctype.based_on, doctype.item_code, doctype.repost_only_accounting_ledgers) + .where( + (doctype.status.isin(["Queued", "In Progress"])) + & (doctype.creation <= now()) + & (doctype.docstatus == 1) + ) + .orderby(CombineDatetime(doctype.posting_date, doctype.posting_time), order=frappe.qb.asc) + .orderby(doctype.creation, order=frappe.qb.asc) + .orderby(doctype.status, order=frappe.qb.asc) ) + if based_on: + query = query.where((doctype.based_on == based_on) | (doctype.repost_only_accounting_ledgers == 1)) + + return query.run(as_dict=True) + def in_configured_timeslot(repost_settings=None, current_time=None): """Check if current time is in configured timeslot for reposting.""" @@ -601,9 +665,14 @@ def in_configured_timeslot(repost_settings=None, current_time=None): @frappe.whitelist() def execute_repost_item_valuation(): """Execute repost item valuation via scheduler.""" + + method = "erpnext.stock.doctype.repost_item_valuation.repost_item_valuation.repost_entries" + if frappe.db.get_single_value("Stock Reposting Settings", "enable_parallel_reposting"): + method = "erpnext.stock.doctype.repost_item_valuation.repost_item_valuation.run_parallel_reposting" + if name := frappe.db.get_value( "Scheduled Job Type", - {"method": "erpnext.stock.doctype.repost_item_valuation.repost_item_valuation.repost_entries"}, + {"method": method}, "name", ): frappe.get_doc("Scheduled Job Type", name).enqueue(force=True) diff --git a/erpnext/stock/doctype/stock_reposting_settings/stock_reposting_settings.json b/erpnext/stock/doctype/stock_reposting_settings/stock_reposting_settings.json index faf70b6cb0d..a06456e0c9c 100644 --- a/erpnext/stock/doctype/stock_reposting_settings/stock_reposting_settings.json +++ b/erpnext/stock/doctype/stock_reposting_settings/stock_reposting_settings.json @@ -13,6 +13,8 @@ "end_time", "limits_dont_apply_on", "item_based_reposting", + "enable_parallel_reposting", + "no_of_parallel_reposting", "errors_notification_section", "notify_reposting_error_to_role" ], @@ -65,12 +67,25 @@ "fieldname": "errors_notification_section", "fieldtype": "Section Break", "label": "Errors Notification" + }, + { + "default": "0", + "depends_on": "eval: doc.item_based_reposting", + "fieldname": "enable_parallel_reposting", + "fieldtype": "Check", + "label": "Enable Parallel Reposting" + }, + { + "default": "4", + "fieldname": "no_of_parallel_reposting", + "fieldtype": "Int", + "label": "No of Parallel Reposting (Per Item)" } ], "index_web_pages_for_search": 1, "issingle": 1, "links": [], - "modified": "2025-07-08 11:27:46.659056", + "modified": "2025-12-10 17:45:56.597514", "modified_by": "Administrator", "module": "Stock", "name": "Stock Reposting Settings", diff --git a/erpnext/stock/doctype/stock_reposting_settings/stock_reposting_settings.py b/erpnext/stock/doctype/stock_reposting_settings/stock_reposting_settings.py index 8b47cd88df6..7924c9042c0 100644 --- a/erpnext/stock/doctype/stock_reposting_settings/stock_reposting_settings.py +++ b/erpnext/stock/doctype/stock_reposting_settings/stock_reposting_settings.py @@ -16,12 +16,14 @@ class StockRepostingSettings(Document): if TYPE_CHECKING: from frappe.types import DF + enable_parallel_reposting: DF.Check end_time: DF.Time | None item_based_reposting: DF.Check limit_reposting_timeslot: DF.Check limits_dont_apply_on: DF.Literal[ "", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday" ] + no_of_parallel_reposting: DF.Int notify_reposting_error_to_role: DF.Link | None start_time: DF.Time | None # end: auto-generated types @@ -29,6 +31,16 @@ class StockRepostingSettings(Document): def validate(self): self.set_minimum_reposting_time_slot() + def before_save(self): + self.reset_parallel_reposting_settings() + + def reset_parallel_reposting_settings(self): + if not self.item_based_reposting and self.enable_parallel_reposting: + self.enable_parallel_reposting = 0 + + if self.enable_parallel_reposting and not self.no_of_parallel_reposting: + self.no_of_parallel_reposting = 4 + def set_minimum_reposting_time_slot(self): """Ensure that timeslot for reposting is at least 12 hours.""" if not self.limit_reposting_timeslot: