Skip to main content

Architecture Overview

System Overview

AITHA IoT Infrastructure is a full industrial platform for IoT device management, designed for the Desel AITHA ecosystem. It implements a bi-directional data pipeline that provides:

  • Data normalization before reaching the control platform
  • Safe and reliable translation of RPC commands to devices
  • Scalable processing (real-time and batch)
  • Full auditability via InfluxDB, Kafka, and Airflow
  • Modularity with replaceable components using open protocols

System Architecture

Full Flow Diagram

┌─────────────────────────────────────────────────────────────────────────┐
│ FLUJO 1: Device → Cloud (Telemetry Uplink) │
├─────────────────────────────────────────────────────────────────────────┤
│ 1. Device publishes: core2/<id>/<type>/<value>/get │
│ 2. Telegraf captures and processes: │
│ → Stores RAW in InfluxDB (all messages) │
│ → Sends only GET to Kafka (raw_data) for enrichment │
│ → Converts to ThingsBoard format and republishes to MQTT │
│ 3. ThingsBoard consumes the converted format │
└─────────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────────┐
│ FLUJO 2: Kafka Enrichment Loop (Stream Processing) │
├─────────────────────────────────────────────────────────────────────────┤
│ 1. Microservices consume raw_data from Kafka │
│ 2. Process/enrich data (AI, validation, fusion) │
│ 3. Publish processed_data to Kafka (enriched core2/* format) │
│ 4. Telegraf captures processed_data │
│ → Converts to ThingsBoard format │
│ → Republishes to MQTT for ThingsBoard │
└─────────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────────┐
│ FLUJO 3: Cloud → Device (RPC/Command Downlink) │
├─────────────────────────────────────────────────────────────────────────┤
│ 1. ThingsBoard sends a command (RPC) to MQTT │
│ 2. Telegraf Bridge intercepts the command │
│ 3. Translates to: core2/<id>/<type>/<value>/set │
│ 4. Publishes to MQTT │
│ 5. Device executes the command │
└─────────────────────────────────────────────────────────────────────────┘

Component Diagram (Mermaid)

Component Roles

1. IoT Devices (Edge)

Supported types: ESP32, STM32, Desel AITHA, generic sensors

Responsibilities:

  • Publish telemetry via MQTT (topics core2/.../get)
  • Receive and execute commands (topics core2/.../set)
  • Maintain a persistent connection to Mosquitto

Message format:

Topic: core2/esp32_001/pin/gpio2/get/telemetry
Payload: "1" (or "true", "23.5", etc.)

2. MQTT Broker (Mosquitto)

Role: Central communication point between devices and the cloud

Key features:

  • Optional TLS/SSL support (port 8883)
  • Username/password authentication (note: mTLS is not enabled and there are currently no per-topic ACLs)
  • QoS 1 to improve delivery guarantees
  • Message persistence

Config: infrastructure/mosquitto/config/mosquitto.conf

3. Telegraf (Unified Normalizer)

The most critical component of the system - it plays multiple roles:

Input plugins

  1. MQTT Consumer (Device Telemetry):

    • Subscribes to: core2/+/+/+/get/telemetry and core2/+/+/+/get/attr
    • Captures device telemetry and attributes
  2. MQTT Consumer (Command Responses):

    • Subscribes to: core2/+/+/+/set
    • Captures responses from executed commands
  3. Kafka Consumer (Processed Data):

    • Topic: processed_data
    • Consumes enriched data from microservices
  4. Execd (ThingsBoard Bridge):

    • Ejecuta thingsboard_bridge.py
    • Manages per-device MQTT connections to ThingsBoard
    • Translates ThingsBoard RPC commands into device format

Processors

  1. Starlark (Auto type conversion):

    • Converts strings to bool/int/float based on content
    • No hard failures - keeps the string if it cannot convert
  2. Starlark (Rename measurement):

    • Concatenates type + "_" + valuepin_gpio2
  3. Date (Add timestamp):

    • Ensures timestamps are in unix_ms
  4. Strings (device_id → device_name):

    • ThingsBoard compatibility

Output plugins

  1. InfluxDB v2:

    • Stores EVERYTHING: telemetry, commands, enriched data
    • Bucket: core2_dev
  2. Kafka:

    • Topic: raw_data
    • Only GET telemetry (for processing)
  3. MQTT (Internal Bridge):

    • Topic: telegraf/bridge/telemetry
    • Consumed by thingsboard_bridge.py

4. ThingsBoard Bridge (Python)

Archivo: ingestion/telegraf/thingsboard_bridge.py

Key responsibilities:

  1. Connection management:

    • One MQTT connection per device to ThingsBoard
    • Authentication via access token (device_id)
    • Automatic reconnect with backoff
  2. Telemetry publishing:

    • Listens on internal MQTT: telegraf/bridge/telemetry
    • Publishes to ThingsBoard: v1/devices/me/telemetry
  3. RPC handling:

    • Subscribes to: v1/devices/me/rpc/request/+
    • Translates ThingsBoard commands to device format
    • Publishes to Mosquitto: core2/<device_id>/<key>/set
  4. Idle cleanup:

    • Closes idle connections after 15 minutes (configurable)

RPC translation format:

// Input (ThingsBoard):
{
"method": "setValue",
"params": {
"key": "relay_relay1_start",
"value": true
}
}

// Output (MQTT to device):
Topic: core2/esp32_001/relay.relay1/start/set
Payload: "true"

5. InfluxDB (Time Series Database)

Role: Storage for all telemetry

Key features:

  • Bucket: core2_dev
  • Retention: 30 days (configurable)
  • Organization: unlix
  • API token for writes from Telegraf

5.1 Capa de Datos Centralizada (PostgreSQL & Redis)

Role: Relational storage, state, and cache for all applications (infrastructure single source of truth).

  • Central PostgreSQL:
  • A single postgres-central container hosts multiple logical databases: thingsboard, zitadel, chatwoot, n8n, chat_db, etc.
  • Managed via ./infra.sh db (up|down|info|backup|import).
  • Provides atomic backups and memory optimization versus the old approach of per-service fragmented databases.

Redis Central:

  • Handles cross-cutting sessions and cache (e.g. Chat UI, Workflows).

6. Kafka Cluster

Role: Message broker for stream processing

Componentes:

  • Kafka Broker (port 9092)
  • Zookeeper (coordination)
  • Kafka UI (port 8089) - web UI

Topics:

  • raw_data: Raw data from Telegraf
  • processed_data: Enriched data from microservices

Configuration:

  • Replication factor: 1 (desarrollo)
  • Retention: 168 hours (7 days)

7. Stream Processing Microservices

Location: processing/stream-processors/data-aggregator/

Technology: Node.js

Role:

  • Consumir raw_data de Kafka
  • Process/enrich data (validation, AI, sensor fusion)
  • Publicar a processed_data

Available processors:

  • GeolocationProcessor: Geolocation enrichment

Scalability: Hot reload with bind mounts for development

8. ThingsBoard (Platform)

Role: Central IoT management platform

Key features:

  • Web UI for visualization and control
  • Device and asset management
  • Business rules (Rule Engine)
  • Bi-directional RPC
  • Customizable dashboards

Database: Centralized PostgreSQL

Kafka integration:

TB_QUEUE_TYPE: kafka
TB_KAFKA_SERVERS: kafka:9092

8.1 Business Platforms and AI Ecosystem

In addition to ThingsBoard, the current architecture runs a full hub of services that orbit the same centralized databases:

  • Zitadel (Security): Identity provider and SSO via OIDC. Core system foundation; issues tokens for Portal Hub and Chat AI.
  • Portal Hub: Next.js frontend that aggregates apps under a single Zitadel-authenticated UI.
  • Chat Server & Chat UI (AI): FastAPI backend and dedicated frontend for industrial conversational assistance via RAG agents (using Qdrant as the vector store).
  • Chatwoot: Customer support component, integrated and routed by the main orchestrator.

9. Grafana (Technical Dashboards)

Role: Advanced metrics visualization

Data Source: InfluxDB

Use cases:

  • Engineering dashboards
  • Device debugging
  • Historical analysis

10. Airflow (Batch Processing)

Role: Batch job orchestration

DAGs disponibles:

  • anomaly_detection.py: Anomaly detection with ML

Use cases:

  • Reprocessing historical data
  • Training ML models
  • Batch cleanup and aggregation

Security and Identity (Zitadel)

Zitadel as the Base Layer

The entire platform uses ZITADEL as the primary identity provider (OIDC).

  • Secures access in the Portal Hub.
  • Validates JWT tokens at the API level in the Chat AI Server.

MQTT authentication

  • Username/password configurable in .env
  • Optional: TLS/SSL on port 8883 (currently lacking per-device restrictive ACLs, which is a known risk mitigable with mTLS).

ThingsBoard

  • Per-device access tokens for publishing.
  • Credentials and data live encrypted in Central PostgreSQL.

Known risks and audits

  • Docker socket in Chat Server: Historically, risks were identified due to tools interacting directly with the Docker host. Any direct exposure or direct MCP deploy from chat was flagged in audits (2026) to be migrated to satellite containers without docker.sock.
  • Secrets: Rotation of variables, PATs, and external API secrets must be handled strictly via .env.secrets kept out of version control.

Scalability

Horizontal Scaling

  • Telegraf: Multiple instances with different topics
  • Kafka: Add more brokers (increase replication factor)
  • Microservices: Scale containers with Docker Swarm/Kubernetes
  • ThingsBoard: Cluster mode with multiple nodes

Vertical Scaling

  • Increase InfluxDB resources for complex queries
  • More memory for Kafka (longer retention)

Observability

Logs

Each service has its own logs directory:

  • infrastructure/mosquitto/logs/
  • ingestion/telegraf/logs/
  • platform/thingsboard/data/logs/
  • processing/airflow/logs/

Metrics

  • InfluxDB stores all system metrics
  • Grafana for visualization
  • Kafka UI for topic monitoring

Health Checks

All services have health checks defined in docker-compose

Fault Tolerance

MQTT

  • Persistent connections
  • QoS 1 improves delivery guarantees
  • Mosquitto keeps sessions

Kafka

  • Automatic offset management
  • Message replay is possible
  • Configurable retention

ThingsBoard

  • PostgreSQL with recommended backups
  • Kafka queue type enables resiliency

Advanced Configuration

Telegraf Tuning

Variables in .env:

TELEGRAF_AGENT_INTERVAL=5s          # Collection frequency
TELEGRAF_AGENT_METRIC_BATCH_SIZE=50 # Batch size
TELEGRAF_AGENT_FLUSH_INTERVAL=5s # Flush frequency

Kafka Tuning

KAFKA_LOG_RETENTION_HOURS=168        # Retention in hours
KAFKA_LOG_RETENTION_BYTES=1073741824 # Retention in bytes

ThingsBoard Tuning

THINGSBOARD_DEFAULT_INACTIVITY_TIMEOUT=120   # Idle timeout
THINGSBOARD_DEFAULT_STATE_CHECK_INTERVAL=60 # State check interval
THINGSBOARD_TELEMETRY_TTL=2592000 # Telemetry TTL

References

Next Step

See cli.md to learn how to operate the system.