【ChannelPromise作用:可以设置success或failure 是为了通知ChannelFutureListener】
Netty的数据处理API通过两个组件暴露——abstract class ByteBuf和interface ByteBufHolder。下面是一些ByteBuf API的优点:
它可以被用户自定义的缓冲区类型扩展; 通过内置的复合缓冲区类型实现了透明的零拷贝; 容量可以按需增长(类似于JDK的StringBuilder); 在读和写这两种模式之间切换不需要调用ByteBuffer的flip()方法; 读和写使用了不同的索引; 支持方法的链式调用; 支持引用计数; 支持池化。 使用不同的读索引和写索引来控制数据访问; readerIndex达到和writerIndex使用内存的不同方式——基于字节数组和直接缓冲区;
通过CompositeByteBuf生成多个ByteBuf的聚合视图; 数据访问方法——搜索、切片以及复制; 随机访问索引 【0到capacity() - 1】 顺序访问索引 可丢弃字节 【discardReadBytes() clear()改变index值】 可读字节【readBytes(ByteBuf dest) 】 可写字节【writeBytes(ByteBuf dest);】 索引管理【markReaderIndex()、markWriterIndex()、resetWriterIndex()和resetReaderIndex( readerIndex(int)或者writerIndex(int) 】 查找操作【buf.indexOf(),forEachByte(ByteBufProcessor.FIND_NUL), int nullIndex = buf.forEachByte(ByteBufProcessor.FIND_NUL);int rIndex = buf.forEachByte(ByteBufProcessor.FIND_CR);】 派生缓冲区【 返回新的buf,都具有 readIndex writeIndex markIndex buf.duplicate(); ByteBuf rep = buf.copy();//创建副本 buf.slice();//操作buf分段 buf.slice(0, 5);//操作buf分段】 读、写、获取和设置API; 读/写操作【get()和set()操作,从给定的索引开始,并且保持索引不变;read()和write()操作,从给定的索引开始,并且会根据已经访问过的字节数对索引进行调整】 ByteBufAllocator池化 ByteBufAllocator pool = new PooledByteBufAllocator();//提高性能减少碎片,高效分配算法 ByteBufAllocator unpool = new UnpooledByteBufAllocator(true);//一直新建 引用计数 ByteBufAllocator allocator = ctx.channel().alloc(); ByteBuf directBuf = allocator.directBuffer(); if(directBuf.refCnt() == 1){//当引用技术为1时释放对象 directBuf.release(); }@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws InterruptedException { logger.info("channelRead start"); ByteBuf buf = (ByteBuf) msg; if (buf.hasArray()) {//检查buf是否支持一个数组 byte[] array = buf.array(); //第一个偏移量 int off = buf.arrayOffset() + buf.readerIndex(); //获取可读取字节 int len = buf.readableBytes(); byte[] buffer = new byte[len]; buf.getBytes(off, buffer); CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer(); ByteBuf header = (ByteBuf) msg; ByteBuf body = (ByteBuf) msg; compositeByteBuf.addComponent(header); compositeByteBuf.addComponent(body); compositeByteBuf.removeComponent(0); for (ByteBuf bufer : compositeByteBuf) { System.out.println(bufer.toString()); } int comLen = compositeByteBuf.readableBytes(); for (int i = 0; i < buf.capacity(); i++) { System.out.println((char) buf.getByte(i)); //读完成后进行丢弃 buf.discardReadBytes(); //或者调用clear buf.clear(); } //标记索引 buf.readerIndex(2); buf.writeByte(2); //重置索引 buf.markReaderIndex(); buf.markWriterIndex(); buf.resetReaderIndex(); buf.resetWriterIndex(); //查找 buf.indexOf(0, 5, (byte) 0); int nullIndex = buf.forEachByte(ByteBufProcessor.FIND_NUL); int rIndex = buf.forEachByte(ByteBufProcessor.FIND_CR); //派生缓冲区 返回新的buf,都具有 readIndex writeIndex markIndex buf.duplicate(); ByteBuf rep = buf.copy();//创建副本 buf.slice();//操作buf分段 buf.slice(0, 5);//操作buf分段 //==========数据访问方法——搜索、切片以及复制; Charset charset = Charset.forName("UTF-8"); ByteBuf buf1 = Unpooled.copiedBuffer("Netty in Action rocks!", charset); //← -- 创建一个用于保存给定字符串的字节的ByteBuf ByteBuf sliced = buf1.slice(0, 15); //← -- 创建该ByteBuf 从索引0 开始到索引15结束的一个新切片 System.out.println(sliced.toString(charset)); // ← -- 将打印“Netty in Action” buf1.setByte(0, (byte) 'J'); //← -- 更新索引0 处的字节 assert buf1.getByte(0) == sliced.getByte(0); //← -- 将会成功,因为数据是共享的,对其中一个所做的更改对另外一个也是可见的 Charset utf8 = Charset.forName("UTF-8"); ByteBuf buf2 = Unpooled.copiedBuffer("Netty in Action rocks!", utf8); // ← -- 创建ByteBuf 以保存所提供的字符串的字节 ByteBuf copy = buf2.copy(0, 15);// ← -- 创建该ByteBuf 从索引0 开始到索引15结束的分段的副本 System.out.println(copy.toString(utf8));// ← -- 将打印“Netty in Action” buf2.setByte(0, (byte) 'J');// ← -- 更新索引0 处的字节 assert buf2.getByte(0) != copy.getByte(0);// ← -- 将会成功,因为数据不是共享的 Unpooled.unmodifiableBuffer(buf); buf.order(); buf.readSlice(1); //使用不同的读索引和写索引来控制数据访问; //读写操作 get/set不改变索引位置 read/write改变索引(readIndex/writeIndex)位置 Charset u8 = Charset.forName("UTF-8"); ByteBuf getSetBuf = Unpooled.copiedBuffer("Netty in Action rocks!", u8); // 创建一个新的ByteBuf以保存给定字符串的字节 System.out.println((char) getSetBuf.getByte(0));// 打印第一个字符'N' int readerIndex = getSetBuf.readerIndex(); // 存储当前的readerIndex 和writerIndex int writerIndex = getSetBuf.writerIndex(); getSetBuf.setByte(0, (byte) 'B'); // 将索引0 处的字节更新为字符'B' System.out.println((char) getSetBuf.getByte(0)); // 打印第一个字符,现在是'B' assert readerIndex == getSetBuf.readerIndex();// 将会成功,因为这些操作并不会修改相应的索引 assert writerIndex == getSetBuf.writerIndex(); ByteBuf readWriteBuf = Unpooled.copiedBuffer("Netty in Action rocks!", u8); // 创建一个新的ByteBuf以保存给定字符串的字节 System.out.println((char) readWriteBuf.readByte());// 打印第一个字符'N' System.out.println((boolean) readWriteBuf.readBoolean());// 读取当前boolean值,并将readIndex+1 readWriteBuf.writeByte('F');// 将字符F追加到缓冲区中,并将writeIndex+1 int reIndex = readWriteBuf.readerIndex(); // 存储当前的readerIndex 和writerIndex int wrIndex = readWriteBuf.writerIndex(); readWriteBuf.setByte(0, (byte) 'B'); // 将索引0 处的字节更新为字符'B' System.out.println((char) readWriteBuf.getByte(0)); // 打印第一个字符,现在是'B' assert reIndex == readWriteBuf.readerIndex();// 将会成功,因为这些操作并不会修改相应的索引 assert wrIndex == readWriteBuf.writerIndex(); buf.isReadable();//至少有一个字符可读,返回true buf.isWritable();//至少有一个字节可被写入,返回true int readableByte = buf.readableBytes();//返回可被读取的字节数 int writableByte = buf.writableBytes();//返回可被写入的字节数 int capacity = buf.capacity();//返回可容纳的字节数 buf.maxCapacity();//返回可容纳的最大字节数 buf.hasArray();//如果buf由一个字节数组支撑,返回true buf.array();//将buf转换为字节数组 //除了数据外,还有一些其他的属性,如http的状态码,cookie等 ByteBufHolder byteBufHolder = new DefaultLastHttpContent(); ByteBuf httpContent = byteBufHolder.content();//返回一个http格式的ByteBuf ByteBufHolder copyBufHolder = byteBufHolder.copy();//深拷贝,不共享 ByteBufHolder duplicateBufHolder = byteBufHolder.duplicate();//浅拷贝,共享 //ByteBufAllocator ByteBuf分配 // buffer()基于堆或直接内存的buf //ioBuffer() 返回一个iobuf //heapBuffer 堆buf //directBuffer 直接buf //compositeBuffer compositeHeapBuffer compositeDirectBuffer 复合buf ByteBufAllocator ctxAllocator = ctx.alloc(); ByteBufAllocator channelAllocator = ctx.channel().alloc(); ctxAllocator.buffer(); ctxAllocator.ioBuffer(); ctxAllocator.compositeBuffer(); ctxAllocator.heapBuffer(); ctxAllocator.directBuffer(); ByteBufAllocator pool = new PooledByteBufAllocator();//提高性能减少碎片,高效分配算法 ByteBufAllocator unpool = new UnpooledByteBufAllocator(true);//一直新建 ctx.writeAndFlush(new byte[10]); ctx.writeAndFlush(Unpooled.copiedBuffer(new byte[10]));//writeAndFlush参数是Object,使用非池化技术转为buf提升效率 //工具类ByteBufUtil ByteBufUtil.hexDump(buf);//可对buf进行转换 ByteBufUtil.hexDump(new byte[9999]);//可对字节进行转换 //引用计数:跟踪特定对象的引用计数 ByteBufAllocator allocator = ctx.channel().alloc(); ByteBuf directBuf = allocator.directBuffer(); if(directBuf.refCnt() == 1){//当引用技术为1时释放对象 directBuf.release(); } if (buf.readableBytes() <= 0) { ReferenceCountUtil.safeRelease(msg); return; } byte[] msgContent = new byte[buf.readableBytes()]; buf.readBytes(msgContent); logger.info("recv from client:length: {},toHexString: {}\n", buf.readableBytes(), HexStringUtils.toHexString(buf.array())); if (buf.getByte(0) == 0x7e && buf.getByte(buf.readableBytes() - 1) == 0x7e) {} if(msgContent[0] == 0x7e && msgContent[msgContent.length-1]==0x7e){} }}//通常如果集成ChannelInboundHandlerAdapter时,复写channelRead(),需要进行手动的消息释放//消息消费完成后自动释放private void release(Object msg) { try { ReferenceCountUtil.release(msg); } catch (Exception e) { e.printStackTrace(); }}===================================================//在SimpleChannelInboundHandler中的channelRead0()方法中自动添加了释放消息的方法//SimpleChannelInboundHandler extends ChannelInboundHandlerAdapter@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { boolean release = true; try { if (acceptInboundMessage(msg)) { @SuppressWarnings("unchecked") I imsg = (I) msg; channelRead0(ctx, imsg); } else { release = false; ctx.fireChannelRead(msg); } } finally { if (autoRelease && release) { ReferenceCountUtil.release(msg); } }}