- Threading en Python
- Temario
- 1) ¿Qué es threading?
- 2) Conceptos clave
- 3) Crear y usar hilos — ejemplos básicos
- 4) Daemon threads y uso correcto
- 5) Sincronización — primitivas importantes
- 6) Comunicación segura entre hilos: queue.Queue
- 7) Manejo de excepciones en hilos
- 8) ThreadPoolExecutor (alta abstracción)
- 9) GIL — cuándo usar threading vs multiprocessing
- 10) Operaciones atómicas y seguridad
- 11) Cancelación y parada de hilos
- 12) Thread-local storage
- 13) Debugging y utilidades
- 14) Buenas prácticas
- 15) Ejemplos prácticos (útiles)
- 16) Limitaciones y alternativas
- 17) Resumen
threading es el módulo estándar de Python para concurrencia mediante hilos (threads). Permite ejecutar varias tareas aparentemente al mismo tiempo dentro del mismo proceso. Es ideal para operaciones I/O-bound (espera de red, disco, etc.).
Importarlo:
import threading- Thread (hilo): unidad de ejecución.
- Daemon thread: hilo que no impide que el proceso termine; se mata cuando el programa principal acaba.
- GIL (Global Interpreter Lock): en CPython, impide que varios hilos ejecuten bytecode Python al mismo tiempo — eso limita el paralelismo real para tareas CPU-bound.
- Race condition: condición de carrera cuando varios hilos acceden/actualizan recursos compartidos sin sincronización.
- Thread-safe: código que puede ejecutarse en varios hilos sin errores por concurrencia.
import threading
import time
def worker(n):
print(f"Worker {n} empieza")
time.sleep(1)
print(f"Worker {n} termina")
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(i,), name=f"worker-{i}")
t.start()
threads.append(t)
for t in threads:
t.join() # esperar que termine cada hilo
print("Todos terminados")import threading, time
class MiHilo(threading.Thread):
def __init__(self, n):
super().__init__(name=f"MiHilo-{n}")
self.n = n
def run(self):
print(f"{self.name} comenzar")
time.sleep(1)
print(f"{self.name} terminar")
h = MiHilo(1)
h.start()
h.join()t = threading.Thread(target=worker, args=(1,), daemon=True)
t.start()
# Si el main thread termina, el hilo daemon se termina abruptamente.Usa daemon para tareas de fondo opcionales (logs en tiempo real, watchers). Para trabajo crítico, no uses daemon; asegúrate de join().
lock = threading.Lock()
with lock:
# región crítica
passrlock = threading.RLock()event = threading.Event()
# hilo espera:
event.wait() # bloquea hasta que event.set()
# hilo que despierta:
event.set()cond = threading.Condition()
with cond:
cond.wait() # espera notificación
# otro hilo:
cond.notify()sem = threading.Semaphore(3) # máximo 3 entradas simultáneas
with sem:
# acceso limitado
passbar = threading.Barrier(3)
bar.wait() # todos los hilos esperan aquí hasta que lleguen los 3queue.Queue es thread-safe y la forma recomendada para pasar datos entre hilos (producer-consumer).
import threading, queue, time
q = queue.Queue()
def producer():
for i in range(5):
q.put(i)
print("produjo", i)
time.sleep(0.2)
q.put(None) # sentinel para indicar fin
def consumer():
while True:
item = q.get()
if item is None:
break
print("consumió", item)
q.task_done()
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start(); t2.start()
t1.join(); q.join()Las excepciones en Thread no se propagan al hilo principal. Opciones:
- Usar
concurrent.futures.ThreadPoolExecutorpara obtenerFuturey atrapar excepciones. - Capturar y guardar excepciones en el propio hilo y consultarlas después.
Ejemplo con
ThreadPoolExecutor:
from concurrent.futures import ThreadPoolExecutor
def trabajo(x):
if x == 3:
raise ValueError("boom")
return x*2
with ThreadPoolExecutor(max_workers=3) as ex:
futures = [ex.submit(trabajo, i) for i in range(5)]
for f in futures:
try:
print(f.result())
except Exception as e:
print("error en hilo:", e)Más cómodo que crear hilos manualmente.
from concurrent.futures import ThreadPoolExecutor, as_completed
def tarea(n):
return n*n
with ThreadPoolExecutor(max_workers=4) as pool:
futures = [pool.submit(tarea, i) for i in range(10)]
for f in as_completed(futures):
print(f.result())- I/O-bound: usa
threading(oasyncio) — hilos dan mejoras reales (espera de I/O libera GIL). - CPU-bound:
threadingno escala por GIL; usamultiprocessing(procesos) o extensiones nativas (numpy, C) que sueltan GIL. Explicación corta: el GIL permite que solo un hilo ejecute bytecode Python simultáneamente; por eso multiples hilos no aceleran cálculos puros en CPython.
- Algunos objetos y operaciones son atómicas en CPython (por ejemplo, asignación simple de variable, operaciones sobre tipos integrales?) — no confíes en ello.
- Ejemplo inseguro (race):
# NO usar sin lock
counter = 0
def incr():
global counter
for _ in range(10000):
counter += 1 # no es atómico: leer-modificar-escribirSiempre protege con Lock si hay acceso concurrente.
No existe Thread.kill() seguro. Patrones para parar:
- Usar
threading.Event()como bandera de parada:
stop_event = threading.Event()
def worker():
while not stop_event.is_set():
# trabajo
pass
# detener:
stop_event.set()- Usar sentinels en
queue(None).
Datos separados por hilo:
import threading
local = threading.local()
def worker(val):
local.x = val
print(local.x)
t1 = threading.Thread(target=worker, args=(1,))
t2 = threading.Thread(target=worker, args=(2,))
t1.start(); t2.start()Cada hilo ve su propio local.x.
threading.enumerate()→ lista hilos activos.threading.active_count()→ cuenta.threading.current_thread().name→ nombre actual.- Poner logs (no prints) con
loggingythreadNameen el formato para seguir hilos. - Para debug avanzado:
faulthandler.dump_traceback_later()othreading.settrace()(poco común).
Ejemplo de logging:
import logging, threading, time
logging.basicConfig(level=logging.INFO, format="%(threadName)s: %(message)s")
def worker():
logging.info("start")
time.sleep(1)
logging.info("end")
t = threading.Thread(target=worker, name="hilo-1")
t.start(); t.join()- Para I/O concurrency, prefiere
ThreadPoolExecutoroasynciosegún el caso. - Evita variables globales mutables; usa
queue.QueueyLocks. - No uses
daemon=Truepara tareas que deben terminar correctamente. - Mantén regiones críticas lo más cortas posible (reduce contención).
- Si necesitas paralelismo real para CPU-bound, usa
multiprocessingo librerías que suelten el GIL. - Añade timeouts a
join()y bloqueos (lock.acquire(timeout=...)) si corres riesgos de deadlock.
Ya mostrado en la sección de Queue. Es el patrón más útil y seguro para comunicación.
import threading, queue, time
def worker(q):
while True:
fn, args = q.get()
if fn is None:
break
try:
fn(*args)
finally:
q.task_done()
q = queue.Queue()
threads = []
for _ in range(4):
t = threading.Thread(target=worker, args=(q,))
t.start()
threads.append(t)
# Encolar tareas
for i in range(10):
q.put((print, (f"task {i}",)))
q.join()
# parar hilos
for _ in threads:
q.put((None, None))
for t in threads:
t.join()import requests
from concurrent.futures import ThreadPoolExecutor
urls = ["https://example.com"]*10
def fetch(url):
r = requests.get(url)
return len(r.content)
with ThreadPoolExecutor(max_workers=5) as ex:
results = list(ex.map(fetch, urls))
print(results)-
threadingno es la mejor opción para CPU-bound por el GIL. -
Alternativas:
-
multiprocessing— procesos (paralelismo real). -
asyncio— concurrencia en un solo hilo usando corutinas (muy eficiente para I/O con muchas conexiones). -
concurrent.futures.ProcessPoolExecutor— API parecida a ThreadPool pero con procesos.
-
- Usa
threadingpara I/O-bound. - Protege recursos compartidos con
Lock,RLock, o usaQueue. - Para excepciones y manejo fácil, usa
ThreadPoolExecutor. - No hay forma segura de matar hilos; implementa mecanismo cooperativo (Event/sentinels).
- Si necesitas paralelismo CPU puro, usa
multiprocessing.
Autor: Fravelz