协议开发指南(Apache Camel)
完全开放的协议接入模式
Zeus IoT Server 并没有定义任何抽象类或者接口,要求在协议实现的时候去继承或者实现。
Zeus Webapp 平台层,对于网络协议和设备接入协议也没有任何的管理和限定,不管是网络层协议还是设备侧接入协议,只是一个数据到达平台的通道而已,Zeus 更加
注重设备网络通信架构
。基于 Apache Camel 已有的组件 和 Zeus 封装的组件,Zeus IoT 完全可以实现市面上绝大部分的网络通信协议覆盖。一行代码启动一个网络通信服务。
Apache Camel 提供了非常好的消息抽象
Exchange
和Message
,基于这两个抽象,我们也可以自定义各种接入组件,具备很好的组件扩展能力。
注解
一切为了简单、方便开发调试,把最大的控制力交给开发者。通过 Apache Camel 的强大的动态路由,统一消息抽象来实现协议接入,最关键的是:每一行代码都可以被调试,没有响应式异步代码。
工程结构包地址
Http Server 协议接入示例
@Override
public void configure() throws Exception {
fromF("netty4-http:http://0.0.0.0:%d/data/receiver?sync=true", config.getPort())
.threads(10)
.choice()
.when(new HeaderPredicate()) // 条件判断
.process(new JsonToItemValueProcess()) // 格式处理
.to("Zabbix");
}
fromF
代表可以 格式化 endpoint,%d 就是端口占位符,config 变量就是 yaml 里面配置的信息。netty4-http
是组件名,必须事先引入对应的 component 组件:<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-netty4-http</artifactId> <version>2.25.4</version> </dependency>
.threads(10)
开启 10 个线程 去处理后续的逻辑。.choice().when(new HeaderPredicate())
开启一个选择路由,条件由调用 when 里面的类 实现的方法 来判断是否满足条件。.process(new JsonToItemValueProcess())
如果 when 满足条件,就触发 process 处理,调用里面的 Process 类处理逻辑。.to("Zabbix");
最后是一个封装的组件,表示 最终满足条件的 或者 协议解析后 的数据进入 Zabbix 平台。
针对 .choice().when(new HeaderPredicate())
这段逻辑,我们看下 HeaderPredicate 里面的实现:
public class HeaderPredicate implements Predicate {
@Override
public boolean matches(Exchange exchange) {
return exchange.getIn().getHeader("deviceType").equals("1010"); // just for example
}
}
类实现 Predicate 接口,重写 matches 方法。
exchange 里面包含了 入口 的所有的信息,包括 http body,http header 等等。
最终是否进入 when 下面的 process 由 matches 来决定,返回 true 就成立。
这一段处理逻辑 .process(new JsonToItemValueProcess())
,一般用来 协议 解析:
public class JsonToItemValueProcess implements Processor {
private final Gson gson = new Gson();
@Override
public void process(Exchange exchange) throws Exception {
Message message = exchange.getIn();
InputStream bodyStream = (InputStream) message.getBody();
String inputContext = this.analysisMessage(bodyStream);
IOTDeviceValue iotValue = gson.fromJson(inputContext, IOTDeviceValue.class);
List<ItemValue> itemValueList = new ArrayList<>();
iotValue.getAttributes().forEach((key, value) -> {
ItemValue item = new ItemValue(iotValue.getDeviceId(), iotValue.getClock());
item.setKey(key);
item.setValue(value);
itemValueList.add(item);
});
exchange.getMessage().setBody(itemValueList);
}
private String analysisMessage(InputStream bodyStream) throws IOException {
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
byte[] contextBytes = new byte[4096];
int realLen;
while ((realLen = bodyStream.read(contextBytes, 0, 4096)) != -1) {
outStream.write(contextBytes, 0, realLen);
}
// 返回从Stream中读取的字串
try {
return outStream.toString("UTF-8");
} finally {
outStream.close();
}
}
}
类 实现 Processor 接口,重写 process 方法。
process 方法入参 exchange 会贯穿 整个链路,任何一个节点都可以进行修改,也会传递到下个节点。
process 具体的处理逻辑,请大家看代码,这个就是一个 流 转 String 的过程,具体的 String 内容就是 Http Post 的 body 内容。
重点 :
exchange.getMessage().setBody(itemValueList)
; 转换完了以后,一定要重新塞回去
,to("Zabbix") 这个节点才可以解析。具体下面的介绍。Camel 可以连招:
.choice().when(new A()).process(new B()).when(new C()).process(new D()).when(new E()).process(new F())
最后记得 to("Zabbix") 就可以。
唯一必须遵守的数据格式
public class ZabbixTrapperProducer extends DefaultProducer {
private final ModuleManager moduleManager;
private final ItemDataTransferWorker itemDataTransferWorker;
private final ExecutorService itemValueThread = Executors.newFixedThreadPool(20);
public ZabbixTrapperProducer(Endpoint endpoint, ModuleManager moduleManager) {
super(endpoint);
this.moduleManager = moduleManager;
this.itemDataTransferWorker = new ItemDataTransferWorker(moduleManager);
}
/**
* Body 必须是 ItemValue
*
* @param exchange Exchange
*/
@Override
public void process(Exchange exchange) {
Message message = exchange.getIn();
List<ItemValue> values = (List<ItemValue>) message.getBody();
for (ItemValue itemValue : values) {
if (StringUtil.isEmpty(itemValue.getHost())
|| StringUtil.isEmpty(itemValue.getKey())
|| StringUtil.isEmpty(itemValue.getValue())) {
log.error(" process item data error,{}", new Gson().toJson(itemValue));
continue;
}
itemValueThread.submit(() -> {
itemDataTransferWorker.in(itemValue);
});
}
exchange.getMessage().setBody("{\"success\":\"true\"}");
}
}
DefaultProducer 是实现 Apache Camel 组件需要被继承的类,to("Zabbix") 最后一个节点会走到这里的 process。
List<ItemValue> values = (List<ItemValue>) message.getBody();
这段代码直接做 强转 的前提就是 前一个步骤塞回去
的是同类型的实例。- itemDataTransferWorker.in(itemValue); 多线程进入数据队列:DataCarrier。
感兴趣的同学可以阅读后面的代码,最终是发给了 Zabbix。(TCP 协议,更加具体的涉及到 Zabbix 协议的部分,感兴趣的同学 加群讨论)
注解
ItemValue 是最终数据发送到 Zabbix 的实体类定义,不管是任何协议任何类型的数据,都只要转换成该对象实例就可以进入 Zabbix 。
@Getter
@Setter
public class ItemValue implements Item {
private String host; // 【设备ID】
private String key; // 【属性标识】
// 【设备上报 值】,都是文本
// Zabbix 会根据配置的ITEM 类型,进行转换,如果失败就报错
private String value;
private Long clock; // 秒,如果为 Null,则 zabbix 以接收时间为准
private Long ns; // 纳秒,如果为 Null,则 zabbix 以接收时间为准
public ItemValue(String host, Long clock) {
this.host = host;
if (clock != null) {
this.clock = clock;
}
}
/**
* 设置 数据时间,单独设置 以设备推送的时间数据为准
*
* @param clock 毫秒,70年到现在
* @param ns 纳秒,0-9位数
*/
public void setTime(Long clock, Long ns) {
this.clock = clock;
this.ns = ns;
}
@Override
public String host() {
return getHost();
}
@Override
public String key() {
return getKey();
}
}
host
就是 设备ID,这个在 平台层设备管理
模块可以看到。比如:801。key
就是 属性标识, 某个属性的唯一标识,找设备管理-属性管理
可以查看。比如:room-1-temp。value
就是 设备上报的值, 类型需要和 创建属性 时填的 一致。
注解
设备上报的 原始数据包 可能和 ItemValue 完全不对应,我们在处理的过程中,只要解析成最终的 ItemValue 对象就可以。
左侧为设备ID,也就是host, 右侧为两个不同属性的key,非ID,需要和设备上报数据时对应一致。