Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
373 changes: 373 additions & 0 deletions scheduling/cpu_scheduling_interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,373 @@
import copy
import threading
import time
import tkinter as tk
from tkinter import messagebox, ttk

import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from typing import Generator

Check failure on line 9 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (UP035)

scheduling/cpu_scheduling_interface.py:9:1: UP035 Import from `collections.abc` instead: `Generator`

Check failure on line 9 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (I001)

scheduling/cpu_scheduling_interface.py:1:1: I001 Import block is un-sorted or un-formatted

Check failure on line 9 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (UP035)

scheduling/cpu_scheduling_interface.py:9:1: UP035 Import from `collections.abc` instead: `Generator`

Check failure on line 9 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (I001)

scheduling/cpu_scheduling_interface.py:1:1: I001 Import block is un-sorted or un-formatted

Check failure on line 9 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (UP035)

scheduling/cpu_scheduling_interface.py:9:1: UP035 Import from `collections.abc` instead: `Generator`

Check failure on line 9 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (I001)

scheduling/cpu_scheduling_interface.py:1:1: I001 Import block is un-sorted or un-formatted

Check failure on line 9 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (UP035)

scheduling/cpu_scheduling_interface.py:9:1: UP035 Import from `collections.abc` instead: `Generator`

Check failure on line 9 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (I001)

scheduling/cpu_scheduling_interface.py:1:1: I001 Import block is un-sorted or un-formatted


# ===================== Scheduler Engine ===================== #
class SchedulerEngine:
def __init__(self, processes: list[dict], algorithm: str, quantum: int = 2) -> None:
"""
Initializes the scheduler engine.

>>> processes = [{"pid": "P1", "arrival": 0, "burst": 4}]
>>> engine = SchedulerEngine(processes, "FCFS")
>>> isinstance(engine, SchedulerEngine)
True
"""
self.original = copy.deepcopy(processes)
self.processes = [p.copy() for p in processes]
self.algorithm = algorithm
self.quantum = quantum
for process in self.processes:
process["remaining"] = process["burst"]
self.timeline: list[tuple[int, str]] = [] # [(time, pid)]
self.stats: list[tuple] = []

def simulate(self) -> Generator[tuple[int, str | None, list[str]], None, None]:

Check failure on line 32 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (UP043)

scheduling/cpu_scheduling_interface.py:32:27: UP043 Unnecessary default type arguments

Check failure on line 32 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (UP043)

scheduling/cpu_scheduling_interface.py:32:27: UP043 Unnecessary default type arguments

Check failure on line 32 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (UP043)

scheduling/cpu_scheduling_interface.py:32:27: UP043 Unnecessary default type arguments

Check failure on line 32 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (UP043)

scheduling/cpu_scheduling_interface.py:32:27: UP043 Unnecessary default type arguments
"""
Runs the selected CPU scheduling algorithm.

>>> processes = [{"pid": "P1", "arrival": 0, "burst": 2}]
>>> engine = SchedulerEngine(processes, "FCFS")
>>> list(engine.simulate())[:2]
[(0, 'P1', []), (1, 'P1', [])]
"""
algo = self.algorithm.lower()
if algo == "fcfs":
yield from self._simulate_fcfs()
elif algo == "sjf (non-preemptive)":
yield from self._simulate_sjf_np()
elif algo == "sjf (preemptive)":
yield from self._simulate_sjf_p()
elif algo == "priority (non-preemptive)":
yield from self._simulate_priority_np()
elif algo == "priority (preemptive)":
yield from self._simulate_priority_p()
elif algo == "round robin":
yield from self._simulate_rr()
self._calculate_stats()

# first come first serve
def _simulate_fcfs(self) -> Generator[tuple[int, str, list[str]], None, None]:

Check failure on line 57 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (UP043)

scheduling/cpu_scheduling_interface.py:57:33: UP043 Unnecessary default type arguments

Check failure on line 57 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (UP043)

scheduling/cpu_scheduling_interface.py:57:33: UP043 Unnecessary default type arguments

Check failure on line 57 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (UP043)

