Arquitectura¶
Capas (Clean Architecture)¶
interfaces/ <- GUI (Streamlit 10 pages), API (FastAPI 11 endpoints)
│
application/ <- Use cases (8), DTOs, ports (8 Protocols), services, events, ServiceContainer
│
core/ <- Entities (7), Value Objects, Enums (5), Business Rules
│
infrastructure/ <- Repositorios (6), logging, security, config, notifications (3)
Dependency Rule¶
- core/ NO importa de ninguna otra capa
- application/ importa SOLO de core/
- infrastructure/ importa de core/ y application/ (implementa Protocols)
- interfaces/ importa de application/ (via ServiceContainer) y infrastructure/ (bootstrap)
Arquitectura Dual-Mode¶
Streamlit (GUI :8501) ──→ ServiceContainer ──→ Use Cases ──→ Ports ──→ PostgreSQL/SQLite
FastAPI (API :8000) ──→ ServiceContainer ──→ Use Cases ──→ Ports ──→ PostgreSQL
│
IAuditLogger ──→ audit_log (append-only, PG immutable)
IDispatcher ──→ Email (SMTP) / WhatsApp (Meta API)
DispatchWorker ──→ asyncio.Queue + retry policy
EventBus ──→ NotificacionCreada / EstadoCambiado
Streamlit llama use cases directamente (sin HTTP). FastAPI usa DI con Depends (session per-request con yield).
Principios¶
Dependency Inversion (DIP)¶
Use cases dependen de Protocols (ports), no de implementaciones:
class CrearNotificacionUseCase:
def __init__(self, *, notificacion_writer: INotificacionWriter, proceso_repository: IProcesoRepository):
...
Interface Segregation (ISP)¶
Notificacion tiene Reader y Writer separados:
class INotificacionReader(Protocol):
def find_by_id(self, id: UUID) -> Notificacion | None: ...
def list_by_proceso(self, id_proceso: UUID) -> list[Notificacion]: ...
def list_all(self) -> list[Notificacion]: ...
class INotificacionWriter(Protocol):
def save(self, notificacion: Notificacion) -> Notificacion: ...
def update(self, notificacion: Notificacion) -> Notificacion: ...
Result Pattern (ROP)¶
Errores de negocio via Result[T, str], no exceptions:
def execute(self, request: CrearNotificacionRequest) -> Result[CrearNotificacionResponse, str]:
proceso = self._proceso_repo.find_by_id(request.id_proceso)
if proceso is None:
return Failure(f"Proceso {request.id_proceso} no encontrado")
return Notificacion.crear(...).map(self._writer.save).map(lambda saved: CrearNotificacionResponse(...))
ServiceContainer (Composition Root)¶
container = ServiceContainer(
notificacion_reader=notif_repo, # ISP: reader separado
notificacion_writer=notif_repo, # ISP: writer separado
proceso_repository=proceso_repo,
sujeto_repository=sujeto_repo,
juzgado_repository=juzgado_repo,
audiencia_repository=audiencia_repo,
usuario_repository=usuario_repo,
password_hasher=hasher,
secret_key=settings.secret_key,
)
Properties publicas expuestas: crear_notificacion, cambiar_estado_notificacion, registrar_proceso, gestionar_sujeto, gestionar_audiencia, vincular_sujeto_proceso, consultar_directorio, generar_reporte, auth_service, authorization_service, audit_logger, notificacion_reader, proceso_repository, juzgado_repository, sujeto_repository, audiencia_repository.
Base de Datos¶
10 modelos ORM + 2 tablas M:N junction (juzgado_proceso, sujeto_proceso). ENUMs con native_enum=False (compatible SQLite + PostgreSQL). FKs con ondelete=CASCADE (proceso→notificacion) o RESTRICT (sujeto→notificacion). AuditLog con id int autoincrement (append-only, PG rules prevent DELETE/UPDATE). LoginAttemptModel: tracking de intentos fallidos con TTL temporal. Connection pooling: pool_size=5, max_overflow=10, pool_pre_ping=True (solo PG).
Patrones¶
- Service Container: DI lazy-loading con RLock (thread-safe Streamlit), 228 lineas
- Repository: Entity ↔ Model mapping en infrastructure/ (6 repos)
- Factory Method: Entity.crear() con Result pattern
- ISP: Reader/Writer separados para Notificacion
- Strategy: INotificacionDispatcher + DispatcherRegistry (Email, WhatsApp)
- Observer: EventBus in-process (subscribe/publish, frozen events)
- Unit of Work: IUnitOfWork Protocol + SqlUnitOfWork (session-per-request)
- RBAC: AuthorizationService + check_permission guard en GUI + JWT middleware en API