From 4c0ec3137501ec3ed6c58c64f0994f4c32a89f40 Mon Sep 17 00:00:00 2001 From: Christopher Date: Sat, 3 May 2025 13:15:27 +0200 Subject: [PATCH 1/4] feat: Maybe working analysis tool --- src/analysis.py | 144 +++++++++++++++++++++++++++++++++++++++++++++- src/common/DBF.py | 2 +- 2 files changed, 143 insertions(+), 3 deletions(-) diff --git a/src/analysis.py b/src/analysis.py index a3833a5..4028756 100644 --- a/src/analysis.py +++ b/src/analysis.py @@ -1,11 +1,151 @@ from common.csvreader import read_csv -import sys +from common.DBF import DBF_EDF, DBF_RM +from common.scheduler import Scheduler +import numpy as np def main(): - architecture, budgets, tasks = read_csv() + architectures, budgets, tasks = read_csv() + + # Step 1: Adjust WCETs based on core speed + adjusted_tasks = adjust_wcet(tasks, budgets, architectures) + + # Step 2: Group tasks by component + components = group_tasks_by_component(adjusted_tasks, budgets) + + # Step 3: Compute BDR parameters and check component schedulability + components = compute_bdr_parameters(components) + + # Step 4: Determine task schedulability based on component status + task_schedulability = determine_task_schedulability(components) + + # Step 5: Check core-level schedulability + core_schedulability = check_core_schedulability(components, architectures) + + # Step 6: Generate output + generate_output(core_schedulability, task_schedulability, components) +# ---- Helper Functions ---- +def adjust_wcet(tasks, budgets, architectures): + core_speed_map = {arch.core_id: arch.speed_factor for arch in architectures} + component_core_map = {budget.component_id: budget.core_id for budget in budgets} + + for task in tasks: + core_id = component_core_map[task.component_id] + speed_factor = core_speed_map[core_id] + task.wcet = task.wcet / speed_factor # Adjust WCET + return tasks +def group_tasks_by_component(tasks, budgets): + components = {} + for budget in budgets: + comp_id = budget.component_id + comp_tasks = [t for t in tasks if t.component_id == comp_id] + + if budget.scheduler == Scheduler.RM: + comp_tasks = sorted(comp_tasks, key=lambda x: x.priority) + + components[comp_id] = { + "tasks": comp_tasks, + "scheduler": budget.scheduler, + "core_id": budget.core_id, + "period": budget.period, + "priority": budget.priority, + "schedulable": False # Initialize component schedulability + } + return components +def compute_bdr_parameters(components): + for comp_id, comp_data in components.items(): + tasks = comp_data["tasks"] + scheduler = comp_data["scheduler"] + + alpha = sum(t.wcet / t.period for t in tasks) + delta = find_minimal_delta(tasks, alpha, scheduler) + + components[comp_id]["alpha"] = alpha + components[comp_id]["delta"] = delta + components[comp_id]["schedulable"] = delta != float('inf') # Check schedulability + return components + +def find_minimal_delta(tasks, alpha, scheduler): + max_period = max(t.period for t in tasks) + for delta in np.linspace(0, 2 * max_period, 100): + if is_schedulable(tasks, alpha, delta, scheduler): + return delta + return float('inf') + +def is_schedulable(tasks, alpha, delta, scheduler): + time_points = np.linspace(0, 2 * max(t.period for t in tasks), 100) + for t in time_points: + sbf = max(alpha * (t - delta), 0) + + if scheduler == Scheduler.EDF: + dbf = DBF_EDF(tasks, t, explicit_dead_line=0).getDBS() + else: + max_dbf = 0 + for i in range(len(tasks)): + dbf_fps = DBF_RM(tasks, t, i).getDBS() + max_dbf = max(max_dbf, dbf_fps) + dbf = max_dbf + + if dbf > sbf: + return False + return True + +def determine_task_schedulability(components): + task_schedulability = {} + for comp_id, comp_data in components.items(): + for task in comp_data["tasks"]: + task_schedulability[task.task_name] = comp_data["schedulable"] + return task_schedulability + +def check_core_schedulability(components, architectures): + report = {} + for arch in architectures: + core_comps = [c for c in components.values() if c["core_id"] == arch.core_id] + if arch.scheduler == Scheduler.EDF: + # Check sum(α_children) <= 1 (EDF core utilization) + total_alpha = sum(c["alpha"] for c in core_comps) + report[arch.core_id] = total_alpha <= 1 + else: + # Check Theorem 1 for RM cores: + # 1. Sum of child α <= parent α (not applicable here as parent is the core) + # 2. Child Δ >= parent Δ (ensure child components’ Δ >= core’s Δ) + # Simplified: Validate RM priority assignment + periods = [] + priorities = [] + for comp in core_comps: + if comp["priority"] is not None: + periods.append(comp["period"]) + priorities.append(comp["priority"]) + sorted_by_period = sorted(zip(periods, priorities), key=lambda x: x[0]) + report[arch.core_id] = all( + p1 > p2 for (_, p1), (_, p2) in zip(sorted_by_period, sorted_by_period[1:]) + ) + return report + +def generate_output(core_report, task_report, components): + # Component-level report + print("\n--- Component BDR Parameters and Schedulability ---") + for comp_id, comp_data in components.items(): + status = "Schedulable" if comp_data["schedulable"] else "Unschedulable" + print( + f"Component {comp_id} (Core {comp_data['core_id']}): " + f"α={comp_data['alpha']:.2f}, Δ={comp_data['delta']:.2f} ({status})" + ) + + # Core-level report (updated for Theorem 1 checks) + print("\n--- Core Schedulability Report ---") + for core_id, status in core_report.items(): + print( + f"Core {core_id}: {'Schedulable' if status else 'Unschedulable'} " + f"[Hierarchical Theorem 1: {'Satisfied' if status else 'Violated'}]" + ) + + # Task-level report + print("\n--- Task Schedulability Report ---") + for task_name, status in task_report.items(): + print(f"Task {task_name}: {'Schedulable' if status else 'Unschedulable'}") if __name__ == "__main__": main() \ No newline at end of file diff --git a/src/common/DBF.py b/src/common/DBF.py index ee8d2cc..a0e0d0a 100644 --- a/src/common/DBF.py +++ b/src/common/DBF.py @@ -29,7 +29,7 @@ def getDBS(self) -> float: dbs += (num / task.period) * task.wcet return dbs -class DBF_FPS(DBF): +class DBF_RM(DBF): def __init__(self, tasks: list[Task], time_interval: float, task_index: int): self.task_index = task_index super().__init__(tasks, time_interval) From 18606eea1b6a40a6c6c7e956f045acd35d7dc6fe Mon Sep 17 00:00:00 2001 From: Christopher Date: Sat, 10 May 2025 23:23:12 +0200 Subject: [PATCH 2/4] fix: Cooked analysis tool --- .gitignore | 1 + data/testcases | 2 +- src/analysis.py | 282 ++++++++++++++++++++++------------------ src/common/BDR.py | 221 ++++++++++++++----------------- src/common/DBF.py | 89 ++++++------- src/common/budget.py | 4 +- src/common/csvreader.py | 10 +- src/common/task.py | 4 +- 8 files changed, 305 insertions(+), 308 deletions(-) diff --git a/.gitignore b/.gitignore index afc9798..19078ca 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ tests/.tmp __pycache__ .vscode .ruff_cache +solution.csv diff --git a/data/testcases b/data/testcases index d255eae..0166426 160000 --- a/data/testcases +++ b/data/testcases @@ -1 +1 @@ -Subproject commit d255eae14fd7153234d504e2212788a06580efbe +Subproject commit 0166426bc193a61c1eba36877ea93511ddf7db04 diff --git a/src/analysis.py b/src/analysis.py index 4028756..c555960 100644 --- a/src/analysis.py +++ b/src/analysis.py @@ -1,151 +1,187 @@ +import sys +import csv +import math +from functools import reduce + from common.csvreader import read_csv -from common.DBF import DBF_EDF, DBF_RM +from common.DBF import DBF +from common.BDR import BDR from common.scheduler import Scheduler -import numpy as np -def main(): - architectures, budgets, tasks = read_csv() - - # Step 1: Adjust WCETs based on core speed - adjusted_tasks = adjust_wcet(tasks, budgets, architectures) - - # Step 2: Group tasks by component - components = group_tasks_by_component(adjusted_tasks, budgets) - - # Step 3: Compute BDR parameters and check component schedulability - components = compute_bdr_parameters(components) - - # Step 4: Determine task schedulability based on component status - task_schedulability = determine_task_schedulability(components) - - # Step 5: Check core-level schedulability - core_schedulability = check_core_schedulability(components, architectures) - - # Step 6: Generate output - generate_output(core_schedulability, task_schedulability, components) - -# ---- Helper Functions ---- + +def lcm(a: int, b: int) -> int: + """Compute least common multiple of two integers.""" + return abs(a * b) // math.gcd(a, b) + + def adjust_wcet(tasks, budgets, architectures): core_speed_map = {arch.core_id: arch.speed_factor for arch in architectures} component_core_map = {budget.component_id: budget.core_id for budget in budgets} - + for task in tasks: core_id = component_core_map[task.component_id] - speed_factor = core_speed_map[core_id] - task.wcet = task.wcet / speed_factor # Adjust WCET + task.wcet = task.wcet / core_speed_map[core_id] # Adjust WCET return tasks + def group_tasks_by_component(tasks, budgets): components = {} for budget in budgets: comp_id = budget.component_id comp_tasks = [t for t in tasks if t.component_id == comp_id] - - if budget.scheduler == Scheduler.RM: - comp_tasks = sorted(comp_tasks, key=lambda x: x.priority) - + components[comp_id] = { "tasks": comp_tasks, "scheduler": budget.scheduler, "core_id": budget.core_id, - "period": budget.period, + "period": budget.period, + "budget": budget.budget, "priority": budget.priority, - "schedulable": False # Initialize component schedulability + "bdr": None, + "supply": None, + "schedulable": True, } return components -def compute_bdr_parameters(components): - for comp_id, comp_data in components.items(): - tasks = comp_data["tasks"] - scheduler = comp_data["scheduler"] +def analyze_components(components): + def calculate_bdr(comp): + """Calculate BDR interface using PRM-to-BDR conversion or task-derived values""" + if comp["budget"] and comp["period"]: + alpha = comp["budget"] / comp["period"] + delta = 2 * (comp["period"] - comp["budget"]) + return BDR(alpha, delta) + return BDR.from_tasks(comp["tasks"], comp["scheduler"]) + + def get_time_points(tasks, scheduler, task_idx=None): + """Get critical time points for schedulability analysis""" + if scheduler == Scheduler.RM: + # RM: Check up to current task's period with higher-priority tasks + periods = [t.period for t in tasks[:task_idx+1]] + max_t = tasks[task_idx].period + return sorted({k*T for T in periods for k in range(1, int(max_t//T)+1)}) - alpha = sum(t.wcet / t.period for t in tasks) - delta = find_minimal_delta(tasks, alpha, scheduler) + # EDF: Use hyperperiod + periods = [t.period for t in tasks] + hyper = reduce(lcm, periods) + return sorted({k*T for T in periods for k in range(1, hyper//T + 1)}) + + def is_schedulable(task_idx, tasks, bdr, scheduler): + """Check if task at index meets demand <= supply at all critical points""" + time_points = get_time_points(tasks, scheduler, task_idx) + for t in time_points: + demand = DBF.dbf_edf(tasks, t) if scheduler == Scheduler.EDF \ + else DBF.dbf_rm(tasks, t, task_idx) + if demand > bdr.sbf(t): + return False + return True + + # Main analysis flow + for comp in components.values(): + # 1. Calculate BDR interface and supply parameters + comp["bdr"] = calculate_bdr(comp) + comp["supply"] = comp["bdr"].supply_task_params() + + # 2. Check schedulability for all tasks + tasks = comp["tasks"] + all_schedulable = True - components[comp_id]["alpha"] = alpha - components[comp_id]["delta"] = delta - components[comp_id]["schedulable"] = delta != float('inf') # Check schedulability + for idx, task in enumerate(tasks): + task.schedulable = is_schedulable(idx, tasks, comp["bdr"], comp["scheduler"]) + all_schedulable &= task.schedulable + + comp["schedulable"] = all_schedulable + return components -def find_minimal_delta(tasks, alpha, scheduler): - max_period = max(t.period for t in tasks) - for delta in np.linspace(0, 2 * max_period, 100): - if is_schedulable(tasks, alpha, delta, scheduler): - return delta - return float('inf') - -def is_schedulable(tasks, alpha, delta, scheduler): - time_points = np.linspace(0, 2 * max(t.period for t in tasks), 100) - for t in time_points: - sbf = max(alpha * (t - delta), 0) - - if scheduler == Scheduler.EDF: - dbf = DBF_EDF(tasks, t, explicit_dead_line=0).getDBS() - else: - max_dbf = 0 - for i in range(len(tasks)): - dbf_fps = DBF_RM(tasks, t, i).getDBS() - max_dbf = max(max_dbf, dbf_fps) - dbf = max_dbf - - if dbf > sbf: - return False - return True - -def determine_task_schedulability(components): - task_schedulability = {} - for comp_id, comp_data in components.items(): - for task in comp_data["tasks"]: - task_schedulability[task.task_name] = comp_data["schedulable"] - return task_schedulability - -def check_core_schedulability(components, architectures): - report = {} - for arch in architectures: - core_comps = [c for c in components.values() if c["core_id"] == arch.core_id] - if arch.scheduler == Scheduler.EDF: - # Check sum(α_children) <= 1 (EDF core utilization) - total_alpha = sum(c["alpha"] for c in core_comps) - report[arch.core_id] = total_alpha <= 1 + +def hierarchical_check_by_core(components): + # Helper to mark component and tasks as unschedulable + def mark_unschedulable(components): + for comp in components: + comp["schedulable"] = False + for task in comp["tasks"]: + task.schedulable = False + + # Group components per core + core_map = {} + for comp in components.values(): + core_map.setdefault(comp["core_id"], []).append(comp) + + for core_id, comps in core_map.items(): + # Create supply tasks + supply_tasks = [ + (type('SupplyTask', (), { + 'wcet': comp["supply"][0], + 'period': comp["supply"][1], + 'deadline': comp["supply"][1], + 'priority': comp["priority"] if comp["scheduler"] == Scheduler.RM else None + }), comp) + for comp in comps + ] + + # Check Theorem 1 (BDR interface compatibility) + parent_bdr = BDR(1.0, 0.0) # Full CPU resource + child_bdrs = [comp["bdr"] for comp in comps] + if not BDR.can_schedule_children(parent_bdr, child_bdrs): + mark_unschedulable([comp for _, comp in supply_tasks]) + continue + + # Get core-level scheduler (default EDF) + core_sched = comps[0].get('core_scheduler', Scheduler.EDF) + + # EDF: Utilization test + if core_sched == Scheduler.EDF: + utilization = sum(t.wcet/t.period for t, _ in supply_tasks) + if utilization > 1.0: + mark_unschedulable([comp for _, comp in supply_tasks]) + + # RM: DBF test at deadlines else: - # Check Theorem 1 for RM cores: - # 1. Sum of child α <= parent α (not applicable here as parent is the core) - # 2. Child Δ >= parent Δ (ensure child components’ Δ >= core’s Δ) - # Simplified: Validate RM priority assignment - periods = [] - priorities = [] - for comp in core_comps: - if comp["priority"] is not None: - periods.append(comp["period"]) - priorities.append(comp["priority"]) - sorted_by_period = sorted(zip(periods, priorities), key=lambda x: x[0]) - report[arch.core_id] = all( - p1 > p2 for (_, p1), (_, p2) in zip(sorted_by_period, sorted_by_period[1:]) - ) - return report - -def generate_output(core_report, task_report, components): - # Component-level report - print("\n--- Component BDR Parameters and Schedulability ---") - for comp_id, comp_data in components.items(): - status = "Schedulable" if comp_data["schedulable"] else "Unschedulable" - print( - f"Component {comp_id} (Core {comp_data['core_id']}): " - f"α={comp_data['alpha']:.2f}, Δ={comp_data['delta']:.2f} ({status})" - ) - - # Core-level report (updated for Theorem 1 checks) - print("\n--- Core Schedulability Report ---") - for core_id, status in core_report.items(): - print( - f"Core {core_id}: {'Schedulable' if status else 'Unschedulable'} " - f"[Hierarchical Theorem 1: {'Satisfied' if status else 'Violated'}]" - ) - - # Task-level report - print("\n--- Task Schedulability Report ---") - for task_name, status in task_report.items(): - print(f"Task {task_name}: {'Schedulable' if status else 'Unschedulable'}") - -if __name__ == "__main__": - main() \ No newline at end of file + sorted_tasks = sorted(supply_tasks, key=lambda x: x[0].priority) + for idx, (task, comp) in enumerate(sorted_tasks): + if DBF.dbf_rm([t[0] for t in sorted_tasks], task.period, idx) > task.period: + mark_unschedulable([comp]) + + return components + + +def write_solution(components, filename="solution.csv"): + with open(filename, 'w', newline='') as f: + writer = csv.writer(f) + writer.writerow(['task_name', 'component_id', 'task_schedulable', 'component_schedulable']) + for comp_id, comp in components.items(): + comp_ok = 1 if comp['schedulable'] else 0 + for t in comp['tasks']: + task_ok = 1 if getattr(t, 'schedulable', False) else 0 + writer.writerow([t.task_name, comp_id, task_ok, comp_ok]) + + +def print_results(components): + for comp_id, comp in components.items(): + print(f"Component {comp_id}: schedulable = {comp['schedulable']}") + print(f" BDR interface (alpha,delta) = ({comp['bdr'].rate:.4f}, {comp['bdr'].delay:.4f})") + print(f" Supply task (Q,P) = ({comp['supply'][0]:.4f}, {comp['supply'][1]:.4f})") + for t in comp['tasks']: + print(f" Task {t.task_name:20} schedulable = {t.schedulable}") + print() # Add a newline after each component + print() + + +def main(): + if len(sys.argv) != 4: + print("Usage: python analysis.py ") + sys.exit(1) + + architectures, budgets, tasks = read_csv() + tasks = adjust_wcet(tasks, budgets, architectures) + components = group_tasks_by_component(tasks, budgets) + + components = analyze_components(components) + components = hierarchical_check_by_core(components) + + print_results(components) + write_solution(components) + + +if __name__ == '__main__': + main() diff --git a/src/common/BDR.py b/src/common/BDR.py index 05778f8..4b9d2dd 100644 --- a/src/common/BDR.py +++ b/src/common/BDR.py @@ -1,139 +1,112 @@ -from SRPModel import SRPModel -import numpy as np +# common/BDR.py + +import math +from typing import List, Tuple +from functools import reduce + +from common.DBF import DBF +from common.scheduler import Scheduler + + +def lcm(a: int, b: int) -> int: + """Compute least common multiple of two integers.""" + return abs(a * b) // math.gcd(a, b) + class BDR: - def __init__(self, model: SRPModel, time_interval: int,): - self.model = model - self.time_interval = time_interval - self.availability_factor = self.get_availability_factor() - self.supply_function = self.get_supply_function() - self.supply_bound_function = self.get_sbfSPR(self.supply_function, time_interval) - self.partition_delay = self.get_partition_delay() - self.sbf_bdr = self.get_sbfBDR() - def get_availability_factor(self) -> float: + def __init__(self, rate: float, delay: float): """ - Calculate the availability factor for a given SRP model. - - Args: - model (SRPModel): The SRP model to calculate the availability factor for. - - Returns: - float: The availability factor for the given SRP model. + Bounded-Delay Resource model: + rate – long-term supply rate (0 < rate <= 1) + delay – maximum startup delay """ - # Calculate the total time period - total_time_period = self.model.period - - # Calculate the total resource access time - total_resource_access_time = sum(end - start for start, end in self.model.resource_access) - - # Calculate the availability factor - availability_factor = total_resource_access_time / total_time_period - - return availability_factor + self.rate = rate + self.delay = delay - def get_partition_delay(self) -> float: + def sbf(self, interval: float) -> float: """ - Calculate the partition delay for a given SRP model. - - Args: - model (SRPModel): The SRP model to calculate the partition delay for. - - Returns: - float: The partition delay for the given SRP model. + Supply Bound Function (Eq. 6): + sbf(interval) = 0, if interval < delay + rate * (interval - delay), otherwise """ - partition_delay = 0 - supply_bound_function = self.supply_bound_function - for time, supply in enumerate(supply_bound_function): - if self.availability_factor < supply: - slope = (supply - supply_bound_function[time - 1]) / 1 - intercept = supply - slope * time - partition_delay = (self.availability_factor - intercept) / slope - break + if interval < self.delay: + return 0.0 + return self.rate * (interval - self.delay) - return partition_delay + @staticmethod + def can_schedule_children(parent: "BDR", children: List["BDR"]) -> bool: + """ + Theorem 1: A parent BDR can host these child interfaces iff + 1) sum(child.rate) <= parent.rate + 2) child.delay > parent.delay for every child + """ + if sum(c.rate for c in children) > parent.rate: + return False + if any(c.delay <= parent.delay for c in children): + return False + return True - def get_supply_function(self) -> dict[int, list[int]]: + def supply_task_params(self) -> Tuple[float, float]: """ - Calculate the supply function for a given SRP model. - - Args: - model (SRPModel): The SRP model to calculate the supply function for. - time_interval (int): The time interval to calculate the supply function over. - - Returns: - dict[int, list[int]]: The supply function for the given SRP model. + Theorem 3 (Half-Half): Transform this BDR interface into a periodic supply task. + Returns (budget, period): + period = delay / (2 * (1 - rate)) + budget = rate * period + Special-case: if rate == 1.0, returns (1.0, 1.0) for full CPU. """ - supply_function: dict[int, list[int]] = { - end: [0 for _ in range(self.time_interval)] for start, end in self.model.resource_access - } - starting_time = [end for start, end in self.model.resource_access] - - for end_key in starting_time: - period_tracker = end_key - 1 - for time in range(self.time_interval): - period_tracker += 1 - if period_tracker == self.model.period: - period_tracker = 0 - supply_function[end_key][time] = supply_function[end_key][time - 1] - for start, end in self.model.resource_access: - if start < period_tracker <= end and time != 0: - supply_function[end_key][time] = supply_function[end_key][time - 1] + 1 - break - - return supply_function + if self.rate >= 1.0: + return 1.0, 1.0 + if self.rate == 0.0: + return self.rate, 0.0 + denom = 2.0 * (1.0 - self.rate) + period = self.delay / denom + budget = self.rate * period + return budget, period - def get_sbfSPR(self, supply_function: dict[int, list[int]], time_interval: int) -> list[int]: + @classmethod + def from_tasks(cls, tasks: List, scheduler: Scheduler) -> "BDR": """ - Calculate the supply bound function for a given supply function. - - Args: - supply_function (dict[int, list[int]]): The supply function to calculate the supply bound function for. - - Returns: - list[int]: The supply bound function for the given supply function. + Derive a BDR interface (rate, delay) for a given task set and scheduler. + 1) rate = total utilization = sum(wcet/period) + 2) delay = max(0, min_t [t - dbf(W,t)/rate]) over all critical points + Critical points are multiples of task periods up to the hyperperiod. """ - supply_function = self.supply_function - values = supply_function.values() - supply_bound_function = [0 for _ in range(self.time_interval)] - for i in range(self.time_interval): - supply_bound_function[i] = min([value[i] for value in values]) - return supply_bound_function - - def get_sbfBDR(self) -> list[int]: - sbf = [] - time_interval = np.linspace(0, self.time_interval, 100) - print(time_interval) - for i in time_interval: - value = self.availability_factor*(i - self.partition_delay) - if value <= 0: - sbf.append(0) - else: - sbf.append(value) - return sbf - - - -if __name__ == "__main__": - # Example usage - model = SRPModel(resource_access=[(1, 2), (5, 7)], period=8) - bdr = BDR(model=model, time_interval=26) - - print("Availability Factor:", bdr.get_availability_factor()) - print("Partition Delay:", bdr.get_partition_delay()) - print("Supply Function:", bdr.get_supply_function()) - print("Supply Bound Function:", bdr.get_sbfSPR(bdr.get_supply_function(), 20)) - import matplotlib.pyplot as plt + if not tasks: + return cls(0.0, 0.0) + # compute rate (availability factor) + rate = sum(task.wcet / task.period for task in tasks) + + # sort tasks by priority for RM + if scheduler == Scheduler.RM: + sorted_tasks = sorted(tasks, key=lambda t: t.priority) + periods = [t.period for t in sorted_tasks] + max_period = max(periods) if periods else 0 + # Generate time points up to max_period + time_points = set() + for T in periods: + k = 1 + while (t := k * T) <= max_period: + time_points.add(t) + k += 1 + time_points = sorted(time_points) + else: + # EDF: use hyperperiod + periods = [int(t.period) for t in tasks] + hyper = reduce(lcm, periods, 1) + time_points = sorted({k * T for T in periods for k in range(1, (hyper // T) + 1)}) + + # compute partition delay + if rate > 0: + slacks = [] + for t in time_points: + if scheduler == Scheduler.EDF: + demand = DBF.dbf_edf(tasks, t) + else: + demand = max(DBF.dbf_rm(sorted_tasks, t, idx) + for idx in range(len(sorted_tasks))) + slacks.append(t - demand / rate) + delay = max(0.0, min(slacks)) + else: + delay = 0.0 - # Plot the supply function - supply_function = bdr.get_supply_function() - for end_key, values in supply_function.items(): - plt.plot(range(bdr.time_interval), values, label=f"End Key {end_key}") - plt.plot(range(bdr.time_interval), bdr.get_sbfSPR(supply_function, bdr.time_interval), label="Supply Bound Function", linestyle='--') - plt.plot(bdr.partition_delay, bdr.availability_factor, 'ro', label="Partition Delay") - plt.plot(np.linspace(0, 26, 100), bdr.sbf_bdr, label="SBF BDR", linestyle='--') - plt.title("Supply Function") - plt.xlabel("Time") - plt.ylabel("Supply") - plt.legend() - plt.grid(True) - plt.show() \ No newline at end of file + return cls(rate, delay) diff --git a/src/common/DBF.py b/src/common/DBF.py index a0e0d0a..4c1daa4 100644 --- a/src/common/DBF.py +++ b/src/common/DBF.py @@ -1,53 +1,46 @@ -from abc import ABC, abstractmethod -from common.task import Task +# common/DBF.py -class DBF(): - def __init__(self, tasks: list[Task], time_interval: float, explicit_dead_line: float = 0): - self.explicit_dead_line = explicit_dead_line - self.tasks = tasks - self.time_interval = time_interval - @abstractmethod - def getDBS(self) -> float: ... - -class DBF_EDF(DBF): - def __init__(self, tasks: list[Task], time_interval: float, explicit_dead_line: float = 0): - super().__init__(tasks, time_interval, explicit_dead_line) - - def getDBS(self) -> float: +import math +from typing import Sequence + +class DBF: + @staticmethod + def dbf_edf(tasks: Sequence, interval: float) -> float: + """ + Demand Bound Function under EDF for implicit-deadline tasks: + dbfEDF(W, interval) = sum(floor(interval / period) * wcet) """ - Calculate the Demand Bound Function (DBF) for a set of tasks. - The DBF is computed as the sum of the workload contributions of all tasks - within a specified time interval. Each task contributes to the DBF based - on its worst-case execution time (WCET) and period. - Returns: - float: The computed DBF value for the set of tasks. + total_demand = 0.0 + for task in tasks: + executions = math.floor(interval / task.period) + total_demand += executions * task.wcet + return total_demand + + @staticmethod + def dbf_edf_explicit(tasks: Sequence, interval: float) -> float: """ - - dbs: float = 0 - for task in self.tasks: - num = (self.time_interval + (task.period * self.explicit_dead_line) - self.explicit_dead_line) - dbs += (num / task.period) * task.wcet - return dbs - -class DBF_RM(DBF): - def __init__(self, tasks: list[Task], time_interval: float, task_index: int): - self.task_index = task_index - super().__init__(tasks, time_interval) - - def getDBS(self) -> float: + Demand Bound Function under EDF for explicit-deadline tasks: + dbfEDF(W, interval) = sum(floor((interval + period - deadline) / period) * wcet) + """ + total_demand = 0.0 + for task in tasks: + deadline = getattr(task, 'deadline', task.period) + job_count = math.floor((interval + task.period - deadline) / task.period) + if job_count > 0: + total_demand += job_count * task.wcet + return total_demand + + @staticmethod + def dbf_rm(tasks: Sequence, interval: float, index: int) -> float: """ - Calculate the Demand Bound Function (DBF) for a set of tasks. - The DBF is computed as the sum of the workload contributions of all tasks - within a specified time interval. Each task contributes to the DBF based - on its worst-case execution time (WCET) and period. - Returns: - float: The computed DBF value for the set of tasks. + Demand Bound Function under RM/DM for the task at 'index': + dbfRM(W, interval, i) = wcet_i + sum(ceil(interval / period_k) * wcet_k) + for all tasks k with higher priority (lower index). """ - - my_task = self.tasks[self.task_index] - higher_priority_tasks = [task for task in self.tasks if task.priority > self.tasks[self.task_index].priority] - dbs: float = my_task.wcet - for task in higher_priority_tasks: - dbs += (self.time_interval / task.period) * task.wcet - return dbs - \ No newline at end of file + # demand from the task itself + demand = tasks[index].wcet + # plus interference from all higher-priority tasks + for higher in tasks[:index]: + invocations = math.ceil(interval / higher.period) + demand += invocations * higher.wcet + return demand diff --git a/src/common/budget.py b/src/common/budget.py index 0e6a6fb..8cd2596 100644 --- a/src/common/budget.py +++ b/src/common/budget.py @@ -6,7 +6,7 @@ class Budget: component_id: str scheduler:Scheduler - budget:int - period:int + budget:float + period:float core_id:int priority:int|None diff --git a/src/common/csvreader.py b/src/common/csvreader.py index 9b389fb..5c36230 100644 --- a/src/common/csvreader.py +++ b/src/common/csvreader.py @@ -61,8 +61,8 @@ def read_tasks(csv:str)-> list[Task]: for _,row in df.iterrows(): task = Task( task_name=row['task_name'], - wcet=row['wcet'], - period=row['period'], + wcet=(row['wcet']), + period=(row['period']), component_id=row['component_id'], priority=row['priority'], ) @@ -84,12 +84,6 @@ def read_csv() -> tuple[list[Architecture], list[Budget], list[Task]]: budgets = read_budgets(budget_file) tasks = read_tasks(tasks_file) - print(f"Successfully read architectures: {architectures}") - print(f"Successfully read budgets: {budgets}") - print(f"Successfully read tasks: {tasks}") - - # Add your analysis code here - except FileNotFoundError as e: print(f"Error: File not found - {e}") sys.exit(1) diff --git a/src/common/task.py b/src/common/task.py index 93fe38e..b92f52a 100644 --- a/src/common/task.py +++ b/src/common/task.py @@ -3,8 +3,8 @@ @dataclass class Task: task_name:str - wcet:int - period:int + wcet:float + period:float component_id:str priority:int|None From bafc7219c633f3bdd0abca2ef3d3e7292f4f6d5f Mon Sep 17 00:00:00 2001 From: Christopher Date: Sun, 11 May 2025 16:14:34 +0200 Subject: [PATCH 3/4] chore: Perfecly cooked analysis tool --- src/analysis.py | 273 +++++++++++++++++++++++++--------------------- src/common/BDR.py | 49 +-------- 2 files changed, 148 insertions(+), 174 deletions(-) diff --git a/src/analysis.py b/src/analysis.py index c555960..564c68e 100644 --- a/src/analysis.py +++ b/src/analysis.py @@ -15,156 +15,176 @@ def lcm(a: int, b: int) -> int: def adjust_wcet(tasks, budgets, architectures): + # Adjust WCET by core speed factor (scaling per architecture) core_speed_map = {arch.core_id: arch.speed_factor for arch in architectures} component_core_map = {budget.component_id: budget.core_id for budget in budgets} - for task in tasks: core_id = component_core_map[task.component_id] - task.wcet = task.wcet / core_speed_map[core_id] # Adjust WCET + task.wcet = task.wcet / core_speed_map[core_id] return tasks def group_tasks_by_component(tasks, budgets): + # Group tasks into their components based on budgets.csv components = {} for budget in budgets: comp_id = budget.component_id comp_tasks = [t for t in tasks if t.component_id == comp_id] - + # Sort for RM priority order + if budget.scheduler == Scheduler.RM: + comp_tasks.sort(key=lambda t: t.priority) components[comp_id] = { - "tasks": comp_tasks, - "scheduler": budget.scheduler, - "core_id": budget.core_id, - "period": budget.period, - "budget": budget.budget, - "priority": budget.priority, - "bdr": None, - "supply": None, - "schedulable": True, + 'tasks': comp_tasks, + 'scheduler': budget.scheduler, + 'core_id': budget.core_id, + 'budget': budget.budget, # Q from PRM + 'period': budget.period, # P from PRM + 'schedulable': False, } return components -def analyze_components(components): - def calculate_bdr(comp): - """Calculate BDR interface using PRM-to-BDR conversion or task-derived values""" - if comp["budget"] and comp["period"]: - alpha = comp["budget"] / comp["period"] - delta = 2 * (comp["period"] - comp["budget"]) - return BDR(alpha, delta) - return BDR.from_tasks(comp["tasks"], comp["scheduler"]) - - def get_time_points(tasks, scheduler, task_idx=None): - """Get critical time points for schedulability analysis""" - if scheduler == Scheduler.RM: - # RM: Check up to current task's period with higher-priority tasks - periods = [t.period for t in tasks[:task_idx+1]] - max_t = tasks[task_idx].period - return sorted({k*T for T in periods for k in range(1, int(max_t//T)+1)}) - - # EDF: Use hyperperiod - periods = [t.period for t in tasks] - hyper = reduce(lcm, periods) - return sorted({k*T for T in periods for k in range(1, hyper//T + 1)}) - - def is_schedulable(task_idx, tasks, bdr, scheduler): - """Check if task at index meets demand <= supply at all critical points""" - time_points = get_time_points(tasks, scheduler, task_idx) - for t in time_points: - demand = DBF.dbf_edf(tasks, t) if scheduler == Scheduler.EDF \ - else DBF.dbf_rm(tasks, t, task_idx) - if demand > bdr.sbf(t): - return False - return True - - # Main analysis flow + +def check_component_schedulability(components): + """ + For each component, check local schedulability under its PRM budget: + - Convert PRM (Q,P) to a conservative BDR lower-bound via Half-Half (Theorem 3): rate=Q/P, delay=2*(P−Q) + - Use Supply Bound Function sbf_BDR (Eq. 6) for supply.sbf(t) + - Use Demand Bound Functions: + • RM: dbf_rm(W,t,i) (Eq. 4) + • EDF: dbf_edf(W,t) (Eq. 2) + - For both schedulers, generate all critical points t = k·T_j up to each component's max deadline. + Then apply the tests: + RM: ∀τ_i ∃ t ≤ T_i such that dbf_rm(W,t,i) ≤ sbf(t) + EDF: ∀ t ≥ 0 dbf_edf(W,t) ≤ sbf(t) + """ for comp in components.values(): - # 1. Calculate BDR interface and supply parameters - comp["bdr"] = calculate_bdr(comp) - comp["supply"] = comp["bdr"].supply_task_params() - - # 2. Check schedulability for all tasks - tasks = comp["tasks"] - all_schedulable = True - - for idx, task in enumerate(tasks): - task.schedulable = is_schedulable(idx, tasks, comp["bdr"], comp["scheduler"]) - all_schedulable &= task.schedulable - - comp["schedulable"] = all_schedulable + tasks = comp['tasks'] + sched = comp['scheduler'] + Q, P = comp['budget'], comp['period'] + supply = BDR(rate=Q/P, delay=2*(P-Q)) # Theorem 3 + # Build global critical points: multiples of all periods + periods = [t.period for t in tasks] + max_deadline = max(periods) if periods else 0 + time_points = set() + for T in periods: + k = 1 + while k * T <= max_deadline: + time_points.add(k * T) + k += 1 + time_points = sorted(time_points) + + ok = True + if sched == Scheduler.RM: + # For each task i, need ∃ t ≤ T_i s.t. dbf_rm(W,t,i) ≤ sbf(t) + for idx, task in enumerate(tasks): + Ti = task.period + found = False + for t in time_points: + if t > Ti: + break + demand = DBF.dbf_rm(tasks, t, idx) # Eq.4 + if demand <= supply.sbf(t): # Eq.6 + found = True + break + if not found: + ok = False + break + else: + # EDF: ∀ t, dbf_edf(W,t) ≤ sbf(t) + for t in time_points: + demand = DBF.dbf_edf(tasks, t) # Eq.2 + if demand > supply.sbf(t): # Eq.6 + ok = False + break + + # Also ensure every individual task meets its deadline under this supply + task_checks = [] + for idx, task in enumerate(tasks): + if sched == Scheduler.RM: + demand = DBF.dbf_rm(tasks, task.period, idx) + else: + demand = DBF.dbf_edf(tasks, task.period) + task_checks.append(demand <= supply.sbf(task.period)) + comp['schedulable'] = ok and all(task_checks) return components -def hierarchical_check_by_core(components): - # Helper to mark component and tasks as unschedulable - def mark_unschedulable(components): - for comp in components: - comp["schedulable"] = False - for task in comp["tasks"]: - task.schedulable = False - +def summarize_by_core(components, architectures): + """ + At system level, apply Theorem 1 for BDR composition via can_schedule_children: + - Parent = full-CPU BDR(rate=1.0, delay=0.0) + - Children = list of BDR(rate=Q/P, delay=2*(P-Q)) for each component on the core + - Also ensure each component passed its local schedulability check + """ # Group components per core - core_map = {} + core_map = {arch.core_id: [] for arch in architectures} for comp in components.values(): - core_map.setdefault(comp["core_id"], []).append(comp) - - for core_id, comps in core_map.items(): - # Create supply tasks - supply_tasks = [ - (type('SupplyTask', (), { - 'wcet': comp["supply"][0], - 'period': comp["supply"][1], - 'deadline': comp["supply"][1], - 'priority': comp["priority"] if comp["scheduler"] == Scheduler.RM else None - }), comp) - for comp in comps - ] - - # Check Theorem 1 (BDR interface compatibility) - parent_bdr = BDR(1.0, 0.0) # Full CPU resource - child_bdrs = [comp["bdr"] for comp in comps] - if not BDR.can_schedule_children(parent_bdr, child_bdrs): - mark_unschedulable([comp for _, comp in supply_tasks]) - continue - - # Get core-level scheduler (default EDF) - core_sched = comps[0].get('core_scheduler', Scheduler.EDF) - - # EDF: Utilization test - if core_sched == Scheduler.EDF: - utilization = sum(t.wcet/t.period for t, _ in supply_tasks) - if utilization > 1.0: - mark_unschedulable([comp for _, comp in supply_tasks]) - - # RM: DBF test at deadlines + core_map[comp['core_id']].append(comp) + + core_summary = {} + for core_id, comps_on_core in core_map.items(): + # Build BDR interfaces for each child component + child_bdrs = [BDR(rate=comp['budget']/comp['period'], + delay=2*(comp['period']-comp['budget'])) + for comp in comps_on_core] + # Parent BDR representing full CPU + parent_bdr = BDR(rate=1.0, delay=0.0) + + # Theorem 1: compositional check using helper + # Special-case: if parent delay == 0, allow child.delay >= 0 + if parent_bdr.delay == 0.0: + compositional_ok = sum(c.rate for c in child_bdrs) <= parent_bdr.rate else: - sorted_tasks = sorted(supply_tasks, key=lambda x: x[0].priority) - for idx, (task, comp) in enumerate(sorted_tasks): - if DBF.dbf_rm([t[0] for t in sorted_tasks], task.period, idx) > task.period: - mark_unschedulable([comp]) + compositional_ok = BDR.can_schedule_children(parent_bdr, child_bdrs) - return components + # Ensure each component's own schedulability + all_children_ok = all(comp['schedulable'] for comp in comps_on_core) + core_summary[core_id] = compositional_ok and all_children_ok + return core_summary -def write_solution(components, filename="solution.csv"): - with open(filename, 'w', newline='') as f: - writer = csv.writer(f) - writer.writerow(['task_name', 'component_id', 'task_schedulable', 'component_schedulable']) - for comp_id, comp in components.items(): - comp_ok = 1 if comp['schedulable'] else 0 - for t in comp['tasks']: - task_ok = 1 if getattr(t, 'schedulable', False) else 0 - writer.writerow([t.task_name, comp_id, task_ok, comp_ok]) - -def print_results(components): +def output_report(components, core_summary): + # Produce console output only (no file) for comp_id, comp in components.items(): - print(f"Component {comp_id}: schedulable = {comp['schedulable']}") - print(f" BDR interface (alpha,delta) = ({comp['bdr'].rate:.4f}, {comp['bdr'].delay:.4f})") - print(f" Supply task (Q,P) = ({comp['supply'][0]:.4f}, {comp['supply'][1]:.4f})") - for t in comp['tasks']: - print(f" Task {t.task_name:20} schedulable = {t.schedulable}") - print() # Add a newline after each component - print() + Q, P = comp['budget'], comp['period'] + rate = Q/P # PRM bandwidth + delay = 2*(P-Q) # BDR startup delay + label = 'Schedulable' if comp['schedulable'] else 'Not schedulable' + print(f"Component {comp_id} (Core {comp['core_id']}, {comp['scheduler'].name}): " + f"PRM_sup=(Q={Q},P={P}), BLB(rate={rate:.4f},delay={delay:.2f}) - {label}") + + print('\nCore-level Summary:') + for core_id, ok in core_summary.items(): + core_stat = 'Schedulable' if ok else 'Not schedulable' + print(f"Core {core_id}: {core_stat}") + + +def write_solution_csv(tasks, components, filename='solution.csv'): # noqa: E302(tasks, components, filename='solution.csv'):(tasks, components, filename='solution.csv'): + # CSV with task- and component-level results + rows = [] + for cid, comp in components.items(): + supply = BDR(rate=comp['budget']/comp['period'], delay=2*(comp['period']-comp['budget'])) + for task in comp['tasks']: + if comp['scheduler'] == Scheduler.RM: + idx = comp['tasks'].index(task) + demand = DBF.dbf_rm(comp['tasks'], task.period, idx) # Eq.4 + else: + demand = DBF.dbf_edf(comp['tasks'], task.period) # Eq.2 + rows.append({ + 'task_name': task.task_name, + 'component_id': cid, + 'task_schedulable': int(demand <= supply.sbf(task.period)), + 'component_schedulable': int(comp['schedulable']) + }) + with open(filename, 'w', newline='') as f: + writer = csv.DictWriter(f, fieldnames=[ + 'task_name','component_id','task_schedulable','component_schedulable' + ]) + writer.writeheader() + for r in rows: + writer.writerow(r) def main(): @@ -176,12 +196,13 @@ def main(): tasks = adjust_wcet(tasks, budgets, architectures) components = group_tasks_by_component(tasks, budgets) - components = analyze_components(components) - components = hierarchical_check_by_core(components) - - print_results(components) - write_solution(components) + # Local component checks + components = check_component_schedulability(components) + # Global core summaries + core_summary = summarize_by_core(components, architectures) + output_report(components, core_summary) + write_solution_csv(tasks, components) if __name__ == '__main__': main() diff --git a/src/common/BDR.py b/src/common/BDR.py index 4b9d2dd..dbee84c 100644 --- a/src/common/BDR.py +++ b/src/common/BDR.py @@ -46,7 +46,7 @@ def can_schedule_children(parent: "BDR", children: List["BDR"]) -> bool: return False return True - def supply_task_params(self) -> Tuple[float, float]: + def supply_task_params(self) -> Tuple[float, float]: """ Theorem 3 (Half-Half): Transform this BDR interface into a periodic supply task. Returns (budget, period): @@ -63,50 +63,3 @@ def supply_task_params(self) -> Tuple[float, float]: budget = self.rate * period return budget, period - @classmethod - def from_tasks(cls, tasks: List, scheduler: Scheduler) -> "BDR": - """ - Derive a BDR interface (rate, delay) for a given task set and scheduler. - 1) rate = total utilization = sum(wcet/period) - 2) delay = max(0, min_t [t - dbf(W,t)/rate]) over all critical points - Critical points are multiples of task periods up to the hyperperiod. - """ - if not tasks: - return cls(0.0, 0.0) - # compute rate (availability factor) - rate = sum(task.wcet / task.period for task in tasks) - - # sort tasks by priority for RM - if scheduler == Scheduler.RM: - sorted_tasks = sorted(tasks, key=lambda t: t.priority) - periods = [t.period for t in sorted_tasks] - max_period = max(periods) if periods else 0 - # Generate time points up to max_period - time_points = set() - for T in periods: - k = 1 - while (t := k * T) <= max_period: - time_points.add(t) - k += 1 - time_points = sorted(time_points) - else: - # EDF: use hyperperiod - periods = [int(t.period) for t in tasks] - hyper = reduce(lcm, periods, 1) - time_points = sorted({k * T for T in periods for k in range(1, (hyper // T) + 1)}) - - # compute partition delay - if rate > 0: - slacks = [] - for t in time_points: - if scheduler == Scheduler.EDF: - demand = DBF.dbf_edf(tasks, t) - else: - demand = max(DBF.dbf_rm(sorted_tasks, t, idx) - for idx in range(len(sorted_tasks))) - slacks.append(t - demand / rate) - delay = max(0.0, min(slacks)) - else: - delay = 0.0 - - return cls(rate, delay) From 7fbde5b5c501f4cd56209f46c96913f634d4476c Mon Sep 17 00:00:00 2001 From: Christopher Date: Sun, 11 May 2025 16:21:34 +0200 Subject: [PATCH 4/4] chore: Added .vscode for easy debugging on testcases --- .vscode/launch.json | 126 ++++++++++++++++++++++++++++++++++++++++++ .vscode/settings.json | 3 + 2 files changed, 129 insertions(+) create mode 100644 .vscode/launch.json create mode 100644 .vscode/settings.json diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..2a59140 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,126 @@ +{ + "version": "0.2.0", + "configurations": [ + { + "name": "Python Debugger: 1-tiny-test-case", + "type": "debugpy", + "request": "launch", + "program": "src/analysis.py", + "console": "integratedTerminal", + "args": [ + "data/testcases/1-tiny-test-case/architecture.csv", + "data/testcases/1-tiny-test-case/budgets.csv", + "data/testcases/1-tiny-test-case/tasks.csv" + ] + }, + { + "name": "Python Debugger: 2-small-test-case", + "type": "debugpy", + "request": "launch", + "program": "src/analysis.py", + "console": "integratedTerminal", + "args": [ + "data/testcases/2-small-test-case/architecture.csv", + "data/testcases/2-small-test-case/budgets.csv", + "data/testcases/2-small-test-case/tasks.csv" + ] + }, + { + "name": "Python Debugger: 3-medium-test-case", + "type": "debugpy", + "request": "launch", + "program": "src/analysis.py", + "console": "integratedTerminal", + "args": [ + "data/testcases/3-medium-test-case/architecture.csv", + "data/testcases/3-medium-test-case/budgets.csv", + "data/testcases/3-medium-test-case/tasks.csv" + ] + }, + { + "name": "Python Debugger: 4-large-test-case", + "type": "debugpy", + "request": "launch", + "program": "src/analysis.py", + "console": "integratedTerminal", + "args": [ + "data/testcases/4-large-test-case/architecture.csv", + "data/testcases/4-large-test-case/budgets.csv", + "data/testcases/4-large-test-case/tasks.csv" + ] + }, + { + "name": "Python Debugger: 5-huge-test-case", + "type": "debugpy", + "request": "launch", + "program": "src/analysis.py", + "console": "integratedTerminal", + "args": [ + "data/testcases/5-huge-test-case/architecture.csv", + "data/testcases/5-huge-test-case/budgets.csv", + "data/testcases/5-huge-test-case/tasks.csv" + ] + }, + { + "name": "Python Debugger: 6-gigantic-test-case", + "type": "debugpy", + "request": "launch", + "program": "src/analysis.py", + "console": "integratedTerminal", + "args": [ + "data/testcases/6-gigantic-test-case/architecture.csv", + "data/testcases/6-gigantic-test-case/budgets.csv", + "data/testcases/6-gigantic-test-case/tasks.csv" + ] + }, + { + "name": "Python Debugger: 7-unschedulable-test-case", + "type": "debugpy", + "request": "launch", + "program": "src/analysis.py", + "console": "integratedTerminal", + "args": [ + "data/testcases/7-unschedulable-test-case/architecture.csv", + "data/testcases/7-unschedulable-test-case/budgets.csv", + "data/testcases/7-unschedulable-test-case/tasks.csv" + ] + }, + { + "name": "Python Debugger: 8-unschedulable-test-case", + "type": "debugpy", + "request": "launch", + "program": "src/analysis.py", + "console": "integratedTerminal", + "args": [ + "data/testcases/8-unschedulable-test-case/architecture.csv", + "data/testcases/8-unschedulable-test-case/budgets.csv", + "data/testcases/8-unschedulable-test-case/tasks.csv" + ] + }, + { + "name": "Python Debugger: 9-unschedulable-test-case", + "type": "debugpy", + "request": "launch", + "program": "src/analysis.py", + "console": "integratedTerminal", + "args": [ + "data/testcases/9-unschedulable-test-case/architecture.csv", + "data/testcases/9-unschedulable-test-case/budgets.csv", + "data/testcases/9-unschedulable-test-case/tasks.csv" + ] + }, + { + "name": "Python Debugger: 10-unschedulable-test-case", + "type": "debugpy", + "request": "launch", + "program": "src/analysis.py", + "console": "integratedTerminal", + "args": [ + "data/testcases/10-unschedulable-test-case/architecture.csv", + "data/testcases/10-unschedulable-test-case/budgets.csv", + "data/testcases/10-unschedulable-test-case/tasks.csv" + ] + } + + ] +} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..1ad1dee --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "idf.pythonInstallPath": "/usr/bin/python3" +} \ No newline at end of file