diff --git a/dev-requirements.txt b/dev-requirements.txt new file mode 100644 index 00000000000..15545c0efa2 --- /dev/null +++ b/dev-requirements.txt @@ -0,0 +1 @@ +hypothesis~=6.31.0 diff --git a/erpnext/stock/stock_ledger.py b/erpnext/stock/stock_ledger.py index e95c0fcd23b..f45bee16504 100644 --- a/erpnext/stock/stock_ledger.py +++ b/erpnext/stock/stock_ledger.py @@ -16,6 +16,7 @@ from erpnext.stock.utils import ( get_or_make_bin, get_valuation_method, ) +from erpnext.stock.valuation import FIFOValuation class NegativeStockError(frappe.ValidationError): pass @@ -456,9 +457,8 @@ class update_entries_after(object): self.wh_data.qty_after_transaction += flt(sle.actual_qty) self.wh_data.stock_value = flt(self.wh_data.qty_after_transaction) * flt(self.wh_data.valuation_rate) else: - self.get_fifo_values(sle) + self.update_fifo_values(sle) self.wh_data.qty_after_transaction += flt(sle.actual_qty) - self.wh_data.stock_value = sum(flt(batch[0]) * flt(batch[1]) for batch in self.wh_data.stock_queue) # rounding as per precision self.wh_data.stock_value = flt(self.wh_data.stock_value, self.precision) @@ -696,87 +696,39 @@ class update_entries_after(object): sle.voucher_type, sle.voucher_no, self.allow_zero_rate, currency=erpnext.get_company_currency(sle.company), company=sle.company) - def get_fifo_values(self, sle): + def update_fifo_values(self, sle): incoming_rate = flt(sle.incoming_rate) actual_qty = flt(sle.actual_qty) outgoing_rate = flt(sle.outgoing_rate) + fifo_queue = FIFOValuation(self.wh_data.stock_queue) if actual_qty > 0: - if not self.wh_data.stock_queue: - self.wh_data.stock_queue.append([0, 0]) - - # last row has the same rate, just updated the qty - if self.wh_data.stock_queue[-1][1]==incoming_rate: - self.wh_data.stock_queue[-1][0] += actual_qty - else: - # Item has a positive balance qty, add new entry - if self.wh_data.stock_queue[-1][0] > 0: - self.wh_data.stock_queue.append([actual_qty, incoming_rate]) - else: # negative balance qty - qty = self.wh_data.stock_queue[-1][0] + actual_qty - if qty > 0: # new balance qty is positive - self.wh_data.stock_queue[-1] = [qty, incoming_rate] - else: # new balance qty is still negative, maintain same rate - self.wh_data.stock_queue[-1][0] = qty + fifo_queue.add_stock(qty=actual_qty, rate=incoming_rate) else: - qty_to_pop = abs(actual_qty) - while qty_to_pop: - if not self.wh_data.stock_queue: - # Get valuation rate from last sle if exists or from valuation rate field in item master - allow_zero_valuation_rate = self.check_if_allow_zero_valuation_rate(sle.voucher_type, sle.voucher_detail_no) - if not allow_zero_valuation_rate: - _rate = get_valuation_rate(sle.item_code, sle.warehouse, - sle.voucher_type, sle.voucher_no, self.allow_zero_rate, - currency=erpnext.get_company_currency(sle.company), company=sle.company) - else: - _rate = 0 - - self.wh_data.stock_queue.append([0, _rate]) - - index = None - if outgoing_rate > 0: - # Find the entry where rate matched with outgoing rate - for i, v in enumerate(self.wh_data.stock_queue): - if v[1] == outgoing_rate: - index = i - break - - # If no entry found with outgoing rate, collapse stack - if index is None: # nosemgrep - new_stock_value = sum(d[0]*d[1] for d in self.wh_data.stock_queue) - qty_to_pop*outgoing_rate - new_stock_qty = sum(d[0] for d in self.wh_data.stock_queue) - qty_to_pop - self.wh_data.stock_queue = [[new_stock_qty, new_stock_value/new_stock_qty if new_stock_qty > 0 else outgoing_rate]] - break + def rate_generator() -> float: + allow_zero_valuation_rate = self.check_if_allow_zero_valuation_rate(sle.voucher_type, sle.voucher_detail_no) + if not allow_zero_valuation_rate: + return get_valuation_rate(sle.item_code, sle.warehouse, + sle.voucher_type, sle.voucher_no, self.allow_zero_rate, + currency=erpnext.get_company_currency(sle.company), company=sle.company) else: - index = 0 + return 0.0 - # select first batch or the batch with same rate - batch = self.wh_data.stock_queue[index] - if qty_to_pop >= batch[0]: - # consume current batch - qty_to_pop = _round_off_if_near_zero(qty_to_pop - batch[0]) - self.wh_data.stock_queue.pop(index) - if not self.wh_data.stock_queue and qty_to_pop: - # stock finished, qty still remains to be withdrawn - # negative stock, keep in as a negative batch - self.wh_data.stock_queue.append([-qty_to_pop, outgoing_rate or batch[1]]) - break + fifo_queue.remove_stock(qty=abs(actual_qty), outgoing_rate=outgoing_rate, rate_generator=rate_generator) - else: - # qty found in current batch - # consume it and exit - batch[0] = batch[0] - qty_to_pop - qty_to_pop = 0 - - stock_value = _round_off_if_near_zero(sum(flt(batch[0]) * flt(batch[1]) for batch in self.wh_data.stock_queue)) - stock_qty = _round_off_if_near_zero(sum(flt(batch[0]) for batch in self.wh_data.stock_queue)) + stock_qty, stock_value = fifo_queue.get_total_stock_and_value() + self.wh_data.stock_queue = fifo_queue.get_state() + self.wh_data.stock_value = stock_value if stock_qty: - self.wh_data.valuation_rate = stock_value / flt(stock_qty) + self.wh_data.valuation_rate = stock_value / stock_qty + if not self.wh_data.stock_queue: self.wh_data.stock_queue.append([0, sle.incoming_rate or sle.outgoing_rate or self.wh_data.valuation_rate]) + + def check_if_allow_zero_valuation_rate(self, voucher_type, voucher_detail_no): ref_item_dt = "" @@ -1158,13 +1110,3 @@ def get_future_sle_with_negative_batch_qty(args): and timestamp(posting_date, posting_time) >= timestamp(%(posting_date)s, %(posting_time)s) limit 1 """, args, as_dict=1) - - -def _round_off_if_near_zero(number: float, precision: int = 6) -> float: - """ Rounds off the number to zero only if number is close to zero for decimal - specified in precision. Precision defaults to 6. - """ - if flt(number) < (1.0 / (10**precision)): - return 0 - - return flt(number) diff --git a/erpnext/stock/tests/test_valuation.py b/erpnext/stock/tests/test_valuation.py new file mode 100644 index 00000000000..85788bac7f8 --- /dev/null +++ b/erpnext/stock/tests/test_valuation.py @@ -0,0 +1,166 @@ +import unittest + +from hypothesis import given +from hypothesis import strategies as st + +from erpnext.stock.valuation import FIFOValuation, _round_off_if_near_zero + +qty_gen = st.floats(min_value=-1e6, max_value=1e6) +value_gen = st.floats(min_value=1, max_value=1e6) +stock_queue_generator = st.lists(st.tuples(qty_gen, value_gen), min_size=10) + + +class TestFifoValuation(unittest.TestCase): + + def setUp(self): + self.queue = FIFOValuation([]) + + def tearDown(self): + qty, value = self.queue.get_total_stock_and_value() + self.assertTotalQty(qty) + self.assertTotalValue(value) + + def assertTotalQty(self, qty): + self.assertAlmostEqual(sum(q for q, _ in self.queue), qty, msg=f"queue: {self.queue}", places=4) + + def assertTotalValue(self, value): + self.assertAlmostEqual(sum(q * r for q, r in self.queue), value, msg=f"queue: {self.queue}", places=2) + + def test_simple_addition(self): + self.queue.add_stock(1, 10) + self.assertTotalQty(1) + + def test_simple_removal(self): + self.queue.add_stock(1, 10) + self.queue.remove_stock(1) + self.assertTotalQty(0) + + def test_merge_new_stock(self): + self.queue.add_stock(1, 10) + self.queue.add_stock(1, 10) + self.assertEqual(self.queue, [[2, 10]]) + + def test_adding_negative_stock_keeps_rate(self): + self.queue = FIFOValuation([[-5.0, 100]]) + self.queue.add_stock(1, 10) + self.assertEqual(self.queue, [[-4, 100]]) + + def test_adding_negative_stock_updates_rate(self): + self.queue = FIFOValuation([[-5.0, 100]]) + self.queue.add_stock(6, 10) + self.assertEqual(self.queue, [[1, 10]]) + + + def test_negative_stock(self): + self.queue.remove_stock(1, 5) + self.assertEqual(self.queue, [[-1, 5]]) + + # XXX + self.queue.remove_stock(1, 10) + self.assertTotalQty(-2) + + self.queue.add_stock(2, 10) + self.assertTotalQty(0) + self.assertTotalValue(0) + + def test_removing_specified_rate(self): + self.queue.add_stock(1, 10) + self.queue.add_stock(1, 20) + + self.queue.remove_stock(1, 20) + self.assertEqual(self.queue, [[1, 10]]) + + + def test_remove_multiple_bins(self): + self.queue.add_stock(1, 10) + self.queue.add_stock(2, 20) + self.queue.add_stock(1, 20) + self.queue.add_stock(5, 20) + + self.queue.remove_stock(4) + self.assertEqual(self.queue, [[5, 20]]) + + + def test_remove_multiple_bins_with_rate(self): + self.queue.add_stock(1, 10) + self.queue.add_stock(2, 20) + self.queue.add_stock(1, 20) + self.queue.add_stock(5, 20) + + self.queue.remove_stock(3, 20) + self.assertEqual(self.queue, [[1, 10], [5, 20]]) + + def test_collapsing_of_queue(self): + self.queue.add_stock(1, 1) + self.queue.add_stock(1, 2) + self.queue.add_stock(1, 3) + self.queue.add_stock(1, 4) + + self.assertTotalValue(10) + + self.queue.remove_stock(3, 1) + # XXX + self.assertEqual(self.queue, [[1, 7]]) + + def test_rounding_off(self): + self.queue.add_stock(1.0, 1.0) + self.queue.remove_stock(1.0 - 1e-9) + self.assertTotalQty(0) + + def test_rounding_off_near_zero(self): + self.assertEqual(_round_off_if_near_zero(0), 0) + self.assertEqual(_round_off_if_near_zero(1), 1) + self.assertEqual(_round_off_if_near_zero(-1), -1) + self.assertEqual(_round_off_if_near_zero(-1e-8), 0) + self.assertEqual(_round_off_if_near_zero(1e-8), 0) + + def test_totals(self): + self.queue.add_stock(1, 10) + self.queue.add_stock(2, 13) + self.queue.add_stock(1, 17) + self.queue.remove_stock(1) + self.queue.remove_stock(1) + self.queue.remove_stock(1) + self.queue.add_stock(5, 17) + self.queue.add_stock(8, 11) + + @given(stock_queue_generator) + def test_fifo_qty_hypothesis(self, stock_queue): + self.queue = FIFOValuation([]) + total_qty = 0 + + for qty, rate in stock_queue: + if qty == 0: + continue + if qty > 0: + self.queue.add_stock(qty, rate) + total_qty += qty + else: + qty = abs(qty) + consumed = self.queue.remove_stock(qty) + self.assertAlmostEqual(qty, sum(q for q, _ in consumed), msg=f"incorrect consumption {consumed}") + total_qty -= qty + self.assertTotalQty(total_qty) + + @given(stock_queue_generator) + def test_fifo_qty_value_nonneg_hypothesis(self, stock_queue): + self.queue = FIFOValuation([]) + total_qty = 0.0 + total_value = 0.0 + + for qty, rate in stock_queue: + # don't allow negative stock + if qty == 0 or total_qty + qty < 0 or abs(qty) < 0.1: + continue + if qty > 0: + self.queue.add_stock(qty, rate) + total_qty += qty + total_value += qty * rate + else: + qty = abs(qty) + consumed = self.queue.remove_stock(qty) + self.assertAlmostEqual(qty, sum(q for q, _ in consumed), msg=f"incorrect consumption {consumed}") + total_qty -= qty + total_value -= sum(q * r for q, r in consumed) + self.assertTotalQty(total_qty) + self.assertTotalValue(total_value) diff --git a/erpnext/stock/valuation.py b/erpnext/stock/valuation.py new file mode 100644 index 00000000000..45c50830995 --- /dev/null +++ b/erpnext/stock/valuation.py @@ -0,0 +1,146 @@ +from typing import Callable, List, NewType, Optional, Tuple + +from frappe.utils import flt + +FifoBin = NewType("FifoBin", List[float]) + +# Indexes of values inside FIFO bin 2-tuple +QTY = 0 +RATE = 1 + + +class FIFOValuation: + """Valuation method where a queue of all the incoming stock is maintained. + + New stock is added at end of the queue. + Qty consumption happens on First In First Out basis. + + Queue is implemented using "bins" of [qty, rate]. + + ref: https://en.wikipedia.org/wiki/FIFO_and_LIFO_accounting + """ + + # specifying the attributes to save resources + # ref: https://docs.python.org/3/reference/datamodel.html#slots + __slots__ = ["queue",] + + def __init__(self, state: Optional[List[FifoBin]]): + self.queue: List[FifoBin] = state if state is not None else [] + + def __repr__(self): + return str(self.queue) + + def __iter__(self): + return iter(self.queue) + + def __eq__(self, other): + if isinstance(other, list): + return self.queue == other + return self.queue == other.queue + + def get_state(self) -> List[FifoBin]: + """Get current state of queue.""" + return self.queue + + def get_total_stock_and_value(self) -> Tuple[float, float]: + total_qty = 0.0 + total_value = 0.0 + + for qty, rate in self.queue: + total_qty += flt(qty) + total_value += flt(qty) * flt(rate) + + return _round_off_if_near_zero(total_qty), _round_off_if_near_zero(total_value) + + def add_stock(self, qty: float, rate: float) -> None: + """Update fifo queue with new stock. + + args: + qty: new quantity to add + rate: incoming rate of new quantity""" + + if not len(self.queue): + self.queue.append([0, 0]) + + # last row has the same rate, merge new bin. + if self.queue[-1][RATE] == rate: + self.queue[-1][QTY] += qty + else: + # Item has a positive balance qty, add new entry + if self.queue[-1][QTY] > 0: + self.queue.append([qty, rate]) + else: # negative balance qty + qty = self.queue[-1][QTY] + qty + if qty > 0: # new balance qty is positive + self.queue[-1] = [qty, rate] + else: # new balance qty is still negative, maintain same rate + self.queue[-1][QTY] = qty + + def remove_stock( + self, qty: float, outgoing_rate: float = 0.0, rate_generator: Callable[[], float] = None + ) -> List[FifoBin]: + """Remove stock from the queue and return popped bins. + + args: + qty: quantity to remove + rate: outgoing rate + rate_generator: function to be called if queue is not found and rate is required. + """ + if not rate_generator: + rate_generator = lambda : 0.0 # noqa + + consumed_bins = [] + while qty: + if not len(self.queue): + # rely on rate generator. + self.queue.append([0, rate_generator()]) + + index = None + if outgoing_rate > 0: + # Find the entry where rate matched with outgoing rate + for idx, fifo_bin in enumerate(self.queue): + if fifo_bin[RATE] == outgoing_rate: + index = idx + break + + # If no entry found with outgoing rate, collapse queue + if index is None: # nosemgrep + new_stock_value = sum(d[QTY] * d[RATE] for d in self.queue) - qty * outgoing_rate + new_stock_qty = sum(d[QTY] for d in self.queue) - qty + self.queue = [[new_stock_qty, new_stock_value / new_stock_qty if new_stock_qty > 0 else outgoing_rate]] + consumed_bins.append([qty, outgoing_rate]) + break + else: + index = 0 + + # select first bin or the bin with same rate + fifo_bin = self.queue[index] + if qty >= fifo_bin[QTY]: + # consume current bin + qty = _round_off_if_near_zero(qty - fifo_bin[QTY]) + to_consume = self.queue.pop(index) + consumed_bins.append(list(to_consume)) + + if not self.queue and qty: + # stock finished, qty still remains to be withdrawn + # negative stock, keep in as a negative bin + self.queue.append([-qty, outgoing_rate or fifo_bin[RATE]]) + consumed_bins.append([qty, outgoing_rate or fifo_bin[RATE]]) + break + else: + # qty found in current bin consume it and exit + fifo_bin[QTY] = _round_off_if_near_zero(fifo_bin[QTY] - qty) + consumed_bins.append([qty, fifo_bin[RATE]]) + qty = 0 + + return consumed_bins + + +def _round_off_if_near_zero(number: float, precision: int = 7) -> float: + """Rounds off the number to zero only if number is close to zero for decimal + specified in precision. Precision defaults to 7. + """ + if abs(0.0 - flt(number)) < (1.0 / (10 ** precision)): + return 0.0 + + return flt(number)