Architecture Overview
System Overview
AITHA IoT Infrastructure is a full industrial platform for IoT device management, designed for the Desel AITHA ecosystem. It implements a bi-directional data pipeline that provides:
- Data normalization before reaching the control platform
- Safe and reliable translation of RPC commands to devices
- Scalable processing (real-time and batch)
- Full auditability via InfluxDB, Kafka, and Airflow
- Modularity with replaceable components using open protocols
System Architecture
Full Flow Diagram
┌─────────────────────────────────────────────────────────────────────────┐
│ FLUJO 1: Device → Cloud (Telemetry Uplink) │
├─────────────────────────────────────────────────────────────────────────┤
│ 1. Device publishes: core2/<id>/<type>/<value>/get │
│ 2. Telegraf captures and processes: │
│ → Stores RAW in InfluxDB (all messages) │
│ → Sends only GET to Kafka (raw_data) for enrichment │
│ → Converts to ThingsBoard format and republishes to MQTT │
│ 3. ThingsBoard consumes the converted format │
└─────────────────────────────────────────────────────────────────────────┘
┌───────────── ────────────────────────────────────────────────────────────┐
│ FLUJO 2: Kafka Enrichment Loop (Stream Processing) │
├─────────────────────────────────────────────────────────────────────────┤
│ 1. Microservices consume raw_data from Kafka │
│ 2. Process/enrich data (AI, validation, fusion) │
│ 3. Publish processed_data to Kafka (enriched core2/* format) │
│ 4. Telegraf captures processed_data │
│ → Converts to ThingsBoard format │
│ → Republishes to MQTT for ThingsBoard │
└─────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────┐
│ FLUJO 3: Cloud → Device (RPC/Command Downlink) │
├─────────────────────────────────────────────────────────────────────────┤
│ 1. ThingsBoard sends a command (RPC) to MQTT │
│ 2. Telegraf Bridge intercepts the command │
│ 3. Translates to: core2/<id>/<type>/<value>/set │
│ 4. Publishes to MQTT │
│ 5. Device executes the command │
└──────────────────────────────────────────────────────────── ─────────────┘
Component Diagram (Mermaid)
Component Roles
1. IoT Devices (Edge)
Supported types: ESP32, STM32, Desel AITHA, generic sensors
Responsibilities:
- Publish telemetry via MQTT (topics
core2/.../get) - Receive and execute commands (topics
core2/.../set) - Maintain a persistent connection to Mosquitto
Message format:
Topic: core2/esp32_001/pin/gpio2/get/telemetry
Payload: "1" (or "true", "23.5", etc.)
2. MQTT Broker (Mosquitto)
Role: Central communication point between devices and the cloud
Key features:
- Optional TLS/SSL support (port 8883)
- Username/password authentication (note: mTLS is not enabled and there are currently no per-topic ACLs)
- QoS 1 to improve delivery guarantees
- Message persistence
Config: infrastructure/mosquitto/config/mosquitto.conf
3. Telegraf (Unified Normalizer)
The most critical component of the system - it plays multiple roles:
Input plugins
-
MQTT Consumer (Device Telemetry):
- Subscribes to:
core2/+/+/+/get/telemetryandcore2/+/+/+/get/attr - Captures device telemetry and attributes
- Subscribes to:
-
MQTT Consumer (Command Responses):
- Subscribes to:
core2/+/+/+/set - Captures responses from executed commands
- Subscribes to:
-
Kafka Consumer (Processed Data):
- Topic:
processed_data - Consumes enriched data from microservices
- Topic:
-
Execd (ThingsBoard Bridge):
- Ejecuta
thingsboard_bridge.py - Manages per-device MQTT connections to ThingsBoard
- Translates ThingsBoard RPC commands into device format
- Ejecuta
Processors
-
Starlark (Auto type conversion):
- Converts strings to bool/int/float based on content
- No hard failures - keeps the string if it cannot convert
-
Starlark (Rename measurement):
- Concatenates
type + "_" + value→pin_gpio2
- Concatenates
-
Date (Add timestamp):
- Ensures timestamps are in
unix_ms
- Ensures timestamps are in
-
Strings (device_id → device_name):
- ThingsBoard compatibility
Output plugins
-
InfluxDB v2:
- Stores EVERYTHING: telemetry, commands, enriched data
- Bucket:
core2_dev
-
Kafka:
- Topic:
raw_data - Only GET telemetry (for processing)
- Topic:
-
MQTT (Internal Bridge):
- Topic:
telegraf/bridge/telemetry - Consumed by
thingsboard_bridge.py
- Topic:
4. ThingsBoard Bridge (Python)
Archivo: ingestion/telegraf/thingsboard_bridge.py
Key responsibilities:
-
Connection management:
- One MQTT connection per device to ThingsBoard
- Authentication via access token (device_id)
- Automatic reconnect with backoff
-
Telemetry publishing:
- Listens on internal MQTT:
telegraf/bridge/telemetry - Publishes to ThingsBoard:
v1/devices/me/telemetry
- Listens on internal MQTT:
-
RPC handling:
- Subscribes to:
v1/devices/me/rpc/request/+ - Translates ThingsBoard commands to device format
- Publishes to Mosquitto:
core2/<device_id>/<key>/set
- Subscribes to:
-
Idle cleanup:
- Closes idle connections after 15 minutes (configurable)
RPC translation format:
// Input (ThingsBoard):
{
"method": "setValue",
"params": {
"key": "relay_relay1_start",
"value": true
}
}
// Output (MQTT to device):
Topic: core2/esp32_001/relay.relay1/start/set
Payload: "true"
5. InfluxDB (Time Series Database)
Role: Storage for all telemetry
Key features:
- Bucket:
core2_dev - Retention: 30 days (configurable)
- Organization:
unlix - API token for writes from Telegraf
5.1 Capa de Datos Centralizada (PostgreSQL & Redis)
Role: Relational storage, state, and cache for all applications (infrastructure single source of truth).
- Central PostgreSQL:
- A single
postgres-centralcontainer hosts multiple logical databases:thingsboard,zitadel,chatwoot,n8n,chat_db, etc. - Managed via
./infra.sh db (up|down|info|backup|import). - Provides atomic backups and memory optimization versus the old approach of per-service fragmented databases.
Redis Central:
- Handles cross-cutting sessions and cache (e.g. Chat UI, Workflows).
6. Kafka Cluster
Role: Message broker for stream processing
Componentes:
- Kafka Broker (port 9092)
- Zookeeper (coordination)
- Kafka UI (port 8089) - web UI
Topics:
raw_data: Raw data from Telegrafprocessed_data: Enriched data from microservices
Configuration:
- Replication factor: 1 (desarrollo)
- Retention: 168 hours (7 days)
7. Stream Processing Microservices
Location: processing/stream-processors/data-aggregator/
Technology: Node.js
Role:
- Consumir
raw_datade Kafka - Process/enrich data (validation, AI, sensor fusion)
- Publicar a
processed_data
Available processors:
GeolocationProcessor: Geolocation enrichment
Scalability: Hot reload with bind mounts for development
8. ThingsBoard (Platform)
Role: Central IoT management platform
Key features:
- Web UI for visualization and control
- Device and asset management
- Business rules (Rule Engine)
- Bi-directional RPC
- Customizable dashboards
Database: Centralized PostgreSQL
Kafka integration:
TB_QUEUE_TYPE: kafka
TB_KAFKA_SERVERS: kafka:9092
8.1 Business Platforms and AI Ecosystem
In addition to ThingsBoard, the current architecture runs a full hub of services that orbit the same centralized databases:
- Zitadel (Security): Identity provider and SSO via OIDC. Core system foundation; issues tokens for Portal Hub and Chat AI.
- Portal Hub: Next.js frontend that aggregates apps under a single Zitadel-authenticated UI.
- Chat Server & Chat UI (AI): FastAPI backend and dedicated frontend for industrial conversational assistance via RAG agents (using Qdrant as the vector store).
- Chatwoot: Customer support component, integrated and routed by the main orchestrator.
9. Grafana (Technical Dashboards)
Role: Advanced metrics visualization
Data Source: InfluxDB
Use cases:
- Engineering dashboards
- Device debugging
- Historical analysis
10. Airflow (Batch Processing)
Role: Batch job orchestration
DAGs disponibles:
anomaly_detection.py: Anomaly detection with ML
Use cases:
- Reprocessing historical data
- Training ML models
- Batch cleanup and aggregation
Security and Identity (Zitadel)
Zitadel as the Base Layer
The entire platform uses ZITADEL as the primary identity provider (OIDC).
- Secures access in the Portal Hub.
- Validates JWT tokens at the API level in the Chat AI Server.
MQTT authentication
- Username/password configurable in
.env - Optional: TLS/SSL on port 8883 (currently lacking per-device restrictive ACLs, which is a known risk mitigable with mTLS).
ThingsBoard
- Per-device access tokens for publishing.
- Credentials and data live encrypted in Central PostgreSQL.
Known risks and audits
- Docker socket in Chat Server: Historically, risks were identified due to tools interacting directly with the Docker host. Any direct exposure or direct MCP deploy from chat was flagged in audits (2026) to be migrated to satellite containers without
docker.sock. - Secrets: Rotation of variables, PATs, and external API secrets must be handled strictly via
.env.secretskept out of version control.
Scalability
Horizontal Scaling
- Telegraf: Multiple instances with different topics
- Kafka: Add more brokers (increase replication factor)
- Microservices: Scale containers with Docker Swarm/Kubernetes
- ThingsBoard: Cluster mode with multiple nodes
Vertical Scaling
- Increase InfluxDB resources for complex queries
- More memory for Kafka (longer retention)
Observability
Logs
Each service has its own logs directory:
infrastructure/mosquitto/logs/ingestion/telegraf/logs/platform/thingsboard/data/logs/processing/airflow/logs/
Metrics
- InfluxDB stores all system metrics
- Grafana for visualization
- Kafka UI for topic monitoring
Health Checks
All services have health checks defined in docker-compose
Fault Tolerance
MQTT
- Persistent connections
- QoS 1 improves delivery guarantees
- Mosquitto keeps sessions
Kafka
- Automatic offset management
- Message replay is possible
- Configurable retention
ThingsBoard
- PostgreSQL with recommended backups
- Kafka queue type enables resiliency
Advanced Configuration
Telegraf Tuning
Variables in .env:
TELEGRAF_AGENT_INTERVAL=5s # Collection frequency
TELEGRAF_AGENT_METRIC_BATCH_SIZE=50 # Batch size
TELEGRAF_AGENT_FLUSH_INTERVAL=5s # Flush frequency
Kafka Tuning
KAFKA_LOG_RETENTION_HOURS=168 # Retention in hours
KAFKA_LOG_RETENTION_BYTES=1073741824 # Retention in bytes
ThingsBoard Tuning
THINGSBOARD_DEFAULT_INACTIVITY_TIMEOUT=120 # Idle timeout
THINGSBOARD_DEFAULT_STATE_CHECK_INTERVAL=60 # State check interval
THINGSBOARD_TELEMETRY_TTL=2592000 # Telemetry TTL
References
Next Step
See cli.md to learn how to operate the system.