***************** Zeus-IoT 架构一览 ***************** - **开源版架构图** .. image:: /_static/images/zeus_iot_arch.jpg :width: 800 .. note:: - 开源版本 数据采集只需要部署 Zabbix Server + IoT Server , 数据库: Postgresql 12 + , TDengine 2.2.0 + 。 - IoT Server 需要开启 ``server-transfer`` 模块 并且 ``storage`` 模块配置 ``tdengine`` , Zabbix Server conf 文件配置 ExportPath , 实时读取文件同时写入 TDengine 。 - 适用环境:小规模环境,几十到几百台设备不等接入,且所有设备都在一个网络可达环境内。 - **专业版架构图** .. image:: /_static/images/zeus_iot_arch_pro.jpg :width: 800 .. note:: - 专业版本 数据采集除了开源版本的类似部署外,还可以支持多个 Zabbix Proxy + IoT Server 采集部署,Zabbix Proxy 依赖 Postgresql 12 + 或者 SQLite。 - IoT Server 作为 Zabbix Proxy 协议解析处理服务时,不需要开启 ``server-transfer`` 模块, ``storage`` 模块配置 ``none`` , 其他只需要打开对应的协议接入模块即可。 - 适用环境:大规模环境,几百到几十万台设备不等,分布式采集,不同的设备可以分布在不同的区域,城市。 1. 不同的 Proxy 下面的 IoT Server 可以有完全不同的协议适配,也可以定义不同的端口(因为是多机部署)。 2. 不同的 租户 会 ``绑定不同的 Proxy`` , ``租户独享 Proxy`` , IoT Server 源码开放给租户,自己实现协议对接就可以,权限在平台层 Zeus Webapp 控制。 3. 一个 Zabbix Server 可以同时对接一万多个 Proxy (官方数据 15000+,Server 裸金属部署,高配)。 1. **Zabbix Webapp (UI)** - 技术栈:SpringBoot 2 + `Ebean `_ (DAO) + Vue Element (样式重构)。 - 主要实现了 物联网基础平台 的各类配置,包括但不限于:产品、设备、告警、场景联动、大屏报表(拖拽大屏基于 `Grafana `_ )等。 - 数据库:和 Zabbix 共用一个 Pg 实例,database name 为 ``zeus-iot`` , 如果熟悉 TimescaleDB 的同学也可以使用,Zabbix 原生支持,具备更高的查询和吞吐性能。 2. **Zeus IoT Server** - 技术栈:`Apache Camel 2 `_ + `Apache Skywalking `_ Modular Design - 结合 Apache Camel 的组件生态,实现各类协议的接入处理,包括但不限于 Http,Mqtt( ``依赖 Emqx 作为 broker`` ),OPC UA,Coap,Tcp/Udp,Modbus 等。 - Zabbix Server 实时数据文件 ndjson 的实时读取,可以扩展作二次分发:推送到 Kafka 等。 **项目源码 Maven 结构说明:** .. code-block:: lua zeus-iot └── iot-common -- 公共库 └── iot-datacarrier -- 数据传输队列,数据缓冲 └── iot-eventbus -- EventBus 数据总线,设备指令下发 └── iot-util -- 基础工具类 └── iot-server -- Zeus IoT Server ├── server-action -- 触发动作处理,比如:执行设备服务,指令下发 ├── server-bootstrap -- 启动模块,集成了所有的模块 └── server-collector-plugin -- 主动采集(专业版) └── modbus-collector-plugin -- 基于Zabbix Agent 协议的主动采集 Modbus 实现 ├── server-core -- Core 模块 ├── server-health-checker -- 健康检查,读取本地监控指标 └── server-library -- 模块化基础库 ├── library-client ├── library-module ├── library-server └── library-util └── server-receiver-plugin -- 设备数据接收、协议解析模块 ├── coap-receiver-plugin -- Coap 协议 ├── http-receiver-plugin -- Http 协议 ├── mqtt-receiver-plugin -- Mqtt Client ├── opc-ua-receiver-plugin -- OPC-UA 协议 ├── tcp-receiver-plugin -- TCP 协议 └── udp-receiver-plugin -- UDP 协议 ├── server-sender -- 数据发送到 Zabbix 协议模块 ├── server-starter -- IoT Server 启动模块,main 方法 └── server-storage-plugin -- 存储模块 ├── server-influxdb-plugin -- InfluxDB 存储实现 ├── server-none-storage -- None 空实现,由于模块依赖关系,storage 不能为 - ,所以定义空实现,none └── server-tdengine-plugin -- TDEngine 存储实现 └── server-telemetry -- 遥测采集(自监控) ├── telemetry-api -- 遥测API └── telemetry-prometheus -- 遥测 普罗米修斯 上报 └── server-transfer -- 本地 ndjson 文件实时读取,进 datacarrier 队列,再入 storage 模块 ├── iot-server-bom -- IoT Server 根pom └── zeus-application-toolkit -- Zeus Webapp 工具类 └── toolkit-async -- 异步任务编排执行框架 └── toolkit-eventbus -- webapp 事件触发总线 ├── zeus-common -- 公共基础类 ├── zeus-core -- 核心基础类 ├── zeus-driver -- Zabbix Api 驱动模块 ├── zmops-zeus-iot -- 打包模块,定义了 mvn package 结构 ├── zeus-iot-ui -- 项目子模块,UI 模块会被打包成 静态资源文件,打包到 SpringBoot Resource 目录下并且发布 ├── zeus-message -- webapp 消息推送模块 ├── zeus-rest -- webapp rest 对外接口模块 ├── zeus-starter -- webapp 启动模块,main 方法 ├── zeus-webapp -- webapp 业务模块,包含 所有的业务实现 └── zeus-webapp-bom -- webapp 父pom 3. **Apache Camel 协议接入示例,一行代码启动** .. code-block:: java /** * Http Server 可以一个 IoT Server 启动多个,只要端口不要冲突就可以 */ @Override public void configure() throws Exception { fromF("netty4-http:http://0.0.0.0:%d/data/receiver?sync=true", config.getPort()) .threads(10) .choice() .when(new HeaderPredicate()) // 条件判断 .process(new JsonToItemValueProcess()) // 格式处理 .to("Zabbix"); } .. code-block:: java /** * Mqtt Client 可以一个 IoT Server 启动多个 */ @Override public void configure() throws Exception { fromF("mqtt:zeus-iot-mqtt?host=tcp://%s:%d&connectWaitInSeconds=30&subscribeTopicNames=+/up", mqttReceiverConfig.getHost(), mqttReceiverConfig.getPort()) .process(new MqttDemo01Process()) // 协议格式解析 .to("Zabbix"); } .. code-block:: java /** * Tcp Server 可以一个 IoT Server 启动多个,只要端口不要冲突就可以 */ @Override public void configure() throws Exception { fromF("netty4:tcp://0.0.0.0:%d", tcpReceiverConfig.getPort()) .process(new StringItemValueProcess()) .to("Zabbix"); } 4. **Zabbix Server 数据文件格式** .. code-block:: json {"host":{"host":"Zabbix server3","name":"Zabbix server3"},"groups":["Zabbix servers"],"item_tags":[{"tag":"Application","value":"Zabbix server"}],"itemid":37127,"name":"Zabbix server: Number of processed not supported values per second","clock":1633695167,"ns":119435806,"value":0.932813,"type":0} {"host":{"host":"Zabbix server","name":"Zabbix server"},"groups":["Zabbix servers"],"item_tags":[{"tag":"Application","value":"Zabbix server"}],"itemid":25667,"name":"Zabbix server: Utilization of preprocessing manager internal processes, in %","clock":1633695167,"ns":119546632,"value":0.016920,"type":0} {"host":{"host":"Zabbix server1","name":"Zabbix server1"},"groups":["Zabbix servers"],"item_tags":[{"tag":"Application","value":"Zabbix server"}],"itemid":36887,"name":"Zabbix server: Number of processed numeric (unsigned) values per second","clock":1633695167,"ns":119733183,"value":1.865619,"type":0} 字段解释,以及和 物联网平台 业务定义的对应关系: - host:主机信息 -> `设备` - groups:主机组 -> `设备组` - item_tags:监控项标签 -> `属性标签` - itemid:监控项ID -> `属性 zbxid` - name:主机名 -> `设备ID` - clock:时间戳,秒 -> `设备采样时间: clock + ns 前三位,精确到 毫秒` - ns:时间戳,纳秒 - value:实际值 - type:value数据类型 -> `0: 浮点数,3:整形,1、2、3 分别对应:字符、日志、文本` 5. **模块化架构的设计,IoT Server 配置文件:** ``iot-server\server-bootstrap\src\main\resources\application.yml`` .. code-block:: yaml # Zabbix Trapper Sender # Zabbix Trapper 模块,数据发送到 Zabbix Server zabbix-sender: selector: ${ZS_RECEIVER_ZABBIX_SENDER:default} default: port: ${ZS_RECEIVER_ZABBIX_PORT:10051} host: ${ZS_RECEIVER_ZABBIX_HOST:127.0.0.1} # Http Server 模块,可以添加 Http 模块,启动多个 Http Server 服务 receiver-http: selector: ${ZS_RECEIVER_HTTP:-} default: port: ${ZS_RECEIVER_HTTP_PORT:9010} host: ${ZS_RECEIVER_HTTP_HOST:0.0.0.0} # TCP Server 模块,可以添加 TCP 模块,启动多个 TCP Server 服务 receiver-tcp: selector: ${ZS_RECEIVER_TCP:-} default: port: ${ZS_RECEIVER_TCP_PORT:9020} host: ${ZS_RECEIVER_TCP_HOST:0.0.0.0} # MQTT Client 模块,可以启动多个 Client receiver-mqtt: selector: ${ZS_RECEIVER_MQTT:-} default: port: ${ZS_RECEIVER_MQTT_PORT:1883} host: ${ZS_RECEIVER_MQTT_HOST:127.0.0.1} # ZBX AGENT 模块,可以启动多个 Agent collector-modbus: selector: ${ZS_COLLECT_MODBUS:-} default: port: ${ZS_COLLECT_MODBUS_PORT:10050} host: ${ZS_COLLECT_MODBUS_HOST:0.0.0.0} # core module config 模块 core: selector: ${ZS_CORE:default} default: prepareThreads: ${ZS_CORE_PREPARE_THREADS:2} restHost: ${ZS_CORE_REST_HOST:0.0.0.0} restPort: ${ZS_CORE_REST_PORT:12800} restContextPath: ${ZS_CORE_REST_CONTEXT_PATH:/} restMinThreads: ${ZS_CORE_REST_JETTY_MIN_THREADS:1} restMaxThreads: ${ZS_CORE_REST_JETTY_MAX_THREADS:200} restIdleTimeOut: ${ZS_CORE_REST_JETTY_IDLE_TIMEOUT:30000} restAcceptorPriorityDelta: ${ZS_CORE_REST_JETTY_DELTA:0} restAcceptQueueSize: ${ZS_CORE_REST_JETTY_QUEUE_SIZE:0} httpMaxRequestHeaderSize: ${ZS_CORE_HTTP_MAX_REQUEST_HEADER_SIZE:8192} zabbixConfigPath: ${ZS_CORE_ZABBIX_CONFIG_PATH:/etc/zabbix/zabbix_server.conf} # ndjson file read realtime 模块 server-transfer: selector: ${ZS_SERVER_TRANSFER:-} default: name: ${ZS_TRANSFER_NAME:zeus-transfer} pattern: ${ZS_TRANSFER_PATTERN:/home/data/history-history-syncer-[0-9]{1}.ndjson} fileMaxWait: ${ZS_TRANSFER_FILE_MAXWAIT:30} # tdengine storage realtime 模块 storage: selector: ${ZS_STORAGE:none} none: tdengine: url: ${ZS_STORAGE_TDENGINE_URL:jdbc:TAOS://127.0.0.1:6030/zeus_data} # TDengine jdbcUrl user: ${ZS_STORAGE_TDENGINE_USER:root} password: ${ZS_STORAGE_TDENGINE_PASSWORD:taosdata} influxdb: # 专业版 url: ${ZS_STORAGE_INFLUXDB_URL:http://127.0.0.1:8086} # influxdb jdbcUrl database: zeus_data user: ${ZS_STORAGE_INFLUXDB_USER:root} password: ${ZS_STORAGE_INFLUXDB_PASSWORD:} actions: ${ZS_STORAGE_INFLUXDB_ACTIONS:1000} duration: ${ZS_STORAGE_INFLUXDB_DURATION:1000} batchEnabled: ${ZS_STORAGE_INFLUXDB_BATCH_ENABLED:true} fetchTaskLogMaxSize: ${ZS_STORAGE_INFLUXDB_FETCH_TASK_LOG_MAX_SIZE:5000} connectionResponseFormat: ${ZS_STORAGE_INFLUXDB_CONNECTION_RESPONSE_FORMAT:MSGPACK} # health check 模块 health-checker: selector: ${ZS_HEALTH_CHECKER:default} default: checkIntervalSeconds: ${ZS_HEALTH_CHECKER_INTERVAL_SECONDS:5} # health telemetry 模块 telemetry: selector: ${ZS_TELEMETRY:prometheus} none: prometheus: host: ${ZS_TELEMETRY_PROMETHEUS_HOST:0.0.0.0} port: ${ZS_TELEMETRY_PROMETHEUS_PORT:1234} sslEnabled: ${ZS_TELEMETRY_PROMETHEUS_SSL_ENABLED:false} sslKeyPath: ${ZS_TELEMETRY_PROMETHEUS_SSL_KEY_PATH:""} sslCertChainPath: ${ZS_TELEMETRY_PROMETHEUS_SSL_CERT_CHAIN_PATH:""} .. note:: - ``模块名 selector 根据下面定义的 option 进行选择,如果不启用该模块,则为 -`` - ``zabbix-sender``:Zabbix Server 数据发送模块, TCP 协议批量发送到 Zabbix Server,必须启用:default - ``receiver-http``:如果开启,会在本地启动 Http Server 服务,端口默认 9010 - ``receiver-tcp``:如果开启,会在本地启动 Tcp Server 服务,端口默认 9020 - ``receiver-mqtt``:如果开启,会在本地启动 Mqtt Client,订阅 Emqx Mqtt Topic - ``collector-modbus``: 如果开启,会启动modbus模块,可以配合 Agent 主动采集的方式,问设备要数据(专业版功能) - ``core``: 核心模块,必须开启 - ``server-transfer``: 数据传输模块,负责实时读取 ndjson 文件,传入 datacarrier 队列,写入 storage 模块 - ``storage``:存储模块,可以有多种实现,开源版支持 TDengine。selector 设置为 ``influxdb`` 时支持实时写入 InfluxDB (专业版) - ``health-checker``: 健康检查模块,负责对本服务的自身监控 - ``telemetry``: 配合健康检查模块,指标数据可以直接发给普罗米修斯,``http://127.0.0.1:1234`` 可以看到具体指标信息 如果需要启动多个 Http Server,有两种方式: 1:再定义相同模块,2:上述的一行代码启动,多写几遍,就可以启动多个 Server : .. code-block:: java /** * Http Server 可以一个 IoT Server 启动多个,只要端口不要冲突就可以 */ @Override public void configure() throws Exception { fromF("netty4-http:http://0.0.0.0:7777/data/receiver?sync=true", config.getPort()) .threads(10) .choice() .when(new HeaderPredicate1()) // 条件判断 .process(new JsonToItemValueProcess1()) // 格式处理 .to("Zabbix"); fromF("netty4-http:http://0.0.0.0:8888/data/receiver?sync=true", config.getPort()) .threads(10) .choice() .when(new HeaderPredicate2()) // 条件判断 .process(new JsonToItemValueProcess2()) // 格式处理 .to("Zabbix"); // 如此两行代码,便可以启动 2个端口的 Http Server } 配置文件可以通过 环境变量 的方式修改,``${xxxx,123}`` xxxx 为变量name,123 为没有变量时的默认值,Linux 可以 export 设置,IDEA 开发环境可以如下配置: .. image:: /_static/images/zeus_iot_server_yaml.png :width: 800