|
5 | 5 | from datetime import timedelta |
6 | 6 | from tracking.models import Trip |
7 | 7 | from fleet.models import fleet |
8 | | -from django.db import IntegrityError |
| 8 | +from django.db import IntegrityError, OperationalError, transaction |
9 | 9 | from routes.models import routeStop |
10 | 10 | import time |
11 | 11 |
|
@@ -197,25 +197,74 @@ def handle(self, *args, **kwargs): |
197 | 197 | # --------------------------------------------------------- |
198 | 198 | if vehicles_to_update: |
199 | 199 | t3 = time.time() |
200 | | - try: |
201 | | - fleet.objects.bulk_update( |
202 | | - vehicles_to_update, |
203 | | - ["sim_lat", "sim_lon", "sim_heading", "current_trip", "updated_at"], |
204 | | - batch_size=500 |
205 | | - ) |
206 | | - self.stdout.write(f"Updated {len(vehicles_to_update)} vehicles in {time.time() - t3:.2f}s") |
207 | | - except IntegrityError as e: |
208 | | - # Bulk update failed due to FK integrity (race or missing trip). Fall back |
209 | | - # to per-vehicle save so we can skip problematic updates. |
210 | | - self.stderr.write(f"Bulk update IntegrityError: {e}. Falling back to per-vehicle updates.") |
211 | | - updated = 0 |
212 | | - for v in vehicles_to_update: |
213 | | - try: |
214 | | - v.save(update_fields=["sim_lat", "sim_lon", "sim_heading", "current_trip", "updated_at"]) |
215 | | - updated += 1 |
216 | | - except IntegrityError as e2: |
217 | | - self.stderr.write(f"Skipping vehicle {v.id} due to IntegrityError: {e2}") |
218 | | - self.stdout.write(f"Fallback updated {updated} vehicles in {time.time() - t3:.2f}s") |
| 200 | + vehicles_by_id = {v.id: v for v in vehicles_to_update} |
| 201 | + vehicle_ids = sorted(vehicles_by_id.keys()) |
| 202 | + attempts = 0 |
| 203 | + |
| 204 | + while True: |
| 205 | + try: |
| 206 | + with transaction.atomic(): |
| 207 | + # Lock rows in a consistent order and skip rows locked elsewhere. |
| 208 | + locked = list( |
| 209 | + fleet.objects.select_for_update(skip_locked=True) |
| 210 | + .filter(pk__in=vehicle_ids) |
| 211 | + .order_by("pk") |
| 212 | + ) |
| 213 | + |
| 214 | + if not locked: |
| 215 | + self.stdout.write("No vehicles available for update; skipping.") |
| 216 | + break |
| 217 | + |
| 218 | + for v in locked: |
| 219 | + src = vehicles_by_id.get(v.id) |
| 220 | + if not src: |
| 221 | + continue |
| 222 | + v.sim_lat = src.sim_lat |
| 223 | + v.sim_lon = src.sim_lon |
| 224 | + v.sim_heading = src.sim_heading |
| 225 | + v.current_trip = src.current_trip |
| 226 | + v.updated_at = src.updated_at |
| 227 | + |
| 228 | + try: |
| 229 | + fleet.objects.bulk_update( |
| 230 | + locked, |
| 231 | + ["sim_lat", "sim_lon", "sim_heading", "current_trip", "updated_at"], |
| 232 | + batch_size=500 |
| 233 | + ) |
| 234 | + self.stdout.write( |
| 235 | + f"Updated {len(locked)} vehicles in {time.time() - t3:.2f}s" |
| 236 | + ) |
| 237 | + except IntegrityError as e: |
| 238 | + # Bulk update failed due to FK integrity (race or missing trip). Fall back |
| 239 | + # to per-vehicle save so we can skip problematic updates. |
| 240 | + self.stderr.write( |
| 241 | + f"Bulk update IntegrityError: {e}. Falling back to per-vehicle updates." |
| 242 | + ) |
| 243 | + updated = 0 |
| 244 | + for v in locked: |
| 245 | + try: |
| 246 | + v.save(update_fields=[ |
| 247 | + "sim_lat", |
| 248 | + "sim_lon", |
| 249 | + "sim_heading", |
| 250 | + "current_trip", |
| 251 | + "updated_at", |
| 252 | + ]) |
| 253 | + updated += 1 |
| 254 | + except IntegrityError as e2: |
| 255 | + self.stderr.write( |
| 256 | + f"Skipping vehicle {v.id} due to IntegrityError: {e2}" |
| 257 | + ) |
| 258 | + self.stdout.write( |
| 259 | + f"Fallback updated {updated} vehicles in {time.time() - t3:.2f}s" |
| 260 | + ) |
| 261 | + break |
| 262 | + except OperationalError as e: |
| 263 | + if "deadlock detected" in str(e).lower() and attempts < 2: |
| 264 | + attempts += 1 |
| 265 | + time.sleep(0.2 * attempts) |
| 266 | + continue |
| 267 | + raise |
219 | 268 |
|
220 | 269 | self.stdout.write(f"Total time: {time.time() - t0:.2f}s") |
221 | 270 |
|
|
0 commit comments