Skip to content

Commit

Permalink
fix: don't allow BOM's item code at any level of child items (frappe#…
Browse files Browse the repository at this point in the history
…27157)

* refactor: bom recursion checking

* fix: dont allow bom recursion

if same item_code is added in child items at any level, it shouldn't be allowed.

* test: add test for bom recursion

* test: fix broken prodplan test using recursive bom

* test: fix recursive bom in tests

(cherry picked from commit c07dce9)

# Conflicts:
#	erpnext/manufacturing/doctype/bom/test_bom.py
#	erpnext/manufacturing/doctype/production_plan/test_production_plan.py
#	erpnext/manufacturing/doctype/work_order/test_work_order.py
  • Loading branch information
ankush authored and frappe-bot committed Aug 26, 2021
1 parent 710c1c1 commit e250c07
Show file tree
Hide file tree
Showing 4 changed files with 415 additions and 13 deletions.
30 changes: 17 additions & 13 deletions erpnext/manufacturing/doctype/bom/bom.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,25 +414,29 @@ def validate_materials(self):
frappe.throw(_("Quantity required for Item {0} in row {1}").format(m.item_code, m.idx))
check_list.append(m)

def check_recursion(self, bom_list=[]):
def check_recursion(self, bom_list=None):
""" Check whether recursion occurs in any bom"""
def _throw_error(bom_name):
frappe.throw(_("BOM recursion: {0} cannot be parent or child of {0}").format(bom_name))

bom_list = self.traverse_tree()
bom_nos = frappe.get_all('BOM Item', fields=["bom_no"],
filters={'parent': ('in', bom_list), 'parenttype': 'BOM'})
child_items = frappe.get_all('BOM Item', fields=["bom_no", "item_code"],
filters={'parent': ('in', bom_list), 'parenttype': 'BOM'}) or []

child_bom = {d.bom_no for d in child_items}
child_items_codes = {d.item_code for d in child_items}

raise_exception = False
if bom_nos and self.name in [d.bom_no for d in bom_nos]:
raise_exception = True
if self.name in child_bom:
_throw_error(self.name)

if not raise_exception:
bom_nos = frappe.get_all('BOM Item', fields=["parent"],
filters={'bom_no': self.name, 'parenttype': 'BOM'})
if self.item in child_items_codes:
_throw_error(self.item)

if self.name in [d.parent for d in bom_nos]:
raise_exception = True
bom_nos = frappe.get_all('BOM Item', fields=["parent"],
filters={'bom_no': self.name, 'parenttype': 'BOM'}) or []

if raise_exception:
frappe.throw(_("BOM recursion: {0} cannot be parent or child of {1}").format(self.name, self.name))
if self.name in {d.parent for d in bom_nos}:
_throw_error(self.name)

def update_cost_and_exploded_items(self, bom_list=[]):
bom_list = self.traverse_tree(bom_list)
Expand Down
197 changes: 197 additions & 0 deletions erpnext/manufacturing/doctype/bom/test_bom.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,203 @@ def test_bom_cost_multi_uom_based_on_valuation_rate(self):

self.assertEqual(bom.items[0].rate, 20)

<<<<<<< HEAD
=======
def test_subcontractor_sourced_item(self):
item_code = "_Test Subcontracted FG Item 1"
set_backflush_based_on('Material Transferred for Subcontract')

if not frappe.db.exists('Item', item_code):
make_item(item_code, {
'is_stock_item': 1,
'is_sub_contracted_item': 1,
'stock_uom': 'Nos'
})

if not frappe.db.exists('Item', "Test Extra Item 1"):
make_item("Test Extra Item 1", {
'is_stock_item': 1,
'stock_uom': 'Nos'
})

if not frappe.db.exists('Item', "Test Extra Item 2"):
make_item("Test Extra Item 2", {
'is_stock_item': 1,
'stock_uom': 'Nos'
})

if not frappe.db.exists('Item', "Test Extra Item 3"):
make_item("Test Extra Item 3", {
'is_stock_item': 1,
'stock_uom': 'Nos'
})
bom = frappe.get_doc({
'doctype': 'BOM',
'is_default': 1,
'item': item_code,
'currency': 'USD',
'quantity': 1,
'company': '_Test Company'
})

for item in ["Test Extra Item 1", "Test Extra Item 2"]:
item_doc = frappe.get_doc('Item', item)

bom.append('items', {
'item_code': item,
'qty': 1,
'uom': item_doc.stock_uom,
'stock_uom': item_doc.stock_uom,
'rate': item_doc.valuation_rate
})

