Zeus-IoT 架构一览
开源版架构图
注解
开源版本 数据采集只需要部署 Zabbix Server + IoT Server , 数据库: Postgresql 12 + , TDengine 2.2.0 + 。
IoT Server 需要开启
server-transfer
模块 并且storage
模块配置tdengine
, Zabbix Server conf 文件配置 ExportPath , 实时读取文件同时写入 TDengine 。适用环境:小规模环境,几十到几百台设备不等接入,且所有设备都在一个网络可达环境内。
专业版架构图
注解
专业版本 数据采集除了开源版本的类似部署外,还可以支持多个 Zabbix Proxy + IoT Server 采集部署,Zabbix Proxy 依赖 Postgresql 12 + 或者 SQLite。
IoT Server 作为 Zabbix Proxy 协议解析处理服务时,不需要开启
server-transfer
模块,storage
模块配置none
, 其他只需要打开对应的协议接入模块即可。适用环境:大规模环境,几百到几十万台设备不等,分布式采集,不同的设备可以分布在不同的区域,城市。
不同的 Proxy 下面的 IoT Server 可以有完全不同的协议适配,也可以定义不同的端口(因为是多机部署)。
不同的 租户 会
绑定不同的 Proxy
,租户独享 Proxy
, IoT Server 源码开放给租户,自己实现协议对接就可以,权限在平台层 Zeus Webapp 控制。一个 Zabbix Server 可以同时对接一万多个 Proxy (官方数据 15000+,Server 裸金属部署,高配)。
Zabbix Webapp (UI)
Zeus IoT Server
技术栈:Apache Camel 2 + Apache Skywalking Modular Design
结合 Apache Camel 的组件生态,实现各类协议的接入处理,包括但不限于 Http,Mqtt(
依赖 Emqx 作为 broker
),OPC UA,Coap,Tcp/Udp,Modbus 等。Zabbix Server 实时数据文件 ndjson 的实时读取,可以扩展作二次分发:推送到 Kafka 等。
项目源码 Maven 结构说明:
zeus-iot
└── iot-common -- 公共库
└── iot-datacarrier -- 数据传输队列,数据缓冲
└── iot-eventbus -- EventBus 数据总线,设备指令下发
└── iot-util -- 基础工具类
└── iot-server -- Zeus IoT Server
├── server-action -- 触发动作处理,比如:执行设备服务,指令下发
├── server-bootstrap -- 启动模块,集成了所有的模块
└── server-collector-plugin -- 主动采集(专业版)
└── modbus-collector-plugin -- 基于Zabbix Agent 协议的主动采集 Modbus 实现
├── server-core -- Core 模块
├── server-health-checker -- 健康检查,读取本地监控指标
└── server-library -- 模块化基础库
├── library-client
├── library-module
├── library-server
└── library-util
└── server-receiver-plugin -- 设备数据接收、协议解析模块
├── coap-receiver-plugin -- Coap 协议
├── http-receiver-plugin -- Http 协议
├── mqtt-receiver-plugin -- Mqtt Client
├── opc-ua-receiver-plugin -- OPC-UA 协议
├── tcp-receiver-plugin -- TCP 协议
└── udp-receiver-plugin -- UDP 协议
├── server-sender -- 数据发送到 Zabbix 协议模块
├── server-starter -- IoT Server 启动模块,main 方法
└── server-storage-plugin -- 存储模块
├── server-influxdb-plugin -- InfluxDB 存储实现
├── server-none-storage -- None 空实现,由于模块依赖关系,storage 不能为 - ,所以定义空实现,none
└── server-tdengine-plugin -- TDEngine 存储实现
└── server-telemetry -- 遥测采集(自监控)
├── telemetry-api -- 遥测API
└── telemetry-prometheus -- 遥测 普罗米修斯 上报
└── server-transfer -- 本地 ndjson 文件实时读取,进 datacarrier 队列,再入 storage 模块
├── iot-server-bom -- IoT Server 根pom
└── zeus-application-toolkit -- Zeus Webapp 工具类
└── toolkit-async -- 异步任务编排执行框架
└── toolkit-eventbus -- webapp 事件触发总线
├── zeus-common -- 公共基础类
├── zeus-core -- 核心基础类
├── zeus-driver -- Zabbix Api 驱动模块
├── zmops-zeus-iot -- 打包模块,定义了 mvn package 结构
├── zeus-iot-ui -- 项目子模块,UI 模块会被打包成 静态资源文件,打包到 SpringBoot Resource 目录下并且发布
├── zeus-message -- webapp 消息推送模块
├── zeus-rest -- webapp rest 对外接口模块
├── zeus-starter -- webapp 启动模块,main 方法
├── zeus-webapp -- webapp 业务模块,包含 所有的业务实现
└── zeus-webapp-bom -- webapp 父pom
Apache Camel 协议接入示例,一行代码启动
/**
* Http Server 可以一个 IoT Server 启动多个,只要端口不要冲突就可以
*/
@Override
public void configure() throws Exception {
fromF("netty4-http:http://0.0.0.0:%d/data/receiver?sync=true", config.getPort())
.threads(10)
.choice()
.when(new HeaderPredicate()) // 条件判断
.process(new JsonToItemValueProcess()) // 格式处理
.to("Zabbix");
}
/**
* Mqtt Client 可以一个 IoT Server 启动多个
*/
@Override
public void configure() throws Exception {
fromF("mqtt:zeus-iot-mqtt?host=tcp://%s:%d&connectWaitInSeconds=30&subscribeTopicNames=+/up",
mqttReceiverConfig.getHost(),
mqttReceiverConfig.getPort())
.process(new MqttDemo01Process()) // 协议格式解析
.to("Zabbix");
}
/**
* Tcp Server 可以一个 IoT Server 启动多个,只要端口不要冲突就可以
*/
@Override
public void configure() throws Exception {
fromF("netty4:tcp://0.0.0.0:%d", tcpReceiverConfig.getPort())
.process(new StringItemValueProcess())
.to("Zabbix");
}
Zabbix Server 数据文件格式
{"host":{"host":"Zabbix server3","name":"Zabbix server3"},"groups":["Zabbix servers"],"item_tags":[{"tag":"Application","value":"Zabbix server"}],"itemid":37127,"name":"Zabbix server: Number of processed not supported values per second","clock":1633695167,"ns":119435806,"value":0.932813,"type":0}
{"host":{"host":"Zabbix server","name":"Zabbix server"},"groups":["Zabbix servers"],"item_tags":[{"tag":"Application","value":"Zabbix server"}],"itemid":25667,"name":"Zabbix server: Utilization of preprocessing manager internal processes, in %","clock":1633695167,"ns":119546632,"value":0.016920,"type":0}
{"host":{"host":"Zabbix server1","name":"Zabbix server1"},"groups":["Zabbix servers"],"item_tags":[{"tag":"Application","value":"Zabbix server"}],"itemid":36887,"name":"Zabbix server: Number of processed numeric (unsigned) values per second","clock":1633695167,"ns":119733183,"value":1.865619,"type":0}
字段解释,以及和 物联网平台 业务定义的对应关系:
host:主机信息 -> 设备
groups:主机组 -> 设备组
item_tags:监控项标签 -> 属性标签
itemid:监控项ID -> 属性 zbxid
name:主机名 -> 设备ID
clock:时间戳,秒 -> 设备采样时间: clock + ns 前三位,精确到 毫秒
ns:时间戳,纳秒
value:实际值
type:value数据类型 -> 0: 浮点数,3:整形,1、2、3 分别对应:字符、日志、文本
模块化架构的设计,IoT Server 配置文件:
iot-server\server-bootstrap\src\main\resources\application.yml
# Zabbix Trapper Sender
# Zabbix Trapper 模块,数据发送到 Zabbix Server
zabbix-sender:
selector: ${ZS_RECEIVER_ZABBIX_SENDER:default}
default:
port: ${ZS_RECEIVER_ZABBIX_PORT:10051}
host: ${ZS_RECEIVER_ZABBIX_HOST:127.0.0.1}
# Http Server 模块,可以添加 Http 模块,启动多个 Http Server 服务
receiver-http:
selector: ${ZS_RECEIVER_HTTP:-}
default:
port: ${ZS_RECEIVER_HTTP_PORT:9010}
host: ${ZS_RECEIVER_HTTP_HOST:0.0.0.0}
# TCP Server 模块,可以添加 TCP 模块,启动多个 TCP Server 服务
receiver-tcp:
selector: ${ZS_RECEIVER_TCP:-}
default:
port: ${ZS_RECEIVER_TCP_PORT:9020}
host: ${ZS_RECEIVER_TCP_HOST:0.0.0.0}
# MQTT Client 模块,可以启动多个 Client
receiver-mqtt:
selector: ${ZS_RECEIVER_MQTT:-}
default:
port: ${ZS_RECEIVER_MQTT_PORT:1883}
host: ${ZS_RECEIVER_MQTT_HOST:127.0.0.1}
# ZBX AGENT 模块,可以启动多个 Agent
collector-modbus:
selector: ${ZS_COLLECT_MODBUS:-}
default:
port: ${ZS_COLLECT_MODBUS_PORT:10050}
host: ${ZS_COLLECT_MODBUS_HOST:0.0.0.0}
# core module config 模块
core:
selector: ${ZS_CORE:default}
default:
prepareThreads: ${ZS_CORE_PREPARE_THREADS:2}
restHost: ${ZS_CORE_REST_HOST:0.0.0.0}
restPort: ${ZS_CORE_REST_PORT:12800}
restContextPath: ${ZS_CORE_REST_CONTEXT_PATH:/}
restMinThreads: ${ZS_CORE_REST_JETTY_MIN_THREADS:1}
restMaxThreads: ${ZS_CORE_REST_JETTY_MAX_THREADS:200}
restIdleTimeOut: ${ZS_CORE_REST_JETTY_IDLE_TIMEOUT:30000}
restAcceptorPriorityDelta: ${ZS_CORE_REST_JETTY_DELTA:0}
restAcceptQueueSize: ${ZS_CORE_REST_JETTY_QUEUE_SIZE:0}
httpMaxRequestHeaderSize: ${ZS_CORE_HTTP_MAX_REQUEST_HEADER_SIZE:8192}
zabbixConfigPath: ${ZS_CORE_ZABBIX_CONFIG_PATH:/etc/zabbix/zabbix_server.conf}
# ndjson file read realtime 模块
server-transfer:
selector: ${ZS_SERVER_TRANSFER:-}
default:
name: ${ZS_TRANSFER_NAME:zeus-transfer}
pattern: ${ZS_TRANSFER_PATTERN:/home/data/history-history-syncer-[0-9]{1}.ndjson}
fileMaxWait: ${ZS_TRANSFER_FILE_MAXWAIT:30}
# tdengine storage realtime 模块
storage:
selector: ${ZS_STORAGE:none}
none:
tdengine:
url: ${ZS_STORAGE_TDENGINE_URL:jdbc:TAOS://127.0.0.1:6030/zeus_data} # TDengine jdbcUrl
user: ${ZS_STORAGE_TDENGINE_USER:root}
password: ${ZS_STORAGE_TDENGINE_PASSWORD:taosdata}
influxdb: # 专业版
url: ${ZS_STORAGE_INFLUXDB_URL:http://127.0.0.1:8086} # influxdb jdbcUrl
database: zeus_data
user: ${ZS_STORAGE_INFLUXDB_USER:root}
password: ${ZS_STORAGE_INFLUXDB_PASSWORD:}
actions: ${ZS_STORAGE_INFLUXDB_ACTIONS:1000}
duration: ${ZS_STORAGE_INFLUXDB_DURATION:1000}
batchEnabled: ${ZS_STORAGE_INFLUXDB_BATCH_ENABLED:true}
fetchTaskLogMaxSize: ${ZS_STORAGE_INFLUXDB_FETCH_TASK_LOG_MAX_SIZE:5000}
connectionResponseFormat: ${ZS_STORAGE_INFLUXDB_CONNECTION_RESPONSE_FORMAT:MSGPACK}
# health check 模块
health-checker:
selector: ${ZS_HEALTH_CHECKER:default}
default:
checkIntervalSeconds: ${ZS_HEALTH_CHECKER_INTERVAL_SECONDS:5}
# health telemetry 模块
telemetry:
selector: ${ZS_TELEMETRY:prometheus}
none:
prometheus:
host: ${ZS_TELEMETRY_PROMETHEUS_HOST:0.0.0.0}
port: ${ZS_TELEMETRY_PROMETHEUS_PORT:1234}
sslEnabled: ${ZS_TELEMETRY_PROMETHEUS_SSL_ENABLED:false}
sslKeyPath: ${ZS_TELEMETRY_PROMETHEUS_SSL_KEY_PATH:""}
sslCertChainPath: ${ZS_TELEMETRY_PROMETHEUS_SSL_CERT_CHAIN_PATH:""}
注解
模块名 selector 根据下面定义的 option 进行选择,如果不启用该模块,则为 -
zabbix-sender
:Zabbix Server 数据发送模块, TCP 协议批量发送到 Zabbix Server,必须启用:defaultreceiver-http
:如果开启,会在本地启动 Http Server 服务,端口默认 9010receiver-tcp
:如果开启,会在本地启动 Tcp Server 服务,端口默认 9020receiver-mqtt
:如果开启,会在本地启动 Mqtt Client,订阅 Emqx Mqtt Topiccollector-modbus
: 如果开启,会启动modbus模块,可以配合 Agent 主动采集的方式,问设备要数据(专业版功能)core
: 核心模块,必须开启server-transfer
: 数据传输模块,负责实时读取 ndjson 文件,传入 datacarrier 队列,写入 storage 模块storage
:存储模块,可以有多种实现,开源版支持 TDengine。selector 设置为influxdb
时支持实时写入 InfluxDB (专业版)health-checker
: 健康检查模块,负责对本服务的自身监控telemetry
: 配合健康检查模块,指标数据可以直接发给普罗米修斯,http://127.0.0.1:1234
可以看到具体指标信息
如果需要启动多个 Http Server,有两种方式: 1:再定义相同模块,2:上述的一行代码启动,多写几遍,就可以启动多个 Server :
/**
* Http Server 可以一个 IoT Server 启动多个,只要端口不要冲突就可以
*/
@Override
public void configure() throws Exception {
fromF("netty4-http:http://0.0.0.0:7777/data/receiver?sync=true", config.getPort())
.threads(10)
.choice()
.when(new HeaderPredicate1()) // 条件判断
.process(new JsonToItemValueProcess1()) // 格式处理
.to("Zabbix");
fromF("netty4-http:http://0.0.0.0:8888/data/receiver?sync=true", config.getPort())
.threads(10)
.choice()
.when(new HeaderPredicate2()) // 条件判断
.process(new JsonToItemValueProcess2()) // 格式处理
.to("Zabbix");
// 如此两行代码,便可以启动 2个端口的 Http Server
}
配置文件可以通过 环境变量 的方式修改,${xxxx,123}
xxxx 为变量name,123 为没有变量时的默认值,Linux 可以 export 设置,IDEA 开发环境可以如下配置: