Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ site
.env
tests/cp/constraints/output_test_all_constraints*
tests/cp/constraints/mass*
processed_solutions/*
3 changes: 3 additions & 0 deletions src/cp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
from .objectives import (
MinimizeConsecutiveNightShiftsObjective as MinimizeConsecutiveNightShiftsObjective,
)
from .objectives import (
MinimizeHiddenEmployeeCountObjective as MinimizeHiddenEmployeeCountObjective,
)
from .objectives import (
MinimizeHiddenEmployeesObjective as MinimizeHiddenEmployeesObjective,
)
Expand Down
2 changes: 1 addition & 1 deletion src/cp/constraints/planned_shifts.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def create(
# Map the shift code to ID
shift_id = Shift.SHIFT_MAPPING.get(shift_code)
if shift_id is None:
logging.warning(f"Unknown shift code: {shift_code} for {employee.name}")
# logging.warning(f"Unknown shift code: {shift_code} for {employee.name}")
# Case 77 and case 3 both have shifts that trigger this case (e.g. see "Rodriques").
# I think this may originate in the fact that all shifts are hardcoded :(
# print(f"\n\nUnknown shift code: {shift_code} for {employee.name}\n\n")
Expand Down
4 changes: 2 additions & 2 deletions src/cp/constraints/target_working_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ def create(
# target_working_time - TOLERANCE_LESS <= working_time_variable <= target_working_time + TOLERANCE_MORE
target_working_time = round(
max(
employee.target_working_time * len(available_days) / len(self._days),
-employee.actual_working_time,
employee.target_working_time * len(available_days) / len(self._days)
- employee.actual_working_time,
0,
)
)
Expand Down
3 changes: 3 additions & 0 deletions src/cp/objectives/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
from .minimize_consecutive_night_shifts import (
MinimizeConsecutiveNightShiftsObjective as MinimizeConsecutiveNightShiftsObjective,
)
from .minimize_hidden_employee_count import (
MinimizeHiddenEmployeeCountObjective as MinimizeHiddenEmployeeCountObjective,
)
from .minimize_hidden_employees import (
MinimizeHiddenEmployeesObjective as MinimizeHiddenEmployeesObjective,
)
Expand Down
49 changes: 49 additions & 0 deletions src/cp/objectives/minimize_hidden_employee_count.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from typing import cast

from ortools.sat.python.cp_model import CpModel, IntVar, LinearExpr

from src.day import Day
from src.employee import Employee
from src.shift import Shift

from ..variables import EmployeeWorksOnDayVariables, ShiftAssignmentVariables
from .objective import Objective


class MinimizeHiddenEmployeeCountObjective(Objective):
@property
def KEY(self) -> str:
return "minimize-hidden-employees"

def __init__(
self,
weight: float,
employees: list[Employee],
days: list[Day],
shifts: list[Shift],
):
"""
Initializes the objective to minimize overtime for employees.
Overtime is calculated as the difference between the total working time and the target working time.
"""
super().__init__(weight, employees, days, shifts)

def create(
self,
model: CpModel,
shift_assignment_variables: ShiftAssignmentVariables,
employee_works_on_day_variables: EmployeeWorksOnDayVariables,
) -> LinearExpr:
hidden_employee_work_vars: list[IntVar] = []

for employee in self._employees:
if not employee.hidden:
continue

hidden_employee_is_used = model.new_bool_var(f"hidden_employee_is_used_{employee.get_key()}")
for day in self._days:
model.Add(employee_works_on_day_variables[employee][day] <= hidden_employee_is_used)

hidden_employee_work_vars.append(hidden_employee_is_used)

return cast(LinearExpr, sum(hidden_employee_work_vars)) * self._weight
23 changes: 22 additions & 1 deletion src/loader/filesystem_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,31 @@ def get_employees(self, start: int = 0) -> list[Employee]:
)
)

employees += super().get_employees(len(employees))
# employees += super().get_employees(len(employees))

# employees += self.get_hidden_employees({"Azubi":3,"Fachkraft":3,"Hilfskraft":3})

return employees

@staticmethod
def get_hidden_employees(num_hidden_employees_per_level: dict[str, int], start: int = 0):
hidden_employees: list[Employee] = []
last_id = start
for level, num in num_hidden_employees_per_level.items():
for new_id in range(last_id, last_id + num):
hidden_employees.append(
Employee(
key=new_id,
name="Hidden",
surname=f"{level}{new_id}",
level=level,
type="hidden",
)
)
last_id += num

return hidden_employees

# shouldnt this be a static function?
def get_shifts(self) -> list[Shift]:
base_shifts = [
Expand Down
17 changes: 13 additions & 4 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def solve(unit: int, start: datetime, end: datetime, timeout: int):

click.echo(f"Creating staff schedule for planning unit {unit} from {start.date()} to {end.date()}.")

solver(
employees, _, _ = solver(
unit=unit,
start_date=start.date(),
end_date=end.date(),
Expand All @@ -45,7 +45,12 @@ def solve(unit: int, start: datetime, end: datetime, timeout: int):

solution_name = f"solution_{unit}_{start.date()}-{end.date()}_wdefault"

process_solution(loader=loader, output_filename=solution_name + "_processed.json", solution_file_name=solution_name)
process_solution(
loader=loader,
employees=employees,
output_filename=solution_name + "_processed.json",
solution_file_name=solution_name,
)


@cli.command("solve-multiple")
Expand Down Expand Up @@ -99,26 +104,30 @@ def solve_multiple(unit: int, start: datetime, end: datetime, timeout: int):
},
]

employees = None
for weight_id, weights in enumerate(weight_sets):
click.echo(
"Creating staff schedule for planning unit "
f"{unit} from {start.date()} to {end.date()} "
f"with weight set {weight_id}"
)

solver(
employees, _, _ = solver(
unit=unit,
start_date=start.date(),
end_date=end.date(),
timeout=timeout,
weights=weights,
weight_id=weight_id,
employees=employees,
)
loader = FSLoader(unit, start_date=start.date(), end_date=end.date())

in_name = f"solution_{unit}_{start.date()}-{end.date()}_w{weight_id}"

process_solution(loader=loader, output_filename=in_name + "_processed.json", solution_file_name=in_name)
process_solution(
loader=loader, employees=employees, output_filename=in_name + "_processed.json", solution_file_name=in_name
)


@cli.command()
Expand Down
92 changes: 86 additions & 6 deletions src/solve.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import logging
import time
from collections.abc import Mapping
from datetime import date

from ortools.sat.python.cp_model import CpSolver

from src.cp import (
EverySecondWeekendFreeObjective,
FreeDayAfterNightShiftPhaseConstraint,
Expand All @@ -11,6 +14,7 @@
MaximizeEmployeeWishesObjective,
MaxOneShiftPerDayConstraint,
MinimizeConsecutiveNightShiftsObjective,
# MinimizeHiddenEmployeeCountObjective,
MinimizeHiddenEmployeesObjective,
MinimizeOvertimeObjective,
MinRestTimeConstraint,
Expand All @@ -23,31 +27,106 @@
TargetWorkingTimeConstraint,
VacationDaysAndShiftsConstraint,
)
from src.day import Day
from src.employee import Employee
from src.loader import FSLoader
from src.shift import Shift

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

MAX_CONSECUTIVE_DAYS = 5


def solve_with_constraints_only(
employees: list[Employee], days: list[Day], shifts: list[Shift], min_staffing: dict[str, dict[str, dict[str, int]]]
) -> str:
constraints = [
FreeDayAfterNightShiftPhaseConstraint(employees, days, shifts),
MinRestTimeConstraint(employees, days, shifts),
MinStaffingConstraint(min_staffing, employees, days, shifts),
RoundsInEarlyShiftConstraint(employees, days, shifts),
MaxOneShiftPerDayConstraint(employees, days, shifts),
TargetWorkingTimeConstraint(employees, days, shifts),
VacationDaysAndShiftsConstraint(employees, days, shifts),
HierarchyOfIntermediateShiftsConstraint(employees, days, shifts),
PlannedShiftsConstraint(employees, days, shifts),
]

model = Model(employees, days, shifts)
for constraint in constraints:
model.add_constraint(constraint)

solver: CpSolver = CpSolver()
solver.parameters.num_workers = 8
solver.parameters.max_time_in_seconds = 5
solver.parameters.linearization_level = 0

start = time.time()
solver.solve(model.cpModel)
end = time.time()
logging.info(f"Wall time: {end - start}")

return CpSolver.StatusName(solver)


def main(
unit: int,
start_date: date,
end_date: date,
timeout: int,
weights: Mapping[str, int | float] | None = None,
weight_id: int | None = None,
employees: list[Employee] | None = None,
):
loader = FSLoader(unit, start_date=start_date, end_date=end_date)
employees = loader.get_employees()
days = loader.get_days(start_date, end_date)
shifts = loader.get_shifts()
min_staffing = loader.get_min_staffing()

if employees is None:
employees = loader.get_employees()
# minimize hidden employees "manually"

# phase 1 - find an upper bound
# increase the hidden employee count raptitly for all classes simultaniously
logging.info("Minimizing Hidden Employee Count Phase 1")
status = "INFEASIBLE"
increase = 5
num_hidden_employees_per_level = {"Azubi": -increase, "Fachkraft": -increase, "Hilfskraft": -increase}
while (status == "INFEASIBLE" or status == "UNKNOWN") and sum(num_hidden_employees_per_level.values()) <= 50:
num_hidden_employees_per_level = {x: y + increase for (x, y) in num_hidden_employees_per_level.items()}
logging.info(f"Trying to solve with {num_hidden_employees_per_level}")
status = solve_with_constraints_only(
employees + FSLoader.get_hidden_employees(num_hidden_employees_per_level), days, shifts, min_staffing
)
logging.info(f'Solver returned status = "{status}"')
logging.info(f"Hidden Employee Upper Bound: {num_hidden_employees_per_level}\n")

# phase 2 - find tight bounds
# for each employee level lower the count unti it is tight
logging.info("Minimizing Hidden Employee Count Phase 2")
for level, value in num_hidden_employees_per_level.items():
tmp = num_hidden_employees_per_level
for i in range(value - 1, -1, -1):
tmp[level] = i
logging.info(f"Trying to solve with {tmp}")
status = solve_with_constraints_only(
employees + FSLoader.get_hidden_employees(tmp), days, shifts, min_staffing
)
logging.info(f'Solver returned status = "{status}"')
if status == "INFEASIBLE" or status == "UNKNOWN":
num_hidden_employees_per_level[level] = i + 1
break
logging.info(f"Hidden Employee Tight Bound: {num_hidden_employees_per_level}\n")

employees += FSLoader.get_hidden_employees(num_hidden_employees_per_level)

if weights is None:
weights = {
"free_weekend": 2,
"consecutive_nights": 2,
"hidden": 100,
"hidden_count": 1000000,
"overtime": 4,
"consecutive_days": 1,
"rotate": 1,
Expand All @@ -64,8 +143,6 @@ def main(
logging.info(f" - number of days: {len(days)}")
logging.info(f" - number of shifts: {len(shifts)}")

min_staffing = loader.get_min_staffing()

constraints = [
FreeDayAfterNightShiftPhaseConstraint(employees, days, shifts),
MinRestTimeConstraint(employees, days, shifts),
Expand All @@ -87,17 +164,20 @@ def main(
MaximizeEmployeeWishesObjective(weights["wishes"], employees, days, shifts),
FreeDaysAfterNightShiftPhaseObjective(weights["after_night"], employees, days, shifts),
EverySecondWeekendFreeObjective(weights["second_weekend"], employees, days),
# MinimizeHiddenEmployeeCountObjective(weights["hidden_count"], employees, days, shifts),
]

model = Model(employees, days, shifts)

for objective in objectives:
model.add_objective(objective)

for constraint in constraints:
model.add_constraint(constraint)

for objective in objectives:
model.add_objective(objective)

solution = model.solve(timeout)
wid = weight_id if weight_id is not None else "default"
solution_name = f"solution_{unit}_{start_date}-{end_date}_w{wid}"
loader.write_solution(solution, solution_name)

return employees, days, shifts
7 changes: 5 additions & 2 deletions src/web/process_solution.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,12 @@ def collect_day_information(solution: Solution, employees: list[Employee], shift


def process_solution(
loader: Loader, output_filename: str = "processed_solution.json", solution_file_name: str | None = None
loader: Loader,
employees: list[Employee],
output_filename: str = "processed_solution.json",
solution_file_name: str | None = None,
):
employees = loader.get_employees()
# employees = loader.get_employees()
shifts = loader.get_shifts()

solution_files = loader.load_solution_file_names()
Expand Down
6 changes: 3 additions & 3 deletions tests/cp/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,12 @@ def setup_with_minstaffing(

@pytest.fixture
def setup_case_77() -> tuple[Model, dict[str, dict[str, dict[str, int]]]]:
start_date = datetime(2025, 11, 1).date()
end_date = datetime(2025, 11, 30).date()
start_date = datetime(2024, 11, 1).date()
end_date = datetime(2024, 11, 30).date()
loader = FSLoader(case_id=77, start_date=start_date, end_date=end_date)

days = loader.get_days(start_date, end_date)
employees = loader.get_employees()
employees = loader.get_employees() + FSLoader.get_hidden_employees({"Azubi": 3, "Fachkraft": 3, "Hilfskraft": 3})
shifts = loader.get_shifts()
min_staffing = loader.get_min_staffing()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def find_hierarchy_of_intermediate_shifts_violations(
weeks: list[tuple[list[Day], list[Day]]] = [
(
[day - timedelta(i) for i in range(1, 6) if day - timedelta(i) >= days[0]],
[day, day + timedelta(1) if day - timedelta(1) <= days[-1] else day],
[day, day + timedelta(1) if day + timedelta(1) <= days[-1] else day],
)
for day in days
if day.weekday() == 5
Expand Down
Loading
Loading