bom.append('items', {
'item_code': "Test Extra Item 3",
'qty': 1,
'uom': item_doc.stock_uom,
'stock_uom': item_doc.stock_uom,
'rate': 0,
'sourced_by_supplier': 1
})
bom.insert(ignore_permissions=True)
bom.update_cost()
bom.submit()
# test that sourced_by_supplier rate is zero even after updating cost
self.assertEqual(bom.items[2].rate, 0)
# test in Purchase Order sourced_by_supplier is not added to Supplied Item
po = create_purchase_order(item_code=item_code, qty=1,
is_subcontracted="Yes", supplier_warehouse="_Test Warehouse 1 - _TC")
bom_items = sorted([d.item_code for d in bom.items if d.sourced_by_supplier != 1])
supplied_items = sorted([d.rm_item_code for d in po.supplied_items])
self.assertEqual(bom_items, supplied_items)

def test_bom_tree_representation(self):
bom_tree = {
"Assembly": {
"SubAssembly1": {"ChildPart1": {}, "ChildPart2": {},},
"SubAssembly2": {"ChildPart3": {}},
"SubAssembly3": {"SubSubAssy1": {"ChildPart4": {}}},
"ChildPart5": {},
"ChildPart6": {},
"SubAssembly4": {"SubSubAssy2": {"ChildPart7": {}}},
}
}
parent_bom = create_nested_bom(bom_tree, prefix="")
created_tree = parent_bom.get_tree_representation()

reqd_order = level_order_traversal(bom_tree)[1:] # skip first item
created_order = created_tree.level_order_traversal()

self.assertEqual(len(reqd_order), len(created_order))

for reqd_item, created_item in zip(reqd_order, created_order):
self.assertEqual(reqd_item, created_item.item_code)

def test_generated_variant_bom(self):
from erpnext.controllers.item_variant import create_variant

template_item = make_item(
"_TestTemplateItem", {"has_variants": 1, "attributes": [{"attribute": "Test Size"},]}
)
variant = create_variant(template_item.item_code, {"Test Size": "Large"})
variant.insert(ignore_if_duplicate=True)

bom_tree = {
template_item.item_code: {
"SubAssembly1": {"ChildPart1": {}, "ChildPart2": {},},
"ChildPart5": {},
}
}
template_bom = create_nested_bom(bom_tree, prefix="")
variant_bom = make_variant_bom(
template_bom.name, template_bom.name, variant.item_code, variant_items=[]
)
variant_bom.save()

reqd_order = template_bom.get_tree_representation().level_order_traversal()
created_order = variant_bom.get_tree_representation().level_order_traversal()

self.assertEqual(len(reqd_order), len(created_order))

for reqd_item, created_item in zip(reqd_order, created_order):
self.assertEqual(reqd_item.item_code, created_item.item_code)
self.assertEqual(reqd_item.qty, created_item.qty)
self.assertEqual(reqd_item.exploded_qty, created_item.exploded_qty)

def test_bom_recursion_1st_level(self):
"""BOM should not allow BOM item again in child"""
item_code = "_Test BOM Recursion"
make_item(item_code, {'is_stock_item': 1})

bom = frappe.new_doc("BOM")
bom.item = item_code
bom.append("items", frappe._dict(item_code=item_code))
with self.assertRaises(frappe.ValidationError) as err:
bom.save()

self.assertTrue("recursion" in str(err.exception).lower())
frappe.delete_doc("BOM", bom.name, ignore_missing=True)

def test_bom_recursion_transitive(self):
item1 = "_Test BOM Recursion"
item2 = "_Test BOM Recursion 2"
make_item(item1, {'is_stock_item': 1})
make_item(item2, {'is_stock_item': 1})

bom1 = frappe.new_doc("BOM")
bom1.item = item1
bom1.append("items", frappe._dict(item_code=item2))
bom1.save()
bom1.submit()

bom2 = frappe.new_doc("BOM")
bom2.item = item2
bom2.append("items", frappe._dict(item_code=item1))

with self.assertRaises(frappe.ValidationError) as err:
bom2.save()
bom2.submit()

self.assertTrue("recursion" in str(err.exception).lower())

bom1.cancel()
frappe.delete_doc("BOM", bom1.name, ignore_missing=True, force=True)
frappe.delete_doc("BOM", bom2.name, ignore_missing=True, force=True)

def test_bom_with_process_loss_item(self):
fg_item_non_whole, fg_item_whole, bom_item = create_process_loss_bom_items()

if not frappe.db.exists("BOM", f"BOM-{fg_item_non_whole.item_code}-001"):
bom_doc = create_bom_with_process_loss_item(
fg_item_non_whole, bom_item, scrap_qty=0.25, scrap_rate=0, fg_qty=1
)
bom_doc.submit()

bom_doc = create_bom_with_process_loss_item(
fg_item_non_whole, bom_item, scrap_qty=2, scrap_rate=0
)
# PL Item qty can't be >= FG Item qty
self.assertRaises(frappe.ValidationError, bom_doc.submit)

