Zeus-IoT 架构一览

  • 开源版架构图

../_images/zeus_iot_arch.jpg

注解

  • 开源版本 数据采集只需要部署 Zabbix Server + IoT Server , 数据库: Postgresql 12 + , TDengine 2.2.0 + 。

  • IoT Server 需要开启 server-transfer 模块 并且 storage 模块配置 tdengine , Zabbix Server conf 文件配置 ExportPath , 实时读取文件同时写入 TDengine 。

  • 适用环境:小规模环境,几十到几百台设备不等接入,且所有设备都在一个网络可达环境内。

  • 专业版架构图

../_images/zeus_iot_arch_pro.jpg

注解

  • 专业版本 数据采集除了开源版本的类似部署外,还可以支持多个 Zabbix Proxy + IoT Server 采集部署,Zabbix Proxy 依赖 Postgresql 12 + 或者 SQLite。

  • IoT Server 作为 Zabbix Proxy 协议解析处理服务时,不需要开启 server-transfer 模块, storage 模块配置 none , 其他只需要打开对应的协议接入模块即可。

  • 适用环境:大规模环境,几百到几十万台设备不等,分布式采集,不同的设备可以分布在不同的区域,城市。

  1. 不同的 Proxy 下面的 IoT Server 可以有完全不同的协议适配,也可以定义不同的端口(因为是多机部署)。

  2. 不同的 租户 会 绑定不同的 Proxy , 租户独享 Proxy , IoT Server 源码开放给租户,自己实现协议对接就可以,权限在平台层 Zeus Webapp 控制。

  3. 一个 Zabbix Server 可以同时对接一万多个 Proxy (官方数据 15000+,Server 裸金属部署,高配)。

  1. Zabbix Webapp (UI)

  • 技术栈:SpringBoot 2 + Ebean (DAO) + Vue Element (样式重构)。

  • 主要实现了 物联网基础平台 的各类配置,包括但不限于:产品、设备、告警、场景联动、大屏报表(拖拽大屏基于 Grafana )等。

  • 数据库:和 Zabbix 共用一个 Pg 实例,database name 为 zeus-iot , 如果熟悉 TimescaleDB 的同学也可以使用,Zabbix 原生支持,具备更高的查询和吞吐性能。

  1. Zeus IoT Server

  • 技术栈:Apache Camel 2 + Apache Skywalking Modular Design

  • 结合 Apache Camel 的组件生态,实现各类协议的接入处理,包括但不限于 Http,Mqtt( 依赖 Emqx 作为 broker ),OPC UA,Coap,Tcp/Udp,Modbus 等。

  • Zabbix Server 实时数据文件 ndjson 的实时读取,可以扩展作二次分发:推送到 Kafka 等。

项目源码 Maven 结构说明:

zeus-iot
└── iot-common -- 公共库
     └── iot-datacarrier -- 数据传输队列,数据缓冲
     └── iot-eventbus -- EventBus 数据总线,设备指令下发
     └── iot-util -- 基础工具类
└── iot-server -- Zeus IoT Server
     ├── server-action -- 触发动作处理,比如:执行设备服务,指令下发
     ├── server-bootstrap -- 启动模块,集成了所有的模块
     └── server-collector-plugin -- 主动采集(专业版)
            └── modbus-collector-plugin -- 基于Zabbix Agent 协议的主动采集 Modbus 实现
     ├── server-core -- Core 模块
     ├── server-health-checker -- 健康检查,读取本地监控指标
     └── server-library -- 模块化基础库
            ├── library-client
            ├── library-module
            ├── library-server
            └── library-util
     └── server-receiver-plugin -- 设备数据接收、协议解析模块
            ├── coap-receiver-plugin -- Coap 协议
            ├── http-receiver-plugin -- Http 协议
            ├── mqtt-receiver-plugin -- Mqtt Client
            ├── opc-ua-receiver-plugin -- OPC-UA 协议
            ├── tcp-receiver-plugin -- TCP 协议
            └── udp-receiver-plugin -- UDP 协议
     ├── server-sender -- 数据发送到 Zabbix 协议模块
     ├── server-starter -- IoT Server 启动模块,main 方法
     └── server-storage-plugin -- 存储模块
            ├── server-influxdb-plugin -- InfluxDB 存储实现
            ├── server-none-storage -- None 空实现,由于模块依赖关系,storage 不能为 - ,所以定义空实现,none
            └── server-tdengine-plugin -- TDEngine 存储实现
     └── server-telemetry -- 遥测采集(自监控)
            ├── telemetry-api -- 遥测API
            └── telemetry-prometheus -- 遥测 普罗米修斯 上报
     └── server-transfer -- 本地 ndjson 文件实时读取,进 datacarrier 队列,再入 storage 模块
