请问 用mina 给客户端发送数据,要求分别以1秒,5秒,10秒等不同的频率发送不同的数据,给每一个连接的客户端
package com.tcp.mina.main; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; import org.apache.mina.core.IoUtil; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; import com.safe.model.Police; import com.safe.model.User; import com.safe.service.CenterService; import com.safe.util.ConstantUtil; import com.safe.util.DateUtil; import com.tcp.mina.frame.Frame; import com.tcp.mina.model.M_p_HandingAlarm; import com.tcp.mina.model.M_p_Pant; import com.tcp.mina.model.M_p_PeopleInfo; import com.tcp.mina.model.M_q_GetPeopleInfo; import com.tcp.mina.model.M_q_HandingAlarm; import com.tcp.mina.model.M_q_Pant; import com.tcp.mina.model.PoliceTask; import com.tcp.mina.msgcoder.MsgCoder; import com.tcp.mina.thread.AllPoliceCoordsThread; import com.tcp.mina.thread.HotPoliceCoordsThread; import com.tcp.mina.thread.UserCoordsThread; import com.tcp.mina.util.TCPConstant; public class MyServerHandler extends IoHandlerAdapter { public static Map<Long, IoSession> allIoSessions; CenterService service; public MyServerHandler(CenterService service) { super(); this.service = service; } @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { System.out.println("exceptionCaught"); cause.printStackTrace(); } @Override public void messageReceived(IoSession session, Object message) throws Exception { System.out.println("【server】messageReceived: " + message); Frame frame = null; if (message instanceof Frame) { frame = (Frame) message; } else { return; } int msgType = frame.getMsgType(); switch (msgType) { /** 一 收到心跳 */ case TCPConstant.MSGTYPE_Q_PANT: M_q_Pant m_q_Pant = new MsgCoder<M_q_Pant>().readFrame(frame, M_q_Pant.class); System.out.println("请求消息-心跳请求:" + m_q_Pant); // 响应心跳 M_p_Pant p_Pant = new M_p_Pant(); session.write(new MsgCoder<M_p_Pant>().readMsg(p_Pant)); break; /** 二 获取人物信息请求 */ case TCPConstant.MSGTYPE_Q_PEOPLEINFO: M_q_GetPeopleInfo q_GetPeopleMsg = new MsgCoder<M_q_GetPeopleInfo>() .readFrame(frame, M_q_GetPeopleInfo.class); int peopleType = q_GetPeopleMsg.getType(); if (TCPConstant.PEOPLETYPE_POLICE == peopleType) {// 警员 Police police = service.getPoliceByPoliceNo(q_GetPeopleMsg .getID().trim()); String birthdayStr = DateUtil.fmtDateToStr(police.getDetails() .getBirthday(), "yyyy-MM-dd"); Frame policeFrame = new MsgCoder<M_p_PeopleInfo>() .readMsg(new M_p_PeopleInfo(police.getPoliceNo(), peopleType, police.getDetails().getName(), birthdayStr, police.getDetails().getTel(), police.getDetails().getAddress(), police .getDetails().getShenFenId(), police .getDetails().getPhoto())); session.write(policeFrame); } else if (TCPConstant.PEOPLETYPE_USER == peopleType) {// 用户 User user = service.getUserByLoginName(q_GetPeopleMsg.getID() .trim()); String birthdayStr = DateUtil.fmtDateToStr(user.getDetails() .getBirthday(), "yyyy-MM-dd"); Frame userFrame = new MsgCoder<M_p_PeopleInfo>() .readMsg(new M_p_PeopleInfo(user.getLoginName(), peopleType, user.getDetails().getName(), birthdayStr, user.getDetails().getTel(), user .getDetails().getAddress(), user .getDetails().getShenFenId(), user .getDetails().getPhoto())); session.write(userFrame); } break; /** 三 处理报警请求 */ case TCPConstant.MSGTYPE_Q_HANDINGALARM: // 解析请求消息 M_q_HandingAlarm q_handingMsg = new MsgCoder<M_q_HandingAlarm>() .readFrame(frame, M_q_HandingAlarm.class); // 操作派警 int alarmId = q_handingMsg.getEventID(); int type = q_handingMsg.getType(); if (type == ConstantUtil.ALARM_TYPE_TRUE) {// 真警则派发任务 // 派发任务 List<PoliceTask> policeTasks = q_handingMsg.getTask(); List<String> policeNos = new ArrayList<String>(); String taskContent = ""; for (PoliceTask policeTask : policeTasks) { policeNos.add(policeTask.getPoliceID()); taskContent = policeTask.getTaskInfo(); } service.sendTaskAndNotice(policeNos, taskContent, alarmId); } else {// 不是真警 则修改对应类型 1假警 2重复报警 // 修改报警类别 service.updateAlarmStatus(alarmId, type); } /** 响应该报警的最新状态 */ // 发送处理报警的响应消息 1 这里直接返回给所有客户端,2警员提交任务时 发送给所有客户端 // AlarmInfo alarmInfo=service.getAlarmById(); // TODO 假数据 M_p_HandingAlarm msg = new M_p_HandingAlarm(alarmId, 2); Frame pMsgframe = new MsgCoder<M_p_HandingAlarm>().readMsg(msg); // 获取所有正在连接的IoSession Collection<IoSession> sessions = session.getService() .getManagedSessions().values(); // 将消息写到所有IoSession IoUtil.broadcast(pMsgframe, sessions); break; default: System.out.println("TCP:TCPListenRequest 未知的请求!~"); break; } // session.write(message); } @Override public void messageSent(IoSession session, Object message) throws Exception { System.out.println("【server】messageSent: " + message); } @Override public void sessionClosed(IoSession session) throws Exception { System.out.println("【server】sessionClosed"); System.out.println("有人关闭,当前客户数:" + allIoSessions.size()); } @Override public void sessionCreated(IoSession session) throws Exception { System.out.println("【server】sessionCreated"); } @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { System.out.println("【server】sessionIdle"); } @Override public void sessionOpened(IoSession session) throws Exception { System.out.println("【server】sessionOpened ID:" + session.getId()); if (allIoSessions == null) { allIoSessions = session.getService().getManagedSessions(); } System.out.println("有人连接,当前客户数:" + allIoSessions.size()); new Thread(new AllPoliceCoordsThread(session, service)).start(); new Thread(new HotPoliceCoordsThread(session, service)).start(); new Thread(new UserCoordsThread(session, service)).start(); } }
请看sessionOpened() 我开启了三个线程每个线程里一个while 循环 sleep1秒。 这样三个线程同时以一定频率发送数据,结果报错了。就算只开一个线程频率太快了也会报错。客户端读取的数据错乱了。
我想在每一个客户端连接以后 给他们以不同的频率发送多种数据,该怎么实现呢?
每次都是报这个错误但是 打印的字节有时候是不一样的。
org.apache.mina.filter.codec.ProtocolDecoderException: java.nio.BufferUnderflowException (Hexdump: 06 64 61 57 53 41 00 07 FE FF 00 7B 00 22 00 4E 00 75 00 6D 00 22 00 3A 00 31 00 30 00 2C 00 22 00 70 00 6F 00 6C 00 69 00 63 00 65 00 73 00 22 00 3A 00 5B 00 7B 00 22 00 50 00 6F 00 6C 00 69 00 63 00 65 00 49 00 44 00 22 00 3A 00 22 00 4F 00 4E 00 4C 00 49 00 4E 00 45 00 23 00 31 00 22 00 2C 00 22 00 45 00 76 00 65 00 6E 00 74 00 49 00 44 00 22 00 3A 00 30 00 2C 00 22 00 43 00 6F 00 6F 00 72 00 64 00 69 00 6E 00 61 00 74 00 65 00 58 00 22 00 3A 00 22 00 31 00 32 00 31 00 2E 00 30 00 22 00 2C 00 22 00 43 00 6F 00 6F 00 72 00 64 00 69 00 6E 00 61 00 74 00 65 00 59 00 22 00 3A 00 22 00 33 00 35 00 2E 00 30 00 22 00 7D 00 2C 00 7B 00 22 00 50 00 6F 00 6C 00 69 00 63 00 65 00 49 00 44 00 22 00 3A 00 22 00 4F 00 4E 00 4C 00 49 00 4E 00 45 00 23 00 32 00 22 00 2C 00 22 00 45 00 76 00 65 00 6E 00 74 00 49 00 44 00 22 00 3A 00 30 00 2C 00 22 00 43 00 6F 00 6F 00 72 00 64 00 69 00 6E 00 61 00 74 00 65 00 58 00 22 00 3A 00 22 00 31 00 32 00 31 00 2E 00 30 00 22 00 2C 00 22 00 43 00 6F 00 6F 00 72 00 64 00 69 00 6E 00 61 00 74 00 65 00 59 00 22 00 3A 00 22 00 33 00 35 00 2E 00 30 00 22 00 7D 00 2C 00 7B 00 22 00 50 00 6F 00 6C 00 69 00 63 00 65 00 49 00 44 00 22 00 3A 00 22 00 4F 00 4E 00 4C 00 49 00 4E 00 45 00 23 00 33 00 22 00 2C 00 22 00 45 00 76 00 65 00 6E 00 74 00 49 00 44 00 22 00 3A 00 30 00 2C 00 22 00 43 00 6F 00 6F 00 72 00 64 00 69 00 6E 00 61 00 74 00 65 00 58 00 22 00 3A 00 22 00 31 00 32 00 31 00 2E 00 30 00 22 00 2C 00 22 00 43 00 6F 00 6F 00 72 00 64 00 69 00 6E 00 61 00 74 00 65 00 59 00 22 00 3A 00 22 00 33 00 35 00 2E 00 30 00 22 00 7D 00 2C 00 7B 00 22 00 50 00 6F 00 6C 00 69 00 63 00 65 00 49 00 44 00 22 00 3A 00 22 00 4F 00 4E 00 4C 00 49 00 4E 00 45 00 23 00 34 00 22 00 2C 00 22 00 45 00 76 00 65 00 6E 00 74 00 49 00 44 00 22 00 3A 00 30 00 2C 00 22 00 43 00 6F 00 6F 00 72 00 64 00 69 00 6E 00 61 00 74 00 65 00 58 00 22 00 3A 00 22 00 31 00 32 00 31 00 2E 00 30 00 22 00 2C 00 22 00 43 00 6F 00 6F 00 72 00 64 00 69 00 6E 00 61 00 74 00 65 00 59 00 22 00 3A 00 22 00 33 00 35 00 2E 00 30 00 22 00 7D 00 2C 00 7B 00 22 00 50 00 6F 00 6C 00 69 00 63 00 65 00 49 00 44 00 22 00 3A 00 22 00 4F 00 4E 00 4C 00 49 00 4E 00 45 00 23 00 35 00 22 00 2C 00 22 00 45 00 76 00 65 00 6E 00 74 00 49 00 44 00 22 00 3A 00 30 00 2C 00 22 00 43 00 6F 00 6F 00 72 00 64 00 69 00 6E 00 61 00 74 00 65 00 58 00 22 00 3A 00 22 00 31 00 32 00 31 00 2E 00 30 00 22 00 2C 00 22 00 43 00 6F 00 6F 00 72 00 64 00 69 00 6E 00 61 00 74 00 65 00 59 00 22 00 3A 00 22 00 33 00 35 00 2E 00 30 00 22 00 7D 00 2C 00 7B 00 22 00 50 00 6F 00 6C 00 69 00 63 00 65 00 49 00 44 00 22 00 3A 00 22 00 4F 00 4E 00 4C 00 49 00 4E 00 45 00 23 00 36 00 22 00 2C 00 22 00 45 00 76 00 65 00 6E 00 74 00 49 00 44 00 22 00 3A 00 30 00 2C 00 22 00 43 00 6F 00 6F 00 72 00 64 00 69 00 6E 00 61 00 74 00 65 00 58 00 22 00 3A 00 22 00 31 00 32 00 31 00 2E 00 30 00 22 00 2C 00 22 00 43 00 6F 00 6F 00 72 00 64 00 69 00 6E 00 61 00 74 00 65 00 59 00 22 00 3A 00 22 00 33 00 35 00 2E 00 30 00 22 00 7D 00 2C 00 7B 00 22 00 50 00 6F 00 6C 00 69 00 63 00 65 00 49 00 44 00 22 00 3A)
at org.apache.mina.filter.codec.ProtocolCodecFilter.messageReceived(ProtocolCodecFilter.java:242)
at org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:417)
at org.apache.mina.core.filterchain.DefaultIoFilterChain.access$1200(DefaultIoFilterChain.java:47)
at org.apache.mina.core.filterchain.DefaultIoFilterChain$EntryImpl$1.messageReceived(DefaultIoFilterChain.java:765)
at org.apache.mina.core.filterchain.IoFilterAdapter.messageReceived(IoFilterAdapter.java:109)
at org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:417)
at org.apache.mina.core.filterchain.DefaultIoFilterChain.fireMessageReceived(DefaultIoFilterChain.java:410)
at org.apache.mina.core.polling.AbstractPollingIoProcessor.read(AbstractPollingIoProcessor.java:710)
at org.apache.mina.core.polling.AbstractPollingIoProcessor.process(AbstractPollingIoProcessor.java:664)
at org.apache.mina.core.polling.AbstractPollingIoProcessor.process(AbstractPollingIoProcessor.java:653)
at org.apache.mina.core.polling.AbstractPollingIoProcessor.access$600(AbstractPollingIoProcessor.java:67)
at org.apache.mina.core.polling.AbstractPollingIoProcessor$Processor.run(AbstractPollingIoProcessor.java:1124)
at org.apache.mina.util.NamePreservingRunnable.run(NamePreservingRunnable.java:64)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:619)
Caused by: java.nio.BufferUnderflowException
at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:127)
at org.apache.mina.core.buffer.AbstractIoBuffer.get(AbstractIoBuffer.java:615)
at org.apache.mina.core.buffer.AbstractIoBuffer.get(AbstractIoBuffer.java:1321)
at com.tcp.mina.protocalcoder.FrameCumulativeDecoder.doDecode(FrameCumulativeDecoder.java:34)
at org.apache.mina.filter.codec.CumulativeProtocolDecoder.decode(CumulativeProtocolDecoder.java:176)
at org.apache.mina.filter.codec.ProtocolCodecFilter.messageReceived(ProtocolCodecFilter.java:232)
... 15 more