为了账号安全,请及时绑定邮箱和手机立即绑定

Java AIO中连续发送的多条数据接收时连在一起的问题

Java AIO中连续发送的多条数据接收时连在一起的问题

Qyouu 2019-03-01 01:25:20
刚开始学习Java网络编程,问题可能有点小白,还请见谅。 我写了一个简单的Demo,运用AIO(NIO2.0)编程模型中的AsynchronousSocketChannel来发送和接收数据,在客户端与服务端之间建立一个长连接来进行通讯,然后发现当客户端连续进行多次发送时,服务端收到的数据就会连在一起,并且是随机地连在一起,感觉像是两次read之间到达的数据都被后一次read一次性读出来了, 在一次测试中,分别进行了三轮发送(客户端运行了三次),每轮按顺序发送1-10这10个数,每次发送一个。服务端的结果如下: 服务端已启动线程pool-1-thread-7已建立来自10.1.84.54:2381的连接908324714线程pool-1-thread-8已通过来自10.1.84.54:2381的连接908324714收到信息【12345678910】来自10.1.84.54:2381的连接908324714已断开线程pool-1-thread-8已建立来自10.1.84.54:2387的连接1224441394线程pool-1-thread-8已通过来自10.1.84.54:2387的连接1224441394收到信息【1】线程pool-1-thread-8已通过来自10.1.84.54:2387的连接1224441394收到信息【2】线程pool-1-thread-7已通过来自10.1.84.54:2387的连接1224441394收到信息【3456】线程pool-1-thread-8已通过来自10.1.84.54:2387的连接1224441394收到信息【78】线程pool-1-thread-7已通过来自10.1.84.54:2387的连接1224441394收到信息【910】来自10.1.84.54:2387的连接1224441394已断开线程pool-1-thread-7已建立来自10.1.84.54:2393的连接1666378193线程pool-1-thread-7已通过来自10.1.84.54:2393的连接1666378193收到信息【1】线程pool-1-thread-8已通过来自10.1.84.54:2393的连接1666378193收到信息【2345】线程pool-1-thread-7已通过来自10.1.84.54:2393的连接1666378193收到信息【678】线程pool-1-thread-7已通过来自10.1.84.54:2393的连接1666378193收到信息【9】线程pool-1-thread-8已通过来自10.1.84.54:2393的连接1666378193收到信息【10】来自10.1.84.54:2393的连接1666378193已断开 问题是如何才能避免这种情况?使得一次read的数据正好是一次发送的数据?或者说这并不是个问题,本身就是这样的机制,也避免不了? 以下是Demo的代码 服务端 package server; import java.io.IOException; import java.net.InetSocketAddress; import java.net.StandardSocketOptions; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.Executors; public class Main { private static final String SERVER_ADDRESS = "0.0.0.0"; private static final int SERVER_PORT = 8888; private static final int BUFFER_SIZE = 32 * 1024; public static void main(String[] args) { try { AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(Runtime.getRuntime().availableProcessors(), Executors.defaultThreadFactory()); AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup) .setOption(StandardSocketOptions.SO_REUSEADDR, true) .bind(new InetSocketAddress(SERVER_ADDRESS, SERVER_PORT)); serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() { @Override public void completed(AsynchronousSocketChannel asynchronousSocketChannel, Object attachment) { serverSocketChannel.accept(null, this); try { asynchronousSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); asynchronousSocketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); asynchronousSocketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true); asynchronousSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, BUFFER_SIZE); asynchronousSocketChannel.setOption(StandardSocketOptions.SO_SNDBUF, BUFFER_SIZE); ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE); asynchronousSocketChannel.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer readBuffer) { try { InetSocketAddress inetSocketAddress = (InetSocketAddress) asynchronousSocketChannel.getRemoteAddress(); if (result > 0) { readBuffer.flip(); byte[] data = new byte[readBuffer.remaining()]; readBuffer.get(data); System.out.println("线程" + Thread.currentThread().getName() + "已通过来自" + inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort() + "的连接" + asynchronousSocketChannel.hashCode() + "收到信息【" + new String(data) + "】"); } else if (result == -1) { System.out.println("来自" + inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort() + "的连接" + asynchronousSocketChannel.hashCode() + "已断开"); asynchronousSocketChannel.close(); return; } } catch (IOException e) { e.printStackTrace(); } readBuffer.clear(); asynchronousSocketChannel.read(readBuffer, readBuffer, this); } @Override public void failed(Throwable exc, ByteBuffer attachment) { System.out.println("读取信息失败"); try { asynchronousSocketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } }); InetSocketAddress inetSocketAddress = (InetSocketAddress) asynchronousSocketChannel.getRemoteAddress(); System.out.println("线程" + Thread.currentThread().getName() + "已建立来自" + inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort() + "的连接" + asynchronousSocketChannel.hashCode()); } catch (IOException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, Object attachment) { System.out.println("连接建立失败"); serverSocketChannel.accept(null, this); } }); System.out.println("服务端已启动"); } catch (IOException e) { e.printStackTrace(); } try { System.in.read(); } catch (IOException e) { e.printStackTrace(); } } } 客户端 package client; import java.io.IOException; import java.net.InetSocketAddress; import java.net.StandardSocketOptions; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; public class Main { private static final String SERVER_ADDRESS = "10.1.84.7"; private static final int SERVER_PORT = 8888; private static final int WRITE_BUFFER_SIZE = 32 * 1024; private static final int WRITE_TIMES = 10; public static void main(String[] args) { try { AsynchronousSocketChannel asynchronousSocketChannel = AsynchronousSocketChannel.open(); asynchronousSocketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); asynchronousSocketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true); asynchronousSocketChannel.connect(new InetSocketAddress(SERVER_ADDRESS, SERVER_PORT), null, new CompletionHandler<Void, Object>() { @Override public void completed(Void result, Object attachment) { System.out.println("连接服务器成功"); ByteBuffer writeBuffer = ByteBuffer.allocate(WRITE_BUFFER_SIZE); System.out.println("第1次数据由线程" + Thread.currentThread().getName() + "发送"); writeBuffer.put("1".getBytes()); writeBuffer.flip(); asynchronousSocketChannel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() { int count = 1; @Override public void completed(Integer result, ByteBuffer attachment) { if (count < WRITE_TIMES) { System.out.println("第" + ++count + "次数据由线程" + Thread.currentThread().getName() + "发送"); String msg = "" + count; writeBuffer.clear(); writeBuffer.put(msg.getBytes()); writeBuffer.flip(); asynchronousSocketChannel.write(writeBuffer, writeBuffer, this); } else { System.out.println(WRITE_TIMES + "次数据已全部发送完成"); try { asynchronousSocketChannel.close(); } catch (IOException e) { e.printStackTrace(); } System.exit(0); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { System.out.println("第" + count + "次发送数据失败"); try { asynchronousSocketChannel.close(); } catch (IOException e) { e.printStackTrace(); } System.exit(0); } }); } @Override public void failed(Throwable exc, Object attachment) { System.out.println("连接服务器失败"); System.exit(0); } }); System.out.println("开始连接服务器"); System.in.read(); } catch (IOException e) { e.printStackTrace(); } } }
查看完整描述

1 回答

  • 1 回答
  • 0 关注
  • 702 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信