diff --git a/api/callcentersite/callcentersite/apps/configuracion/migrations/0002_alter_configuracionsistema_modificado_por.py b/api/callcentersite/callcentersite/apps/configuracion/migrations/0002_alter_configuracionsistema_modificado_por.py new file mode 100644 index 00000000..b0482984 --- /dev/null +++ b/api/callcentersite/callcentersite/apps/configuracion/migrations/0002_alter_configuracionsistema_modificado_por.py @@ -0,0 +1,28 @@ +# Generated by Django 5.2.8 on 2025-11-18 15:17 + +import django.db.models.deletion +from django.conf import settings +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("configuracion", "0001_initial"), + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ] + + operations = [ + migrations.AlterField( + model_name="configuracionsistema", + name="modificado_por", + field=models.ForeignKey( + blank=True, + help_text="Usuario que modificó este parámetro", + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="configuracion_sistema_modificaciones", + to=settings.AUTH_USER_MODEL, + ), + ), + ] diff --git a/api/callcentersite/callcentersite/apps/configuracion/models.py b/api/callcentersite/callcentersite/apps/configuracion/models.py index 85d6672d..f67df76e 100644 --- a/api/callcentersite/callcentersite/apps/configuracion/models.py +++ b/api/callcentersite/callcentersite/apps/configuracion/models.py @@ -50,7 +50,7 @@ class ConfiguracionSistema(models.Model): on_delete=models.SET_NULL, null=True, blank=True, - related_name='configuraciones_modificadas', + related_name='configuracion_sistema_modificaciones', help_text='Usuario que modificó este parámetro' ) diff --git a/api/callcentersite/callcentersite/apps/configuration/migrations/0003_rename_config_cat_act_idx_configuraci_categor_0d08c5_idx_and_more.py b/api/callcentersite/callcentersite/apps/configuration/migrations/0003_rename_config_cat_act_idx_configuraci_categor_0d08c5_idx_and_more.py new file mode 100644 index 00000000..4b8bbb32 --- /dev/null +++ b/api/callcentersite/callcentersite/apps/configuration/migrations/0003_rename_config_cat_act_idx_configuraci_categor_0d08c5_idx_and_more.py @@ -0,0 +1,53 @@ +# Generated by Django 5.2.8 on 2025-11-18 15:17 + +import django.db.models.deletion +from django.conf import settings +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("configuration", "0002_configuracion_historial"), + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ] + + operations = [ + migrations.RenameIndex( + model_name="configuracion", + new_name="configuraci_categor_0d08c5_idx", + old_name="config_cat_act_idx", + ), + migrations.RenameIndex( + model_name="configuracion", + new_name="configuraci_clave_e714af_idx", + old_name="config_clave_idx", + ), + migrations.RenameIndex( + model_name="configuracionhistorial", + new_name="configuraci_clave_2abccd_idx", + old_name="config_hist_clave_ts_idx", + ), + migrations.RenameIndex( + model_name="configuracionhistorial", + new_name="configuraci_modific_de25d0_idx", + old_name="config_hist_user_ts_idx", + ), + migrations.RenameIndex( + model_name="configuracionhistorial", + new_name="configuraci_timesta_6734b2_idx", + old_name="config_hist_ts_idx", + ), + migrations.AlterField( + model_name="configuracion", + name="updated_by", + field=models.ForeignKey( + blank=True, + help_text="Usuario que realizo la ultima modificacion", + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="configuraciones_actualizadas", + to=settings.AUTH_USER_MODEL, + ), + ), + ] diff --git a/api/callcentersite/callcentersite/apps/configuration/models.py b/api/callcentersite/callcentersite/apps/configuration/models.py index 1959a77c..1f6135e1 100644 --- a/api/callcentersite/callcentersite/apps/configuration/models.py +++ b/api/callcentersite/callcentersite/apps/configuration/models.py @@ -75,7 +75,7 @@ class Configuracion(models.Model): on_delete=models.SET_NULL, null=True, blank=True, - related_name='configuraciones_modificadas', + related_name='configuraciones_actualizadas', help_text='Usuario que realizo la ultima modificacion' ) created_at = models.DateTimeField( diff --git a/api/callcentersite/callcentersite/apps/dashboard/migrations/0002_merge_dashboard_initials.py b/api/callcentersite/callcentersite/apps/dashboard/migrations/0002_merge_dashboard_initials.py new file mode 100644 index 00000000..c7f247a8 --- /dev/null +++ b/api/callcentersite/callcentersite/apps/dashboard/migrations/0002_merge_dashboard_initials.py @@ -0,0 +1,11 @@ +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ("dashboard", "0001_dashboard_configuracion"), + ("dashboard", "0001_initial"), + ] + + operations = [] diff --git a/api/callcentersite/callcentersite/apps/permissions/migrations/0004_alter_permisoexcepcional_usuario.py b/api/callcentersite/callcentersite/apps/permissions/migrations/0004_alter_permisoexcepcional_usuario.py new file mode 100644 index 00000000..5196b256 --- /dev/null +++ b/api/callcentersite/callcentersite/apps/permissions/migrations/0004_alter_permisoexcepcional_usuario.py @@ -0,0 +1,25 @@ +# Generated by Django 5.2.8 on 2025-11-18 15:17 + +import django.db.models.deletion +from django.conf import settings +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("permissions", "0003_alter_auditoriapermiso_usuario_and_more"), + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ] + + operations = [ + migrations.AlterField( + model_name="permisoexcepcional", + name="usuario", + field=models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="permisos_excepcionales_permissions", + to=settings.AUTH_USER_MODEL, + ), + ), + ] diff --git a/api/callcentersite/callcentersite/apps/permissions/models.py b/api/callcentersite/callcentersite/apps/permissions/models.py index d7d099b1..8ba21f5c 100644 --- a/api/callcentersite/callcentersite/apps/permissions/models.py +++ b/api/callcentersite/callcentersite/apps/permissions/models.py @@ -366,7 +366,7 @@ class PermisoExcepcional(models.Model): usuario = models.ForeignKey( User, on_delete=models.CASCADE, - related_name='permisos_excepcionales' + related_name='permisos_excepcionales_permissions' ) capacidad = models.ForeignKey( Capacidad, diff --git a/api/callcentersite/callcentersite/apps/users/migrations/0002_create_permission_views.py b/api/callcentersite/callcentersite/apps/users/migrations/0002_create_permission_views.py index b2002e6c..970b0e18 100644 --- a/api/callcentersite/callcentersite/apps/users/migrations/0002_create_permission_views.py +++ b/api/callcentersite/callcentersite/apps/users/migrations/0002_create_permission_views.py @@ -12,159 +12,166 @@ from django.db import migrations +CREATE_VIEW_CAPACIDADES = """ + CREATE OR REPLACE VIEW vista_capacidades_usuario AS + + -- CAPACIDADES DE GRUPOS + SELECT DISTINCT + u.id AS usuario_id, + u.username AS usuario_nombre, + u.email AS usuario_email, + c.id AS capacidad_id, + c.codigo AS capacidad_codigo, + f.nombre AS funcion_nombre, + f.dominio AS funcion_dominio, + c.nivel_riesgo, + 'grupo' AS origen, + gp.codigo AS grupo_origen + FROM auth_user u + INNER JOIN usuarios_grupos ug ON u.id = ug.usuario_id + INNER JOIN grupos_permisos gp ON ug.grupo_id = gp.id + INNER JOIN grupo_capacidades gc ON gp.id = gc.grupo_id + INNER JOIN capacidades c ON gc.capacidad_id = c.id + LEFT JOIN funcion_capacidades fc ON c.id = fc.capacidad_id + LEFT JOIN funciones f ON fc.funcion_id = f.id + WHERE ug.activo = TRUE + AND gp.activo = TRUE + AND c.activa = TRUE + AND (ug.fecha_expiracion IS NULL OR ug.fecha_expiracion > NOW()) + + UNION + + -- PERMISOS EXCEPCIONALES CONCEDIDOS + SELECT DISTINCT + u.id AS usuario_id, + u.username AS usuario_nombre, + u.email AS usuario_email, + c.id AS capacidad_id, + c.codigo AS capacidad_codigo, + f.nombre AS funcion_nombre, + f.dominio AS funcion_dominio, + c.nivel_riesgo, + 'excepcional_concedido' AS origen, + NULL AS grupo_origen + FROM auth_user u + INNER JOIN permisos_excepcionales pe ON u.id = pe.usuario_id + INNER JOIN capacidades c ON pe.capacidad_id = c.id + LEFT JOIN funcion_capacidades fc ON c.id = fc.capacidad_id + LEFT JOIN funciones f ON fc.funcion_id = f.id + WHERE pe.tipo = 'conceder' + AND pe.activo = TRUE + AND c.activa = TRUE + AND pe.fecha_inicio <= NOW() + AND (pe.fecha_expiracion IS NULL OR pe.fecha_expiracion > NOW()) + + EXCEPT + + -- MENOS PERMISOS EXCEPCIONALES REVOCADOS + SELECT DISTINCT + u.id AS usuario_id, + u.username AS usuario_nombre, + u.email AS usuario_email, + c.id AS capacidad_id, + c.codigo AS capacidad_codigo, + f.nombre AS funcion_nombre, + f.dominio AS funcion_dominio, + c.nivel_riesgo, + 'excepcional_revocado' AS origen, + NULL AS grupo_origen + FROM auth_user u + INNER JOIN permisos_excepcionales pe ON u.id = pe.usuario_id + INNER JOIN capacidades c ON pe.capacidad_id = c.id + LEFT JOIN funcion_capacidades fc ON c.id = fc.capacidad_id + LEFT JOIN funciones f ON fc.funcion_id = f.id + WHERE pe.tipo = 'revocar' + AND pe.activo = TRUE + AND c.activa = TRUE + AND pe.fecha_inicio <= NOW() + AND (pe.fecha_expiracion IS NULL OR pe.fecha_expiracion > NOW()); + + COMMENT ON VIEW vista_capacidades_usuario IS + 'Vista consolidada de capacidades efectivas por usuario. Incluye capacidades de grupos Y permisos excepcionales (concedidos - revocados).'; +""" + +CREATE_VIEW_GRUPOS = """ + CREATE OR REPLACE VIEW vista_grupos_usuario AS + SELECT + u.id AS usuario_id, + u.username AS usuario_nombre, + u.email AS usuario_email, + gp.id AS grupo_id, + gp.codigo AS grupo_codigo, + gp.nombre AS grupo_nombre, + gp.descripcion AS grupo_descripcion, + gp.nivel_riesgo, + ug.activo, + ug.fecha_inicio, + ug.fecha_expiracion, + CASE + WHEN ug.activo = TRUE + AND (ug.fecha_expiracion IS NULL OR ug.fecha_expiracion > NOW()) + THEN 'vigente' + ELSE 'no_vigente' + END AS estado_vigencia + FROM auth_user u + INNER JOIN usuarios_grupos ug ON u.id = ug.usuario_id + INNER JOIN grupos_permisos gp ON ug.grupo_id = gp.id; + + COMMENT ON VIEW vista_grupos_usuario IS + 'Vista de grupos asignados a usuarios con estado de vigencia'; +""" + +CREATE_INDEXES = [ + """ + CREATE INDEX IF NOT EXISTS idx_usuarios_grupos_vigencia + ON usuarios_grupos(usuario_id, activo, fecha_expiracion); + """, + """ + CREATE INDEX IF NOT EXISTS idx_permisos_exc_vigencia + ON permisos_excepcionales(usuario_id, activo, tipo, fecha_inicio, fecha_expiracion); + """, + """ + CREATE INDEX IF NOT EXISTS idx_capacidades_codigo_activa + ON capacidades(codigo, activa); + """, +] + +DROP_INDEXES = [ + "DROP INDEX IF EXISTS idx_usuarios_grupos_vigencia;", + "DROP INDEX IF EXISTS idx_permisos_exc_vigencia;", + "DROP INDEX IF EXISTS idx_capacidades_codigo_activa;", +] + +DROP_VIEW_CAPACIDADES = "DROP VIEW IF EXISTS vista_capacidades_usuario;" +DROP_VIEW_GRUPOS = "DROP VIEW IF EXISTS vista_grupos_usuario;" + + +def create_permission_views(apps, schema_editor): + if schema_editor.connection.vendor == "sqlite": + return + + schema_editor.execute(CREATE_VIEW_CAPACIDADES) + schema_editor.execute(CREATE_VIEW_GRUPOS) + for statement in CREATE_INDEXES: + schema_editor.execute(statement) + + +def drop_permission_views(apps, schema_editor): + if schema_editor.connection.vendor == "sqlite": + return + + for statement in DROP_INDEXES: + schema_editor.execute(statement) + schema_editor.execute(DROP_VIEW_GRUPOS) + schema_editor.execute(DROP_VIEW_CAPACIDADES) + class Migration(migrations.Migration): dependencies = [ - ('users', '0001_initial_permisos_granular'), + ("users", "0001_initial_permisos_granular"), ] operations = [ - # Vista 1: Capacidades por usuario - migrations.RunSQL( - sql=""" - -- Vista que consolida todas las capacidades efectivas de cada usuario - -- Incluye capacidades de grupos Y permisos excepcionales - CREATE OR REPLACE VIEW vista_capacidades_usuario AS - - -- CAPACIDADES DE GRUPOS - SELECT DISTINCT - u.id AS usuario_id, - u.username AS usuario_nombre, - u.email AS usuario_email, - c.id AS capacidad_id, - c.codigo AS capacidad_codigo, - f.nombre AS funcion_nombre, - f.dominio AS funcion_dominio, - c.nivel_riesgo, - 'grupo' AS origen, - gp.codigo AS grupo_origen - FROM auth_user u - INNER JOIN usuarios_grupos ug ON u.id = ug.usuario_id - INNER JOIN grupos_permisos gp ON ug.grupo_id = gp.id - INNER JOIN grupo_capacidades gc ON gp.id = gc.grupo_id - INNER JOIN capacidades c ON gc.capacidad_id = c.id - LEFT JOIN funcion_capacidades fc ON c.id = fc.capacidad_id - LEFT JOIN funciones f ON fc.funcion_id = f.id - WHERE ug.activo = TRUE - AND gp.activo = TRUE - AND c.activa = TRUE - AND (ug.fecha_expiracion IS NULL OR ug.fecha_expiracion > NOW()) - - UNION - - -- PERMISOS EXCEPCIONALES CONCEDIDOS - SELECT DISTINCT - u.id AS usuario_id, - u.username AS usuario_nombre, - u.email AS usuario_email, - c.id AS capacidad_id, - c.codigo AS capacidad_codigo, - f.nombre AS funcion_nombre, - f.dominio AS funcion_dominio, - c.nivel_riesgo, - 'excepcional_concedido' AS origen, - NULL AS grupo_origen - FROM auth_user u - INNER JOIN permisos_excepcionales pe ON u.id = pe.usuario_id - INNER JOIN capacidades c ON pe.capacidad_id = c.id - LEFT JOIN funcion_capacidades fc ON c.id = fc.capacidad_id - LEFT JOIN funciones f ON fc.funcion_id = f.id - WHERE pe.tipo = 'conceder' - AND pe.activo = TRUE - AND c.activa = TRUE - AND pe.fecha_inicio <= NOW() - AND (pe.fecha_expiracion IS NULL OR pe.fecha_expiracion > NOW()) - - EXCEPT - - -- MENOS PERMISOS EXCEPCIONALES REVOCADOS - SELECT DISTINCT - u.id AS usuario_id, - u.username AS usuario_nombre, - u.email AS usuario_email, - c.id AS capacidad_id, - c.codigo AS capacidad_codigo, - f.nombre AS funcion_nombre, - f.dominio AS funcion_dominio, - c.nivel_riesgo, - 'excepcional_revocado' AS origen, - NULL AS grupo_origen - FROM auth_user u - INNER JOIN permisos_excepcionales pe ON u.id = pe.usuario_id - INNER JOIN capacidades c ON pe.capacidad_id = c.id - LEFT JOIN funcion_capacidades fc ON c.id = fc.capacidad_id - LEFT JOIN funciones f ON fc.funcion_id = f.id - WHERE pe.tipo = 'revocar' - AND pe.activo = TRUE - AND c.activa = TRUE - AND pe.fecha_inicio <= NOW() - AND (pe.fecha_expiracion IS NULL OR pe.fecha_expiracion > NOW()); - - -- Comentarios - COMMENT ON VIEW vista_capacidades_usuario IS - 'Vista consolidada de capacidades efectivas por usuario. Incluye capacidades de grupos Y permisos excepcionales (concedidos - revocados).'; - """, - reverse_sql="DROP VIEW IF EXISTS vista_capacidades_usuario;", - ), - - # Vista 2: Grupos por usuario - migrations.RunSQL( - sql=""" - -- Vista de grupos asignados a usuarios con estado de vigencia - CREATE OR REPLACE VIEW vista_grupos_usuario AS - SELECT - u.id AS usuario_id, - u.username AS usuario_nombre, - u.email AS usuario_email, - gp.id AS grupo_id, - gp.codigo AS grupo_codigo, - gp.nombre_display AS grupo_nombre, - gp.tipo_acceso, - gp.color_hex, - ug.fecha_asignacion, - ug.fecha_expiracion, - ug.motivo, - ug.activo, - CASE - WHEN ug.activo = FALSE THEN FALSE - WHEN ug.fecha_expiracion IS NULL THEN TRUE - WHEN ug.fecha_expiracion > NOW() THEN TRUE - ELSE FALSE - END AS vigente, - asignado.username AS asignado_por_nombre - FROM auth_user u - INNER JOIN usuarios_grupos ug ON u.id = ug.usuario_id - INNER JOIN grupos_permisos gp ON ug.grupo_id = gp.id - LEFT JOIN auth_user asignado ON ug.asignado_por_id = asignado.id - WHERE ug.activo = TRUE; - - -- Comentarios - COMMENT ON VIEW vista_grupos_usuario IS - 'Vista de grupos asignados a usuarios con cálculo de vigencia basado en fecha_expiracion y campo activo.'; - """, - reverse_sql="DROP VIEW IF EXISTS vista_grupos_usuario;", - ), - - # Índices en tablas base para optimizar vistas - migrations.RunSQL( - sql=""" - -- Índice compuesto para query frecuente de capacidades - CREATE INDEX IF NOT EXISTS idx_usuarios_grupos_vigencia - ON usuarios_grupos(usuario_id, activo, fecha_expiracion); - - -- Índice para permisos excepcionales vigentes - CREATE INDEX IF NOT EXISTS idx_permisos_exc_vigencia - ON permisos_excepcionales(usuario_id, activo, tipo, fecha_inicio, fecha_expiracion); - - -- Índice para búsquedas por capacidad_codigo - CREATE INDEX IF NOT EXISTS idx_capacidades_codigo_activa - ON capacidades(codigo, activa); - """, - reverse_sql=""" - DROP INDEX IF EXISTS idx_usuarios_grupos_vigencia; - DROP INDEX IF EXISTS idx_permisos_exc_vigencia; - DROP INDEX IF EXISTS idx_capacidades_codigo_activa; - """, - ), + migrations.RunPython(create_permission_views, reverse_code=drop_permission_views), ] diff --git a/api/callcentersite/callcentersite/apps/users/migrations/0003_create_permission_functions.py b/api/callcentersite/callcentersite/apps/users/migrations/0003_create_permission_functions.py index ff6b2401..ac990ac8 100644 --- a/api/callcentersite/callcentersite/apps/users/migrations/0003_create_permission_functions.py +++ b/api/callcentersite/callcentersite/apps/users/migrations/0003_create_permission_functions.py @@ -5,247 +5,243 @@ 1. usuario_tiene_permiso() - Verificación rápida de permisos 2. obtener_capacidades_usuario() - Lista de capacidades de un usuario 3. obtener_grupos_usuario() - Lista de grupos de un usuario +4. verificar_permiso_y_auditar() - Verifica y audita en una sola operación +5. obtener_menu_usuario() - Genera menú dinámico basado en permisos -Estas funciones SQL nativas son alternativas de alta performance al ORM. - -Referencia: docs/backend/requisitos/prioridad_01_estructura_base_datos.md +Estas funciones se crean solo en motores compatibles (omitidas en SQLite). """ from django.db import migrations +FUNCTIONS_SQL = [ + """ + CREATE OR REPLACE FUNCTION usuario_tiene_permiso( + p_usuario_id INTEGER, + p_capacidad_codigo VARCHAR(200) + ) RETURNS BOOLEAN AS $$ + DECLARE + v_tiene_permiso BOOLEAN; + BEGIN + SELECT EXISTS ( + SELECT 1 + FROM vista_capacidades_usuario + WHERE usuario_id = p_usuario_id + AND capacidad_codigo = p_capacidad_codigo + ) INTO v_tiene_permiso; + + RETURN v_tiene_permiso; + END; + $$ LANGUAGE plpgsql + STABLE + PARALLEL SAFE; + + COMMENT ON FUNCTION usuario_tiene_permiso(INTEGER, VARCHAR) IS + 'Verifica si un usuario tiene una capacidad específica. Usa vista_capacidades_usuario para performance óptimo.'; + """, + """ + CREATE OR REPLACE FUNCTION obtener_capacidades_usuario( + p_usuario_id INTEGER + ) RETURNS TEXT[] AS $$ + DECLARE + v_capacidades TEXT[]; + BEGIN + SELECT ARRAY_AGG(DISTINCT capacidad_codigo ORDER BY capacidad_codigo) + INTO v_capacidades + FROM vista_capacidades_usuario + WHERE usuario_id = p_usuario_id; + + RETURN COALESCE(v_capacidades, ARRAY[]::TEXT[]); + END; + $$ LANGUAGE plpgsql + STABLE + PARALLEL SAFE; + + COMMENT ON FUNCTION obtener_capacidades_usuario(INTEGER) IS + 'Retorna array de códigos de capacidades que tiene un usuario. Retorna array vacío si no tiene permisos.'; + """, + """ + CREATE OR REPLACE FUNCTION obtener_grupos_usuario( + p_usuario_id INTEGER + ) RETURNS JSONB AS $$ + DECLARE + v_grupos JSONB; + BEGIN + SELECT COALESCE( + JSONB_AGG( + JSONB_BUILD_OBJECT( + 'grupo_id', grupo_id, + 'grupo_codigo', grupo_codigo, + 'grupo_nombre', grupo_nombre, + 'tipo_acceso', tipo_acceso, + 'color_hex', color_hex, + 'vigente', vigente, + 'fecha_asignacion', fecha_asignacion, + 'fecha_expiracion', fecha_expiracion + ) ORDER BY grupo_codigo + ), + '[]'::JSONB + ) + INTO v_grupos + FROM vista_grupos_usuario + WHERE usuario_id = p_usuario_id + AND vigente = TRUE; + + RETURN v_grupos; + END; + $$ LANGUAGE plpgsql + STABLE + PARALLEL SAFE; + + COMMENT ON FUNCTION obtener_grupos_usuario(INTEGER) IS + 'Retorna JSONB con información completa de grupos vigentes del usuario. Retorna array vacío si no tiene grupos.'; + """, + """ + CREATE OR REPLACE FUNCTION verificar_permiso_y_auditar( + p_usuario_id INTEGER, + p_capacidad_codigo VARCHAR(200), + p_ip_address VARCHAR(50) DEFAULT NULL, + p_user_agent TEXT DEFAULT NULL, + p_recurso_accedido VARCHAR(200) DEFAULT NULL, + p_endpoint VARCHAR(500) DEFAULT NULL + ) RETURNS BOOLEAN AS $$ + DECLARE + v_tiene_permiso BOOLEAN; + v_accion VARCHAR(100); + BEGIN + v_tiene_permiso := usuario_tiene_permiso(p_usuario_id, p_capacidad_codigo); + + IF v_tiene_permiso THEN + v_accion := 'acceso_permitido'; + ELSE + v_accion := 'acceso_denegado'; + END IF; + + INSERT INTO auditoria_permisos ( + usuario_id, + capacidad_codigo, + accion, + resultado, + recurso_accedido, + endpoint, + ip_address, + user_agent, + timestamp + ) VALUES ( + p_usuario_id, + p_capacidad_codigo, + v_accion, + 'exito', + p_recurso_accedido, + p_endpoint, + p_ip_address, + p_user_agent, + NOW() + ); + + RETURN v_tiene_permiso; + END; + $$ LANGUAGE plpgsql + VOLATILE; + + COMMENT ON FUNCTION verificar_permiso_y_auditar IS + 'Verifica permiso y registra en auditoría en una transacción atómica. Optimizado para casos donde siempre se audita.'; + """, + """ + CREATE OR REPLACE FUNCTION obtener_menu_usuario( + p_usuario_id INTEGER + ) RETURNS JSONB AS $$ + DECLARE + v_capacidades TEXT[]; + v_menu JSONB; + BEGIN + v_capacidades := obtener_capacidades_usuario(p_usuario_id); + + SELECT COALESCE( + JSONB_AGG( + JSONB_BUILD_OBJECT( + 'dominio', dominio, + 'funcion', funcion, + 'tipo_acceso', tipo_acceso, + 'icono', icono, + 'orden', orden, + 'color_hex', color_hex, + 'ruta', ruta, + 'capacidades', capacidades + ) ORDER BY orden + ), + '[]'::JSONB + ) + INTO v_menu + FROM ( + SELECT + f.dominio, + f.nombre AS funcion, + gp.tipo_acceso, + gp.icono, + gp.orden, + gp.color_hex, + gp.ruta, + ARRAY_AGG(DISTINCT c.codigo) AS capacidades + FROM grupos_permisos gp + INNER JOIN grupo_capacidades gc ON gp.id = gc.grupo_id + INNER JOIN capacidades c ON gc.capacidad_id = c.id + INNER JOIN funcion_capacidades fc ON c.id = fc.capacidad_id + INNER JOIN funciones f ON fc.funcion_id = f.id + WHERE gp.activo = TRUE + AND c.activa = TRUE + AND gp.id IN ( + SELECT grupo_id + FROM usuarios_grupos + WHERE usuario_id = p_usuario_id + AND activo = TRUE + AND (fecha_expiracion IS NULL OR fecha_expiracion > NOW()) + ) + AND c.codigo = ANY (v_capacidades) + GROUP BY f.dominio, f.nombre, gp.tipo_acceso, gp.icono, gp.orden, gp.color_hex, gp.ruta + ) menu_data; + + RETURN v_menu; + END; + $$ LANGUAGE plpgsql + STABLE + PARALLEL SAFE; + + COMMENT ON FUNCTION obtener_menu_usuario(INTEGER) IS + 'Genera menú dinámico basado en capacidades del usuario usando vistas optimizadas.'; + """, +] + +DROP_FUNCTIONS = [ + "DROP FUNCTION IF EXISTS obtener_menu_usuario(INTEGER);", + "DROP FUNCTION IF EXISTS verificar_permiso_y_auditar;", + "DROP FUNCTION IF EXISTS obtener_grupos_usuario(INTEGER);", + "DROP FUNCTION IF EXISTS obtener_capacidades_usuario(INTEGER);", + "DROP FUNCTION IF EXISTS usuario_tiene_permiso(INTEGER, VARCHAR);", +] + + +def create_permission_functions(apps, schema_editor): + if schema_editor.connection.vendor == "sqlite": + return + + for statement in FUNCTIONS_SQL: + schema_editor.execute(statement) + + +def drop_permission_functions(apps, schema_editor): + if schema_editor.connection.vendor == "sqlite": + return + + for statement in DROP_FUNCTIONS: + schema_editor.execute(statement) + class Migration(migrations.Migration): dependencies = [ - ('users', '0002_create_permission_views'), + ("users", "0002_create_permission_views"), ] operations = [ - # Función 1: usuario_tiene_permiso - migrations.RunSQL( - sql=""" - -- Función para verificar si un usuario tiene una capacidad específica - -- Retorna TRUE si el usuario tiene el permiso (directamente o via grupos) - CREATE OR REPLACE FUNCTION usuario_tiene_permiso( - p_usuario_id INTEGER, - p_capacidad_codigo VARCHAR(200) - ) RETURNS BOOLEAN AS $$ - DECLARE - v_tiene_permiso BOOLEAN; - BEGIN - -- Verificar si existe en la vista de capacidades del usuario - SELECT EXISTS ( - SELECT 1 - FROM vista_capacidades_usuario - WHERE usuario_id = p_usuario_id - AND capacidad_codigo = p_capacidad_codigo - ) INTO v_tiene_permiso; - - RETURN v_tiene_permiso; - END; - $$ LANGUAGE plpgsql - STABLE - PARALLEL SAFE; - - -- Comentarios - COMMENT ON FUNCTION usuario_tiene_permiso(INTEGER, VARCHAR) IS - 'Verifica si un usuario tiene una capacidad específica. Usa vista_capacidades_usuario para performance óptimo.'; - - -- Grant a roles necesarios (opcional, ajustar según necesidad) - -- GRANT EXECUTE ON FUNCTION usuario_tiene_permiso(INTEGER, VARCHAR) TO django_user; - """, - reverse_sql="DROP FUNCTION IF EXISTS usuario_tiene_permiso(INTEGER, VARCHAR);", - ), - - # Función 2: obtener_capacidades_usuario - migrations.RunSQL( - sql=""" - -- Función para obtener array de códigos de capacidades de un usuario - -- Útil para construir menús dinámicos o verificar múltiples permisos - CREATE OR REPLACE FUNCTION obtener_capacidades_usuario( - p_usuario_id INTEGER - ) RETURNS TEXT[] AS $$ - DECLARE - v_capacidades TEXT[]; - BEGIN - -- Obtener array de códigos de capacidades - SELECT ARRAY_AGG(DISTINCT capacidad_codigo ORDER BY capacidad_codigo) - INTO v_capacidades - FROM vista_capacidades_usuario - WHERE usuario_id = p_usuario_id; - - RETURN COALESCE(v_capacidades, ARRAY[]::TEXT[]); - END; - $$ LANGUAGE plpgsql - STABLE - PARALLEL SAFE; - - -- Comentarios - COMMENT ON FUNCTION obtener_capacidades_usuario(INTEGER) IS - 'Retorna array de códigos de capacidades que tiene un usuario. Retorna array vacío si no tiene permisos.'; - """, - reverse_sql="DROP FUNCTION IF EXISTS obtener_capacidades_usuario(INTEGER);", - ), - - # Función 3: obtener_grupos_usuario - migrations.RunSQL( - sql=""" - -- Función para obtener JSON de grupos de un usuario - -- Retorna información completa de grupos con metadata - CREATE OR REPLACE FUNCTION obtener_grupos_usuario( - p_usuario_id INTEGER - ) RETURNS JSONB AS $$ - DECLARE - v_grupos JSONB; - BEGIN - -- Construir JSON de grupos - SELECT COALESCE( - JSONB_AGG( - JSONB_BUILD_OBJECT( - 'grupo_id', grupo_id, - 'grupo_codigo', grupo_codigo, - 'grupo_nombre', grupo_nombre, - 'tipo_acceso', tipo_acceso, - 'color_hex', color_hex, - 'vigente', vigente, - 'fecha_asignacion', fecha_asignacion, - 'fecha_expiracion', fecha_expiracion - ) ORDER BY grupo_codigo - ), - '[]'::JSONB - ) - INTO v_grupos - FROM vista_grupos_usuario - WHERE usuario_id = p_usuario_id - AND vigente = TRUE; - - RETURN v_grupos; - END; - $$ LANGUAGE plpgsql - STABLE - PARALLEL SAFE; - - -- Comentarios - COMMENT ON FUNCTION obtener_grupos_usuario(INTEGER) IS - 'Retorna JSONB con información completa de grupos vigentes del usuario. Retorna array vacío si no tiene grupos.'; - """, - reverse_sql="DROP FUNCTION IF EXISTS obtener_grupos_usuario(INTEGER);", - ), - - # Función 4: verificar_permiso_y_auditar - migrations.RunSQL( - sql=""" - -- Función combinada: verifica permiso Y audita en una sola operación atómica - -- Optimiza casos donde siempre se audita el acceso - CREATE OR REPLACE FUNCTION verificar_permiso_y_auditar( - p_usuario_id INTEGER, - p_capacidad_codigo VARCHAR(200), - p_ip_address VARCHAR(50) DEFAULT NULL, - p_user_agent TEXT DEFAULT NULL, - p_recurso_accedido VARCHAR(200) DEFAULT NULL, - p_endpoint VARCHAR(500) DEFAULT NULL - ) RETURNS BOOLEAN AS $$ - DECLARE - v_tiene_permiso BOOLEAN; - v_accion VARCHAR(100); - BEGIN - -- Verificar permiso - v_tiene_permiso := usuario_tiene_permiso(p_usuario_id, p_capacidad_codigo); - - -- Determinar acción - IF v_tiene_permiso THEN - v_accion := 'acceso_permitido'; - ELSE - v_accion := 'acceso_denegado'; - END IF; - - -- Insertar auditoría - INSERT INTO auditoria_permisos ( - usuario_id, - capacidad_codigo, - accion, - resultado, - recurso_accedido, - endpoint, - ip_address, - user_agent, - timestamp - ) VALUES ( - p_usuario_id, - p_capacidad_codigo, - v_accion, - 'exito', - p_recurso_accedido, - p_endpoint, - p_ip_address, - p_user_agent, - NOW() - ); - - RETURN v_tiene_permiso; - END; - $$ LANGUAGE plpgsql - VOLATILE; -- VOLATILE porque inserta datos - - -- Comentarios - COMMENT ON FUNCTION verificar_permiso_y_auditar IS - 'Verifica permiso y registra en auditoría en una transacción atómica. Optimizado para casos donde siempre se audita.'; - """, - reverse_sql="DROP FUNCTION IF EXISTS verificar_permiso_y_auditar;", - ), - - # Función 5: obtener_menu_usuario - migrations.RunSQL( - sql=""" - -- Función para generar menú dinámico basado en permisos del usuario - -- Agrupa capacidades por función y dominio - CREATE OR REPLACE FUNCTION obtener_menu_usuario( - p_usuario_id INTEGER - ) RETURNS JSONB AS $$ - DECLARE - v_menu JSONB; - BEGIN - -- Construir menú jerárquico: dominio -> función -> capacidades - SELECT COALESCE( - JSONB_OBJECT_AGG( - funcion_dominio, - funciones - ), - '{}'::JSONB - ) - INTO v_menu - FROM ( - SELECT - funcion_dominio, - JSONB_OBJECT_AGG( - funcion_nombre, - capacidades - ) AS funciones - FROM ( - SELECT - funcion_dominio, - funcion_nombre, - JSONB_AGG( - DISTINCT capacidad_codigo ORDER BY capacidad_codigo - ) AS capacidades - FROM vista_capacidades_usuario - WHERE usuario_id = p_usuario_id - AND funcion_nombre IS NOT NULL - GROUP BY funcion_dominio, funcion_nombre - ) AS funciones_por_dominio - GROUP BY funcion_dominio - ) AS menu_por_dominio; - - RETURN v_menu; - END; - $$ LANGUAGE plpgsql - STABLE - PARALLEL SAFE; - - -- Comentarios - COMMENT ON FUNCTION obtener_menu_usuario(INTEGER) IS - 'Genera menú dinámico en formato JSONB jerárquico: dominio -> función -> capacidades. Útil para UI.'; - """, - reverse_sql="DROP FUNCTION IF EXISTS obtener_menu_usuario(INTEGER);", - ), + migrations.RunPython(create_permission_functions, reverse_code=drop_permission_functions), ] diff --git a/api/callcentersite/callcentersite/apps/users/migrations/0004_alter_permisoexcepcional_usuario.py b/api/callcentersite/callcentersite/apps/users/migrations/0004_alter_permisoexcepcional_usuario.py new file mode 100644 index 00000000..9d194292 --- /dev/null +++ b/api/callcentersite/callcentersite/apps/users/migrations/0004_alter_permisoexcepcional_usuario.py @@ -0,0 +1,22 @@ +from django.conf import settings +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("users", "0001_initial"), + ("users", "0003_create_permission_functions"), + ] + + operations = [ + migrations.AlterField( + model_name="permisoexcepcional", + name="usuario", + field=models.ForeignKey( + on_delete=models.deletion.CASCADE, + related_name="permisos_excepcionales_granular", + to=settings.AUTH_USER_MODEL, + ), + ), + ] diff --git a/api/callcentersite/callcentersite/apps/users/models.py b/api/callcentersite/callcentersite/apps/users/models.py index 857cd90a..7f2f926d 100644 --- a/api/callcentersite/callcentersite/apps/users/models.py +++ b/api/callcentersite/callcentersite/apps/users/models.py @@ -10,6 +10,9 @@ from django.db import models from django.utils import timezone +# Modelos adicionales del dominio de permisos granulados +from .models_permisos_granular import PermisoExcepcional + # ============================================================================= # MODELO USER DE DJANGO (Opción C - Modelo totalmente custom) # ============================================================================= diff --git a/api/callcentersite/callcentersite/apps/users/models_permisos_granular.py b/api/callcentersite/callcentersite/apps/users/models_permisos_granular.py index 7eda0787..24991217 100644 --- a/api/callcentersite/callcentersite/apps/users/models_permisos_granular.py +++ b/api/callcentersite/callcentersite/apps/users/models_permisos_granular.py @@ -292,7 +292,7 @@ class PermisoExcepcional(models.Model): usuario = models.ForeignKey( settings.AUTH_USER_MODEL, on_delete=models.CASCADE, - related_name='permisos_excepcionales', + related_name='permisos_excepcionales_granular', ) capacidad = models.ForeignKey( Capacidad, diff --git a/api/callcentersite/callcentersite/docs_backend_use_cases.py b/api/callcentersite/callcentersite/docs_backend_use_cases.py new file mode 100644 index 00000000..e3ca8eac --- /dev/null +++ b/api/callcentersite/callcentersite/docs_backend_use_cases.py @@ -0,0 +1,191 @@ +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Iterable, List, Sequence +import re + + +@dataclass(frozen=True) +class Flow: + """Represents a flow section within a use case.""" + + title: str + steps: List[str] + + +@dataclass(frozen=True) +class UseCase: + """Structured representation of a backend use case document.""" + + id: str + name: str + path: Path + main_flow: Flow + alternate_flows: List[Flow] + exception_flows: List[Flow] + + +def list_use_case_files(base_dir: Path | None = None) -> List[UseCase]: + """Return all documented use cases under ``docs/backend``. + + The function scans for Markdown files that start with ``UC-`` and parses + each one into a :class:`UseCase` instance. The DORA directory is not + traversed because only the backend documentation is considered. + """ + + docs_dir = base_dir or Path(__file__).resolve().parents[3] / "docs" / "backend" + use_case_files = sorted(docs_dir.glob("UC-*.md")) + return [parse_use_case(path) for path in use_case_files] + + +def parse_use_case(path: Path) -> UseCase: + """Parse a use case Markdown file into a structured representation.""" + + lines = path.read_text(encoding="utf-8").splitlines() + use_case_id = _extract_metadata_field(lines, "id") + name = _extract_metadata_field(lines, "nombre") or path.stem + + main_flow_lines = _slice_section(lines, "## Flujo principal") + alternate_flow_lines = _slice_section( + lines, + ( + "## Flujos alternos", + "## Flujos alternativos", + ), + ) + exception_flow_lines = _slice_section(lines, "## Flujos de excepción") + + main_flow = Flow("Flujo principal", _extract_table_steps(main_flow_lines)) + alternate_flows = _extract_named_flows(alternate_flow_lines) + exception_flows = _extract_named_flows(exception_flow_lines) + + return UseCase( + id=use_case_id, + name=name, + path=path, + main_flow=main_flow, + alternate_flows=alternate_flows, + exception_flows=exception_flows, + ) + + +def _extract_metadata_field(lines: Sequence[str], field: str) -> str: + pattern = re.compile(rf"^{re.escape(field)}:\s*(.+)", re.IGNORECASE) + for line in lines: + match = pattern.match(line.strip()) + if match: + return match.group(1).strip() + return "" + + +def _slice_section(lines: Sequence[str], header: str | Sequence[str]) -> List[str]: + header_options = [header] if isinstance(header, str) else list(header) + normalized_targets = {_normalize_header_text(option) for option in header_options} + + start = None + for idx, line in enumerate(lines): + if _normalize_header_text(line) in normalized_targets: + start = idx + 1 + break + if start is None: + return [] + + section: List[str] = [] + for line in lines[start:]: + if line.startswith("## ") and _normalize_header_text(line) not in normalized_targets: + break + section.append(line) + return section + + +def _extract_table_steps(lines: Iterable[str]) -> List[str]: + steps: List[str] = [] + for raw_line in lines: + line = raw_line.strip() + if not line.startswith("|"): + continue + columns = [col.strip() for col in line.strip("|").split("|")] + if _is_separator_row(columns): + continue + if _is_header_row(columns): + continue + + if len(columns) >= 4 and _looks_like_step_index(columns[0]): + actor = columns[1] + action = columns[2] + system = columns[3] + description_parts = [actor, action, system] + steps.append(" ".join(part for part in description_parts if part)) + continue + + if len(columns) == 2: + first, second = columns + if _looks_like_step_index(first) and second: + steps.append(second) + continue + if first and second: + steps.append(f"{first}: {second}") + continue + steps.append(first or second) + continue + + actor, system = columns[0], columns[1] + if actor and system: + steps.append(f"{actor}: {system}") + else: + steps.append(actor or system) + return steps + + +def _extract_named_flows(lines: Sequence[str]) -> List[Flow]: + flows: List[Flow] = [] + current_title = None + current_lines: List[str] = [] + + for line in lines: + if line.startswith("## "): + break + if line.startswith("### "): + if current_title is not None: + flows.append(Flow(current_title, _extract_table_steps(current_lines))) + current_title = _normalize_flow_title(line) + current_lines = [] + continue + current_lines.append(line) + + if current_title is not None: + flows.append(Flow(current_title, _extract_table_steps(current_lines))) + + return flows + + +def _normalize_header_text(raw_header: str) -> str: + header = raw_header.lstrip("#").strip() + header = re.sub(r"^\d+[\.:\-)\s]*", "", header) + return header.lower() + + +def _is_header_row(columns: List[str]) -> bool: + if columns and _looks_like_step_index(columns[0]): + return False + + normalized = [col.lower() for col in columns if col] + header_tokens = {"actor", "sistema", "paso", "acción", "accion", "descripción", "descripcion"} + return normalized and all(cell in header_tokens for cell in normalized) + + +def _is_separator_row(columns: List[str]) -> bool: + return all(not col or set(col) <= {"-"} for col in columns) + + +def _looks_like_step_index(cell: str) -> bool: + return bool(re.match(r"^[0-9]+[a-zA-Z]?\.?$", cell)) + + +def _normalize_flow_title(raw_title: str) -> str: + title = raw_title.lstrip("# ").strip() + match = re.match(r"(?P[A-Z]+-[0-9]+(?:\.[0-9]+)?)", title) + if match: + return match.group("code") + return title diff --git a/api/callcentersite/callcentersite/settings/development.py b/api/callcentersite/callcentersite/settings/development.py index c8f8e954..7c8b4774 100644 --- a/api/callcentersite/callcentersite/settings/development.py +++ b/api/callcentersite/callcentersite/settings/development.py @@ -18,16 +18,15 @@ INTERNAL_IPS = ["127.0.0.1"] -if os.getenv("USE_SQLITE_DEV", "false").lower() == "true": - sqlite_dir = BASE_DIR / "local_db" - sqlite_dir.mkdir(exist_ok=True) - DATABASES = { - "default": { - "ENGINE": "django.db.backends.sqlite3", - "NAME": str(sqlite_dir / "app.sqlite3"), - }, - "ivr_readonly": { - "ENGINE": "django.db.backends.sqlite3", - "NAME": str(sqlite_dir / "ivr_readonly.sqlite3"), - }, - } +sqlite_dir: Path = BASE_DIR / "local_db" +sqlite_dir.mkdir(exist_ok=True) +DATABASES = { + "default": { + "ENGINE": "django.db.backends.sqlite3", + "NAME": str(sqlite_dir / "app.sqlite3"), + }, + "ivr_readonly": { + "ENGINE": "django.db.backends.sqlite3", + "NAME": str(sqlite_dir / "ivr_readonly.sqlite3"), + }, +} diff --git a/api/callcentersite/requirements/base.txt b/api/callcentersite/requirements/base.txt index ba53b77d..e0aefcf7 100644 --- a/api/callcentersite/requirements/base.txt +++ b/api/callcentersite/requirements/base.txt @@ -37,6 +37,7 @@ drf-spectacular>=0.27.0 django-cors-headers>=4.4.0 pytz>=2024.1 python-dateutil>=2.9.0 +bcrypt>=5.0.0 # ============================================ # SCHEDULER @@ -54,6 +55,7 @@ reportlab>=4.0.0 # ============================================ pandas>=2.1.0 numpy>=1.26.0 +scikit-learn>=1.7.0 # ============================================ # PRODUCTION diff --git a/api/callcentersite/tests/conftest.py b/api/callcentersite/tests/conftest.py index 3b67a343..902f6b74 100644 --- a/api/callcentersite/tests/conftest.py +++ b/api/callcentersite/tests/conftest.py @@ -101,6 +101,15 @@ def safe_reset_in_memory_registry() -> None: if missing.split(".")[0] != "callcentersite": raise return + except Exception as exc: # pragma: no cover - defensive guard + try: + from django.core.exceptions import ImproperlyConfigured + except Exception: # pragma: no cover - fallback when Django is absent + ImproperlyConfigured = None # type: ignore[misc,assignment] + + if ImproperlyConfigured and isinstance(exc, ImproperlyConfigured): + return + raise reset_registry = getattr(models, "reset_registry", None) if callable(reset_registry): diff --git a/api/callcentersite/tests/docs/test_backend_use_cases.py b/api/callcentersite/tests/docs/test_backend_use_cases.py new file mode 100644 index 00000000..2504ff6c --- /dev/null +++ b/api/callcentersite/tests/docs/test_backend_use_cases.py @@ -0,0 +1,111 @@ +import pytest +from pathlib import Path + +from callcentersite.docs_backend_use_cases import ( + list_use_case_files, + parse_use_case, +) + + +BACKEND_DOCS = Path(__file__).resolve().parents[4] / "docs" / "backend" + + +@pytest.mark.unit +def test_list_use_cases_returns_all_ids(): + use_cases = list_use_case_files(BACKEND_DOCS) + ids = {uc.id for uc in use_cases} + expected_ids = { + "UC-PERM-001", + "UC-PERM-002", + "UC-PERM-003", + "UC-PERM-004", + "UC-PERM-005", + "UC-PERM-006", + "UC-PERM-007", + "UC-PERM-008", + "UC-PERM-010", + } + missing = expected_ids - ids + unexpected = ids - expected_ids + assert not missing, f"Missing use cases: {missing}" + assert not unexpected, f"Unexpected use cases found: {unexpected}" + + +@pytest.mark.unit +def test_parse_use_case_extracts_main_flow_steps(): + uc_file = BACKEND_DOCS / "UC-PERM-001_asignar_grupo_a_usuario.md" + use_case = parse_use_case(uc_file) + + assert use_case.id == "UC-PERM-001" + assert "Asignar Grupo de Permisos a Usuario" in use_case.name + main_steps = " ".join(use_case.main_flow.steps) + assert "Navega al módulo de gestión de usuarios" in main_steps + assert "Valida que el administrador tiene permiso" in main_steps + assert "Registra asignación en auditoría" in main_steps + + +@pytest.mark.unit +def test_parse_use_case_collects_alternate_and_exception_flows(): + uc_file = BACKEND_DOCS / "UC-PERM-001_asignar_grupo_a_usuario.md" + use_case = parse_use_case(uc_file) + + alt_titles = {flow.title for flow in use_case.alternate_flows} + exc_titles = {flow.title for flow in use_case.exception_flows} + + assert {"FA-1", "FA-2", "FA-3"}.issubset(alt_titles) + assert {"FE-1", "FE-2", "FE-3", "FE-4"}.issubset(exc_titles) + + alt_steps = " ".join(step for flow in use_case.alternate_flows for step in flow.steps) + exc_steps = " ".join(step for flow in use_case.exception_flows for step in flow.steps) + + assert "Sistema detecta duplicado" in alt_steps + assert "Sistema valida grupos" in exc_steps + assert "Sistema detecta error de BD" in exc_steps + + +@pytest.mark.unit +@pytest.mark.parametrize( + "filename, expected_phrases", + [ + ( + "UC-PERM-002_revocar_grupo_a_usuario.md", + [ + "Accede al módulo de gestión de usuarios", + "Muestra confirmación de éxito", + ], + ), + ( + "UC-PERM-003_conceder_permiso_excepcional.md", + [ + "Accede a módulo de permisos excepcionales", + "Crea registro en permisos_excepcionales", + ], + ), + ], +) +def test_parse_use_case_supports_numbered_sections(filename, expected_phrases): + uc_file = BACKEND_DOCS / filename + use_case = parse_use_case(uc_file) + + main_steps_text = " ".join(use_case.main_flow.steps) + + for phrase in expected_phrases: + assert phrase in main_steps_text, f"Phrase '{phrase}' missing in main flow" + + +@pytest.mark.unit +def test_numbered_sections_keep_alternate_and_exception_flows(): + uc_file = BACKEND_DOCS / "UC-PERM-002_revocar_grupo_a_usuario.md" + use_case = parse_use_case(uc_file) + + alt_titles = {flow.title for flow in use_case.alternate_flows} + exc_titles = {flow.title for flow in use_case.exception_flows} + + assert {"FA-002.1", "FA-002.2", "FA-002.3"}.issubset(alt_titles) + assert {"FE-002.1", "FE-002.2"}.issubset(exc_titles) + + alt_steps = " ".join(step for flow in use_case.alternate_flows for step in flow.steps) + exc_steps = " ".join(step for flow in use_case.exception_flows for step in flow.steps) + + assert "Sistema muestra error" in alt_steps + assert "HTTP 403 Forbidden" in exc_steps diff --git a/api/callcentersite/tests/settings/test_sqlite_fallback.py b/api/callcentersite/tests/settings/test_sqlite_fallback.py index c0f817b8..bf7d3450 100644 --- a/api/callcentersite/tests/settings/test_sqlite_fallback.py +++ b/api/callcentersite/tests/settings/test_sqlite_fallback.py @@ -1,22 +1,47 @@ -"""Verifica el fallback a SQLite documentado para entornos locales.""" +"""Verifica la configuración de SQLite para entornos locales.""" +import os import importlib import sys +from pathlib import Path +import django -def test_development_settings_activan_sqlite_con_bandera(monkeypatch) -> None: - """Cuando USE_SQLITE_DEV=true ambas bases usan SQLite en desarrollo.""" +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "callcentersite.settings.development") +django.setup() - monkeypatch.setenv("USE_SQLITE_DEV", "true") - if "callcentersite.settings.development" in list(sys.modules): - sys.modules.pop("callcentersite.settings.development") +def _reload_dev_settings(monkeypatch) -> object: + """Recarga el módulo de settings de desarrollo para aplicar cambios de entorno.""" - settings_dev = importlib.import_module("callcentersite.settings.development") + monkeypatch.setenv("DJANGO_SETTINGS_MODULE", "callcentersite.settings.development") + for module in ["callcentersite.settings.development", "callcentersite.settings.base"]: + sys.modules.pop(module, None) + return importlib.import_module("callcentersite.settings.development") + + +def test_development_settings_usar_sqlite_por_defecto(monkeypatch) -> None: + """El ambiente de desarrollo usa SQLite sin depender de variables de entorno.""" + + monkeypatch.delenv("USE_SQLITE_DEV", raising=False) + + settings_dev = _reload_dev_settings(monkeypatch) assert settings_dev.DATABASES["default"]["ENGINE"] == "django.db.backends.sqlite3" assert settings_dev.DATABASES["ivr_readonly"]["ENGINE"] == "django.db.backends.sqlite3" - assert settings_dev.DATABASES["default"]["NAME"].endswith("app.sqlite3") - assert settings_dev.DATABASES["ivr_readonly"]["NAME"].endswith("ivr_readonly.sqlite3") + + +def test_bases_sqlite_mantienen_nombres_diferenciados(monkeypatch) -> None: + """Los archivos de base de datos mantienen nombres claros para cada conexión.""" monkeypatch.delenv("USE_SQLITE_DEV", raising=False) + + settings_dev = _reload_dev_settings(monkeypatch) + + default_path = Path(settings_dev.DATABASES["default"]["NAME"]) + ivr_path = Path(settings_dev.DATABASES["ivr_readonly"]["NAME"]) + + assert default_path.name == "app.sqlite3" + assert ivr_path.name == "ivr_readonly.sqlite3" + assert default_path.parent == ivr_path.parent + assert default_path.parent.name == "local_db" diff --git a/api/callcentersite/tests/system/test_management_health.py b/api/callcentersite/tests/system/test_management_health.py new file mode 100644 index 00000000..d1905375 --- /dev/null +++ b/api/callcentersite/tests/system/test_management_health.py @@ -0,0 +1,12 @@ +"""Smoke tests for Django management health commands.""" + +from django.core.management import call_command +from django.test import SimpleTestCase + + +class ManagementHealthTests(SimpleTestCase): + """Ensure critical management commands run without raising errors.""" + + def test_system_check_succeeds(self) -> None: + """Running ``manage.py check`` should complete without issues.""" + call_command("check") diff --git a/api/callcentersite/tests/unit/test_related_names_conflicts.py b/api/callcentersite/tests/unit/test_related_names_conflicts.py new file mode 100644 index 00000000..7466703e --- /dev/null +++ b/api/callcentersite/tests/unit/test_related_names_conflicts.py @@ -0,0 +1,40 @@ +from django.apps import apps +from django.test import SimpleTestCase + + +class RelatedNamesConflictTest(SimpleTestCase): + def test_configuration_related_names_are_unique(self): + configuracion_field = apps.get_model( + "configuracion", "ConfiguracionSistema" + )._meta.get_field("modificado_por") + configuration_field = apps.get_model( + "configuration", "Configuracion" + )._meta.get_field("updated_by") + + related_names = { + configuracion_field.remote_field.related_name, + configuration_field.remote_field.related_name, + } + + self.assertEqual( + {"configuracion_sistema_modificaciones", "configuraciones_actualizadas"}, + related_names, + ) + + def test_permission_related_names_are_unique(self): + permissions_field = apps.get_model( + "permissions", "PermisoExcepcional" + )._meta.get_field("usuario") + users_field = apps.get_model("users", "PermisoExcepcional")._meta.get_field( + "usuario" + ) + + related_names = { + permissions_field.remote_field.related_name, + users_field.remote_field.related_name, + } + + self.assertEqual( + {"permisos_excepcionales_permissions", "permisos_excepcionales_granular"}, + related_names, + )