scheduling/cpu_scheduling_interface.py:57:33: UP043 Unnecessary default type arguments

Check failure on line 57 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (UP043)

scheduling/cpu_scheduling_interface.py:57:33: UP043 Unnecessary default type arguments
"""
Simulates First Come First Serve scheduling.

>>> processes = [{"pid": "P1", "arrival": 0, "burst": 2}]
>>> engine = SchedulerEngine(processes, "FCFS")
>>> next(engine._simulate_fcfs())
(0, 'P1', [])
"""
t = 0
processes = sorted(self.processes, key=lambda process: process["arrival"])
for process in processes:
t = max(t, process["arrival"])
for _ in range(process["burst"]):
self.timeline.append((t, process["pid"]))
yield (t, process["pid"], [])
t += 1
process["completion"] = t

# shortest job first non preemptive
def _simulate_sjf_np(self) -> Generator[tuple[int, str | None, list[str]], None, None]:

Check failure on line 77 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E501)

scheduling/cpu_scheduling_interface.py:77:89: E501 Line too long (91 > 88)

Check failure on line 77 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (UP043)

scheduling/cpu_scheduling_interface.py:77:35: UP043 Unnecessary default type arguments

Check failure on line 77 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E501)

scheduling/cpu_scheduling_interface.py:77:89: E501 Line too long (91 > 88)

Check failure on line 77 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (UP043)

scheduling/cpu_scheduling_interface.py:77:35: UP043 Unnecessary default type arguments

Check failure on line 77 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E501)

scheduling/cpu_scheduling_interface.py:77:89: E501 Line too long (91 > 88)

Check failure on line 77 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (UP043)

scheduling/cpu_scheduling_interface.py:77:35: UP043 Unnecessary default type arguments

Check failure on line 77 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E501)

scheduling/cpu_scheduling_interface.py:77:89: E501 Line too long (91 > 88)

Check failure on line 77 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (UP043)

scheduling/cpu_scheduling_interface.py:77:35: UP043 Unnecessary default type arguments
"""
Simulates Shortest Job First (Non-Preemptive).

>>> processes = [{"pid": "P1", "arrival": 0, "burst": 2}]
>>> engine = SchedulerEngine(processes, "SJF (Non-Preemptive)")
>>> isinstance(engine._simulate_sjf_np(), Generator)
True
"""
t = 0
processes = sorted(self.processes, key=lambda process: process["arrival"])
done = 0
while done < len(processes):
ready = [process for process in processes if process["arrival"] <= t and "completion" not in process]

Check failure on line 90 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E501)

scheduling/cpu_scheduling_interface.py:90:89: E501 Line too long (113 > 88)

Check failure on line 90 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E501)

scheduling/cpu_scheduling_interface.py:90:89: E501 Line too long (113 > 88)

Check failure on line 90 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E501)

scheduling/cpu_scheduling_interface.py:90:89: E501 Line too long (113 > 88)

Check failure on line 90 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E501)

scheduling/cpu_scheduling_interface.py:90:89: E501 Line too long (113 > 88)
if not ready:
t += 1
yield (t, None, [])
continue
process = min(ready, key=lambda x: x["burst"])
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide descriptive name for the parameter: x

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide descriptive name for the parameter: x

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide descriptive name for the parameter: x

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide descriptive name for the parameter: x

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide descriptive name for the parameter: x

for _ in range(process["burst"]):
self.timeline.append((t, process["pid"]))
yield (t, process["pid"], [])
t += 1
process["completion"] = t
done += 1

# shortest job first preemptive
def _simulate_sjf_p(self) -> Generator[tuple[int, str | None, list[str]], None, None]:

Check failure on line 104 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E501)

scheduling/cpu_scheduling_interface.py:104:89: E501 Line too long (90 > 88)

Check failure on line 104 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (UP043)

scheduling/cpu_scheduling_interface.py:104:34: UP043 Unnecessary default type arguments

Check failure on line 104 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E501)

scheduling/cpu_scheduling_interface.py:104:89: E501 Line too long (90 > 88)

Check failure on line 104 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (UP043)

