******************** 协议开发指南(Apache Camel) ******************** 1. **完全开放的协议接入模式** - Zeus IoT Server 并没有定义任何抽象类或者接口,要求在协议实现的时候去继承或者实现。 - Zeus Webapp 平台层,对于网络协议和设备接入协议也没有任何的管理和限定,不管是网络层协议还是设备侧接入协议,只是一个数据到达平台的通道而已,Zeus 更加 ``注重设备网络通信架构``。 - 基于 Apache Camel 已有的组件 和 Zeus 封装的组件,Zeus IoT 完全可以实现市面上绝大部分的网络通信协议覆盖。`一行代码启动一个网络通信服务`。 - Apache Camel 提供了非常好的消息抽象 ``Exchange`` 和 ``Message`` ,基于这两个抽象,我们也可以自定义各种接入组件,具备很好的组件扩展能力。 .. note:: 一切为了简单、方便开发调试,把最大的控制力交给开发者。通过 Apache Camel 的强大的动态路由,统一消息抽象来实现协议接入,最关键的是:每一行代码都可以被调试,没有响应式异步代码。 2. **工程结构包地址** .. image:: /_static/images/protocol_rec.png :width: 1000 3. **Http Server 协议接入示例** .. code-block:: java @Override public void configure() throws Exception { fromF("netty4-http:http://0.0.0.0:%d/data/receiver?sync=true", config.getPort()) .threads(10) .choice() .when(new HeaderPredicate()) // 条件判断 .process(new JsonToItemValueProcess()) // 格式处理 .to("Zabbix"); } - ``fromF`` 代表可以 格式化 endpoint,%d 就是端口占位符,config 变量就是 yaml 里面配置的信息。 - ``netty4-http`` 是组件名,必须事先引入对应的 component 组件: .. code-block:: xml org.apache.camel camel-netty4-http 2.25.4 - ``.threads(10)`` 开启 10 个线程 去处理后续的逻辑。 - ``.choice().when(new HeaderPredicate())`` 开启一个选择路由,条件由调用 when 里面的类 实现的方法 来判断是否满足条件。 - ``.process(new JsonToItemValueProcess())`` 如果 when 满足条件,就触发 process 处理,调用里面的 Process 类处理逻辑。 - ``.to("Zabbix");`` 最后是一个封装的组件,表示 最终满足条件的 或者 协议解析后 的数据进入 Zabbix 平台。 针对 ``.choice().when(new HeaderPredicate())`` 这段逻辑,我们看下 HeaderPredicate 里面的实现: .. code-block:: java public class HeaderPredicate implements Predicate { @Override public boolean matches(Exchange exchange) { return exchange.getIn().getHeader("deviceType").equals("1010"); // just for example } } 1. 类实现 Predicate 接口,重写 matches 方法。 2. exchange 里面包含了 入口 的所有的信息,包括 http body,http header 等等。 3. 最终是否进入 when 下面的 process 由 matches 来决定,返回 true 就成立。 这一段处理逻辑 ``.process(new JsonToItemValueProcess())`` ,一般用来 协议 解析: .. code-block:: java public class JsonToItemValueProcess implements Processor { private final Gson gson = new Gson(); @Override public void process(Exchange exchange) throws Exception { Message message = exchange.getIn(); InputStream bodyStream = (InputStream) message.getBody(); String inputContext = this.analysisMessage(bodyStream); IOTDeviceValue iotValue = gson.fromJson(inputContext, IOTDeviceValue.class); List itemValueList = new ArrayList<>(); iotValue.getAttributes().forEach((key, value) -> { ItemValue item = new ItemValue(iotValue.getDeviceId(), iotValue.getClock()); item.setKey(key); item.setValue(value); itemValueList.add(item); }); exchange.getMessage().setBody(itemValueList); } private String analysisMessage(InputStream bodyStream) throws IOException { ByteArrayOutputStream outStream = new ByteArrayOutputStream(); byte[] contextBytes = new byte[4096]; int realLen; while ((realLen = bodyStream.read(contextBytes, 0, 4096)) != -1) { outStream.write(contextBytes, 0, realLen); } // 返回从Stream中读取的字串 try { return outStream.toString("UTF-8"); } finally { outStream.close(); } } } - 类 实现 Processor 接口,重写 process 方法。 - process 方法入参 exchange 会贯穿 整个链路,任何一个节点都可以进行修改,也会传递到下个节点。 - process 具体的处理逻辑,请大家看代码,这个就是一个 流 转 String 的过程,具体的 String 内容就是 Http Post 的 body 内容。 - **重点** : ``exchange.getMessage().setBody(itemValueList)`` ; 转换完了以后,``一定要重新塞回去`` ,to("Zabbix") 这个节点才可以解析。具体下面的介绍。 - Camel 可以连招: ``.choice().when(new A()).process(new B()).when(new C()).process(new D()).when(new E()).process(new F())`` 最后记得 to("Zabbix") 就可以。 4. **唯一必须遵守的数据格式** .. code-block:: java public class ZabbixTrapperProducer extends DefaultProducer { private final ModuleManager moduleManager; private final ItemDataTransferWorker itemDataTransferWorker; private final ExecutorService itemValueThread = Executors.newFixedThreadPool(20); public ZabbixTrapperProducer(Endpoint endpoint, ModuleManager moduleManager) { super(endpoint); this.moduleManager = moduleManager; this.itemDataTransferWorker = new ItemDataTransferWorker(moduleManager); } /** * Body 必须是 ItemValue * * @param exchange Exchange */ @Override public void process(Exchange exchange) { Message message = exchange.getIn(); List values = (List) message.getBody(); for (ItemValue itemValue : values) { if (StringUtil.isEmpty(itemValue.getHost()) || StringUtil.isEmpty(itemValue.getKey()) || StringUtil.isEmpty(itemValue.getValue())) { log.error(" process item data error,{}", new Gson().toJson(itemValue)); continue; } itemValueThread.submit(() -> { itemDataTransferWorker.in(itemValue); }); } exchange.getMessage().setBody("{\"success\":\"true\"}"); } } - DefaultProducer 是实现 Apache Camel 组件需要被继承的类,to("Zabbix") 最后一个节点会走到这里的 process。 - ``List values = (List) message.getBody();`` 这段代码直接做 **强转** 的前提就是 前一个步骤 ``塞回去`` 的是同类型的实例。 - itemDataTransferWorker.in(itemValue); 多线程进入数据队列:DataCarrier。 - 感兴趣的同学可以阅读后面的代码,最终是发给了 Zabbix。(TCP 协议,更加具体的涉及到 Zabbix 协议的部分,感兴趣的同学 加群讨论) .. note:: **ItemValue** 是最终数据发送到 Zabbix 的实体类定义,不管是任何协议任何类型的数据,都只要转换成该对象实例就可以进入 Zabbix 。 .. code-block:: java @Getter @Setter public class ItemValue implements Item { private String host; // 【设备ID】 private String key; // 【属性标识】 // 【设备上报 值】,都是文本 // Zabbix 会根据配置的ITEM 类型,进行转换,如果失败就报错 private String value; private Long clock; // 秒,如果为 Null,则 zabbix 以接收时间为准 private Long ns; // 纳秒,如果为 Null,则 zabbix 以接收时间为准 public ItemValue(String host, Long clock) { this.host = host; if (clock != null) { this.clock = clock; } } /** * 设置 数据时间,单独设置 以设备推送的时间数据为准 * * @param clock 毫秒,70年到现在 * @param ns 纳秒,0-9位数 */ public void setTime(Long clock, Long ns) { this.clock = clock; this.ns = ns; } @Override public String host() { return getHost(); } @Override public String key() { return getKey(); } } - ``host`` 就是 设备ID,这个在 平台层 ``设备管理`` 模块可以看到。比如:801。 - ``key`` 就是 属性标识, 某个属性的唯一标识,找 ``设备管理-属性管理`` 可以查看。比如:room-1-temp。 - ``value`` 就是 设备上报的值, 类型需要和 创建属性 时填的 一致。 .. note:: 设备上报的 原始数据包 可能和 ItemValue 完全不对应,我们在处理的过程中,只要解析成最终的 ItemValue 对象就可以。 .. image:: /_static/images/data_process_3.png :width: 800 左侧为设备ID,也就是host, 右侧为两个不同属性的key,非ID,需要和设备上报数据时对应一致。