├── iot-server-bom -- IoT Server 根pom
└── zeus-application-toolkit -- Zeus Webapp 工具类
     └── toolkit-async -- 异步任务编排执行框架
     └── toolkit-eventbus -- webapp 事件触发总线
├── zeus-common -- 公共基础类
├── zeus-core -- 核心基础类
├── zeus-driver -- Zabbix Api 驱动模块
├── zmops-zeus-iot -- 打包模块,定义了 mvn package 结构
├── zeus-iot-ui -- 项目子模块,UI 模块会被打包成 静态资源文件,打包到 SpringBoot Resource 目录下并且发布
├── zeus-message -- webapp 消息推送模块
├── zeus-rest -- webapp rest 对外接口模块
├── zeus-starter -- webapp 启动模块,main 方法
├── zeus-webapp -- webapp 业务模块,包含 所有的业务实现
└── zeus-webapp-bom -- webapp 父pom
  1. Apache Camel 协议接入示例,一行代码启动

/**
 *  Http Server 可以一个 IoT Server 启动多个,只要端口不要冲突就可以
 */
@Override
public void configure() throws Exception {
    fromF("netty4-http:http://0.0.0.0:%d/data/receiver?sync=true", config.getPort())
            .threads(10)
            .choice()
            .when(new HeaderPredicate()) // 条件判断
            .process(new JsonToItemValueProcess()) // 格式处理
            .to("Zabbix");
}
/**
 *  Mqtt Client 可以一个 IoT Server 启动多个
 */
@Override
public void configure() throws Exception {
    fromF("mqtt:zeus-iot-mqtt?host=tcp://%s:%d&connectWaitInSeconds=30&subscribeTopicNames=+/up",
            mqttReceiverConfig.getHost(),
            mqttReceiverConfig.getPort())
            .process(new MqttDemo01Process()) // 协议格式解析
            .to("Zabbix");
}
/**
 *  Tcp Server 可以一个 IoT Server 启动多个,只要端口不要冲突就可以
 */
@Override
public void configure() throws Exception {
    fromF("netty4:tcp://0.0.0.0:%d", tcpReceiverConfig.getPort())
            .process(new StringItemValueProcess())
            .to("Zabbix");
}
  1. Zabbix Server 数据文件格式

{"host":{"host":"Zabbix server3","name":"Zabbix server3"},"groups":["Zabbix servers"],"item_tags":[{"tag":"Application","value":"Zabbix server"}],"itemid":37127,"name":"Zabbix server: Number of processed not supported values per second","clock":1633695167,"ns":119435806,"value":0.932813,"type":0}

{"host":{"host":"Zabbix server","name":"Zabbix server"},"groups":["Zabbix servers"],"item_tags":[{"tag":"Application","value":"Zabbix server"}],"itemid":25667,"name":"Zabbix server: Utilization of preprocessing manager internal processes, in %","clock":1633695167,"ns":119546632,"value":0.016920,"type":0}

{"host":{"host":"Zabbix server1","name":"Zabbix server1"},"groups":["Zabbix servers"],"item_tags":[{"tag":"Application","value":"Zabbix server"}],"itemid":36887,"name":"Zabbix server: Number of processed numeric (unsigned) values per second","clock":1633695167,"ns":119733183,"value":1.865619,"type":0}