scheduling/cpu_scheduling_interface.py:104:34: UP043 Unnecessary default type arguments

Check failure on line 104 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E501)

scheduling/cpu_scheduling_interface.py:104:89: E501 Line too long (90 > 88)

Check failure on line 104 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (UP043)

scheduling/cpu_scheduling_interface.py:104:34: UP043 Unnecessary default type arguments

Check failure on line 104 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E501)

scheduling/cpu_scheduling_interface.py:104:89: E501 Line too long (90 > 88)

Check failure on line 104 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (UP043)

scheduling/cpu_scheduling_interface.py:104:34: UP043 Unnecessary default type arguments
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function _simulate_sjf_p

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function _simulate_sjf_p

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function _simulate_sjf_p

"""Simulates SJF Preemptive scheduling."""
t = 0
processes = sorted(self.processes, key=lambda process: process["arrival"])
done = 0
while done < len(processes):
ready = [process for process in processes if process["arrival"] <= t and process["remaining"] > 0]

Check failure on line 110 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E501)

scheduling/cpu_scheduling_interface.py:110:89: E501 Line too long (110 > 88)

Check failure on line 110 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E501)

scheduling/cpu_scheduling_interface.py:110:89: E501 Line too long (110 > 88)

Check failure on line 110 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E501)

scheduling/cpu_scheduling_interface.py:110:89: E501 Line too long (110 > 88)

Check failure on line 110 in scheduling/cpu_scheduling_interface.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E501)

scheduling/cpu_scheduling_interface.py:110:89: E501 Line too long (110 > 88)
if not ready:
t += 1
yield (t, None, [])
continue
process = min(ready, key=lambda x: x["remaining"])
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide descriptive name for the parameter: x

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide descriptive name for the parameter: x

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide descriptive name for the parameter: x

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide descriptive name for the parameter: x

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide descriptive name for the parameter: x

self.timeline.append((t, process["pid"]))
yield (t, process["pid"], [])
process["remaining"] -= 1
if process["remaining"] == 0:
process["completion"] = t + 1
done += 1
t += 1

# priority non preemptive
def _simulate_priority_np(self) -> Generator[tuple[int, str | None, list[str]], None, None]:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function _simulate_priority_np

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function _simulate_priority_np

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function _simulate_priority_np

"""Simulates Priority (Non-Preemptive) scheduling."""
t = 0
done = 0
while done < len(self.processes):
ready = [process for process in self.processes if process["arrival"] <= t and "completion" not in process]
if not ready:
t += 1
yield (t, None, [])
continue
process = min(ready, key=lambda x: x["priority"])
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide descriptive name for the parameter: x

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide descriptive name for the parameter: x

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide descriptive name for the parameter: x

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide descriptive name for the parameter: x

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide descriptive name for the parameter: x

for _ in range(process["burst"]):
self.timeline.append((t, process["pid"]))
yield (t, process["pid"], [])
t += 1
process["completion"] = t
done += 1

# priority preemptive
def _simulate_priority_p(self) -> Generator[tuple[int, str | None, list[str]], None, None]:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function _simulate_priority_p

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function _simulate_priority_p

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function _simulate_priority_p

"""Simulates Priority (Preemptive) scheduling."""
t = 0
done = 0
while done < len(self.processes):
ready = [process for process in self.processes if process["arrival"] <= t and process["remaining"] > 0]
if not ready:
t += 1
yield (t, None, [])
continue
process = min(ready, key=lambda x: x["priority"])
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide descriptive name for the parameter: x

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide descriptive name for the parameter: x

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide descriptive name for the parameter: x

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide descriptive name for the parameter: x

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide descriptive name for the parameter: x

self.timeline.append((t, process["pid"]))
yield (t, process["pid"], [])
process["remaining"] -= 1
if process["remaining"] == 0:
process["completion"] = t + 1
done += 1
t += 1

# round robin
def _simulate_rr(self) -> Generator[tuple[int, str | None, list[str]], None, None]:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function _simulate_rr

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function _simulate_rr

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function _simulate_rr

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function _simulate_rr

