最近又开始需要使用netty进行网络通信方面的编程开发了。于是遇到了一些问题通过查找好多资料记录下来。
做的内容大致是:客户端向服务端发送一条命令,服务端接收到之后,根据命令里面的一些信息去读取服务器上的一些文件并把文件内容(文件的内容类似于数据库中的一行一行的数据,是以行存储的,每个字段值以\t分割,每条数据为一行)发送给客户端处理(我这里的样例暂以获取数据之后按行保存入文件中)。
1、客户端服务端的代码
cmdLog = getSearchCmd(); ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); ClientBootstrap bootstrap = new ClientBootstrap(factory); final ClientBufferHandler clientHandler = new ClientBufferHandler(cmdLog, getEncoding()); final DelimiterBasedFrameDecoder clientDecoder = new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, false, true, ChannelBuffers.copiedBuffer("\r\n", Charset.defaultCharset())); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(clientDecoder, clientHandler); } }); bootstrap.setOption("tcpNoDelay", true); bootstrap.setOption("keepAlive", true); ChannelFuture future = bootstrap.connect(new InetSocketAddress(serverHost, serverPort)); future.awaitUninterruptibly(); if (!future.isSuccess()) { future.getCause().printStackTrace(); } future.getChannel().getCloseFuture().awaitUninterruptibly(); factory.releaseExternalResources();
cmdLog是客户端要发送的命令,getEncoding()是因为每个服务端要读取的文件可能是不同的编码,客户端这边传过去之后通过这个来编码。
有两个handler,下面会介绍,其他的都是很常规的这里就不多说了。
2、服务端代码
ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool() ,Executors.newCachedThreadPool()); ServerBootstrap bootstrap = new ServerBootstrap(factory); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { ChannelPipeline pipeline = Channels.pipeline( new ServerDecoderHandler(), new FileSearchHandler()); return pipeline; } }); bootstrap.setOption("child.tcpNoDelay", true); bootstrap.setOption("child.keepAlive", true); bootstrap.bind(new InetSocketAddress(8027));
服务端的代码也很简单,后面会详细介绍handler。
3、先看客户端的handler,一个是ClientBufferHandler,这个是用来发送命令并接收服务端响应handler。
ClientBufferHandler extends SimpleChannelHandler
private static final String testPath = "F:/ test/test"; private String cmd; private String encoding; public ClientBufferHandler(String cmd, String encoding) { this.cmd = cmd; this.encoding = encoding; } @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { int cmdLength = cmd.getBytes().length; ChannelBuffer cmdBuffer = ChannelBuffers.buffer(cmdLength+4); cmdBuffer.writeInt(cmdLength); cmdBuffer.writeBytes(cmd.getBytes()); e.getChannel().write(cmdBuffer); } @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { ChannelBuffer buf = (ChannelBuffer) e.getMessage(); if(!buf.readable()) { return; } FileHelper.writeFile(testPath, buf.toString(Charset.forName(encoding)));
重写channelConnected方法,发送命令。我这里在发送命令前面跟了四个字节的命令长度,保证服务端一次接收到所有的命令信息。
重写messageReceived方法,接收服务端获取的信息存入文件中。FileHelper.writeFile是把字符串追加写入文件的工具方法我就不放出了,实现方法还是很多的。
在这个之前还有一个解码器,因为服务端发送过来的数据都是按行发送的(每行结尾是\r\n),所以使用netty提供的一个解码器DelimiterBasedFrameDecoder实现按分隔符分割接收到的数据,构造方法见客户端的代码。保证获取的数据每次都是完整的一行。这里感谢一下http://blog.163.com/linfenliang@126/blog/static/12785719520121082103807/提供的netty的分包、组包、粘包处理机制。
4、然后是服务端的handler。
首先是ServerDecoderHandler解码器,保证能够读取完整的命令并把命令前的四个字节用来标识命令长度的内容丢掉。
public class ServerDecoderHandler extends FrameDecoder { @Override protected Object decode(ChannelHandlerContext ctx, Channel c, ChannelBuffer buf) throws Exception { int length = 4; if(buf.readableBytes() < length) { return null; } byte[] header = new byte[length]; buf.markReaderIndex(); buf.readBytes(header); int cmdLength = (header[0] & 0xFF) << 24 | (header[1] & 0xFF) << 16 | (header[2] & 0xFF) << 8 | (header[3] & 0xFF); if (cmdLength != 0) { if (buf.readableBytes() < cmdLength) { buf.resetReaderIndex(); return null; } length += cmdLength; } buf.resetReaderIndex(); buf.readerIndex(4); return buf.readBytes(cmdLength); } }
这部分代码内容比较简单就不多做说明了。
然后是FileSearchHandler的代码。
public class CdrFileSearchHandler extends SimpleChannelUpstreamHandler { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { ChannelBuffer buf = (ChannelBuffer) e.getMessage(); String cmd = buf.toString(Charset.defaultCharset()); logger.info("查询命令:\r\n" + cmd); // 返回文件内容 Channel ch = e.getChannel(); ChannelFuture f = null; final BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(getPath(cmd)))); String line = ""; while (line != null) { line = reader.readLine(); if (line != null) { ChannelBuffer returnBuf = ChannelBuffers.dynamicBuffer(); returnBuf.writeBytes((line + "\r\n").getBytes()); f = ch.write(returnBuf); } } if(line == null) { f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { reader.close(); Channel ch = future.getChannel(); ch.close(); } }); } }
这部分代码其实也很简单,就是获取到命令,根据命令选择文件(这部分代码省略了),按行读取文件,然后加上\r\n然后写入发送。当line不为null时则读取到了数据,如果为null则说明没有读取到数据,跳出循环,并且添加监听器,当发送完关闭各种链接。(之所以会这样判断是因为netty本身是多次调用messageReceived的,需要在发送完最后一条数据的时候关闭连接。)
这种内容发送的方式只适用与文件内容比较小可以,但是youyu 一般的handler都是同步执行的,一旦文件内容很大,就会因为文件读取耗时较长导致Worker线程不能及时返回处理其它请求,对性能影响较高,从而导致内存溢出等问题。
这个时候就需要使用netty提供的一个handler,ExecutionHandler,ExecutionHandler就是为这种情况设计了,它提供了一种异步处理任务的机制,将它之后handler处理以任务的形式投递到线程池中并直接返回。ExecutionHandler不像其它Handler都是独立的,它是所有Handler共享使用。其使用OrderedMemoryAwareThreadPoolExecutor线程池来保证同一个Channel上事件的先后顺序。
所以在服务端的代码处需要修改代码如下:
bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { ChannelPipeline pipeline = Channels.pipeline( new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576)), new ServerDecoderHandler(), new CdrFileSearchHandler()); return pipeline; } });
增加一个ExecutionHandler的handler,即可处理。
注意ExecutionHandler一定要在不同的pipeline 之间共享。它的作用是自动从ExecutionHandler自己管理的一个线程池中拿出一个线程来处理排在它后面的业务逻辑handler。而 worker线程在经过ExecutionHandler后就结束了,它会被ChannelFactory的worker线程池所回收。
它的构造方法是ExecutionHandler(Executor executor) ,很显然executor就是ExecutionHandler内部管理的线程池了。netty额外给我们提供了两种线程池:
MemoryAwareThreadPoolExecutor和OrderedMemoryAwareThreadPoolExecutor,它们都在org.jboss.netty.handler.execution 包下。
MemoryAwareThreadPoolExecutor 确保jvm不会因为过多的线程而导致内存溢出错误,OrderedMemoryAwareThreadPoolExecutor是前一个线程池的子类,除 了保证没有内存溢出之外,还可以保证channel event的处理次序。具体可以查看API文档,上面有详细说明。
总结一下:写这些东西用了一天的时间,其实多数时候都是在测试大文件的问题,网上好多源码分析的文章,都很浅,真正说每种buffer怎么用,每种handler的用法的很少。而详细讲源码的文章多数也都是抄来抄去,但是也还是有好多不错的。所以遇到什么问题多找找资料,在自己探索一下应该都可以解决的。(其实api上面的东西还是不少的,有时间研究的话应该好好看看netty的源码有助于更好的使用netty开发)
最后说下这里使用的版本是3.6的。
相关推荐
简单但是内容不浅的netty传输文件的例子,实现客户端和服务器端。全面,5积分绝对值得。本人通过很久测试才完成该简单通俗易懂的例子。 netty版本:4.0.23
通用的netty传输协议 通过该协议进行文件传输 文件传输客户端与服务端 可以根据文件的最后更新时间来增量传输文件 源码开放,通过eclipse或者idea导入代码即可运行 协议开放,协议是自定义的协议,大家可以根据需求...
支持多文件、大文件上传,客户端长连接,只要客户端有文件就通知服务器要发送文件。
使用Netty搭建WebSocket服务器,该资源示范如何修改单包大小限制,解决不能发送大数据包的问题。
这个小程序使用netty5进行udp网络通讯,客户端有两种,1:用netty5类库发送DatagramPacket和接收 2:直接使用DatagramSocket发送接收DatagramPacket 先运行netty_server的QuoteOfTheMomentServer, 在运行netty_...
通过netty获取文件进度,上传文件的时时数据大小,剩余时间.websocket和HTML结合,并不javascript的轮询获取上传进度,而是通过NIO的推送.
java Netty实现大文件分块传输详解
封装NETTY来实现一个分布式文件传输服务,目标是提供定时的可靠安全文件传输。同时提供一种简便的方式,只需将数据配置好
使用netty进行rtsp服务端开发 一个使用netty写的rtsp服务器。 目前支持H264、H265、 AAC格式的流文件上传与存储。 H264、H265、AAC格式流文件的播放
Netty+H5实现实时进度条文件上传,支持断点续传。 1、WebSocketServer:服务启动类 2、服务启动后,浏览器访问http://localhost:9999 3、FileWebSocketFrameHandler类SERVER_SAVE_PATH常量为文件上传保存路经
描述文档请看我的个人博客:www.mesoftware.cn
netty案例,netty4.1中级拓展篇四《Netty传输文件、分片发送、断点续传》源码 ...
采用Netty5.0编的的TCP/IP上传文件的列子,实现了拆包,合并
在使用netty进行网络通信协议传输使用protobuf时protobuf编译.proto文件生成JAVA类.zip 包括测试proto3.proto文件,自动protobuf编译.proto文件生成JAVA类
采用netty与protobuf进行文件传输
使用netty进行安卓端发送接收文字,并且附带发送图片功能,折腾了几天的netty总算有点眉目了,做下记录。 详细介绍:http://lison.cc/508.html github下载地址:https://github.com/LisonLiou/netty-learning.git
netty5 对象传输demo
文件传输基于netty实现的文件上传下载(简单的个人云盘)发布: : : upload? ***下载: : : *** config.NettyServerConfig中可以设置上传下载的端口号,文件的目录,访问的密码(passwd)打成jar包执行的时候,...
包括netty的文件传输的实例,以及一些netty的简单的实例,里面包含有相对应的jar包,比起那些要5分,然后下下来P都没有的要实惠多了。自己也是新研究的,希望能和大家一起讨论分享
netty案例,netty4.1中级拓展篇三《Netty传输Java对象》源码 https://mp.weixin.qq.com/s?__biz=MzIxMDAwMDAxMw==&mid=2650724806&idx=1&sn=bb986119b9cdd950e2e6d995295e7f06&scene=19#wechat_redirect