字段解释,以及和 物联网平台 业务定义的对应关系:

  • host:主机信息 -> 设备

  • groups:主机组 -> 设备组

  • item_tags:监控项标签 -> 属性标签

  • itemid:监控项ID -> 属性 zbxid

  • name:主机名 -> 设备ID

  • clock:时间戳,秒 -> 设备采样时间: clock + ns 前三位,精确到 毫秒

  • ns:时间戳,纳秒

  • value:实际值

  • type:value数据类型 -> 0: 浮点数,3:整形,1、2、3 分别对应:字符、日志、文本

  1. 模块化架构的设计,IoT Server 配置文件: iot-server\server-bootstrap\src\main\resources\application.yml

# Zabbix Trapper Sender
# Zabbix Trapper 模块,数据发送到 Zabbix Server
zabbix-sender:
  selector: ${ZS_RECEIVER_ZABBIX_SENDER:default}
  default:
    port: ${ZS_RECEIVER_ZABBIX_PORT:10051}
    host: ${ZS_RECEIVER_ZABBIX_HOST:127.0.0.1}

# Http Server 模块,可以添加 Http 模块,启动多个 Http Server 服务
receiver-http:
  selector: ${ZS_RECEIVER_HTTP:-}
  default:
    port: ${ZS_RECEIVER_HTTP_PORT:9010}
    host: ${ZS_RECEIVER_HTTP_HOST:0.0.0.0}

# TCP Server 模块,可以添加 TCP 模块,启动多个 TCP Server 服务
receiver-tcp:
  selector: ${ZS_RECEIVER_TCP:-}
  default:
    port: ${ZS_RECEIVER_TCP_PORT:9020}
    host: ${ZS_RECEIVER_TCP_HOST:0.0.0.0}

#  MQTT Client 模块,可以启动多个 Client
receiver-mqtt:
  selector: ${ZS_RECEIVER_MQTT:-}
  default:
    port: ${ZS_RECEIVER_MQTT_PORT:1883}
    host: ${ZS_RECEIVER_MQTT_HOST:127.0.0.1}

#  ZBX AGENT 模块,可以启动多个 Agent
collector-modbus:
  selector: ${ZS_COLLECT_MODBUS:-}
  default:
    port: ${ZS_COLLECT_MODBUS_PORT:10050}
    host: ${ZS_COLLECT_MODBUS_HOST:0.0.0.0}

# core module config 模块
core:
  selector: ${ZS_CORE:default}
  default:
    prepareThreads: ${ZS_CORE_PREPARE_THREADS:2}
    restHost: ${ZS_CORE_REST_HOST:0.0.0.0}
    restPort: ${ZS_CORE_REST_PORT:12800}
    restContextPath: ${ZS_CORE_REST_CONTEXT_PATH:/}
    restMinThreads: ${ZS_CORE_REST_JETTY_MIN_THREADS:1}
    restMaxThreads: ${ZS_CORE_REST_JETTY_MAX_THREADS:200}
    restIdleTimeOut: ${ZS_CORE_REST_JETTY_IDLE_TIMEOUT:30000}
    restAcceptorPriorityDelta: ${ZS_CORE_REST_JETTY_DELTA:0}
    restAcceptQueueSize: ${ZS_CORE_REST_JETTY_QUEUE_SIZE:0}
    httpMaxRequestHeaderSize: ${ZS_CORE_HTTP_MAX_REQUEST_HEADER_SIZE:8192}
    zabbixConfigPath: ${ZS_CORE_ZABBIX_CONFIG_PATH:/etc/zabbix/zabbix_server.conf}

# ndjson file read realtime 模块
server-transfer:
  selector: ${ZS_SERVER_TRANSFER:-}
  default:
    name: ${ZS_TRANSFER_NAME:zeus-transfer}
    pattern: ${ZS_TRANSFER_PATTERN:/home/data/history-history-syncer-[0-9]{1}.ndjson}
    fileMaxWait: ${ZS_TRANSFER_FILE_MAXWAIT:30}