bom_doc = create_bom_with_process_loss_item(
fg_item_non_whole, bom_item, scrap_qty=1, scrap_rate=100
)
# PL Item rate has to be 0
self.assertRaises(frappe.ValidationError, bom_doc.submit)

bom_doc = create_bom_with_process_loss_item(
fg_item_whole, bom_item, scrap_qty=0.25, scrap_rate=0
)
# Items with whole UOMs can't be PL Items
self.assertRaises(frappe.ValidationError, bom_doc.submit)

bom_doc = create_bom_with_process_loss_item(
fg_item_non_whole, bom_item, scrap_qty=0.25, scrap_rate=0, is_process_loss=0
)
# FG Items in Scrap/Loss Table should have Is Process Loss set
self.assertRaises(frappe.ValidationError, bom_doc.submit)

>>>>>>> c07dce940e (fix: don't allow BOM's item code at any level of child items (#27157))
def get_default_bom(item_code="_Test FG Item 2"):
return frappe.db.get_value("BOM", {"item": item_code, "is_active": 1, "is_default": 1})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,85 @@ def test_production_plan_with_multi_level_bom(self):
pln.cancel()
frappe.delete_doc("Production Plan", pln.name)

<<<<<<< HEAD
=======
def test_get_warehouse_list_group(self):
"""Check if required warehouses are returned"""
warehouse_json = '[{\"warehouse\":\"_Test Warehouse Group - _TC\"}]'

warehouses = set(get_warehouse_list(warehouse_json))
expected_warehouses = {"_Test Warehouse Group-C1 - _TC", "_Test Warehouse Group-C2 - _TC"}

missing_warehouse = expected_warehouses - warehouses

self.assertTrue(len(missing_warehouse) == 0,
msg=f"Following warehouses were expected {', '.join(missing_warehouse)}")

def test_get_warehouse_list_single(self):
warehouse_json = '[{\"warehouse\":\"_Test Scrap Warehouse - _TC\"}]'

warehouses = set(get_warehouse_list(warehouse_json))
expected_warehouses = {"_Test Scrap Warehouse - _TC", }

self.assertEqual(warehouses, expected_warehouses)

def test_get_sales_order_with_variant(self):
rm_item = create_item('PIV_RM', valuation_rate = 100)
if not frappe.db.exists('Item', {"item_code": 'PIV'}):
item = create_item('PIV', valuation_rate = 100)
variant_settings = {
"attributes": [
{
"attribute": "Colour"
},
],
"has_variants": 1
}
item.update(variant_settings)
item.save()
parent_bom = make_bom(item = 'PIV', raw_materials = [rm_item.item_code])
if not frappe.db.exists('BOM', {"item": 'PIV'}):
parent_bom = make_bom(item = 'PIV', raw_materials = [rm_item.item_code])
else:
parent_bom = frappe.get_doc('BOM', {"item": 'PIV'})

if not frappe.db.exists('Item', {"item_code": 'PIV-RED'}):
variant = create_variant("PIV", {"Colour": "Red"})
variant.save()
variant_bom = make_bom(item = variant.item_code, raw_materials = [rm_item.item_code])
else:
variant = frappe.get_doc('Item', 'PIV-RED')
if not frappe.db.exists('BOM', {"item": 'PIV-RED'}):
variant_bom = make_bom(item = variant.item_code, raw_materials = [rm_item.item_code])

"""Testing when item variant has a BOM"""
so = make_sales_order(item_code="PIV-RED", qty=5)
pln = frappe.new_doc('Production Plan')
pln.company = so.company
pln.get_items_from = 'Sales Order'
pln.item_code = 'PIV-RED'
pln.get_open_sales_orders()
self.assertEqual(pln.sales_orders[0].sales_order, so.name)
pln.get_so_items()
self.assertEqual(pln.po_items[0].item_code, 'PIV-RED')
self.assertEqual(pln.po_items[0].bom_no, variant_bom.name)
so.cancel()
frappe.delete_doc('Sales Order', so.name)
variant_bom.cancel()
frappe.delete_doc('BOM', variant_bom.name)

"""Testing when item variant doesn't have a BOM"""
so = make_sales_order(item_code="PIV-RED", qty=5)
pln.get_open_sales_orders()
self.assertEqual(pln.sales_orders[0].sales_order, so.name)
pln.po_items = []
pln.get_so_items()
self.assertEqual(pln.po_items[0].item_code, 'PIV-RED')
self.assertEqual(pln.po_items[0].bom_no, parent_bom.name)

frappe.db.rollback()

>>>>>>> c07dce940e (fix: don't allow BOM's item code at any level of child items (#27157))
def create_production_plan(**args):
args = frappe._dict(args)

Expand Down
Loading

0 comments on commit e250c07

Please sign in to comment.