spring集成mina:
在学习mina这块时,在网上找了很多资料,只有一些,只能实现客户端向服务端发送消息、建立长连接之类。但是实际上在项目中,并不简单实现这些,还有业务逻辑之类的处理以及消息的推送之类的。于是就单独建立了一个工程项目,能够实现客户端和服务端相互之间发送消息、建立长连接、实现心跳检测等功能。
例如:可以实现客户端A向服务端发送消息,服务端将消息转发给客户端B。效果实现图:
服务端启动成功后, 客户端A绑定服务端。客户端B向服务端发送信息,请求服务端向客户端A推送消息
客户端A受到服务端转发的客户端B的消息
服务端心跳检测的实现
代码的目录结构:
那么开始实现代码的编写。(可以直接跳到底部,通过链接下载工程代码)
首先在官网上下载mina以及spring相关架包,这里相关架包已准备好:服务端:
1. 首先实现数据传输对象、消息常量的代码编写。
我使用的两个传输对象,接受和发送,代码如下。(传输对象可以自行定义)。
package com.pcm.mina.service.model;import java.io.Serializable;import java.util.HashMap;/** * @author ZERO * @Description 服务端接收消息对象 */public class SentBody implements Serializable { private static final long serialVersionUID = 1L; private String key; private HashMapdata; private long timestamp; public SentBody() { data = new HashMap (); timestamp = System.currentTimeMillis(); } public String getKey() { return key; } public String get(String k) { return data.get(k); } public void put(String k, String v) { data.put(k, v); } public long getTimestamp() { return timestamp; } public void setTimestamp(long timestamp) { this.timestamp = timestamp; } public void setKey(String key) { this.key = key; } public void remove(String k) { data.remove(k); } public HashMap getData() { return data; } @Override public String toString() { StringBuffer buffer = new StringBuffer(); buffer.append(" "); buffer.append(" "); buffer.append(" "); return buffer.toString(); } public String toXmlString() { return toString(); }}").append(key).append(" "); buffer.append("").append(timestamp).append(" "); buffer.append(" "); for (String key : data.keySet()) { buffer.append("<" + key + ">").append(data.get(key)).append( " "); } buffer.append(""); buffer.append("
package com.pcm.mina.service.model;import java.io.Serializable;import java.util.HashMap;/** * @author ZERO * @Description 服务端发送消息对象 */public class ReplyBody implements Serializable { private static final long serialVersionUID = 1L; /** * 请求key */ private String key; /** * 返回码 */ private String code; /** * 返回说明 */ private String message; /** * 返回数据集合 */ private HashMapdata; private long timestamp; public ReplyBody() { data = new HashMap (); timestamp = System.currentTimeMillis(); } public long getTimestamp() { return timestamp; } public void setTimestamp(long timestamp) { this.timestamp = timestamp; } public String getKey() { return key; } public void setKey(String key) { this.key = key; } public void put(String k, String v) { data.put(k, v); } public String get(String k) { return data.get(k); } public void remove(String k) { data.remove(k); } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } public HashMap getData() { return data; } public void setData(HashMap data) { this.data = data; } public String getCode() { return code; } public void setCode(String code) { this.code = code; } public String toString() { StringBuffer buffer = new StringBuffer(); buffer.append(" "); buffer.append(" "); buffer.append(" "); return buffer.toString(); } public String toXmlString() { return toString(); }}").append(this.getKey()).append(" "); buffer.append("").append(timestamp).append(" "); buffer.append("").append(code).append("
"); buffer.append("").append(message).append(" "); buffer.append(" "); for(String key:this.getData().keySet()) { buffer.append("<"+key+">").append(this.get(key)).append(" "); } buffer.append(""); buffer.append("
package com.pcm.mina.service.model;/** * @author ZERO * @Description 消息常量 */public class Message { public static class ReturnCode { public static String CODE_404 = "404"; public static String CODE_403 = "403"; //该账号未绑定 public static String CODE_405 = "405"; //事物未定义 public static String CODE_200 = "200"; //成功 public static String CODE_500 = "500"; //未知错误 } public static final String SESSION_KEY = "account"; /** * 服务端心跳请求命令 */ public static final String CMD_HEARTBEAT_REQUEST = "hb_request"; /** * 客户端心跳响应命令 */ public static final String CMD_HEARTBEAT_RESPONSE = "hb_response"; public static class MessageType { // 用户会 踢出下线消息类型 public static String TYPE_999 = "999"; }}
2,实现心跳检测功能。
服务端发送的是hb_request,那么客户端就应该返回hb_response,以此来实现心跳检测。
/** * @author ZERO * @Description 心跳协议的实现类 */ public class KeepAliveMessageFactoryImpl implements KeepAliveMessageFactory{ private final Logger LOG=Logger.getLogger(KeepAliveMessageFactoryImpl.class); /** * 客户端心跳响应命令 */ private static String HEARRESPONSE=Message.CMD_HEARTBEAT_RESPONSE; /** * 服务端心跳请求命令 */ private static String HEARREQUEST=Message.CMD_HEARTBEAT_REQUEST; public Object getRequest(IoSession session) { LOG.warn("请求预设信息:"+HEARREQUEST); return HEARREQUEST; } public Object getResponse(IoSession session, Object message) { LOG.warn("响应预设信息: " + message); /** 返回预设语句 */ return HEARRESPONSE; } public boolean isRequest(IoSession session, Object message) { LOG.warn("请求心跳包信息: " + message); return message.equals(HEARREQUEST); } public boolean isResponse(IoSession session, Object message) { LOG.warn("响应心跳包信息: " + message); return message.equals(HEARRESPONSE); }}
3, 实现服务端代码编写
服务端代码这块,因为注释写的已经够详细了,所以这里就不细说了。
package com.pcm.mina.service;import java.io.IOException;import java.net.InetSocketAddress;import java.util.Map;import org.apache.log4j.Logger;import org.apache.mina.core.service.IoAcceptor;import org.apache.mina.core.service.IoHandler;import org.apache.mina.core.session.IdleStatus;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.ProtocolCodecFilter;import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;import org.apache.mina.filter.executor.ExecutorFilter;import org.apache.mina.filter.keepalive.KeepAliveFilter;import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;import org.apache.mina.filter.logging.LoggingFilter;import org.apache.mina.transport.socket.nio.NioSocketAcceptor;import com.pcm.mina.service.filter.KeepAliveMessageFactoryImpl;/** * @author ZERO * @Description mina服务端 */public class SerNioSociketAcceptor { IoAcceptor acceptor; IoHandler ioHandler; int port; //记录日志 public static Logger logger=Logger.getLogger(SerNioSociketAcceptor.class); //创建bind()方法接收连接 public void bind() throws IOException { //创建 协议编码解码过滤器ProtocolCodecFilter //设置序列化Object 可以自行设置自定义解码器 ProtocolCodecFilter pf=new ProtocolCodecFilter(new ObjectSerializationCodecFactory()); //getFilterChain() 获取 I/O 过滤器链,可以对 I/O 过滤器进行管理,包括添加和删除 I/O 过滤器。 acceptor = new NioSocketAcceptor(); //设置缓存大小 acceptor.getSessionConfig().setReadBufferSize(1024); // 设置过滤器 acceptor.getFilterChain().addLast("executor",new ExecutorFilter()); acceptor.getFilterChain().addLast("logger",new LoggingFilter()); acceptor.getFilterChain().addLast("codec",pf); KeepAliveMessageFactory kamf=new KeepAliveMessageFactoryImpl(); KeepAliveFilter kaf = new KeepAliveFilter(kamf, IdleStatus.BOTH_IDLE); kaf.setForwardEvent(true); kaf.setRequestInterval(30); //本服务器为被定型心跳 即需要每30秒接受一个心跳请求 否则该连接进入空闲状态 并且发出idled方法回调 acceptor.getFilterChain().addLast("heart", kaf); //读写通道60秒内无操作进入空闲状态 acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 60); //绑定逻辑处理器 acceptor.setHandler(ioHandler); //绑定端口 acceptor.bind(new InetSocketAddress(port)); logger.info("Mina服务端启动成功...端口号为:"+port); //测试使用 } //创建unbind()方法停止监听 public void unbind() { acceptor.unbind(); logger.info("服务端停止成功"); } public void setAcceptor(IoAcceptor acceptor) { this.acceptor = acceptor; } // 设置 I/O 处理器。该 I/O 处理器会负责处理该 I/O 服务所管理的所有 I/O 会话产生的 I/O 事件。 public void setIoHandler(IoHandler ioHandler) { this.ioHandler = ioHandler; } //设置端口 public void setPort(int port) { this.port = port; }// 获取该 I/O 服务所管理的 I/O 会话。 public MapgetManagedSessions() { return acceptor.getManagedSessions(); }}
4,实现session容器
如果需要保证线程安全,可以使用 ConcurrentHashMap,作为session容器。
package com.pcm.mina.service.session;import java.io.Serializable;import java.net.InetAddress;import java.net.SocketAddress;import java.net.UnknownHostException;import org.apache.mina.core.session.IoSession;/** * @author ZERO * @Description IoSession包装类 */public class PcmSession implements Serializable{ private static final long serialVersionUID = 1L; private transient IoSession session; private String gid; //session全局ID private Long nid; //session在本台服务器上的ID private String host; //session绑定的服务器IP private String account; //session绑定的账号 private String message; //session绑定账号的消息 private Long bindTime; //登录时间 private Long heartbeat; //心跳时间 public PcmSession(){} public PcmSession(IoSession session) { this.session = session; this.nid = session.getId(); } public String getGid() { return gid; } public void setGid(String gid) { this.gid = gid; } public Long getBindTime() { return bindTime; } public void setBindTime(Long bindTime) { this.bindTime = bindTime; } public Long getHeartbeat() { return heartbeat; } public void setHeartbeat(Long heartbeat) { this.heartbeat = heartbeat; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } public String getAccount() { return account; } public void setAccount(String account) { this.account = account; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public void setIoSession(IoSession session) { this.session = session; } public IoSession getIoSession() { return session; }// 将键为 key,值为 value的用户自定义的属性存储到 I/O 会话中。 public void setAttribute(String key, Object value) { if(session!=null){ session.setAttribute(key, value); } } public boolean containsAttribute(String key) { if(session!=null){ return session.containsAttribute(key); } return false; } // 从 I/O 会话中获取键为 key的用户自定义的属性。 public Object getAttribute(String key) { if(session!=null){ return session.getAttribute(key); } return null; } //从 I/O 会话中删除键为 key的用户自定义的属性。 public void removeAttribute(String key) { if(session!=null){ session.removeAttribute(key); } } public SocketAddress getRemoteAddress() { if(session!=null){ return session.getRemoteAddress(); } return null; }/* 将消息对象 message发送到当前连接的对等体。该方法是异步的,当消息被真正发送到对等体的时候, IoHandler.messageSent(IoSession,Object)会被调用。如果需要的话, 也可以等消息真正发送出去之后再继续执行后续操作。*/ public void write(Object msg) { if(session!=null) { session.write(msg).isWritten(); } } public boolean isConnected() { if(session!=null){ return session.isConnected(); } return false; } public boolean isLocalhost() { try { String ip = InetAddress.getLocalHost().getHostAddress(); return ip.equals(host); } catch (UnknownHostException e) { e.printStackTrace(); } return false; }/* 关闭当前连接。如果参数 immediately为 true的话, * 连接会等到队列中所有的数据发送请求都完成之后才关闭;否则的话就立即关闭。 */ public void close(boolean immediately) { if(session!=null){ session.close(immediately); } } public boolean equals(Object message) { if (message instanceof PcmSession) { PcmSession t = (PcmSession) message; if( t.nid!=null && nid!=null) { return t.nid.longValue()==nid.longValue() && t.host.equals(host); } } return false; } public String toString() { StringBuffer buffer = new StringBuffer(); buffer.append("{"); buffer.append("\"").append("gid").append("\":").append("\"").append(gid).append("\"").append(","); buffer.append("\"").append("nid").append("\":").append(nid).append(","); buffer.append("\"").append("host").append("\":").append("\"").append(host).append("\"").append(","); buffer.append("\"").append("account").append("\":").append("\"").append(account).append("\"").append(","); buffer.append("\"").append("bindTime").append("\":").append(bindTime).append(","); buffer.append("\"").append("heartbeat").append("\":").append(heartbeat); buffer.append("}"); return buffer.toString(); }}
package com.pcm.mina.service.session;/** * @author ZERO * @Description 客户端的session管理接口 */public interface SessionManager { /** * 添加新的session */ public void addSession(String account,PcmSession session); /** * * @param account 客户端session的 key 一般可用 用户账号来对应session * @return */ PcmSession getSession(String account); /** * 删除session * @param session */ public void removeSession(PcmSession session); /** * 删除session * @param account */ public void removeSession(String account);}
package com.pcm.mina.service.session;import java.util.HashMap;import java.util.concurrent.atomic.AtomicInteger;import com.pcm.mina.service.model.Message;/** * @author ZERO * @Description 自带默认 session管理实现 */public class DefaultSessionManager implements SessionManager{ private static HashMapsessions =new HashMap (); private static final AtomicInteger connectionsCounter = new AtomicInteger(0); public void addSession(String account, PcmSession session) { if(session !=null){ sessions.put(account, session); connectionsCounter.incrementAndGet(); } } public PcmSession getSession(String account) { return sessions.get(account); } public void removeSession(PcmSession session) { sessions.remove(session.getAttribute(Message.SESSION_KEY)); } public void removeSession(String account) { sessions.remove(account); }}
5, 实现业务逻辑处理器。
因为注释写的已经够详细了,所以这里就不细说了。
做了简单业务逻辑处理,如有需要可以自行更改。package com.pcm.mina.service.handler;import java.net.InetSocketAddress;import java.util.HashMap;import org.apache.log4j.Logger;import org.apache.mina.core.service.IoHandlerAdapter;import org.apache.mina.core.session.IdleStatus;import org.apache.mina.core.session.IoSession;import org.springframework.stereotype.Component;import com.pcm.mina.service.RequestHandler;import com.pcm.mina.service.model.Message;import com.pcm.mina.service.model.ReplyBody;import com.pcm.mina.service.model.SentBody;import com.pcm.mina.service.session.PcmSession;/** * @author ZERO * @Description I/O 处理器 客户端请求的入口,所有请求都首先经过它分发处理 业务逻辑实现 */ @Component("sercixeMainHandler")public class ServiceMainHandler extends IoHandlerAdapter{ protected final Logger logger = Logger.getLogger(ServiceMainHandler.class); //本地handler请求 private HashMaphandlers = new HashMap (); //出错时 @Override public void exceptionCaught(IoSession session, Throwable cause){ logger.error("exceptionCaught()... from "+session.getRemoteAddress()); logger.error(cause); cause.printStackTrace(); } //接收到消息时 @Override public void messageReceived(IoSession iosession,Object message){ logger.info("服务端接收到的消息..."+message.toString()); if(message instanceof SentBody){ SentBody sent=(SentBody) message; ReplyBody rb=new ReplyBody(); PcmSession session=new PcmSession(iosession); String key=sent.getKey(); if("quit".equals(sent.get("message"))){ //服务器断开的条件 try { sessionClosed(iosession); } catch (Exception e) { rb.setCode(Message.ReturnCode.CODE_500); e.printStackTrace(); } }else{ //根据key的不同调用不同的handler RequestHandler rhandler=handlers.get(key); if(rhandler==null){ //如果没有这个handler rb.setCode(Message.ReturnCode.CODE_405); rb.setMessage("服务端未定义!"); }else{ //有的话 rb=rhandler.process(session, sent); } } if(rb !=null){ rb.setKey(key); session.write(rb); logger.info("服务端发送的消息: " + rb.toString()); } } } //发送消息 @Override public void messageSent(IoSession session, Object message) throws Exception { // session.close(); //发送成功后主动断开与客户端的连接 实现短连接 logger.info("服务端发送信息成功..."); } //建立连接时 @Override public void sessionCreated(IoSession session) throws Exception { InetSocketAddress sa=(InetSocketAddress)session.getRemoteAddress(); String address=sa.getAddress().getHostAddress(); //访问的ip session.setAttribute("address", address); //将连接的客户端ip保存到map集合中 SentBody body=new SentBody(); body.put("address", address); logger.info("访问的ip:"+address); } //关闭连接时 @Override public void sessionClosed(IoSession iosession) throws Exception { PcmSession session=new PcmSession(iosession); logger.debug("sessionClosed()... from "+session.getRemoteAddress()); try { RequestHandler hand=handlers.get("client_closs"); if(hand !=null && session.containsAttribute(Message.SESSION_KEY)){ hand.process(session, null); } } catch (Exception e) { e.printStackTrace(); } session.close(true); logger.info("连接关闭"); } //空闲时 @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { logger.debug("sessionIdle()... from "+session.getRemoteAddress()); } //打开连接时 @Override public void sessionOpened(IoSession session) throws Exception { logger.info("开启连接..."); } public HashMap getHandlers() { return handlers; } public void setHandlers(HashMap handlers) { this.handlers = handlers; } }
6, 实现业务逻辑代码。
目前实现了绑定,推送以及关闭逻辑代码。如有需要,可自行增加。
package com.pcm.mina.service;import com.pcm.mina.service.model.ReplyBody;import com.pcm.mina.service.model.SentBody;import com.pcm.mina.service.session.PcmSession;/** * @author ZERO * @Description 请求处理接口,所有的请求必须实现此接口 */public interface RequestHandler { public abstract ReplyBody process(PcmSession session,SentBody message);}
package com.pcm.mina.service.handler;import java.net.InetAddress;import java.util.UUID;import org.apache.log4j.Logger;import com.pcm.mina.service.RequestHandler;import com.pcm.mina.service.model.Message;import com.pcm.mina.service.model.ReplyBody;import com.pcm.mina.service.model.SentBody;import com.pcm.mina.service.session.DefaultSessionManager;import com.pcm.mina.service.session.PcmSession;import com.pcm.util.ContextHolder;/** * @author ZERO * @Description 账号绑定实现 */ public class BindHandler implements RequestHandler { protected final Logger logger = Logger.getLogger(BindHandler.class); public ReplyBody process(PcmSession newSession, SentBody message) { ReplyBody reply = new ReplyBody(); DefaultSessionManager sessionManager= ((DefaultSessionManager) ContextHolder.getBean("PcmSessionManager")); try { String account = message.get(Message.SESSION_KEY); newSession.setAccount(account); newSession.setMessage(message.get("message")); newSession.setGid(UUID.randomUUID().toString()); newSession.setHost(InetAddress.getLocalHost().getHostAddress()); //第一次设置心跳时间为登录时间 newSession.setBindTime(System.currentTimeMillis()); newSession.setHeartbeat(System.currentTimeMillis()); /** * 由于客户端断线服务端可能会无法获知的情况,客户端重连时,需要关闭旧的连接 */ PcmSession oldSession = sessionManager.getSession(account); //如果是账号已经在另一台终端登录。则让另一个终端下线 if(oldSession!=null&&!oldSession.equals(newSession)) { oldSession.removeAttribute(Message.SESSION_KEY); ReplyBody rb = new ReplyBody(); rb.setCode(Message.MessageType.TYPE_999);//强行下线消息类型 rb.put(Message.SESSION_KEY, account); if(!oldSession.isLocalhost()) { /* 判断当前session是否连接于本台服务器,如不是发往目标服务器处理 MessageDispatcher.execute(rb, oldSession.getHost()); */ }else { oldSession.write(rb); oldSession.close(true); oldSession = null; } oldSession = null; } if(oldSession==null) { sessionManager.addSession(account, newSession); } reply.setCode(Message.ReturnCode.CODE_200); } catch (Exception e) { reply.setCode(Message.ReturnCode.CODE_500); e.printStackTrace(); } logger.debug("绑定账号:" +message.get(Message.SESSION_KEY)+"-----------------------------" +reply.getCode()); return reply; }}
package com.pcm.mina.service.handler;import org.apache.log4j.Logger;import com.pcm.mina.service.RequestHandler;import com.pcm.mina.service.model.Message;import com.pcm.mina.service.model.ReplyBody;import com.pcm.mina.service.model.SentBody;import com.pcm.mina.service.session.DefaultSessionManager;import com.pcm.mina.service.session.PcmSession;import com.pcm.util.ContextHolder;/** * @author ZERO * @Description 推送消息 */ public class PushMessageHandler implements RequestHandler { protected final Logger logger = Logger.getLogger(PushMessageHandler.class); public ReplyBody process(PcmSession ios, SentBody sent) { ReplyBody reply = new ReplyBody(); String account=(String) sent.getData().get(Message.SESSION_KEY); DefaultSessionManager sessionManager=(DefaultSessionManager) ContextHolder.getBean("PcmSessionManager"); PcmSession session=sessionManager.getSession(account); if(session !=null){ sent.remove(Message.SESSION_KEY); reply.setKey(sent.getKey()); reply.setMessage("推送的消息"); reply.setData(sent.getData()); reply.setCode(Message.ReturnCode.CODE_200); session.write(reply); //转发获取的消息 logger.info("推送的消息是:"+reply.toString()); }else{ reply.setCode(Message.ReturnCode.CODE_403); reply.setMessage("推送失败"); } return reply; }}
package com.pcm.mina.service.handler;import org.apache.log4j.Logger;import com.pcm.mina.service.RequestHandler;import com.pcm.mina.service.model.Message;import com.pcm.mina.service.model.ReplyBody;import com.pcm.mina.service.model.SentBody;import com.pcm.mina.service.session.DefaultSessionManager;import com.pcm.mina.service.session.PcmSession;import com.pcm.util.ContextHolder;/** * @author ZERO * @Description 断开连接,清除session */public class SessionClosedHandler implements RequestHandler { protected final Logger logger = Logger.getLogger(SessionClosedHandler.class); public ReplyBody process(PcmSession ios, SentBody message) { DefaultSessionManager sessionManager = ((DefaultSessionManager) ContextHolder.getBean("PcmSessionManager")); if(ios.getAttribute(Message.SESSION_KEY)==null) { return null; } String account = ios.getAttribute(Message.SESSION_KEY).toString(); sessionManager.removeSession(account); return null; }}
7,spring配置
可以将过滤器添加到spring这块,包括心跳设置。
客户端
1,编写业务逻辑处理器
几乎和服务端一样,这里因为测试,所以就从简了。
package com.pcm.mina.client.MinaDemo;import org.apache.log4j.Logger;import org.apache.mina.core.service.IoHandlerAdapter;import org.apache.mina.core.session.IoSession;/** * @author ZERO * @Description 客户端handle */public class MinaClientHandler extends IoHandlerAdapter { private static Logger logger = Logger.getLogger(MinaClientHandler.class); @Override public void messageReceived(IoSession session, Object message) throws Exception { String msg = message.toString(); // logger.info("客户端A接收的数据:" + msg); System.out.println("客户端A接收的数据:" + msg); if(msg.equals("hb_request")){ logger.warn("客户端A成功收到心跳包:hb_request"); session.write("hb_response"); logger.warn("客户端A成功发送心跳包:hb_response"); } } @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { logger.error("发生错误...", cause); }}
2,编写客户端程序。
也几乎和服务端一致,为了简单使用,编写main方法。
注:客户端和服务端的过滤器要一致。package com.pcm.mina.client.MinaDemo;import java.net.InetSocketAddress;import org.apache.log4j.Logger;import org.apache.mina.core.future.ConnectFuture;import org.apache.mina.core.service.IoConnector;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.ProtocolCodecFilter;import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;import org.apache.mina.transport.socket.nio.NioSocketConnector;import com.pcm.mina.service.model.SentBody;/** * @author ZERO * @Description mina 客户端 */ public class MinaClient { private static Logger logger = Logger.getLogger(MinaClient.class); private static String HOST = "127.0.0.1"; private static int PORT = 1255; private static IoConnector connector=new NioSocketConnector(); private static IoSession session; public static IoConnector getConnector() { return connector; } public static void setConnector(IoConnector connector) { MinaClient.connector = connector; } /* * 测试服务端与客户端程序! a. 启动服务端,然后再启动客户端 b. 服务端接收消息并处理成功; */ @SuppressWarnings("deprecation") public static void main(String[] args) { // 设置链接超时时间 connector.setConnectTimeout(30000); // 添加过滤器 可序列话的对象 connector.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory())); // 添加业务逻辑处理器类 connector.setHandler(new MinaClientHandler()); ConnectFuture future = connector.connect(new InetSocketAddress( HOST, PORT));// 创建连接 future.awaitUninterruptibly();// 等待连接创建完成 session = future.getSession();// 获得session bindstart(); // pushstart(); } public static void bindstart(){ logger.info("客户端A绑定服务端"); try { SentBody sy=new SentBody(); sy.put("message", "这是个测试账号"); sy.put("account", "123456"); sy.setKey("client_bind"); session.write(sy);// 发送消息 System.out.println("客户端A与服务端建立连接成功...发送的消息为:"+sy); // logger.info("客户端A与服务端建立连接成功...发送的消息为:"+sy); } catch (Exception e) { e.printStackTrace(); logger.error("客户A端链接异常...", e); } session.getCloseFuture().awaitUninterruptibly();// 等待连接断开 connector.dispose(); }}
注意事项
1,客户端和服务端的过滤器要保持一致,不然容易出现异常 java.nio.charset.MalformedInputException。
2,使用对象进行传输的时候需要实现接口java.io.Serializable接口。
3,如果使用对象传输,所在的包。类名也要一致,不然会出现 org.apache.mina.core.buffer.BufferDataException: java.io.InvalidClassException: failed to read class descriptor (Hexdump: 00 00 00 3C AC ED 00 05 73 72 01 00 1C 63 6F 6D 2E 65 78 61 6D 70 6C 65 2E 63 这种错误(被困扰过很久)。
代码就先告一段落。客户端也可以通过socket和mina进行数据传输,这里就不贴代码了。
spring整合mina,暂时就到这了。项目我放到了github上,地址: 如果感觉不错,希望可以给个star。