协议开发指南(Apache Camel)

  1. 完全开放的协议接入模式

  • Zeus IoT Server 并没有定义任何抽象类或者接口,要求在协议实现的时候去继承或者实现。

  • Zeus Webapp 平台层,对于网络协议和设备接入协议也没有任何的管理和限定,不管是网络层协议还是设备侧接入协议,只是一个数据到达平台的通道而已,Zeus 更加 注重设备网络通信架构

  • 基于 Apache Camel 已有的组件 和 Zeus 封装的组件,Zeus IoT 完全可以实现市面上绝大部分的网络通信协议覆盖。一行代码启动一个网络通信服务

  • Apache Camel 提供了非常好的消息抽象 ExchangeMessage ,基于这两个抽象,我们也可以自定义各种接入组件,具备很好的组件扩展能力。

注解

一切为了简单、方便开发调试,把最大的控制力交给开发者。通过 Apache Camel 的强大的动态路由,统一消息抽象来实现协议接入,最关键的是:每一行代码都可以被调试,没有响应式异步代码。

  1. 工程结构包地址

../_images/protocol_rec.png
  1. Http Server 协议接入示例

@Override
public void configure() throws Exception {
    fromF("netty4-http:http://0.0.0.0:%d/data/receiver?sync=true", config.getPort())
            .threads(10)
            .choice()
            .when(new HeaderPredicate()) // 条件判断
            .process(new JsonToItemValueProcess()) // 格式处理
            .to("Zabbix");
}
  • fromF 代表可以 格式化 endpoint,%d 就是端口占位符,config 变量就是 yaml 里面配置的信息。

  • netty4-http 是组件名,必须事先引入对应的 component 组件:

    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-netty4-http</artifactId>
        <version>2.25.4</version>
    </dependency>
    
  • .threads(10) 开启 10 个线程 去处理后续的逻辑。

  • .choice().when(new HeaderPredicate()) 开启一个选择路由,条件由调用 when 里面的类 实现的方法 来判断是否满足条件。

  • .process(new JsonToItemValueProcess()) 如果 when 满足条件,就触发 process 处理,调用里面的 Process 类处理逻辑。

  • .to("Zabbix"); 最后是一个封装的组件,表示 最终满足条件的 或者 协议解析后 的数据进入 Zabbix 平台。

针对 .choice().when(new HeaderPredicate()) 这段逻辑,我们看下 HeaderPredicate 里面的实现:

 public class HeaderPredicate implements Predicate {

    @Override
    public boolean matches(Exchange exchange) {
        return exchange.getIn().getHeader("deviceType").equals("1010"); // just for example
    }
}
  1. 类实现 Predicate 接口,重写 matches 方法。

  2. exchange 里面包含了 入口 的所有的信息,包括 http body,http header 等等。

  3. 最终是否进入 when 下面的 process 由 matches 来决定,返回 true 就成立。

这一段处理逻辑 .process(new JsonToItemValueProcess()) ,一般用来 协议 解析:

public class JsonToItemValueProcess implements Processor {
private final Gson gson = new Gson();

   @Override
   public void process(Exchange exchange) throws Exception {
       Message message = exchange.getIn();
       InputStream bodyStream = (InputStream) message.getBody();
       String inputContext = this.analysisMessage(bodyStream);

       IOTDeviceValue iotValue = gson.fromJson(inputContext, IOTDeviceValue.class);
       List<ItemValue> itemValueList = new ArrayList<>();

       iotValue.getAttributes().forEach((key, value) -> {
           ItemValue item = new ItemValue(iotValue.getDeviceId(), iotValue.getClock());
           item.setKey(key);
           item.setValue(value);
           itemValueList.add(item);
       });

       exchange.getMessage().setBody(itemValueList);
   }