"""Simulates Round Robin scheduling."""
t = 0
q: list[dict] = []
processes = sorted(self.processes, key=lambda process: process["arrival"])
i = 0
done = 0
while done < len(processes):
while i < len(processes) and processes[i]["arrival"] <= t:
q.append(processes[i])
i += 1
if not q:
t += 1
yield (t, None, [])
continue
process = q.pop(0)
burst = min(self.quantum, process["remaining"])
for _ in range(burst):
self.timeline.append((t, process["pid"]))
yield (t, process["pid"], [x["pid"] for x in q])
t += 1
process["remaining"] -= 1
while i < len(processes) and processes[i]["arrival"] <= t:
q.append(processes[i])
i += 1
if process["remaining"] > 0:
q.append(process)
else:
process["completion"] = t
done += 1

def _calculate_stats(self) -> None:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function _calculate_stats

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function _calculate_stats

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function _calculate_stats

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function _calculate_stats

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function _calculate_stats

"""Calculates turnaround, waiting, and response times."""
for process in self.processes:
pid = process["pid"]
arrival = process["arrival"]
burst = process["burst"]
completion = process["completion"]
first_exec = next((t for t, pid2 in self.timeline if pid2 == pid), arrival)
tat = completion - arrival
wt = tat - burst
rt = first_exec - arrival
self.stats.append((pid, arrival, burst, completion, tat, wt, rt))


# ===================== Interface ===================== #
class CPUSchedulerGUI:
def __init__(self, root: tk.Tk) -> None:
"""Initializes the GUI window."""
self.root = root
self.root.title("CPU Scheduling Visualizer")
self.root.geometry("1000x700")
self.processes: list[dict] = []
self.setup_ui()

def setup_ui(self) -> None:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function setup_ui

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function setup_ui

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function setup_ui

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function setup_ui

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function setup_ui

"""Sets up GUI widgets."""
top_frame = ttk.Frame(self.root)
top_frame.pack(pady=10)

self.tree = ttk.Treeview(
top_frame,
columns=("pid", "arrival", "burst", "priority"),
show="headings",
)
for col in self.tree["columns"]:
self.tree.heading(col, text=col.capitalize())
self.tree.pack(side="left")

form = ttk.Frame(top_frame)
form.pack(side="left", padx=10)
ttk.Label(form, text="PID").grid(row=0, column=0)
ttk.Label(form, text="Arrival").grid(row=1, column=0)
ttk.Label(form, text="Burst").grid(row=2, column=0)
ttk.Label(form, text="Priority").grid(row=3, column=0)
self.pid_e = ttk.Entry(form)
self.arrival_e = ttk.Entry(form)
self.burst_e = ttk.Entry(form)
self.priority_e = ttk.Entry(form)
self.pid_e.grid(row=0, column=1)
self.arrival_e.grid(row=1, column=1)
self.burst_e.grid(row=2, column=1)
self.priority_e.grid(row=3, column=1)
ttk.Button(form, text="Add", command=self.add_process).grid(row=4, column=0, pady=5)
ttk.Button(form, text="Delete", command=self.delete_process).grid(row=4, column=1)

algo_frame = ttk.Frame(self.root)
algo_frame.pack(pady=10)
ttk.Label(algo_frame, text="Algorithm:").pack(side="left")
self.algo_cb = ttk.Combobox(
algo_frame,
values=[
"FCFS",
"SJF (Non-Preemptive)",
"SJF (Preemptive)",
"Priority (Non-Preemptive)",
"Priority (Preemptive)",
"Round Robin",
],
)
self.algo_cb.current(0)
self.algo_cb.pack(side="left", padx=5)
ttk.Label(algo_frame, text="Quantum:").pack(side="left")
self.quantum_e = ttk.Entry(algo_frame, width=5)
self.quantum_e.insert(0, "2")
self.quantum_e.pack(side="left")
ttk.Button(algo_frame, text="Run", command=self.run_scheduling).pack(side="left", padx=10)

