| import pandas as pd |
| import src.preprocess.transform as transformed_data |
| import datetime |
| from datetime import timedelta |
| import src.preprocess.extract as extract |
| from src.config.constants import ShiftType, LineType, KitLevel, DefaultConfig |
|
|
| |
| import importlib |
|
|
| |
| |
| |
|
|
|
|
| def get_date_span(): |
| """Get date span from streamlit session state, or return default""" |
| try: |
| import streamlit as st |
| if hasattr(st, 'session_state'): |
| |
| if 'start_date' in st.session_state and 'planning_days' in st.session_state: |
| from datetime import datetime, timedelta |
| start_date = datetime.combine(st.session_state.start_date, datetime.min.time()) |
| planning_days = st.session_state.planning_days |
| end_date = start_date + timedelta(days=planning_days - 1) |
| date_span = list(range(1, planning_days + 1)) |
| return date_span, start_date, end_date |
| except: |
| pass |
| |
| |
| from datetime import datetime |
| return list(range(1, 6)), datetime(2025, 7, 7), datetime(2025, 7, 11) |
|
|
|
|
| |
| |
| DATE_SPAN = None |
| start_date = None |
| end_date = None |
|
|
| def get_product_list(): |
| """Get filtered product list without printing spam""" |
| try: |
| from src.demand_filtering import DemandFilter |
| filter_instance = DemandFilter() |
| filter_instance.load_data(force_reload=True) |
| return filter_instance.get_filtered_product_list() |
| except: |
| |
| date_span, start_date, end_date = get_date_span() |
| return transformed_data.get_released_product_list(start_date) |
|
|
|
|
| def get_employee_type_list(): |
| """Get employee type list from session state or default""" |
| try: |
| import streamlit as st |
| if hasattr(st, 'session_state') and 'selected_employee_types' in st.session_state: |
| return st.session_state.selected_employee_types |
| except: |
| pass |
| |
| |
| employee_type_list = extract.read_employee_data() |
| return employee_type_list["employment_type"].unique().tolist() |
|
|
| |
| def get_shift_list(): |
| """Get shift list from session state or default""" |
| try: |
| import streamlit as st |
| if hasattr(st, 'session_state') and 'selected_shifts' in st.session_state: |
| return st.session_state.selected_shifts |
| except: |
| pass |
| |
| |
| shift_list = extract.get_shift_info() |
| return shift_list["id"].unique().tolist() |
|
|
| |
| |
| |
| |
| |
| EVENING_SHIFT_MODE = "normal" |
|
|
| |
| |
| EVENING_SHIFT_DEMAND_THRESHOLD = 0.9 |
|
|
| |
| def get_active_shift_list(): |
| """ |
| Get the list of active shifts based on EVENING_SHIFT_MODE setting. |
| """ |
| all_shifts = get_shift_list() |
| |
| if EVENING_SHIFT_MODE == "normal": |
| |
| active_shifts = [s for s in all_shifts if s in ShiftType.REGULAR_AND_OVERTIME] |
| print(f"[SHIFT MODE] Normal mode: Using shifts {active_shifts} (Regular + Overtime only, NO evening)") |
| |
| elif EVENING_SHIFT_MODE == "activate_evening": |
| |
| active_shifts = list(all_shifts) |
| print(f"[SHIFT MODE] Evening activated: Using all shifts {active_shifts}") |
| |
| elif EVENING_SHIFT_MODE == "always_available": |
| |
| active_shifts = list(all_shifts) |
| print(f"[SHIFT MODE] Always available: Using all shifts {active_shifts}") |
| |
| else: |
| |
| active_shifts = [s for s in all_shifts if s in ShiftType.REGULAR_AND_OVERTIME] |
| print(f"[SHIFT MODE] Unknown mode '{EVENING_SHIFT_MODE}', defaulting to normal: {active_shifts}") |
| |
| return active_shifts |
|
|
| |
| |
|
|
| |
| def get_line_list(): |
| """Get line list - try from streamlit session state first, then from data files""" |
| try: |
| |
| import streamlit as st |
| if hasattr(st, 'session_state') and 'selected_lines' in st.session_state: |
| print(f"Using lines from Dataset Metadata page: {st.session_state.selected_lines}") |
| return st.session_state.selected_lines |
| except Exception as e: |
| print(f"Could not get lines from streamlit session: {e}") |
| |
| |
| print(f"Loading line list from data files") |
| line_df = extract.read_packaging_line_data() |
| line_list = line_df["id"].unique().tolist() |
| return line_list |
|
|
| |
| |
|
|
| |
| def get_kit_line_match(): |
| kit_line_match = extract.read_kit_line_match_data() |
| kit_line_match_dict = kit_line_match.set_index("kit_name")["line_type"].to_dict() |
| |
| |
| line_name_to_id = { |
| "long line": LineType.LONG_LINE, |
| "mini load": LineType.MINI_LOAD, |
| "miniload": LineType.MINI_LOAD, |
| "Long_line": LineType.LONG_LINE, |
| "Mini_load": LineType.MINI_LOAD, |
| } |
| |
| |
| converted_dict = {} |
| for kit, line_name in kit_line_match_dict.items(): |
| if isinstance(line_name, str) and line_name.strip(): |
| |
| line_id = line_name_to_id.get(line_name.strip(), None) |
| if line_id is not None: |
| converted_dict[kit] = line_id |
| else: |
| print(f"Warning: Unknown line type '{line_name}' for kit {kit}") |
| |
| converted_dict[kit] = LineType.LONG_LINE |
| elif isinstance(line_name, (int, float)) and not pd.isna(line_name): |
| |
| converted_dict[kit] = int(line_name) |
| else: |
| |
| pass |
| |
| return converted_dict |
|
|
| KIT_LINE_MATCH_DICT = get_kit_line_match() |
|
|
|
|
| def get_line_cnt_per_type(): |
| try: |
| |
| import streamlit as st |
| if hasattr(st, 'session_state') and 'line_counts' in st.session_state: |
| print(f"Using line counts from config page: {st.session_state.line_counts}") |
| return st.session_state.line_counts |
| except Exception as e: |
| print(f"Could not get line counts from streamlit session: {e}") |
| |
| print(f"Loading default line count values from data files") |
| line_df = extract.read_packaging_line_data() |
| line_cnt_per_type = line_df.set_index("id")["line_count"].to_dict() |
| print("line cnt per type", line_cnt_per_type) |
| return line_cnt_per_type |
|
|
| |
| |
|
|
| |
| def get_demand_dictionary(force_reload=False): |
| """ |
| Get filtered demand dictionary. |
| IMPORTANT: This dynamically loads data to reflect current Streamlit configs/dates. |
| """ |
| try: |
| |
| from src.demand_filtering import DemandFilter |
| filter_instance = DemandFilter() |
| |
| |
| filter_instance.load_data(force_reload=True) |
| |
| demand_dictionary = filter_instance.get_filtered_demand_dictionary() |
| print(f"📈 FRESH FILTERED DEMAND: {len(demand_dictionary)} products with total demand {sum(demand_dictionary.values())}") |
| print(f"🔄 LOADED DYNAMICALLY: Reflects current Streamlit configs") |
| return demand_dictionary |
| except Exception as e: |
| print(f"Error loading dynamic demand dictionary: {e}") |
| raise Exception("Demand dictionary not found with error:"+str(e)) |
|
|
| |
| |
|
|
| |
| def get_cost_list_per_emp_shift(): |
| try: |
| |
| import streamlit as st |
| if hasattr(st, 'session_state') and 'cost_list_per_emp_shift' in st.session_state: |
| print(f"Using cost list from config page: {st.session_state.cost_list_per_emp_shift}") |
| return st.session_state.cost_list_per_emp_shift |
| except Exception as e: |
| print(f"Could not get cost list from streamlit session: {e}") |
| |
| print(f"Loading default cost values") |
| |
| return DefaultConfig.DEFAULT_COST_RATES |
|
|
| def shift_code_to_name(): |
| return ShiftType.get_all_names() |
|
|
| def line_code_to_name(): |
| """Convert line type IDs to readable names""" |
| return LineType.get_all_names() |
|
|
| |
| |
| |
|
|
|
|
| |
| |
| |
| |
|
|
|
|
|
|
|
|
|
|
|
|
| |
| def get_team_requirements(product_list=None): |
| """ |
| Extract team requirements from Kits Calculation CSV. |
| Returns dictionary with employee type as key and product requirements as nested dict. |
| """ |
| if product_list is None: |
| product_list = get_product_list() |
| |
|
|
| kits_df = extract.read_personnel_requirement_data() |
|
|
| team_req_dict = { |
| "UNICEF Fixed term": {}, |
| "Humanizer": {} |
| } |
| |
| |
| for product in product_list: |
| print("product",product) |
| print(f"Processing team requirements for product: {product}") |
| product_data = kits_df[kits_df['Kit'] == product] |
| print("product_data",product_data) |
| if not product_data.empty: |
| |
| humanizer_req = product_data["Humanizer"].iloc[0] |
| unicef_req = product_data["UNICEF staff"].iloc[0] |
| |
| |
| team_req_dict["Humanizer"][product] = int(humanizer_req) |
| team_req_dict["UNICEF Fixed term"][product] = int(unicef_req) |
| else: |
| print(f"Warning: Product {product} not found in Kits Calculation data, setting requirements to 0") |
| |
| |
| return team_req_dict |
|
|
|
|
|
|
| def get_max_employee_per_type_on_day(): |
| try: |
| |
| import streamlit as st |
| if hasattr(st, 'session_state') and 'max_employee_per_type_on_day' in st.session_state: |
| print(f"Using max employee counts from config page: {st.session_state.max_employee_per_type_on_day}") |
| return st.session_state.max_employee_per_type_on_day |
| except Exception as e: |
| print(f"Could not get max employee counts from streamlit session: {e}") |
| |
| print(f"Loading default max employee values") |
| |
| if DATE_SPAN is None: |
| date_span, _, _ = get_date_span() |
| else: |
| date_span = DATE_SPAN |
| |
| max_employee_per_type_on_day = { |
| "UNICEF Fixed term": { |
| t: 8 for t in date_span |
| }, |
| "Humanizer": { |
| t: 10 for t in date_span |
| } |
| } |
| return max_employee_per_type_on_day |
|
|
|
|
| |
| MAX_HOUR_PER_PERSON_PER_DAY = 14 |
| def get_max_hour_per_shift_per_person(): |
| """Get max hours per shift per person from session state or default""" |
| try: |
| import streamlit as st |
| if hasattr(st, 'session_state'): |
| |
| max_hours = { |
| ShiftType.REGULAR: st.session_state.get('max_hours_shift_1', DefaultConfig.MAX_HOUR_PER_SHIFT_PER_PERSON[ShiftType.REGULAR]), |
| ShiftType.EVENING: st.session_state.get('max_hours_shift_2', DefaultConfig.MAX_HOUR_PER_SHIFT_PER_PERSON[ShiftType.EVENING]), |
| ShiftType.OVERTIME: st.session_state.get('max_hours_shift_3', DefaultConfig.MAX_HOUR_PER_SHIFT_PER_PERSON[ShiftType.OVERTIME]) |
| } |
| return max_hours |
| except Exception as e: |
| print(f"Could not get max hours per shift from session: {e}") |
| |
| |
| return DefaultConfig.MAX_HOUR_PER_SHIFT_PER_PERSON |
|
|
|
|
|
|
| |
| def get_evening_shift_demand_threshold(): |
| """Get evening shift demand threshold from session state or default""" |
| try: |
| import streamlit as st |
| if hasattr(st, 'session_state'): |
| return st.session_state.get('evening_shift_threshold', DefaultConfig.EVENING_SHIFT_DEMAND_THRESHOLD) |
| except Exception as e: |
| print(f"Could not get evening shift threshold from session: {e}") |
| |
| |
| return DefaultConfig.EVENING_SHIFT_DEMAND_THRESHOLD |
|
|
|
|
| |
| def get_kit_hierarchy_data(): |
| kit_levels, dependencies, priority_order = extract.get_production_order_data() |
| |
| return kit_levels, dependencies, priority_order |
|
|
| KIT_LEVELS, KIT_DEPENDENCIES, PRODUCTION_PRIORITY_ORDER = get_kit_hierarchy_data() |
| print(f"Kit Hierarchy loaded: {len(KIT_LEVELS)} kits, Priority order: {len(PRODUCTION_PRIORITY_ORDER)} items") |
|
|
| def get_kit_levels(): |
| """Get kit levels lazily - returns {kit_id: level} where 0=prepack, 1=subkit, 2=master""" |
| kit_levels, _, _ = get_kit_hierarchy_data() |
| return kit_levels |
|
|
| def get_kit_dependencies(): |
| """Get kit dependencies lazily - returns {kit_id: [dependency_list]}""" |
| _, dependencies, _ = get_kit_hierarchy_data() |
| return dependencies |
|
|
| def get_max_parallel_workers(): |
| """Get max parallel workers from session state or default""" |
| try: |
| import streamlit as st |
| if hasattr(st, 'session_state'): |
| |
| max_parallel_workers = { |
| LineType.LONG_LINE: st.session_state.get('max_parallel_workers_long_line', DefaultConfig.MAX_PARALLEL_WORKERS_LONG_LINE), |
| LineType.MINI_LOAD: st.session_state.get('max_parallel_workers_mini_load', DefaultConfig.MAX_PARALLEL_WORKERS_MINI_LOAD) |
| } |
| return max_parallel_workers |
| except Exception as e: |
| print(f"Could not get max parallel workers from session: {e}") |
| |
| |
| return { |
| LineType.LONG_LINE: DefaultConfig.MAX_PARALLEL_WORKERS_LONG_LINE, |
| LineType.MINI_LOAD: DefaultConfig.MAX_PARALLEL_WORKERS_MINI_LOAD |
| } |
|
|
|
|
|
|
| def get_fixed_min_unicef_per_day(): |
| """ |
| Get fixed minimum UNICEF employees per day - try from streamlit session state first, then default |
| This ensures a minimum number of UNICEF fixed-term staff are present every working day |
| """ |
| try: |
| import streamlit as st |
| if hasattr(st, 'session_state') and 'fixed_min_unicef_per_day' in st.session_state: |
| print(f"Using fixed minimum UNICEF per day from config page: {st.session_state.fixed_min_unicef_per_day}") |
| return st.session_state.fixed_min_unicef_per_day |
| except ImportError: |
| pass |
| |
| |
| return DefaultConfig.FIXED_MIN_UNICEF_PER_DAY |
|
|
|
|
| def get_payment_mode_config(): |
| """ |
| Get payment mode configuration - try from streamlit session state first, then default values |
| Payment modes: |
| - "bulk": If employee works any hours in shift, pay for full shift hours |
| - "partial": Pay only for actual hours worked |
| """ |
| try: |
| |
| import streamlit as st |
| if hasattr(st, 'session_state') and 'payment_mode_config' in st.session_state: |
| print(f"Using payment mode config from streamlit session: {st.session_state.payment_mode_config}") |
| return st.session_state.payment_mode_config |
| except Exception as e: |
| print(f"Could not get payment mode config from streamlit session: {e}") |
| |
| |
| print(f"Loading default payment mode configuration") |
| payment_mode_config = DefaultConfig.PAYMENT_MODE_CONFIG |
| |
| return payment_mode_config |
|
|
|
|
| print("✅ Module-level configuration functions defined (variables initialized dynamically)") |
|
|