|
首先是技术选型,我采用的是Apache Mina,Spring MVC,Redis,ActiveMQ 。
ApacheMINA是一个网络应用程序框架,用来帮助用户简单地开发高性能和高可扩展性的网络应用程序。它提供了一个通过Java NIO在不同的传输例如TCP/IP和UDP/IP上抽象的事件驱动的异步API。具体查看Mina的技术文档
架构中涉及了数据的接收,设备的远程控制,如下图。
设备的数据通过协议接口识别协议类型,并转发到各个协议解析组件,在组件中会记录会话的session,供远程控制使用。为使协议解析组件保持无状态属性,在此处使用REDIS保持会话状态。
远程控制使用ActiveMQ[或者使用Mina来传输指令]传递消息至指令转发模块,指令转发模块作为消息的消费者,收到消息后转发给相应的Session,通过此通道下发给设备,实现数据的双向通信。
下面介绍一下平台的关键技术点。
1.协议识别组件的实现;
2.协议解析组件的热插拔;
3.分布式设计;
一、协议识别组件的实现
平台支持同时解析多协议的功能,需要识别每个协议簇,发送到特定的协议解码器。这就需要用到Mina提供的自定义解码器功能,
acceptor.getFilterChain().addLast("codec",
new ProtocolCodecFilter(new MessageCodecFactory(Charset.forName("UTF-8"))));
在MessageCodecFactory中,可以对应各个协议簇注册所属的解码器,如下所示:
addMessageDecoder(tcpDecoder);
addMessageEncoder(AbstractMessage.class, tcpEncoder);
addMessageDecoder(modbusDecoder);
addMessageEncoder(AbstractMessage.class, modbusEncoder);
当服务器收到发来的消息后,将循环所有的解码器,如果某一解码器返回MessageDecoder.OK,说明本解码器是合适的解码器,消息将被发送到对应的解码器进行解码。解析出协议的包头,命令号等,以备后续处理。
二、协议解析组件的热插拔
上面协议识别组件仅仅是识别出了协议簇,以及哪个协议;具体的业务逻辑还需要交由协议解析组件来处理。在这里我引入了Spring来管理协议解析组件。系统启动后注册协议解析组件,通过getBeansOfType查找到对应的协议解析组件来处理协议逻辑。
具体代码:
协议接收转发
/**
* 数据通过此接口转发到各个解析模块
*/
@Override
public void onMsg(AbstractMessage message, IoSession session) {
logger.info("onMsg() cmd = " + message.getCmd() + ", protocolType = " + message.getProtocolType() );
ApplicationContextUtil.callMessageRequestProvider(message, session);
}
查找门面
public static voidcallMessageRequestProvider(AbstractMessage message, IoSession session){
MessageManagerLogicHandler handler = null;
Map<String, MessageManagerFacade> map = context.getBeansOfType(MessageManagerFacade.class);
for(Map.Entry<String, MessageManagerFacade> entry : map.entrySet()){
MessageManagerFacade facade = entry.getValue();
if(facade.getFacadeMap() != null){
handler = (MessageManagerLogicHandler) facade.getFacadeMap().get(message.getProtocolType());
if(handler != null){
handler.doExec(message, session);
}
}
}
}
上面代码中通过协议簇类型ProtocolType找到具体的协议簇注册类,然后分发给具体的业务逻辑处理类。如下所示:
@PostConstruct
public void registry() {
logger.info("====================TcpMessageFacade Registry=======================");
// TCP协议解析组件
this.facadeMap.put(TcpNetCmd.LOGIN_ID, loginMessageClientHandler);
this.facadeMap.put(TcpNetCmd.READ_DATA_ID, dataGetMessageHandler);
this.facadeMap.put(TcpNetCmd.SET_TIME_ID, timeSetResponseHandler);
}
三、分布式设计
目前的架构设计,在无重的IO情况下,单台服务器可以支持8000+设备并发。若需接入更多的设备,采用分布式设计,则需要使门户系统变为无状态,通常的解决方案就是Redis,ActiveMQ等。
|
|