   private String analysisMessage(InputStream bodyStream) throws IOException {
       ByteArrayOutputStream outStream = new ByteArrayOutputStream();
       byte[] contextBytes = new byte[4096];
       int realLen;
       while ((realLen = bodyStream.read(contextBytes, 0, 4096)) != -1) {
           outStream.write(contextBytes, 0, realLen);
       }

       // 返回从Stream中读取的字串
       try {
           return outStream.toString("UTF-8");
       } finally {
           outStream.close();
       }
   }
}
  • 类 实现 Processor 接口,重写 process 方法。

  • process 方法入参 exchange 会贯穿 整个链路,任何一个节点都可以进行修改,也会传递到下个节点。

  • process 具体的处理逻辑,请大家看代码,这个就是一个 流 转 String 的过程,具体的 String 内容就是 Http Post 的 body 内容。

  • 重点exchange.getMessage().setBody(itemValueList) ; 转换完了以后,一定要重新塞回去 ,to("Zabbix") 这个节点才可以解析。具体下面的介绍。

  • Camel 可以连招: .choice().when(new A()).process(new B()).when(new C()).process(new D()).when(new E()).process(new F()) 最后记得 to("Zabbix") 就可以。

  1. 唯一必须遵守的数据格式

public class ZabbixTrapperProducer extends DefaultProducer {

    private final ModuleManager moduleManager;
    private final ItemDataTransferWorker itemDataTransferWorker;
    private final ExecutorService itemValueThread = Executors.newFixedThreadPool(20);


    public ZabbixTrapperProducer(Endpoint endpoint, ModuleManager moduleManager) {
       super(endpoint);
       this.moduleManager = moduleManager;
       this.itemDataTransferWorker = new ItemDataTransferWorker(moduleManager);
    }

    /**
    * Body 必须是 ItemValue
    *
    * @param exchange Exchange
    */
    @Override
    public void process(Exchange exchange) {
        Message message = exchange.getIn();
        List<ItemValue> values = (List<ItemValue>) message.getBody();

        for (ItemValue itemValue : values) {
            if (StringUtil.isEmpty(itemValue.getHost())
                  || StringUtil.isEmpty(itemValue.getKey())
                  || StringUtil.isEmpty(itemValue.getValue())) {
               log.error(" process item data error,{}", new Gson().toJson(itemValue));
               continue;
            }

            itemValueThread.submit(() -> {
               itemDataTransferWorker.in(itemValue);
            });
        }

        exchange.getMessage().setBody("{\"success\":\"true\"}");
    }
}
  • DefaultProducer 是实现 Apache Camel 组件需要被继承的类,to("Zabbix") 最后一个节点会走到这里的 process。

  • List<ItemValue> values = (List<ItemValue>) message.getBody(); 这段代码直接做 强转 的前提就是 前一个步骤 塞回去 的是同类型的实例。

  • itemDataTransferWorker.in(itemValue); 多线程进入数据队列:DataCarrier。
    • 感兴趣的同学可以阅读后面的代码,最终是发给了 Zabbix。(TCP 协议,更加具体的涉及到 Zabbix 协议的部分,感兴趣的同学 加群讨论)

注解

ItemValue 是最终数据发送到 Zabbix 的实体类定义,不管是任何协议任何类型的数据,都只要转换成该对象实例就可以进入 Zabbix 。

@Getter
@Setter
public class ItemValue implements Item {

    private String host;  // 【设备ID】
    private String key; // 【属性标识】

    // 【设备上报 值】,都是文本
    // Zabbix 会根据配置的ITEM 类型,进行转换,如果失败就报错
    private String value;

    private Long clock; // 秒,如果为 Null,则 zabbix 以接收时间为准
    private Long ns; // 纳秒,如果为 Null,则 zabbix 以接收时间为准


    public ItemValue(String host, Long clock) {
        this.host = host;
        if (clock != null) {
            this.clock = clock;
        }
    }

    /**
     * 设置 数据时间,单独设置 以设备推送的时间数据为准
     *
     * @param clock 毫秒,70年到现在
     * @param ns    纳秒,0-9位数
     */
    public void setTime(Long clock, Long ns) {
        this.clock = clock;
        this.ns = ns;
    }


    @Override
    public String host() {
        return getHost();
    }

    @Override
    public String key() {
        return getKey();
    }
}
  • host 就是 设备ID,这个在 平台层 设备管理 模块可以看到。比如:801。

  • key 就是 属性标识, 某个属性的唯一标识,找 设备管理-属性管理 可以查看。比如:room-1-temp。

  • value 就是 设备上报的值, 类型需要和 创建属性 时填的 一致。

注解

设备上报的 原始数据包 可能和 ItemValue 完全不对应,我们在处理的过程中,只要解析成最终的 ItemValue 对象就可以。

../_images/data_process_3.png

左侧为设备ID,也就是host, 右侧为两个不同属性的key,非ID,需要和设备上报数据时对应一致。