feat: run parallel reposting

This commit is contained in:
Rohit Waghchaure
2025-12-10 17:46:06 +05:30
parent 8e0227ea68
commit 767e4762bb
4 changed files with 126 additions and 28 deletions

View File

@@ -411,7 +411,9 @@ scheduler_events = {
"0/15 * * * *": [
"erpnext.manufacturing.doctype.bom_update_log.bom_update_log.resume_bom_cost_update_jobs",
],
"0/30 * * * *": [],
"0/30 * * * *": [
"erpnext.stock.doctype.repost_item_valuation.repost_item_valuation.run_parallel_reposting",
],
# Hourly but offset by 30 minutes
"30 * * * *": [
"erpnext.accounts.doctype.gl_entry.gl_entry.rename_gle_sle_docs",

View File

@@ -9,7 +9,7 @@ from frappe.desk.form.load import get_attachments
from frappe.exceptions import QueryDeadlockError, QueryTimeoutError
from frappe.model.document import Document
from frappe.query_builder import DocType, Interval
from frappe.query_builder.functions import Max, Now
from frappe.query_builder.functions import CombineDatetime, Max, Now
from frappe.utils import cint, get_link_to_form, get_weekday, getdate, now, nowtime
from frappe.utils.user import get_users_with_role
from rq.timeouts import JobTimeoutException
@@ -539,41 +539,105 @@ def get_recipients():
return recipients
def run_parallel_reposting():
# This function is called every 15 minutes via hooks.py
if not frappe.db.get_single_value("Stock Reposting Settings", "enable_parallel_reposting"):
return
if not in_configured_timeslot():
return
items = set()
no_of_parallel_reposting = (
frappe.db.get_single_value("Stock Reposting Settings", "no_of_parallel_reposting") or 4
)
riv_entries = get_repost_item_valuation_entries("Item and Warehouse")
for row in riv_entries:
if row.repost_only_accounting_ledgers:
execute_reposting_entry(row.name)
continue
if frappe.db.get_value(
"Repost Item Valuation",
{
"based_on": "Item and Warehouse",
"item_code": row.item_code,
"docstatus": 1,
"status": "In Progress",
},
"name",
):
continue
if row.item_code in items:
continue
items.add(row.item_code)
if len(items) > no_of_parallel_reposting:
break
frappe.enqueue(
execute_reposting_entry,
name=row.name,
queue="long",
timeout=1800,
)
def repost_entries():
"""
Reposts 'Repost Item Valuation' entries in queue.
Called hourly via hooks.py.
"""
# This function is called every hour via hooks.py
if frappe.db.get_single_value("Stock Reposting Settings", "enable_parallel_reposting"):
return
if not in_configured_timeslot():
return
riv_entries = get_repost_item_valuation_entries()
for row in riv_entries:
doc = frappe.get_doc("Repost Item Valuation", row.name)
if (
doc.repost_only_accounting_ledgers
and doc.reposting_reference
and frappe.db.get_value("Repost Item Valuation", doc.reposting_reference, "status")
not in ["Completed", "Skipped"]
):
continue
if doc.status in ("Queued", "In Progress"):
repost(doc)
doc.deduplicate_similar_repost()
execute_reposting_entry(row.name)
def get_repost_item_valuation_entries():
return frappe.db.sql(
""" SELECT name from `tabRepost Item Valuation`
WHERE status in ('Queued', 'In Progress') and creation <= %s and docstatus = 1
ORDER BY timestamp(posting_date, posting_time) asc, creation asc, status asc
""",
now(),
as_dict=1,
def execute_reposting_entry(name):
doc = frappe.get_doc("Repost Item Valuation", name)
if (
doc.repost_only_accounting_ledgers
and doc.reposting_reference
and frappe.db.get_value("Repost Item Valuation", doc.reposting_reference, "status")
not in ["Completed", "Skipped"]
):
return
if doc.status in ("Queued", "In Progress"):
repost(doc)
doc.deduplicate_similar_repost()
def get_repost_item_valuation_entries(based_on=None):
doctype = frappe.qb.DocType("Repost Item Valuation")
query = (
frappe.qb.from_(doctype)
.select(doctype.name, doctype.based_on, doctype.item_code, doctype.repost_only_accounting_ledgers)
.where(
(doctype.status.isin(["Queued", "In Progress"]))
& (doctype.creation <= now())
& (doctype.docstatus == 1)
)
.orderby(CombineDatetime(doctype.posting_date, doctype.posting_time), order=frappe.qb.asc)
.orderby(doctype.creation, order=frappe.qb.asc)
.orderby(doctype.status, order=frappe.qb.asc)
)
if based_on:
query = query.where((doctype.based_on == based_on) | (doctype.repost_only_accounting_ledgers == 1))
return query.run(as_dict=True)
def in_configured_timeslot(repost_settings=None, current_time=None):
"""Check if current time is in configured timeslot for reposting."""
@@ -601,9 +665,14 @@ def in_configured_timeslot(repost_settings=None, current_time=None):
@frappe.whitelist()
def execute_repost_item_valuation():
"""Execute repost item valuation via scheduler."""
method = "erpnext.stock.doctype.repost_item_valuation.repost_item_valuation.repost_entries"
if frappe.db.get_single_value("Stock Reposting Settings", "enable_parallel_reposting"):
method = "erpnext.stock.doctype.repost_item_valuation.repost_item_valuation.run_parallel_reposting"
if name := frappe.db.get_value(
"Scheduled Job Type",
{"method": "erpnext.stock.doctype.repost_item_valuation.repost_item_valuation.repost_entries"},
{"method": method},
"name",
):
frappe.get_doc("Scheduled Job Type", name).enqueue(force=True)

View File

@@ -13,6 +13,8 @@
"end_time",
"limits_dont_apply_on",
"item_based_reposting",
"enable_parallel_reposting",
"no_of_parallel_reposting",
"errors_notification_section",
"notify_reposting_error_to_role"
],
@@ -65,12 +67,25 @@
"fieldname": "errors_notification_section",
"fieldtype": "Section Break",
"label": "Errors Notification"
},
{
"default": "0",
"depends_on": "eval: doc.item_based_reposting",
"fieldname": "enable_parallel_reposting",
"fieldtype": "Check",
"label": "Enable Parallel Reposting"
},
{
"default": "4",
"fieldname": "no_of_parallel_reposting",
"fieldtype": "Int",
"label": "No of Parallel Reposting (Per Item)"
}
],
"index_web_pages_for_search": 1,
"issingle": 1,
"links": [],
"modified": "2025-07-08 11:27:46.659056",
"modified": "2025-12-10 17:45:56.597514",
"modified_by": "Administrator",
"module": "Stock",
"name": "Stock Reposting Settings",

View File

@@ -16,12 +16,14 @@ class StockRepostingSettings(Document):
if TYPE_CHECKING:
from frappe.types import DF
enable_parallel_reposting: DF.Check
end_time: DF.Time | None
item_based_reposting: DF.Check
limit_reposting_timeslot: DF.Check
limits_dont_apply_on: DF.Literal[
"", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"
]
no_of_parallel_reposting: DF.Int
notify_reposting_error_to_role: DF.Link | None
start_time: DF.Time | None
# end: auto-generated types
@@ -29,6 +31,16 @@ class StockRepostingSettings(Document):
def validate(self):
self.set_minimum_reposting_time_slot()
def before_save(self):
self.reset_parallel_reposting_settings()
def reset_parallel_reposting_settings(self):
if not self.item_based_reposting and self.enable_parallel_reposting:
self.enable_parallel_reposting = 0
if self.enable_parallel_reposting and not self.no_of_parallel_reposting:
self.no_of_parallel_reposting = 4
def set_minimum_reposting_time_slot(self):
"""Ensure that timeslot for reposting is at least 12 hours."""
if not self.limit_reposting_timeslot: