diff --git a/erpnext/hr/doctype/attendance/attendance.py b/erpnext/hr/doctype/attendance/attendance.py index 98055c7955b5..a2487b31ffe7 100644 --- a/erpnext/hr/doctype/attendance/attendance.py +++ b/erpnext/hr/doctype/attendance/attendance.py @@ -14,6 +14,7 @@ class DuplicateAttendanceError(frappe.ValidationError): pass + class Attendance(Document): def validate(self): from erpnext.controllers.status_updater import validate_status @@ -39,12 +40,20 @@ def validate_attendance_date(self): frappe.throw(_("Attendance date can not be less than employee's joining date")) def validate_duplicate_record(self): - duplicate = get_duplicate_attendance_record(self.employee, self.attendance_date, self.shift, self.name) + duplicate = get_duplicate_attendance_record( + self.employee, self.attendance_date, self.shift, self.name + ) if duplicate: - frappe.throw(_("Attendance for employee {0} is already marked for the date {1}: {2}").format( - frappe.bold(self.employee), frappe.bold(self.attendance_date), get_link_to_form("Attendance", duplicate[0].name)), - title=_("Duplicate Attendance"), exc=DuplicateAttendanceError) + frappe.throw( + _("Attendance for employee {0} is already marked for the date {1}: {2}").format( + frappe.bold(self.employee), + frappe.bold(self.attendance_date), + get_link_to_form("Attendance", duplicate[0].name), + ), + title=_("Duplicate Attendance"), + exc=DuplicateAttendanceError, + ) def validate_employee_status(self): if frappe.db.get_value("Employee", self.employee, "status") == "Inactive": @@ -101,26 +110,29 @@ def get_duplicate_attendance_record(employee, attendance_date, shift, name=None) attendance = frappe.qb.DocType("Attendance") query = ( frappe.qb.from_(attendance) - .select(attendance.name) - .where( - (attendance.employee == employee) - & (attendance.docstatus < 2) - ) + .select(attendance.name) + .where((attendance.employee == employee) & (attendance.docstatus < 2)) ) if shift: query = query.where( - Criterion.any([ - Criterion.all([ - ((attendance.shift.isnull()) | (attendance.shift == "")), - (attendance.attendance_date == attendance_date) - ]), - Criterion.all([ - ((attendance.shift.isnotnull()) | (attendance.shift != "")), - (attendance.attendance_date == attendance_date), - (attendance.shift == shift) - ]) - ]) + Criterion.any( + [ + Criterion.all( + [ + ((attendance.shift.isnull()) | (attendance.shift == "")), + (attendance.attendance_date == attendance_date), + ] + ), + Criterion.all( + [ + ((attendance.shift.isnotnull()) | (attendance.shift != "")), + (attendance.attendance_date == attendance_date), + (attendance.shift == shift), + ] + ), + ] + ) ) else: query = query.where((attendance.attendance_date == attendance_date)) @@ -167,21 +179,32 @@ def add_attendance(events, start, end, conditions=None): if e not in events: events.append(e) -def mark_attendance(employee, attendance_date, status, shift=None, leave_type=None, ignore_validate=False, - late_entry=False, early_exit=False): + +def mark_attendance( + employee, + attendance_date, + status, + shift=None, + leave_type=None, + ignore_validate=False, + late_entry=False, + early_exit=False, +): if not get_duplicate_attendance_record(employee, attendance_date, shift): - company = frappe.db.get_value('Employee', employee, 'company') - attendance = frappe.get_doc({ - 'doctype': 'Attendance', - 'employee': employee, - 'attendance_date': attendance_date, - 'status': status, - 'company': company, - 'shift': shift, - 'leave_type': leave_type, - 'late_entry': late_entry, - 'early_exit': early_exit - }) + company = frappe.db.get_value("Employee", employee, "company") + attendance = frappe.get_doc( + { + "doctype": "Attendance", + "employee": employee, + "attendance_date": attendance_date, + "status": status, + "company": company, + "shift": shift, + "leave_type": leave_type, + "late_entry": late_entry, + "early_exit": early_exit, + } + ) attendance.flags.ignore_validate = ignore_validate attendance.insert() attendance.submit() diff --git a/erpnext/hr/doctype/employee_checkin/employee_checkin.py b/erpnext/hr/doctype/employee_checkin/employee_checkin.py index a9ac60d1bef9..81c9a460592b 100644 --- a/erpnext/hr/doctype/employee_checkin/employee_checkin.py +++ b/erpnext/hr/doctype/employee_checkin/employee_checkin.py @@ -31,11 +31,21 @@ def validate_duplicate_log(self): ) def fetch_shift(self): - shift_actual_timings = get_actual_start_end_datetime_of_shift(self.employee, get_datetime(self.time), True) + shift_actual_timings = get_actual_start_end_datetime_of_shift( + self.employee, get_datetime(self.time), True + ) if shift_actual_timings: - if shift_actual_timings.shift_type.determine_check_in_and_check_out == 'Strictly based on Log Type in Employee Checkin' \ - and not self.log_type and not self.skip_auto_attendance: - frappe.throw(_('Log Type is required for check-ins falling in the shift: {0}.').format(shift_actual_timings.shift_type.name)) + if ( + shift_actual_timings.shift_type.determine_check_in_and_check_out + == "Strictly based on Log Type in Employee Checkin" + and not self.log_type + and not self.skip_auto_attendance + ): + frappe.throw( + _("Log Type is required for check-ins falling in the shift: {0}.").format( + shift_actual_timings.shift_type.name + ) + ) if not self.attendance: self.shift = shift_actual_timings.shift_type.name self.shift_actual_start = shift_actual_timings.actual_start @@ -125,8 +135,8 @@ def mark_attendance_and_link_log( ("1", log_names), ) return None - elif attendance_status in ('Present', 'Absent', 'Half Day'): - employee_doc = frappe.get_doc('Employee', employee) + elif attendance_status in ("Present", "Absent", "Half Day"): + employee_doc = frappe.get_doc("Employee", employee) if not get_duplicate_attendance_record(employee, attendance_date, shift): doc_dict = { "doctype": "Attendance", diff --git a/erpnext/hr/doctype/shift_assignment/shift_assignment.py b/erpnext/hr/doctype/shift_assignment/shift_assignment.py index 768a86258efc..5f3d26afb025 100644 --- a/erpnext/hr/doctype/shift_assignment/shift_assignment.py +++ b/erpnext/hr/doctype/shift_assignment/shift_assignment.py @@ -19,6 +19,7 @@ class OverlappingShiftError(frappe.ValidationError): pass + class ShiftAssignment(Document): def validate(self): validate_active_employee(self.employee) @@ -42,27 +43,35 @@ def get_overlapping_dates(self): shift = frappe.qb.DocType("Shift Assignment") query = ( frappe.qb.from_(shift) - .select(shift.name, shift.shift_type, shift.start_date, shift.end_date, shift.docstatus, shift.status) - .where( - (shift.employee == self.employee) - & (shift.docstatus == 1) - & (shift.name != self.name) - & (shift.status == "Active") - ) + .select( + shift.name, shift.shift_type, shift.start_date, shift.end_date, shift.docstatus, shift.status + ) + .where( + (shift.employee == self.employee) + & (shift.docstatus == 1) + & (shift.name != self.name) + & (shift.status == "Active") + ) ) if self.end_date: query = query.where( - Criterion.any([ - Criterion.any([ - shift.end_date.isnull(), - ((self.start_date >= shift.start_date) & (self.start_date <= shift.end_date)) - ]), - Criterion.any([ - ((self.end_date >= shift.start_date) & (self.end_date <= shift.end_date)), - shift.start_date.between(self.start_date, self.end_date) - ]) - ]) + Criterion.any( + [ + Criterion.any( + [ + shift.end_date.isnull(), + ((self.start_date >= shift.start_date) & (self.start_date <= shift.end_date)), + ] + ), + Criterion.any( + [ + ((self.end_date >= shift.start_date) & (self.end_date <= shift.end_date)), + shift.start_date.between(self.start_date, self.end_date), + ] + ), + ] + ) ) else: query = query.where( @@ -73,12 +82,27 @@ def get_overlapping_dates(self): return query.run(as_dict=True) def has_overlapping_timings(self, overlapping_shift): - curr_shift = frappe.db.get_value("Shift Type", self.shift_type, ["start_time", "end_time"], as_dict=True) - overlapping_shift = frappe.db.get_value("Shift Type", overlapping_shift, ["start_time", "end_time"], as_dict=True) + curr_shift = frappe.db.get_value( + "Shift Type", self.shift_type, ["start_time", "end_time"], as_dict=True + ) + overlapping_shift = frappe.db.get_value( + "Shift Type", overlapping_shift, ["start_time", "end_time"], as_dict=True + ) - if ((curr_shift.start_time > overlapping_shift.start_time and curr_shift.start_time < overlapping_shift.end_time) or - (curr_shift.end_time > overlapping_shift.start_time and curr_shift.end_time < overlapping_shift.end_time) or - (curr_shift.start_time <= overlapping_shift.start_time and curr_shift.end_time >= overlapping_shift.end_time)): + if ( + ( + curr_shift.start_time > overlapping_shift.start_time + and curr_shift.start_time < overlapping_shift.end_time + ) + or ( + curr_shift.end_time > overlapping_shift.start_time + and curr_shift.end_time < overlapping_shift.end_time + ) + or ( + curr_shift.start_time <= overlapping_shift.start_time + and curr_shift.end_time >= overlapping_shift.end_time + ) + ): return True return False @@ -87,14 +111,20 @@ def throw_overlap_error(self, shift_details): msg = None if shift_details.docstatus == 1 and shift_details.status == "Active": if shift_details.start_date and shift_details.end_date: - msg = _("Employee {0} already has an active Shift {1}: {2} from {3} to {4}").format(frappe.bold(self.employee), frappe.bold(self.shift_type), + msg = _("Employee {0} already has an active Shift {1}: {2} from {3} to {4}").format( + frappe.bold(self.employee), + frappe.bold(self.shift_type), get_link_to_form("Shift Assignment", shift_details.name), getdate(self.start_date).strftime("%d-%m-%Y"), - getdate(self.end_date).strftime("%d-%m-%Y")) + getdate(self.end_date).strftime("%d-%m-%Y"), + ) else: - msg = _("Employee {0} already has an active Shift {1}: {2} from {3}").format(frappe.bold(self.employee), frappe.bold(self.shift_type), + msg = _("Employee {0} already has an active Shift {1}: {2} from {3}").format( + frappe.bold(self.employee), + frappe.bold(self.shift_type), get_link_to_form("Shift Assignment", shift_details.name), - getdate(self.start_date).strftime("%d-%m-%Y")) + getdate(self.start_date).strftime("%d-%m-%Y"), + ) if msg: frappe.throw(msg, title=_("Overlapping Shifts"), exc=OverlappingShiftError) @@ -180,10 +210,14 @@ def get_shift_for_time(shifts: List[Dict], for_timestamp: datetime) -> Dict: for entry in shifts: shift_details = get_shift_details(entry.shift_type, for_timestamp=for_timestamp) - if get_datetime(shift_details.actual_start) <= get_datetime(for_timestamp) <= get_datetime(shift_details.actual_end): + if ( + get_datetime(shift_details.actual_start) + <= get_datetime(for_timestamp) + <= get_datetime(shift_details.actual_end) + ): valid_shifts.append(shift_details) - valid_shifts.sort(key=lambda x: x['actual_start']) + valid_shifts.sort(key=lambda x: x["actual_start"]) if len(valid_shifts) > 1: for i in range(len(valid_shifts) - 1): @@ -193,8 +227,16 @@ def get_shift_for_time(shifts: List[Dict], for_timestamp: datetime) -> Dict: next_shift = valid_shifts[i + 1] if curr_shift and next_shift: - next_shift.actual_start = curr_shift.end_datetime if next_shift.actual_start < curr_shift.end_datetime else next_shift.actual_start - curr_shift.actual_end = next_shift.actual_start if curr_shift.actual_end > next_shift.actual_start else curr_shift.actual_end + next_shift.actual_start = ( + curr_shift.end_datetime + if next_shift.actual_start < curr_shift.end_datetime + else next_shift.actual_start + ) + curr_shift.actual_end = ( + next_shift.actual_start + if curr_shift.actual_end > next_shift.actual_start + else curr_shift.actual_end + ) valid_shifts[i] = curr_shift valid_shifts[i + 1] = next_shift @@ -206,23 +248,25 @@ def get_shift_for_time(shifts: List[Dict], for_timestamp: datetime) -> Dict: def get_shifts_for_date(employee: str, for_timestamp: datetime) -> List[Dict[str, str]]: """Returns list of shifts with details for given date""" - assignment = frappe.qb.DocType('Shift Assignment') + assignment = frappe.qb.DocType("Shift Assignment") return ( frappe.qb.from_(assignment) - .select(assignment.name, assignment.shift_type) - .where( - (assignment.employee == employee) - & (assignment.docstatus == 1) - & (assignment.status == 'Active') - & (assignment.start_date <= getdate(for_timestamp.date())) - & ( - Criterion.any([ + .select(assignment.name, assignment.shift_type) + .where( + (assignment.employee == employee) + & (assignment.docstatus == 1) + & (assignment.status == "Active") + & (assignment.start_date <= getdate(for_timestamp.date())) + & ( + Criterion.any( + [ assignment.end_date.isnull(), - (assignment.end_date.isnotnull() & (getdate(for_timestamp.date()) >= assignment.end_date)) - ]) + (assignment.end_date.isnotnull() & (getdate(for_timestamp.date()) >= assignment.end_date)), + ] ) ) + ) ).run(as_dict=True) @@ -233,7 +277,12 @@ def get_shift_for_timestamp(employee: str, for_timestamp: datetime) -> Dict: return {} -def get_employee_shift(employee: str, for_timestamp: datetime = None, consider_default_shift: bool = False, next_shift_direction: str = None) -> Dict: +def get_employee_shift( + employee: str, + for_timestamp: datetime = None, + consider_default_shift: bool = False, + next_shift_direction: str = None, +) -> Dict: """Returns a Shift Type for the given employee on the given date. (excluding the holidays) :param employee: Employee for which shift is required. @@ -247,7 +296,7 @@ def get_employee_shift(employee: str, for_timestamp: datetime = None, consider_d shift_details = get_shift_for_timestamp(employee, for_timestamp) # if shift assignment is not found, consider default shift - default_shift = frappe.db.get_value('Employee', employee, 'default_shift') + default_shift = frappe.db.get_value("Employee", employee, "default_shift") if not shift_details and consider_default_shift: shift_details = get_shift_details(default_shift, for_timestamp) @@ -257,38 +306,55 @@ def get_employee_shift(employee: str, for_timestamp: datetime = None, consider_d # if no shift is found, find next or prev shift assignment based on direction if not shift_details and next_shift_direction: - shift_details = get_prev_or_next_shift(employee, for_timestamp, consider_default_shift, default_shift, next_shift_direction) + shift_details = get_prev_or_next_shift( + employee, for_timestamp, consider_default_shift, default_shift, next_shift_direction + ) return shift_details or {} -def get_prev_or_next_shift(employee: str, for_timestamp: datetime, consider_default_shift: bool, - default_shift: str, next_shift_direction: str) -> Dict: +def get_prev_or_next_shift( + employee: str, + for_timestamp: datetime, + consider_default_shift: bool, + default_shift: str, + next_shift_direction: str, +) -> Dict: """Returns a dict of shift details for the next or prev shift based on the next_shift_direction""" MAX_DAYS = 366 shift_details = {} if consider_default_shift and default_shift: - direction = -1 if next_shift_direction == 'reverse' else 1 + direction = -1 if next_shift_direction == "reverse" else 1 for i in range(MAX_DAYS): - date = for_timestamp + timedelta(days=direction*(i+1)) + date = for_timestamp + timedelta(days=direction * (i + 1)) shift_details = get_employee_shift(employee, date, consider_default_shift, None) if shift_details: break else: - direction = '<' if next_shift_direction == 'reverse' else '>' - sort_order = 'desc' if next_shift_direction == 'reverse' else 'asc' - dates = frappe.db.get_all('Shift Assignment', - ['start_date', 'end_date'], - {'employee': employee, 'start_date': (direction, for_timestamp.date()), 'docstatus': 1, 'status': 'Active'}, + direction = "<" if next_shift_direction == "reverse" else ">" + sort_order = "desc" if next_shift_direction == "reverse" else "asc" + dates = frappe.db.get_all( + "Shift Assignment", + ["start_date", "end_date"], + { + "employee": employee, + "start_date": (direction, for_timestamp.date()), + "docstatus": 1, + "status": "Active", + }, as_list=True, - limit=MAX_DAYS, order_by='start_date ' + sort_order) + limit=MAX_DAYS, + order_by="start_date " + sort_order, + ) if dates: for date in dates: if date[1] and date[1] < for_timestamp.date(): continue - shift_details = get_employee_shift(employee, datetime.combine(date[0], for_timestamp.time()), consider_default_shift, None) + shift_details = get_employee_shift( + employee, datetime.combine(date[0], for_timestamp.time()), consider_default_shift, None + ) if shift_details: break @@ -296,7 +362,9 @@ def get_prev_or_next_shift(employee: str, for_timestamp: datetime, consider_defa def is_holiday_date(employee: str, shift_details: Dict) -> bool: - holiday_list_name = frappe.db.get_value('Shift Type', shift_details.shift_type.name, 'holiday_list') + holiday_list_name = frappe.db.get_value( + "Shift Type", shift_details.shift_type.name, "holiday_list" + ) if not holiday_list_name: holiday_list_name = get_holiday_list_for_employee(employee, False) @@ -304,17 +372,23 @@ def is_holiday_date(employee: str, shift_details: Dict) -> bool: return holiday_list_name and is_holiday(holiday_list_name, shift_details.start_datetime.date()) -def get_employee_shift_timings(employee: str, for_timestamp: datetime = None, consider_default_shift: bool = False) -> List[Dict]: +def get_employee_shift_timings( + employee: str, for_timestamp: datetime = None, consider_default_shift: bool = False +) -> List[Dict]: """Returns previous shift, current/upcoming shift, next_shift for the given timestamp and employee""" if for_timestamp is None: for_timestamp = now_datetime() # write and verify a test case for midnight shift. prev_shift = curr_shift = next_shift = None - curr_shift = get_employee_shift(employee, for_timestamp, consider_default_shift, 'forward') + curr_shift = get_employee_shift(employee, for_timestamp, consider_default_shift, "forward") if curr_shift: - next_shift = get_employee_shift(employee, curr_shift.start_datetime + timedelta(days=1), consider_default_shift, 'forward') - prev_shift = get_employee_shift(employee, for_timestamp + timedelta(days=-1), consider_default_shift, 'reverse') + next_shift = get_employee_shift( + employee, curr_shift.start_datetime + timedelta(days=1), consider_default_shift, "forward" + ) + prev_shift = get_employee_shift( + employee, for_timestamp + timedelta(days=-1), consider_default_shift, "reverse" + ) if curr_shift: # adjust actual start and end times if they are overlapping with grace period (before start and after end) @@ -330,26 +404,35 @@ def get_employee_shift_timings(employee: str, for_timestamp: datetime = None, co else prev_shift.actual_end ) if next_shift: - next_shift.actual_start = curr_shift.end_datetime if next_shift.actual_start < curr_shift.end_datetime else next_shift.actual_start - curr_shift.actual_end = next_shift.actual_start if curr_shift.actual_end > next_shift.actual_start else curr_shift.actual_end + next_shift.actual_start = ( + curr_shift.end_datetime + if next_shift.actual_start < curr_shift.end_datetime + else next_shift.actual_start + ) + curr_shift.actual_end = ( + next_shift.actual_start + if curr_shift.actual_end > next_shift.actual_start + else curr_shift.actual_end + ) return prev_shift, curr_shift, next_shift -def get_actual_start_end_datetime_of_shift(employee: str, for_timestamp: datetime, consider_default_shift: bool = False) -> Dict: - """ - Params: - employee (str): Employee name - for_timestamp (datetime, optional): Datetime value of checkin, if not provided considers current datetime - consider_default_shift (bool, optional): Flag (defaults to False) to specify whether to consider - default shift in employee master if no shift assignment is found +def get_actual_start_end_datetime_of_shift( + employee: str, for_timestamp: datetime, consider_default_shift: bool = False +) -> Dict: + """Returns a Dict containing shift details with actual_start and actual_end datetime values + Here 'actual' means taking into account the "begin_check_in_before_shift_start_time" and "allow_check_out_after_shift_end_time". + Empty Dict is returned if the timestamp is outside any actual shift timings. - Returns: - dict: Dict containing shift details with actual_start and actual_end datetime values - Here 'actual' means taking into account the "begin_check_in_before_shift_start_time" and "allow_check_out_after_shift_end_time". - Empty Dict is returned if the timestamp is outside any actual shift timings. + :param employee (str): Employee name + :param for_timestamp (datetime, optional): Datetime value of checkin, if not provided considers current datetime + :param consider_default_shift (bool, optional): Flag (defaults to False) to specify whether to consider + default shift in employee master if no shift assignment is found """ - shift_timings_as_per_timestamp = get_employee_shift_timings(employee, for_timestamp, consider_default_shift) + shift_timings_as_per_timestamp = get_employee_shift_timings( + employee, for_timestamp, consider_default_shift + ) return get_exact_shift(shift_timings_as_per_timestamp, for_timestamp) @@ -381,25 +464,22 @@ def get_exact_shift(shifts: List, for_timestamp: datetime) -> Dict: if timestamp_index: break - if timestamp_index and timestamp_index%2 == 1: - shift_details = shifts[int((timestamp_index-1)/2)] + if timestamp_index and timestamp_index % 2 == 1: + shift_details = shifts[int((timestamp_index - 1) / 2)] return shift_details def get_shift_details(shift_type_name: str, for_timestamp: datetime = None) -> Dict: - """ - Params: - shift_type_name (str): shift type name for which shift_details are required. - for_timestamp (datetime, optional): Datetime value of checkin, if not provided considers current datetime - - Returns: - dict: Dict containing shift details with the following data: - 'shift_type' - Object of DocType Shift Type, - 'start_datetime' - datetime of shift start on given timestamp, - 'end_datetime' - datetime of shift end on given timestamp, - 'actual_start' - datetime of shift start after adding 'begin_check_in_before_shift_start_time', - 'actual_end' - datetime of shift end after adding 'allow_check_out_after_shift_end_time' (None is returned if this is zero) + """Returns a Dict containing shift details with the following data: + 'shift_type' - Object of DocType Shift Type, + 'start_datetime' - datetime of shift start on given timestamp, + 'end_datetime' - datetime of shift end on given timestamp, + 'actual_start' - datetime of shift start after adding 'begin_check_in_before_shift_start_time', + 'actual_end' - datetime of shift end after adding 'allow_check_out_after_shift_end_time' (None is returned if this is zero) + + :param shift_type_name (str): shift type name for which shift_details are required. + :param for_timestamp (datetime, optional): Datetime value of checkin, if not provided considers current datetime """ if not shift_type_name: return {} @@ -407,8 +487,10 @@ def get_shift_details(shift_type_name: str, for_timestamp: datetime = None) -> D if for_timestamp is None: for_timestamp = now_datetime() - shift_type = frappe.get_doc('Shift Type', shift_type_name) - shift_actual_start = shift_type.start_time - timedelta(minutes=shift_type.begin_check_in_before_shift_start_time) + shift_type = frappe.get_doc("Shift Type", shift_type_name) + shift_actual_start = shift_type.start_time - timedelta( + minutes=shift_type.begin_check_in_before_shift_start_time + ) if shift_type.start_time > shift_type.end_time: # shift spans accross 2 different days @@ -428,13 +510,17 @@ def get_shift_details(shift_type_name: str, for_timestamp: datetime = None) -> D start_datetime = datetime.combine(for_timestamp, datetime.min.time()) + shift_type.start_time end_datetime = datetime.combine(for_timestamp, datetime.min.time()) + shift_type.end_time - actual_start = start_datetime - timedelta(minutes=shift_type.begin_check_in_before_shift_start_time) + actual_start = start_datetime - timedelta( + minutes=shift_type.begin_check_in_before_shift_start_time + ) actual_end = end_datetime + timedelta(minutes=shift_type.allow_check_out_after_shift_end_time) - return frappe._dict({ - 'shift_type': shift_type, - 'start_datetime': start_datetime, - 'end_datetime': end_datetime, - 'actual_start': actual_start, - 'actual_end': actual_end - }) + return frappe._dict( + { + "shift_type": shift_type, + "start_datetime": start_datetime, + "end_datetime": end_datetime, + "actual_start": actual_start, + "actual_end": actual_end, + } + ) diff --git a/erpnext/hr/doctype/shift_type/shift_type.py b/erpnext/hr/doctype/shift_type/shift_type.py index dd1dff1bc4d6..f5689d190f2b 100644 --- a/erpnext/hr/doctype/shift_type/shift_type.py +++ b/erpnext/hr/doctype/shift_type/shift_type.py @@ -34,19 +34,40 @@ def process_auto_attendance(self): return filters = { - 'skip_auto_attendance': 0, - 'attendance': ('is', 'not set'), - 'time': ('>=', self.process_attendance_after), - 'shift_actual_end': ('<', self.last_sync_of_checkin), - 'shift': self.name + "skip_auto_attendance": 0, + "attendance": ("is", "not set"), + "time": (">=", self.process_attendance_after), + "shift_actual_end": ("<", self.last_sync_of_checkin), + "shift": self.name, } - logs = frappe.db.get_list('Employee Checkin', fields="*", filters=filters, order_by="employee,time") + logs = frappe.db.get_list( + "Employee Checkin", fields="*", filters=filters, order_by="employee,time" + ) - for key, group in itertools.groupby(logs, key=lambda x: (x['employee'], x['shift_actual_start'])): + for key, group in itertools.groupby( + logs, key=lambda x: (x["employee"], x["shift_actual_start"]) + ): single_shift_logs = list(group) - attendance_status, working_hours, late_entry, early_exit, in_time, out_time = self.get_attendance(single_shift_logs) - mark_attendance_and_link_log(single_shift_logs, attendance_status, key[1].date(), - working_hours, late_entry, early_exit, in_time, out_time, self.name) + ( + attendance_status, + working_hours, + late_entry, + early_exit, + in_time, + out_time, + ) = self.get_attendance(single_shift_logs) + + mark_attendance_and_link_log( + single_shift_logs, + attendance_status, + key[1].date(), + working_hours, + late_entry, + early_exit, + in_time, + out_time, + self.name, + ) for employee in self.get_assigned_employee(self.process_attendance_after, True): self.mark_absent_for_dates_with_no_attendance(employee) @@ -54,9 +75,9 @@ def process_auto_attendance(self): def get_attendance(self, logs): """Return attendance_status, working_hours, late_entry, early_exit, in_time, out_time for a set of logs belonging to a single shift. - Assumption: - 1. These logs belongs to a single shift, single employee and it's not in a holiday date. - 2. Logs are in chronological order + Assumptions: + 1. These logs belongs to a single shift, single employee and it's not in a holiday date. + 2. Logs are in chronological order """ late_entry = early_exit = False total_working_hours, in_time, out_time = calculate_working_hours( @@ -116,8 +137,9 @@ def mark_absent_for_dates_with_no_attendance(self, employee): mark_attendance(employee, date, "Absent", self.name) def get_start_and_end_dates(self, employee): - date_of_joining, relieving_date, employee_creation = frappe.db.get_value("Employee", employee, - ["date_of_joining", "relieving_date", "creation"]) + date_of_joining, relieving_date, employee_creation = frappe.db.get_value( + "Employee", employee, ["date_of_joining", "relieving_date", "creation"] + ) if not date_of_joining: date_of_joining = employee_creation.date() @@ -126,26 +148,32 @@ def get_start_and_end_dates(self, employee): end_date = None shift_details = get_shift_details(self.name, get_datetime(self.last_sync_of_checkin)) - last_shift_time = shift_details.actual_start if shift_details else get_datetime(self.last_sync_of_checkin) + last_shift_time = ( + shift_details.actual_start if shift_details else get_datetime(self.last_sync_of_checkin) + ) - prev_shift = get_employee_shift(employee, last_shift_time - timedelta(days=1), True, 'reverse') + prev_shift = get_employee_shift(employee, last_shift_time - timedelta(days=1), True, "reverse") if prev_shift: - end_date = min(prev_shift.start_datetime.date(), relieving_date) if relieving_date else prev_shift.start_datetime.date() + end_date = ( + min(prev_shift.start_datetime.date(), relieving_date) + if relieving_date + else prev_shift.start_datetime.date() + ) return start_date, end_date def get_assigned_employee(self, from_date=None, consider_default_shift=False): - filters = {'shift_type': self.name, 'docstatus': '1'} + filters = {"shift_type": self.name, "docstatus": "1"} if from_date: - filters['start_date'] = ('>', from_date) + filters["start_date"] = (">", from_date) - assigned_employees = frappe.get_all('Shift Assignment', filters=filters, pluck='employee') + assigned_employees = frappe.get_all("Shift Assignment", filters=filters, pluck="employee") if consider_default_shift: - filters = {'default_shift': self.name, 'status': ['!=', 'Inactive']} - default_shift_employees = frappe.get_all('Employee', filters=filters, pluck='name') + filters = {"default_shift": self.name, "status": ["!=", "Inactive"]} + default_shift_employees = frappe.get_all("Employee", filters=filters, pluck="name") - return list(set(assigned_employees+default_shift_employees)) + return list(set(assigned_employees + default_shift_employees)) return assigned_employees diff --git a/erpnext/hr/report/monthly_attendance_sheet/monthly_attendance_sheet.py b/erpnext/hr/report/monthly_attendance_sheet/monthly_attendance_sheet.py index a98afe41cc51..8cb1505f6dce 100644 --- a/erpnext/hr/report/monthly_attendance_sheet/monthly_attendance_sheet.py +++ b/erpnext/hr/report/monthly_attendance_sheet/monthly_attendance_sheet.py @@ -14,44 +14,47 @@ Filters = frappe._dict status_map = { - 'Present': 'P', - 'Absent': 'A', - 'Half Day': 'HD', - 'Work From Home': 'WFH', - 'On Leave': 'L', - 'Holiday': 'H', - 'Weekly Off': 'WO' + "Present": "P", + "Absent": "A", + "Half Day": "HD", + "Work From Home": "WFH", + "On Leave": "L", + "Holiday": "H", + "Weekly Off": "WO", } -day_abbr = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'] +day_abbr = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"] -def execute(filters: Optional[Filters] = None) -> Tuple: + +def execute(filters: Optional[Filters] = None) -> Tuple: filters = frappe._dict(filters or {}) if not (filters.month and filters.year): - frappe.throw(_('Please select month and year.')) + frappe.throw(_("Please select month and year.")) attendance_map = get_attendance_map(filters) if not attendance_map: - frappe.msgprint(_('No attendance records found.'), alert=True, indicator='orange') + frappe.msgprint(_("No attendance records found."), alert=True, indicator="orange") return [], [], None, None columns = get_columns(filters) data = get_data(filters, attendance_map) if not data: - frappe.msgprint(_('No attendance records found for this criteria.'), alert=True, indicator='orange') + frappe.msgprint( + _("No attendance records found for this criteria."), alert=True, indicator="orange" + ) return columns, [], None, None - message = get_message() if not filters.summarized_view else '' + message = get_message() if not filters.summarized_view else "" chart = get_chart_data(attendance_map, filters) return columns, data, message, chart def get_message() -> str: - message = '' - colors = ['green', 'red', 'orange', 'green', '#318AD8', '', ''] + message = "" + colors = ["green", "red", "orange", "green", "#318AD8", "", ""] count = 0 for status, abbr in status_map.items(): @@ -70,39 +73,53 @@ def get_columns(filters: Filters) -> List[Dict]: if filters.group_by: columns.append( - {'label': _(filters.group_by), 'fieldname': frappe.scrub(filters.group_by), 'fieldtype': 'Link', 'options': 'Branch', 'width': 120} + { + "label": _(filters.group_by), + "fieldname": frappe.scrub(filters.group_by), + "fieldtype": "Link", + "options": "Branch", + "width": 120, + } ) - columns.extend([ - {'label': _('Employee'), 'fieldname': 'employee', 'fieldtype': 'Link', 'options': 'Employee', 'width': 135}, - {'label': _('Employee Name'), 'fieldname': 'employee_name', 'fieldtype': 'Data', 'width': 120} - ]) + columns.extend( + [ + {"label": _("Employee"), "fieldname": "employee", "fieldtype": "Link", "options": "Employee", "width": 135}, + {"label": _("Employee Name"), "fieldname": "employee_name", "fieldtype": "Data", "width": 120} + ] + ) if filters.summarized_view: - columns.extend([ - {'label': _('Total Present'), 'fieldname': 'total_present', 'fieldtype': 'Float', 'width': 110}, - {'label': _('Total Leaves'), 'fieldname': 'total_leaves', 'fieldtype': 'Float', 'width': 110}, - {'label': _('Total Absent'), 'fieldname': 'total_absent', 'fieldtype': 'Float', 'width': 110}, - {'label': _('Total Holidays'), 'fieldname': 'total_holidays', 'fieldtype': 'Float', 'width': 120}, - {'label': _('Unmarked Days'), 'fieldname': 'unmarked_days', 'fieldtype': 'Float', 'width': 130} - ]) + columns.extend( + [ + {"label": _("Total Present"), "fieldname": "total_present", "fieldtype": "Float", "width": 110}, + {"label": _("Total Leaves"), "fieldname": "total_leaves", "fieldtype": "Float", "width": 110}, + {"label": _("Total Absent"), "fieldname": "total_absent", "fieldtype": "Float", "width": 110}, + {"label": _("Total Holidays"), "fieldname": "total_holidays", "fieldtype": "Float", "width": 120}, + {"label": _("Unmarked Days"), "fieldname": "unmarked_days", "fieldtype": "Float", "width": 130} + ] + ) columns.extend(get_columns_for_leave_types()) - columns.extend([ - {'label': _('Total Late Entries'), 'fieldname': 'total_late_entries', 'fieldtype': 'Float', 'width': 140}, - {'label': _('Total Early Exits'), 'fieldname': 'total_early_exits', 'fieldtype': 'Float', 'width': 140} - ]) + columns.extend( + [ + {"label": _("Total Late Entries"), "fieldname": "total_late_entries", "fieldtype": "Float", "width": 140}, + {"label": _("Total Early Exits"), "fieldname": "total_early_exits", "fieldtype": "Float", "width": 140} + ] + ) else: - columns.append({'label': _('Shift'), 'fieldname': 'shift', 'fieldtype': 'Data', 'width': 120}) + columns.append({"label": _("Shift"), "fieldname": "shift", "fieldtype": "Data", "width": 120}) columns.extend(get_columns_for_days(filters)) return columns def get_columns_for_leave_types() -> List[Dict]: - leave_types = frappe.db.get_all('Leave Type', pluck='name') + leave_types = frappe.db.get_all("Leave Type", pluck="name") types = [] for entry in leave_types: - types.append({'label': entry, 'fieldname': frappe.scrub(entry), 'fieldtype': 'Float', 'width': 120}) + types.append( + {"label": entry, "fieldname": frappe.scrub(entry), "fieldtype": "Float", "width": 120} + ) return types @@ -111,23 +128,14 @@ def get_columns_for_days(filters: Filters) -> List[Dict]: total_days = get_total_days_in_month(filters) days = [] - for day in range(1, total_days+1): + for day in range(1, total_days + 1): # forms the dates from selected year and month from filters - date = '{}-{}-{}'.format( - cstr(filters.year), - cstr(filters.month), - cstr(day) - ) + date = "{}-{}-{}".format(cstr(filters.year), cstr(filters.month), cstr(day)) # gets abbr from weekday number weekday = day_abbr[getdate(date).weekday()] # sets days as 1 Mon, 2 Tue, 3 Wed - label = '{} {}'.format(cstr(day), weekday) - days.append({ - 'label': label, - 'fieldtype': 'Data', - 'fieldname': day, - 'width': 65 - }) + label = "{} {}".format(cstr(day), weekday) + days.append({"label": label, "fieldtype": "Data", "fieldname": day, "width": 65}) return days @@ -137,7 +145,9 @@ def get_total_days_in_month(filters: Filters) -> int: def get_data(filters: Filters, attendance_map: Dict) -> List[Dict]: - employee_details, group_by_param_values = get_employee_related_details(filters.group_by, filters.company) + employee_details, group_by_param_values = get_employee_related_details( + filters.group_by, filters.company + ) holiday_map = get_holiday_map(filters) data = [] @@ -151,9 +161,7 @@ def get_data(filters: Filters, attendance_map: Dict) -> List[Dict]: records = get_rows(employee_details[value], filters, holiday_map, attendance_map) if records: - data.append({ - group_by_column: frappe.bold(value) - }) + data.append({group_by_column: frappe.bold(value)}) data.extend(records) else: data = get_rows(employee_details, filters, holiday_map, attendance_map) @@ -163,30 +171,31 @@ def get_data(filters: Filters, attendance_map: Dict) -> List[Dict]: def get_attendance_map(filters: Filters) -> Dict: """Returns a dictionary of employee wise attendance map as per shifts for all the days of the month like - { - 'employee1': { - 'Morning Shift': {1: 'Present', 2: 'Absent', ...} - 'Evening Shift': {1: 'Absent', 2: 'Present', ...} - }, - 'employee2': { - 'Afternoon Shift': {1: 'Present', 2: 'Absent', ...} - 'Night Shift': {1: 'Absent', 2: 'Absent', ...} - } + { + 'employee1': { + 'Morning Shift': {1: 'Present', 2: 'Absent', ...} + 'Evening Shift': {1: 'Absent', 2: 'Present', ...} + }, + 'employee2': { + 'Afternoon Shift': {1: 'Present', 2: 'Absent', ...} + 'Night Shift': {1: 'Absent', 2: 'Absent', ...} } + } """ - Attendance = frappe.qb.DocType('Attendance') + Attendance = frappe.qb.DocType("Attendance") query = ( frappe.qb.from_(Attendance) .select( Attendance.employee, - Extract('day', Attendance.attendance_date).as_('day_of_month'), + Extract("day", Attendance.attendance_date).as_("day_of_month"), Attendance.status, - Attendance.shift - ).where( + Attendance.shift, + ) + .where( (Attendance.docstatus == 1) & (Attendance.company == filters.company) - & (Extract('month', Attendance.attendance_date) == filters.month) - & (Extract('year', Attendance.attendance_date) == filters.year) + & (Extract("month", Attendance.attendance_date) == filters.month) + & (Extract("year", Attendance.attendance_date) == filters.year) ) ) if filters.employee: @@ -205,18 +214,23 @@ def get_attendance_map(filters: Filters) -> Dict: def get_employee_related_details(group_by: str, company: str) -> Tuple[Dict, List]: """Returns - 1. nested dict for employee details - 2. list of values for the group by filter - eg: if group by filter is set to "Department" then returns a list like ['HR', 'Support', 'Engineering'] + 1. nested dict for employee details + 2. list of values for the group by filter """ - Employee = frappe.qb.DocType('Employee') + Employee = frappe.qb.DocType("Employee") query = ( frappe.qb.from_(Employee) .select( - Employee.name, Employee.employee_name, Employee.designation, - Employee.grade, Employee.department, Employee.branch, - Employee.company, Employee.holiday_list - ).where(Employee.company == company) + Employee.name, + Employee.employee_name, + Employee.designation, + Employee.grade, + Employee.department, + Employee.branch, + Employee.company, + Employee.holiday_list, + ) + .where(Employee.company == company) ) if group_by: @@ -258,12 +272,12 @@ def get_holiday_map(filters: Filters) -> Dict[str, List[Dict]]: } """ # add default holiday list too - holiday_lists = frappe.db.get_all('Holiday List', pluck='name') - default_holiday_list = frappe.get_cached_value('Company', filters.company, 'default_holiday_list') + holiday_lists = frappe.db.get_all("Holiday List", pluck="name") + default_holiday_list = frappe.get_cached_value("Company", filters.company, "default_holiday_list") holiday_lists.append(default_holiday_list) holiday_map = frappe._dict() - Holiday = frappe.qb.DocType('Holiday') + Holiday = frappe.qb.DocType("Holiday") for d in holiday_lists: if not d: @@ -271,13 +285,11 @@ def get_holiday_map(filters: Filters) -> Dict[str, List[Dict]]: holidays = ( frappe.qb.from_(Holiday) - .select( - Extract('day', Holiday.holiday_date).as_('day_of_month'), - Holiday.weekly_off - ).where( + .select(Extract("day", Holiday.holiday_date).as_("day_of_month"), Holiday.weekly_off) + .where( (Holiday.parent == d) - & (Extract('month', Holiday.holiday_date) == filters.month) - & (Extract('year', Holiday.holiday_date) == filters.year) + & (Extract("month", Holiday.holiday_date) == filters.month) + & (Extract("year", Holiday.holiday_date) == filters.year) ) ).run(as_dict=True) @@ -286,9 +298,11 @@ def get_holiday_map(filters: Filters) -> Dict[str, List[Dict]]: return holiday_map -def get_rows(employee_details: Dict, filters: Filters, holiday_map: Dict, attendance_map: Dict) -> List[Dict]: +def get_rows( + employee_details: Dict, filters: Filters, holiday_map: Dict, attendance_map: Dict +) -> List[Dict]: records = [] - default_holiday_list = frappe.get_cached_value('Company', filters.company, 'default_holiday_list') + default_holiday_list = frappe.get_cached_value("Company", filters.company, "default_holiday_list") for employee, details in employee_details.items(): emp_holiday_list = details.holiday_list or default_holiday_list @@ -302,7 +316,7 @@ def get_rows(employee_details: Dict, filters: Filters, holiday_map: Dict, attend leave_summary = get_leave_summary(employee, filters) entry_exits_summary = get_entry_exits_summary(employee, filters) - row = {'employee': employee, 'employee_name': details.employee_name} + row = {"employee": employee, "employee_name": details.employee_name} set_defaults_for_summarized_view(filters, row) row.update(attendance) row.update(leave_summary) @@ -314,12 +328,13 @@ def get_rows(employee_details: Dict, filters: Filters, holiday_map: Dict, attend if not employee_attendance: continue - attendance_for_employee = get_attendance_status_for_detailed_view(employee, filters, employee_attendance, holidays) + attendance_for_employee = get_attendance_status_for_detailed_view( + employee, filters, employee_attendance, holidays + ) # set employee details in the first row - attendance_for_employee[0].update({ - 'employee': employee, - 'employee_name': details.employee_name - }) + attendance_for_employee[0].update( + {"employee": employee, "employee_name": details.employee_name} + ) records.extend(attendance_for_employee) @@ -328,13 +343,15 @@ def get_rows(employee_details: Dict, filters: Filters, holiday_map: Dict, attend def set_defaults_for_summarized_view(filters, row): for entry in get_columns(filters): - if entry.get('fieldtype') == 'Float': - row[entry.get('fieldname')] = 0.0 + if entry.get("fieldtype") == "Float": + row[entry.get("fieldname")] = 0.0 -def get_attendance_status_for_summarized_view(employee: str, filters: Filters, holidays: List) -> Dict: +def get_attendance_status_for_summarized_view( + employee: str, filters: Filters, holidays: List +) -> Dict: """Returns dict of attendance status for employee like - {'total_present': 1.5, 'total_leaves': 0.5, 'total_absent': 13.5, 'total_holidays': 8, 'unmarked_days': 5} + {'total_present': 1.5, 'total_leaves': 0.5, 'total_absent': 13.5, 'total_holidays': 8, 'unmarked_days': 5} """ summary, attendance_days = get_attendance_summary_and_days(employee, filters) if not any(summary.values()): @@ -348,83 +365,93 @@ def get_attendance_status_for_summarized_view(employee: str, filters: Filters, h continue status = get_holiday_status(day, holidays) - if status in ['Weekly Off', 'Holiday']: + if status in ["Weekly Off", "Holiday"]: total_holidays += 1 elif not status: total_unmarked_days += 1 return { - 'total_present': summary.total_present + summary.total_half_days, - 'total_leaves': summary.total_leaves + summary.total_half_days, - 'total_absent': summary.total_absent + summary.total_half_days, - 'total_holidays': total_holidays, - 'unmarked_days': total_unmarked_days + "total_present": summary.total_present + summary.total_half_days, + "total_leaves": summary.total_leaves + summary.total_half_days, + "total_absent": summary.total_absent + summary.total_half_days, + "total_holidays": total_holidays, + "unmarked_days": total_unmarked_days, } def get_attendance_summary_and_days(employee: str, filters: Filters) -> Tuple[Dict, List]: - Attendance = frappe.qb.DocType('Attendance') + Attendance = frappe.qb.DocType("Attendance") - present_case = frappe.qb.terms.Case().when(((Attendance.status == 'Present') | (Attendance.status == 'Work From Home')), 1).else_(0) - sum_present = Sum(present_case).as_('total_present') + present_case = ( + frappe.qb.terms.Case() + .when(((Attendance.status == "Present") | (Attendance.status == "Work From Home")), 1) + .else_(0) + ) + sum_present = Sum(present_case).as_("total_present") - absent_case = frappe.qb.terms.Case().when(Attendance.status == 'Absent', 1).else_(0) - sum_absent = Sum(absent_case).as_('total_absent') + absent_case = frappe.qb.terms.Case().when(Attendance.status == "Absent", 1).else_(0) + sum_absent = Sum(absent_case).as_("total_absent") - leave_case = frappe.qb.terms.Case().when(Attendance.status == 'On Leave', 1).else_(0) - sum_leave = Sum(leave_case).as_('total_leaves') + leave_case = frappe.qb.terms.Case().when(Attendance.status == "On Leave", 1).else_(0) + sum_leave = Sum(leave_case).as_("total_leaves") - half_day_case = frappe.qb.terms.Case().when(Attendance.status == 'Half Day', 0.5).else_(0) - sum_half_day = Sum(half_day_case).as_('total_half_days') + half_day_case = frappe.qb.terms.Case().when(Attendance.status == "Half Day", 0.5).else_(0) + sum_half_day = Sum(half_day_case).as_("total_half_days") summary = ( frappe.qb.from_(Attendance) .select( - sum_present, sum_absent, sum_leave, sum_half_day, - ).where( + sum_present, + sum_absent, + sum_leave, + sum_half_day, + ) + .where( (Attendance.docstatus == 1) & (Attendance.employee == employee) & (Attendance.company == filters.company) - & (Extract('month', Attendance.attendance_date) == filters.month) - & (Extract('year', Attendance.attendance_date) == filters.year) + & (Extract("month", Attendance.attendance_date) == filters.month) + & (Extract("year", Attendance.attendance_date) == filters.year) ) ).run(as_dict=True) days = ( frappe.qb.from_(Attendance) - .select(Extract('day', Attendance.attendance_date).as_('day_of_month')) + .select(Extract("day", Attendance.attendance_date).as_("day_of_month")) .distinct() .where( (Attendance.docstatus == 1) & (Attendance.employee == employee) & (Attendance.company == filters.company) - & (Extract('month', Attendance.attendance_date) == filters.month) - & (Extract('year', Attendance.attendance_date) == filters.year) + & (Extract("month", Attendance.attendance_date) == filters.month) + & (Extract("year", Attendance.attendance_date) == filters.year) ) ).run(pluck=True) return summary[0], days -def get_attendance_status_for_detailed_view(employee: str, filters: Filters, employee_attendance: Dict, holidays: List) -> List[Dict]: +def get_attendance_status_for_detailed_view( + employee: str, filters: Filters, employee_attendance: Dict, holidays: List +) -> List[Dict]: """Returns list of shift-wise attendance status for employee - [ - {'shift': 'Morning Shift', 1: 'A', 2: 'P', 3: 'A'....}, - {'shift': 'Evening Shift', 1: 'P', 2: 'A', 3: 'P'....} - ] + [ + {'shift': 'Morning Shift', 1: 'A', 2: 'P', 3: 'A'....}, + {'shift': 'Evening Shift', 1: 'P', 2: 'A', 3: 'P'....} + ] """ total_days = get_total_days_in_month(filters) attendance_values = [] for shift, status_dict in employee_attendance.items(): - row = {'shift': shift} + row = {"shift": shift} for day in range(1, total_days + 1): status = status_dict.get(day) if status is None and holidays: status = get_holiday_status(day, holidays) - abbr = status_map.get(status, '') + abbr = status_map.get(status, "") row[day] = abbr attendance_values.append(row) @@ -435,22 +462,22 @@ def get_attendance_status_for_detailed_view(employee: str, filters: Filters, emp def get_holiday_status(day: int, holidays: List) -> str: status = None for holiday in holidays: - if day == holiday.get('day_of_month'): - if holiday.get('weekly_off'): - status = 'Weekly Off' + if day == holiday.get("day_of_month"): + if holiday.get("weekly_off"): + status = "Weekly Off" else: - status = 'Holiday' + status = "Holiday" break return status def get_leave_summary(employee: str, filters: Filters) -> Dict[str, float]: """Returns a dict of leave type and corresponding leaves taken by employee like: - {'leave_without_pay': 1.0, 'sick_leave': 2.0} + {'leave_without_pay': 1.0, 'sick_leave': 2.0} """ - Attendance = frappe.qb.DocType('Attendance') - day_case = frappe.qb.terms.Case().when(Attendance.status == 'Half Day', 0.5).else_(1) - sum_leave_days = Sum(day_case).as_('leave_days') + Attendance = frappe.qb.DocType("Attendance") + day_case = frappe.qb.terms.Case().when(Attendance.status == "Half Day", 0.5).else_(1) + sum_leave_days = Sum(day_case).as_("leave_days") leave_details = ( frappe.qb.from_(Attendance) @@ -459,10 +486,11 @@ def get_leave_summary(employee: str, filters: Filters) -> Dict[str, float]: (Attendance.employee == employee) & (Attendance.docstatus == 1) & (Attendance.company == filters.company) - & ((Attendance.leave_type.isnotnull()) | (Attendance.leave_type != '')) - & (Extract('month', Attendance.attendance_date) == filters.month) - & (Extract('year', Attendance.attendance_date) == filters.year) - ).groupby(Attendance.leave_type) + & ((Attendance.leave_type.isnotnull()) | (Attendance.leave_type != "")) + & (Extract("month", Attendance.attendance_date) == filters.month) + & (Extract("year", Attendance.attendance_date) == filters.year) + ) + .groupby(Attendance.leave_type) ).run(as_dict=True) leaves = {} @@ -475,15 +503,15 @@ def get_leave_summary(employee: str, filters: Filters) -> Dict[str, float]: def get_entry_exits_summary(employee: str, filters: Filters) -> Dict[str, float]: """Returns total late entries and total early exits for employee like: - {'total_late_entries': 5, 'total_early_exits': 2} + {'total_late_entries': 5, 'total_early_exits': 2} """ - Attendance = frappe.qb.DocType('Attendance') + Attendance = frappe.qb.DocType("Attendance") - late_entry_case = frappe.qb.terms.Case().when(Attendance.late_entry == '1', '1') - count_late_entries = Count(late_entry_case).as_('total_late_entries') + late_entry_case = frappe.qb.terms.Case().when(Attendance.late_entry == "1", "1") + count_late_entries = Count(late_entry_case).as_("total_late_entries") - early_exit_case = frappe.qb.terms.Case().when(Attendance.early_exit == '1', '1') - count_early_exits = Count(early_exit_case).as_('total_early_exits') + early_exit_case = frappe.qb.terms.Case().when(Attendance.early_exit == "1", "1") + count_early_exits = Count(early_exit_case).as_("total_early_exits") entry_exits = ( frappe.qb.from_(Attendance) @@ -492,8 +520,8 @@ def get_entry_exits_summary(employee: str, filters: Filters) -> Dict[str, float] (Attendance.docstatus == 1) & (Attendance.employee == employee) & (Attendance.company == filters.company) - & (Extract('month', Attendance.attendance_date) == filters.month) - & (Extract('year', Attendance.attendance_date) == filters.year) + & (Extract("month", Attendance.attendance_date) == filters.month) + & (Extract("year", Attendance.attendance_date) == filters.year) ) ).run(as_dict=True) @@ -503,10 +531,10 @@ def get_entry_exits_summary(employee: str, filters: Filters) -> Dict[str, float] @frappe.whitelist() def get_attendance_years() -> str: """Returns all the years for which attendance records exist""" - Attendance = frappe.qb.DocType('Attendance') + Attendance = frappe.qb.DocType("Attendance") year_list = ( frappe.qb.from_(Attendance) - .select(Extract('year', Attendance.attendance_date).as_('year')) + .select(Extract("year", Attendance.attendance_date).as_("year")) .distinct() ).run(as_dict=True) @@ -526,21 +554,21 @@ def get_chart_data(attendance_map: Dict, filters: Filters) -> Dict: leave = [] for day in days: - labels.append(day['label']) + labels.append(day["label"]) total_absent_on_day = total_leaves_on_day = total_present_on_day = 0 for employee, attendance_dict in attendance_map.items(): for shift, attendance in attendance_dict.items(): - attendance_on_day = attendance.get(day['fieldname']) + attendance_on_day = attendance.get(day["fieldname"]) - if attendance_on_day == 'Absent': + if attendance_on_day == "Absent": total_absent_on_day += 1 - elif attendance_on_day in ['Present', 'Work From Home']: + elif attendance_on_day in ["Present", "Work From Home"]: total_present_on_day += 1 - elif attendance_on_day == 'Half Day': + elif attendance_on_day == "Half Day": total_present_on_day += 0.5 total_leaves_on_day += 0.5 - elif attendance_on_day == 'On Leave': + elif attendance_on_day == "On Leave": total_leaves_on_day += 1 absent.append(total_absent_on_day) @@ -548,14 +576,14 @@ def get_chart_data(attendance_map: Dict, filters: Filters) -> Dict: leave.append(total_leaves_on_day) return { - 'data': { - 'labels': labels, - 'datasets': [ - {'name': 'Absent', 'values': absent}, - {'name': 'Present', 'values': present}, - {'name': 'Leave', 'values': leave}, - ] + "data": { + "labels": labels, + "datasets": [ + {"name": "Absent", "values": absent}, + {"name": "Present", "values": present}, + {"name": "Leave", "values": leave}, + ], }, - 'type': 'line', - 'colors': ['red', 'green', 'blue'], - } \ No newline at end of file + "type": "line", + "colors": ["red", "green", "blue"], + } diff --git a/erpnext/hr/report/monthly_attendance_sheet/test_monthly_attendance_sheet.py b/erpnext/hr/report/monthly_attendance_sheet/test_monthly_attendance_sheet.py index 6b778e51e48b..331e5d255026 100644 --- a/erpnext/hr/report/monthly_attendance_sheet/test_monthly_attendance_sheet.py +++ b/erpnext/hr/report/monthly_attendance_sheet/test_monthly_attendance_sheet.py @@ -1,7 +1,7 @@ import frappe from dateutil.relativedelta import relativedelta from frappe.tests.utils import FrappeTestCase -from frappe.utils import add_days, get_year_ending, get_year_start, getdate, now_datetime +from frappe.utils import get_year_ending, get_year_start, getdate, now_datetime from erpnext.hr.doctype.attendance.attendance import mark_attendance from erpnext.hr.doctype.employee.test_employee import make_employee @@ -12,12 +12,13 @@ ) from erpnext.payroll.doctype.salary_slip.test_salary_slip import make_leave_application -test_dependencies = ['Shift Type'] +test_dependencies = ["Shift Type"] + class TestMonthlyAttendanceSheet(FrappeTestCase): def setUp(self): self.employee = make_employee("test_employee@example.com") - frappe.db.delete('Attendance') + frappe.db.delete("Attendance") def test_monthly_attendance_sheet_report(self): now = now_datetime() @@ -41,13 +42,13 @@ def test_monthly_attendance_sheet_report(self): report = execute(filters=filters) record = report[1][0] - datasets = report[3]['data']['datasets'] - absent = datasets[0]['values'] - present = datasets[1]['values'] - leaves = datasets[2]['values'] + datasets = report[3]["data"]["datasets"] + absent = datasets[0]["values"] + present = datasets[1]["values"] + leaves = datasets[2]["values"] # ensure correct attendance is reflect on the report - self.assertEqual(self.employee, record.get('employee')) + self.assertEqual(self.employee, record.get("employee")) self.assertEqual(absent[0], 1) self.assertEqual(present[1], 1) self.assertEqual(leaves[2], 1) @@ -57,110 +58,118 @@ def test_monthly_attendance_sheet_with_detailed_view(self): previous_month = now.month - 1 previous_month_first = now.replace(day=1).replace(month=previous_month).date() - company = frappe.db.get_value('Employee', self.employee, 'company') + company = frappe.db.get_value("Employee", self.employee, "company") # attendance with shift - mark_attendance(self.employee, previous_month_first, 'Absent', 'Day Shift') - mark_attendance(self.employee, previous_month_first + relativedelta(days=1), 'Present', 'Day Shift') + mark_attendance(self.employee, previous_month_first, "Absent", "Day Shift") + mark_attendance( + self.employee, previous_month_first + relativedelta(days=1), "Present", "Day Shift" + ) # attendance without shift - mark_attendance(self.employee, previous_month_first + relativedelta(days=2), 'On Leave') - mark_attendance(self.employee, previous_month_first + relativedelta(days=3), 'Present') - - filters = frappe._dict({ - 'month': previous_month, - 'year': now.year, - 'company': company, - }) + mark_attendance(self.employee, previous_month_first + relativedelta(days=2), "On Leave") + mark_attendance(self.employee, previous_month_first + relativedelta(days=3), "Present") + + filters = frappe._dict( + { + "month": previous_month, + "year": now.year, + "company": company, + } + ) report = execute(filters=filters) day_shift_row = report[1][0] row_without_shift = report[1][1] - self.assertEqual(day_shift_row['shift'], 'Day Shift') - self.assertEqual(day_shift_row[1], 'A') # absent on the 1st day of the month - self.assertEqual(day_shift_row[2], 'P') # present on the 2nd day + self.assertEqual(day_shift_row["shift"], "Day Shift") + self.assertEqual(day_shift_row[1], "A") # absent on the 1st day of the month + self.assertEqual(day_shift_row[2], "P") # present on the 2nd day - self.assertEqual(row_without_shift['shift'], None) - self.assertEqual(row_without_shift[3], 'L') # on leave on the 3rd day - self.assertEqual(row_without_shift[4], 'P') # present on the 4th day + self.assertEqual(row_without_shift["shift"], None) + self.assertEqual(row_without_shift[3], "L") # on leave on the 3rd day + self.assertEqual(row_without_shift[4], "P") # present on the 4th day def test_monthly_attendance_sheet_with_summarized_view(self): now = now_datetime() previous_month = now.month - 1 previous_month_first = now.replace(day=1).replace(month=previous_month).date() - company = frappe.db.get_value('Employee', self.employee, 'company') + company = frappe.db.get_value("Employee", self.employee, "company") # attendance with shift - mark_attendance(self.employee, previous_month_first, 'Absent', 'Day Shift') - mark_attendance(self.employee, previous_month_first + relativedelta(days=1), 'Present', 'Day Shift') + mark_attendance(self.employee, previous_month_first, "Absent", "Day Shift") + mark_attendance( + self.employee, previous_month_first + relativedelta(days=1), "Present", "Day Shift" + ) - mark_attendance(self.employee, previous_month_first + relativedelta(days=3), 'Present') # attendance without shift - mark_attendance(self.employee, previous_month_first + relativedelta(days=4), 'Present', late_entry=1) # late entry - mark_attendance(self.employee, previous_month_first + relativedelta(days=5), 'Present', early_exit=1) # early exit + mark_attendance( + self.employee, previous_month_first + relativedelta(days=3), "Present" + ) # attendance without shift + mark_attendance( + self.employee, previous_month_first + relativedelta(days=4), "Present", late_entry=1 + ) # late entry + mark_attendance( + self.employee, previous_month_first + relativedelta(days=5), "Present", early_exit=1 + ) # early exit leave_application = get_leave_application(self.employee) - filters = frappe._dict({ - 'month': previous_month, - 'year': now.year, - 'company': company, - 'summarized_view': 1 - }) + filters = frappe._dict( + {"month": previous_month, "year": now.year, "company": company, "summarized_view": 1} + ) report = execute(filters=filters) row = report[1][0] - self.assertEqual(row['employee'], self.employee) - self.assertEqual(row['total_present'], 4) - self.assertEqual(row['total_absent'], 1) - self.assertEqual(row['total_leaves'], leave_application.total_leave_days) + self.assertEqual(row["employee"], self.employee) + self.assertEqual(row["total_present"], 4) + self.assertEqual(row["total_absent"], 1) + self.assertEqual(row["total_leaves"], leave_application.total_leave_days) # total days - (days with attendance + leave days) unmarked_days = get_total_days_in_month(filters) - (5 + leave_application.total_leave_days) - self.assertEqual(row['unmarked_days'], unmarked_days) - self.assertEqual(row['_test_leave_type'], leave_application.total_leave_days) - self.assertEqual(row['total_late_entries'], 1) - self.assertEqual(row['total_early_exits'], 1) + self.assertEqual(row["unmarked_days"], unmarked_days) + self.assertEqual(row["_test_leave_type"], leave_application.total_leave_days) + self.assertEqual(row["total_late_entries"], 1) + self.assertEqual(row["total_early_exits"], 1) def test_attendance_with_group_by_filter(self): now = now_datetime() previous_month = now.month - 1 previous_month_first = now.replace(day=1).replace(month=previous_month).date() - company = frappe.db.get_value('Employee', self.employee, 'company') + company = frappe.db.get_value("Employee", self.employee, "company") # attendance with shift - mark_attendance(self.employee, previous_month_first, 'Absent', 'Day Shift') - mark_attendance(self.employee, previous_month_first + relativedelta(days=1), 'Present', 'Day Shift') + mark_attendance(self.employee, previous_month_first, "Absent", "Day Shift") + mark_attendance( + self.employee, previous_month_first + relativedelta(days=1), "Present", "Day Shift" + ) # attendance without shift - mark_attendance(self.employee, previous_month_first + relativedelta(days=2), 'On Leave') - mark_attendance(self.employee, previous_month_first + relativedelta(days=3), 'Present') - - filters = frappe._dict({ - 'month': previous_month, - 'year': now.year, - 'company': company, - 'group_by': 'Department' - }) + mark_attendance(self.employee, previous_month_first + relativedelta(days=2), "On Leave") + mark_attendance(self.employee, previous_month_first + relativedelta(days=3), "Present") + + filters = frappe._dict( + {"month": previous_month, "year": now.year, "company": company, "group_by": "Department"} + ) report = execute(filters=filters) - department = frappe.db.get_value('Employee', self.employee, 'department') + department = frappe.db.get_value("Employee", self.employee, "department") department_row = report[1][0] - self.assertIn(department, department_row['department']) + self.assertIn(department, department_row["department"]) day_shift_row = report[1][1] row_without_shift = report[1][2] - self.assertEqual(day_shift_row['shift'], 'Day Shift') - self.assertEqual(day_shift_row[1], 'A') # absent on the 1st day of the month - self.assertEqual(day_shift_row[2], 'P') # present on the 2nd day + self.assertEqual(day_shift_row["shift"], "Day Shift") + self.assertEqual(day_shift_row[1], "A") # absent on the 1st day of the month + self.assertEqual(day_shift_row[2], "P") # present on the 2nd day - self.assertEqual(row_without_shift['shift'], None) - self.assertEqual(row_without_shift[3], 'L') # on leave on the 3rd day - self.assertEqual(row_without_shift[4], 'P') # present on the 4th day + self.assertEqual(row_without_shift["shift"], None) + self.assertEqual(row_without_shift[3], "L") # on leave on the 3rd day + self.assertEqual(row_without_shift[4], "P") # present on the 4th day def get_leave_application(employee): @@ -170,8 +179,8 @@ def get_leave_application(employee): date = getdate() year_start = getdate(get_year_start(date)) year_end = getdate(get_year_ending(date)) - allocation = make_allocation_record(employee=employee, from_date=year_start, to_date=year_end) + make_allocation_record(employee=employee, from_date=year_start, to_date=year_end) from_date = now.replace(day=7).replace(month=previous_month).date() to_date = now.replace(day=8).replace(month=previous_month).date() - return make_leave_application(employee, from_date, to_date, '_Test Leave Type') \ No newline at end of file + return make_leave_application(employee, from_date, to_date, "_Test Leave Type")