Saltar a contenido

Arquitectura

Capas (Clean Architecture)

interfaces/       <- GUI (Streamlit 10 pages), API (FastAPI 11 endpoints)
application/      <- Use cases (8), DTOs, ports (8 Protocols), services, events, ServiceContainer
core/             <- Entities (7), Value Objects, Enums (5), Business Rules
infrastructure/   <- Repositorios (6), logging, security, config, notifications (3)

Dependency Rule

  • core/ NO importa de ninguna otra capa
  • application/ importa SOLO de core/
  • infrastructure/ importa de core/ y application/ (implementa Protocols)
  • interfaces/ importa de application/ (via ServiceContainer) y infrastructure/ (bootstrap)

Arquitectura Dual-Mode

Streamlit (GUI :8501) ──→ ServiceContainer ──→ Use Cases ──→ Ports ──→ PostgreSQL/SQLite
FastAPI (API :8000)   ──→ ServiceContainer ──→ Use Cases ──→ Ports ──→ PostgreSQL
                              IAuditLogger ──→ audit_log (append-only, PG immutable)
                              IDispatcher  ──→ Email (SMTP) / WhatsApp (Meta API)
                              DispatchWorker ──→ asyncio.Queue + retry policy
                              EventBus ──→ NotificacionCreada / EstadoCambiado

Streamlit llama use cases directamente (sin HTTP). FastAPI usa DI con Depends (session per-request con yield).

Principios

Dependency Inversion (DIP)

Use cases dependen de Protocols (ports), no de implementaciones:

class CrearNotificacionUseCase:
    def __init__(self, *, notificacion_writer: INotificacionWriter, proceso_repository: IProcesoRepository):
        ...

Interface Segregation (ISP)

Notificacion tiene Reader y Writer separados:

class INotificacionReader(Protocol):
    def find_by_id(self, id: UUID) -> Notificacion | None: ...
    def list_by_proceso(self, id_proceso: UUID) -> list[Notificacion]: ...
    def list_all(self) -> list[Notificacion]: ...

class INotificacionWriter(Protocol):
    def save(self, notificacion: Notificacion) -> Notificacion: ...
    def update(self, notificacion: Notificacion) -> Notificacion: ...

Result Pattern (ROP)

Errores de negocio via Result[T, str], no exceptions:

def execute(self, request: CrearNotificacionRequest) -> Result[CrearNotificacionResponse, str]:
    proceso = self._proceso_repo.find_by_id(request.id_proceso)
    if proceso is None:
        return Failure(f"Proceso {request.id_proceso} no encontrado")
    return Notificacion.crear(...).map(self._writer.save).map(lambda saved: CrearNotificacionResponse(...))

ServiceContainer (Composition Root)

container = ServiceContainer(
    notificacion_reader=notif_repo,      # ISP: reader separado
    notificacion_writer=notif_repo,      # ISP: writer separado
    proceso_repository=proceso_repo,
    sujeto_repository=sujeto_repo,
    juzgado_repository=juzgado_repo,
    audiencia_repository=audiencia_repo,
    usuario_repository=usuario_repo,
    password_hasher=hasher,
    secret_key=settings.secret_key,
)

Properties publicas expuestas: crear_notificacion, cambiar_estado_notificacion, registrar_proceso, gestionar_sujeto, gestionar_audiencia, vincular_sujeto_proceso, consultar_directorio, generar_reporte, auth_service, authorization_service, audit_logger, notificacion_reader, proceso_repository, juzgado_repository, sujeto_repository, audiencia_repository.

Base de Datos

10 modelos ORM + 2 tablas M:N junction (juzgado_proceso, sujeto_proceso). ENUMs con native_enum=False (compatible SQLite + PostgreSQL). FKs con ondelete=CASCADE (proceso→notificacion) o RESTRICT (sujeto→notificacion). AuditLog con id int autoincrement (append-only, PG rules prevent DELETE/UPDATE). LoginAttemptModel: tracking de intentos fallidos con TTL temporal. Connection pooling: pool_size=5, max_overflow=10, pool_pre_ping=True (solo PG).

Patrones

  • Service Container: DI lazy-loading con RLock (thread-safe Streamlit), 228 lineas
  • Repository: Entity ↔ Model mapping en infrastructure/ (6 repos)
  • Factory Method: Entity.crear() con Result pattern
  • ISP: Reader/Writer separados para Notificacion
  • Strategy: INotificacionDispatcher + DispatcherRegistry (Email, WhatsApp)
  • Observer: EventBus in-process (subscribe/publish, frozen events)
  • Unit of Work: IUnitOfWork Protocol + SqlUnitOfWork (session-per-request)
  • RBAC: AuthorizationService + check_permission guard en GUI + JWT middleware en API

Diagrama de Estado: Notificacion

PENDIENTE → {EN_PROCESO, CANCELADA}
EN_PROCESO → {ENVIADA, FALLIDA, CANCELADA}
ENVIADA → {RECIBIDA, FALLIDA}
RECIBIDA → {} (terminal)
FALLIDA → {EN_PROCESO, CANCELADA} (reintento o cancelar)
CANCELADA → {} (terminal)