self.ready_label = ttk.Label(self.root, text="Ready Queue:")
self.ready_list = tk.Listbox(self.root, height=3)
self.ready_label.pack_forget()
self.ready_list.pack_forget()

self.figure, self.ax = plt.subplots(figsize=(8, 3))
self.canvas = FigureCanvasTkAgg(self.figure, master=self.root)
self.canvas.get_tk_widget().pack()

self.result_box = ttk.Treeview(
self.root,
columns=("pid", "arrival", "burst", "completion", "tat", "wt", "rt"),
show="headings",
height=6,
)
for col in self.result_box["columns"]:
self.result_box.heading(col, text=col.upper())
self.result_box.pack(pady=10)

self.avg_label = ttk.Label(self.root, text="", font=("Arial", 11, "bold"))
self.avg_label.pack()

def add_process(self) -> None:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function add_process

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function add_process

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function add_process

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function add_process

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function add_process

"""Adds a new process entry to the table."""
try:
pid = self.pid_e.get()
arrival = int(self.arrival_e.get())
burst = int(self.burst_e.get())
priority = int(self.priority_e.get() or 0)
self.processes.append(
{"pid": pid, "arrival": arrival, "burst": burst, "priority": priority}
)
self.tree.insert("", "end", values=(pid, arrival, burst, priority))
except ValueError:
messagebox.showerror("Error", "Invalid input")

def delete_process(self) -> None:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function delete_process

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function delete_process

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function delete_process

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function delete_process

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function delete_process

"""Deletes a selected process."""
if sel := self.tree.selection():
pid = self.tree.item(sel[0])["values"][0]
self.processes = [p for p in self.processes if p["pid"] != pid]
self.tree.delete(sel[0])

def run_scheduling(self) -> None:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function run_scheduling

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function run_scheduling

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function run_scheduling

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function run_scheduling

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function run_scheduling

"""Runs the selected scheduling algorithm."""
algo = self.algo_cb.get()
quantum = int(self.quantum_e.get() or 2)
if algo.lower() == "round robin":
self.ready_label.pack()
self.ready_list.pack()
else:
self.ready_label.pack_forget()
self.ready_list.pack_forget()

self.engine = SchedulerEngine(self.processes, algo, quantum)
threading.Thread(target=self.animate, daemon=True).start()

def animate(self) -> None:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function animate

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function animate

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function animate

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function animate

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function animate

"""Animates the scheduling visualization."""
self.ax.clear()
x, colors, current_pid = 0, {}, None
for step in self.engine.simulate():
_, pid, rq = step
if pid:
if pid != current_pid:
self.ax.axvline(x, color="black", linewidth=0.8)
current_pid = pid
colors.setdefault(pid, plt.cm.tab20(len(colors) % 20))
self.ax.barh(0, 1, left=x, color=colors[pid])
self.ax.text(x + 0.5, 0, pid, ha="center", va="center", color="white", fontsize=9)
x += 1
self.ax.set_xticks(range(x + 1))
self.ax.set_yticks([])
self.ax.set_xlabel("Time")
self.canvas.draw()
if rq:
self.ready_list.delete(0, tk.END)
for pid_r in rq:
self.ready_list.insert(tk.END, pid_r)
time.sleep(0.3)
self.show_results()

def show_results(self) -> None:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function show_results

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function show_results

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function show_results

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function show_results

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As there is no test file in this pull request nor any test function or class in the file scheduling/cpu_scheduling_interface.py, please provide doctest for the function show_results

"""Displays scheduling results."""
for item in self.result_box.get_children():
self.result_box.delete(item)
total_wt = total_tat = total_rt = 0
for row in self.engine.stats:
self.result_box.insert("", "end", values=row)
total_wt += row[5]
total_tat += row[4]
total_rt += row[6]
n = len(self.engine.stats) or 1
self.avg_label.config(
text=f"AVG WT = {total_wt/n:.2f} | AVG TAT = {total_tat/n:.2f} | AVG RT = {total_rt/n:.2f}"
)


if __name__ == "__main__":
root = tk.Tk()
CPUSchedulerGUI(root)
root.mainloop()