# tdengine storage realtime 模块
storage:
  selector: ${ZS_STORAGE:none}
  none:
  tdengine:
    url: ${ZS_STORAGE_TDENGINE_URL:jdbc:TAOS://127.0.0.1:6030/zeus_data} # TDengine jdbcUrl
    user: ${ZS_STORAGE_TDENGINE_USER:root}
    password: ${ZS_STORAGE_TDENGINE_PASSWORD:taosdata}
  influxdb: # 专业版
    url: ${ZS_STORAGE_INFLUXDB_URL:http://127.0.0.1:8086} # influxdb jdbcUrl
    database: zeus_data
    user: ${ZS_STORAGE_INFLUXDB_USER:root}
    password: ${ZS_STORAGE_INFLUXDB_PASSWORD:}
    actions: ${ZS_STORAGE_INFLUXDB_ACTIONS:1000}
    duration: ${ZS_STORAGE_INFLUXDB_DURATION:1000}
    batchEnabled: ${ZS_STORAGE_INFLUXDB_BATCH_ENABLED:true}
    fetchTaskLogMaxSize: ${ZS_STORAGE_INFLUXDB_FETCH_TASK_LOG_MAX_SIZE:5000}
    connectionResponseFormat: ${ZS_STORAGE_INFLUXDB_CONNECTION_RESPONSE_FORMAT:MSGPACK}

# health check 模块
health-checker:
  selector: ${ZS_HEALTH_CHECKER:default}
  default:
    checkIntervalSeconds: ${ZS_HEALTH_CHECKER_INTERVAL_SECONDS:5}

# health telemetry 模块
telemetry:
  selector: ${ZS_TELEMETRY:prometheus}
  none:
  prometheus:
    host: ${ZS_TELEMETRY_PROMETHEUS_HOST:0.0.0.0}
    port: ${ZS_TELEMETRY_PROMETHEUS_PORT:1234}
    sslEnabled: ${ZS_TELEMETRY_PROMETHEUS_SSL_ENABLED:false}
    sslKeyPath: ${ZS_TELEMETRY_PROMETHEUS_SSL_KEY_PATH:""}
    sslCertChainPath: ${ZS_TELEMETRY_PROMETHEUS_SSL_CERT_CHAIN_PATH:""}

注解

  • 模块名 selector 根据下面定义的 option 进行选择,如果不启用该模块,则为 -

  • zabbix-sender:Zabbix Server 数据发送模块, TCP 协议批量发送到 Zabbix Server,必须启用:default

  • receiver-http:如果开启,会在本地启动 Http Server 服务,端口默认 9010

  • receiver-tcp:如果开启,会在本地启动 Tcp Server 服务,端口默认 9020

  • receiver-mqtt:如果开启,会在本地启动 Mqtt Client,订阅 Emqx Mqtt Topic

  • collector-modbus: 如果开启,会启动modbus模块,可以配合 Agent 主动采集的方式,问设备要数据(专业版功能)

  • core: 核心模块,必须开启

  • server-transfer: 数据传输模块,负责实时读取 ndjson 文件,传入 datacarrier 队列,写入 storage 模块

  • storage:存储模块,可以有多种实现,开源版支持 TDengine。selector 设置为 influxdb 时支持实时写入 InfluxDB (专业版)

  • health-checker: 健康检查模块,负责对本服务的自身监控

  • telemetry: 配合健康检查模块,指标数据可以直接发给普罗米修斯,http://127.0.0.1:1234 可以看到具体指标信息

如果需要启动多个 Http Server,有两种方式: 1:再定义相同模块,2:上述的一行代码启动,多写几遍,就可以启动多个 Server :

/**
 *  Http Server 可以一个 IoT Server 启动多个,只要端口不要冲突就可以
 */
@Override
public void configure() throws Exception {

    fromF("netty4-http:http://0.0.0.0:7777/data/receiver?sync=true", config.getPort())
            .threads(10)
            .choice()
            .when(new HeaderPredicate1()) // 条件判断
            .process(new JsonToItemValueProcess1()) // 格式处理
            .to("Zabbix");

    fromF("netty4-http:http://0.0.0.0:8888/data/receiver?sync=true", config.getPort())
            .threads(10)
            .choice()
            .when(new HeaderPredicate2()) // 条件判断
            .process(new JsonToItemValueProcess2()) // 格式处理
            .to("Zabbix");

    // 如此两行代码,便可以启动 2个端口的 Http Server
}

配置文件可以通过 环境变量 的方式修改,${xxxx,123} xxxx 为变量name,123 为没有变量时的默认值,Linux 可以 export 设置,IDEA 开发环境可以如下配置:

../_images/zeus_iot_server_yaml.png