博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
netty之ByteBuf详解
阅读量:6302 次
发布时间:2019-06-22

本文共 9019 字,大约阅读时间需要 30 分钟。

【ChannelPromise作用:可以设置success或failure 是为了通知ChannelFutureListener】

Netty的数据处理API通过两个组件暴露——abstract class ByteBuf和interface ByteBufHolder。

下面是一些ByteBuf API的优点:

  它可以被用户自定义的缓冲区类型扩展;
  通过内置的复合缓冲区类型实现了透明的零拷贝;
  容量可以按需增长(类似于JDK的StringBuilder);
  在读和写这两种模式之间切换不需要调用ByteBuffer的flip()方法;
  读和写使用了不同的索引;
  支持方法的链式调用;
  支持引用计数;
  支持池化。
  使用不同的读索引和写索引来控制数据访问;
  readerIndex达到和writerIndex

  使用内存的不同方式——基于字节数组和直接缓冲区;

  通过CompositeByteBuf生成多个ByteBuf的聚合视图;
  数据访问方法——搜索、切片以及复制;
  随机访问索引 【0到capacity() - 1】
  顺序访问索引
  可丢弃字节 【discardReadBytes() clear()改变index值】
  可读字节【readBytes(ByteBuf dest) 】
  可写字节【writeBytes(ByteBuf dest);】
  索引管理【markReaderIndex()、markWriterIndex()、resetWriterIndex()和resetReaderIndex( readerIndex(int)或者writerIndex(int) 】
  查找操作【buf.indexOf(),forEachByte(ByteBufProcessor.FIND_NUL), int nullIndex = buf.forEachByte(ByteBufProcessor.FIND_NUL);int rIndex = buf.forEachByte(ByteBufProcessor.FIND_CR);】
  派生缓冲区【 返回新的buf,都具有 readIndex writeIndex markIndex
  buf.duplicate();
  ByteBuf rep = buf.copy();//创建副本
  buf.slice();//操作buf分段
  buf.slice(0, 5);//操作buf分段】
  读、写、获取和设置API;
  读/写操作【get()和set()操作,从给定的索引开始,并且保持索引不变;read()和write()操作,从给定的索引开始,并且会根据已经访问过的字节数对索引进行调整】
  ByteBufAllocator池化
  ByteBufAllocator pool = new PooledByteBufAllocator();//提高性能减少碎片,高效分配算法
  ByteBufAllocator unpool = new UnpooledByteBufAllocator(true);//一直新建
  引用计数
  ByteBufAllocator allocator = ctx.channel().alloc();
  ByteBuf directBuf = allocator.directBuffer();
  if(directBuf.refCnt() == 1){//当引用技术为1时释放对象
  directBuf.release();
  }

@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws InterruptedException {    logger.info("channelRead start");    ByteBuf buf = (ByteBuf) msg;    if (buf.hasArray()) {//检查buf是否支持一个数组        byte[] array = buf.array();        //第一个偏移量        int off = buf.arrayOffset() + buf.readerIndex();        //获取可读取字节        int len = buf.readableBytes();        byte[] buffer = new byte[len];        buf.getBytes(off, buffer);        CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();        ByteBuf header = (ByteBuf) msg;        ByteBuf body = (ByteBuf) msg;        compositeByteBuf.addComponent(header);        compositeByteBuf.addComponent(body);        compositeByteBuf.removeComponent(0);        for (ByteBuf bufer : compositeByteBuf) {            System.out.println(bufer.toString());        }        int comLen = compositeByteBuf.readableBytes();        for (int i = 0; i < buf.capacity(); i++) {            System.out.println((char) buf.getByte(i));            //读完成后进行丢弃            buf.discardReadBytes();            //或者调用clear            buf.clear();        }        //标记索引        buf.readerIndex(2);        buf.writeByte(2);        //重置索引        buf.markReaderIndex();        buf.markWriterIndex();        buf.resetReaderIndex();        buf.resetWriterIndex();        //查找        buf.indexOf(0, 5, (byte) 0);        int nullIndex = buf.forEachByte(ByteBufProcessor.FIND_NUL);        int rIndex = buf.forEachByte(ByteBufProcessor.FIND_CR);        //派生缓冲区  返回新的buf,都具有 readIndex  writeIndex markIndex        buf.duplicate();        ByteBuf rep = buf.copy();//创建副本        buf.slice();//操作buf分段        buf.slice(0, 5);//操作buf分段        //==========数据访问方法——搜索、切片以及复制;        Charset charset = Charset.forName("UTF-8");        ByteBuf buf1 = Unpooled.copiedBuffer("Netty in Action rocks!", charset);  //← --  创建一个用于保存给定字符串的字节的ByteBuf        ByteBuf sliced = buf1.slice(0, 15);  //← --  创建该ByteBuf 从索引0 开始到索引15结束的一个新切片        System.out.println(sliced.toString(charset));  // ← --  将打印“Netty in Action”        buf1.setByte(0, (byte) 'J');   //← --  更新索引0 处的字节        assert buf1.getByte(0) == sliced.getByte(0); //← --  将会成功,因为数据是共享的,对其中一个所做的更改对另外一个也是可见的        Charset utf8 = Charset.forName("UTF-8");        ByteBuf buf2 = Unpooled.copiedBuffer("Netty in Action rocks!", utf8); // ← --  创建ByteBuf 以保存所提供的字符串的字节        ByteBuf copy = buf2.copy(0, 15);// ← --  创建该ByteBuf 从索引0 开始到索引15结束的分段的副本        System.out.println(copy.toString(utf8));//  ← --   将打印“Netty in Action”        buf2.setByte(0, (byte) 'J');//  ← --  更新索引0 处的字节        assert buf2.getByte(0) != copy.getByte(0);//  ← --  将会成功,因为数据不是共享的        Unpooled.unmodifiableBuffer(buf);        buf.order();        buf.readSlice(1);       //使用不同的读索引和写索引来控制数据访问;        //读写操作 get/set不改变索引位置   read/write改变索引(readIndex/writeIndex)位置        Charset u8 = Charset.forName("UTF-8");        ByteBuf getSetBuf = Unpooled.copiedBuffer("Netty in Action rocks!", u8); // 创建一个新的ByteBuf以保存给定字符串的字节        System.out.println((char) getSetBuf.getByte(0));//  打印第一个字符'N'        int readerIndex = getSetBuf.readerIndex(); //  存储当前的readerIndex 和writerIndex        int writerIndex = getSetBuf.writerIndex();        getSetBuf.setByte(0, (byte) 'B'); //  将索引0 处的字节更新为字符'B'        System.out.println((char) getSetBuf.getByte(0)); //  打印第一个字符,现在是'B'         assert readerIndex == getSetBuf.readerIndex();//  将会成功,因为这些操作并不会修改相应的索引        assert writerIndex == getSetBuf.writerIndex();        ByteBuf readWriteBuf = Unpooled.copiedBuffer("Netty in Action rocks!", u8); // 创建一个新的ByteBuf以保存给定字符串的字节        System.out.println((char) readWriteBuf.readByte());//  打印第一个字符'N'        System.out.println((boolean) readWriteBuf.readBoolean());//  读取当前boolean值,并将readIndex+1        readWriteBuf.writeByte('F');//  将字符F追加到缓冲区中,并将writeIndex+1        int reIndex = readWriteBuf.readerIndex(); //  存储当前的readerIndex 和writerIndex        int wrIndex = readWriteBuf.writerIndex();        readWriteBuf.setByte(0, (byte) 'B'); //  将索引0 处的字节更新为字符'B'        System.out.println((char) readWriteBuf.getByte(0)); //  打印第一个字符,现在是'B'         assert reIndex == readWriteBuf.readerIndex();//  将会成功,因为这些操作并不会修改相应的索引        assert wrIndex == readWriteBuf.writerIndex();        buf.isReadable();//至少有一个字符可读,返回true        buf.isWritable();//至少有一个字节可被写入,返回true        int readableByte = buf.readableBytes();//返回可被读取的字节数        int writableByte = buf.writableBytes();//返回可被写入的字节数        int capacity = buf.capacity();//返回可容纳的字节数        buf.maxCapacity();//返回可容纳的最大字节数        buf.hasArray();//如果buf由一个字节数组支撑,返回true        buf.array();//将buf转换为字节数组        //除了数据外,还有一些其他的属性,如http的状态码,cookie等        ByteBufHolder byteBufHolder = new DefaultLastHttpContent();        ByteBuf httpContent = byteBufHolder.content();//返回一个http格式的ByteBuf        ByteBufHolder copyBufHolder = byteBufHolder.copy();//深拷贝,不共享        ByteBufHolder duplicateBufHolder = byteBufHolder.duplicate();//浅拷贝,共享        //ByteBufAllocator  ByteBuf分配        // buffer()基于堆或直接内存的buf        //ioBuffer() 返回一个iobuf        //heapBuffer  堆buf        //directBuffer 直接buf        //compositeBuffer  compositeHeapBuffer   compositeDirectBuffer   复合buf        ByteBufAllocator ctxAllocator = ctx.alloc();        ByteBufAllocator channelAllocator = ctx.channel().alloc();        ctxAllocator.buffer();        ctxAllocator.ioBuffer();        ctxAllocator.compositeBuffer();        ctxAllocator.heapBuffer();        ctxAllocator.directBuffer();        ByteBufAllocator pool = new PooledByteBufAllocator();//提高性能减少碎片,高效分配算法        ByteBufAllocator unpool = new UnpooledByteBufAllocator(true);//一直新建        ctx.writeAndFlush(new byte[10]);        ctx.writeAndFlush(Unpooled.copiedBuffer(new byte[10]));//writeAndFlush参数是Object,使用非池化技术转为buf提升效率        //工具类ByteBufUtil        ByteBufUtil.hexDump(buf);//可对buf进行转换        ByteBufUtil.hexDump(new byte[9999]);//可对字节进行转换        //引用计数:跟踪特定对象的引用计数        ByteBufAllocator allocator = ctx.channel().alloc();        ByteBuf directBuf = allocator.directBuffer();        if(directBuf.refCnt() == 1){//当引用技术为1时释放对象            directBuf.release();        }        if (buf.readableBytes() <= 0) {            ReferenceCountUtil.safeRelease(msg);            return;        }        byte[] msgContent = new byte[buf.readableBytes()];        buf.readBytes(msgContent);        logger.info("recv from client:length: {},toHexString: {}\n", buf.readableBytes(), HexStringUtils.toHexString(buf.array()));        if (buf.getByte(0) == 0x7e && buf.getByte(buf.readableBytes() - 1) == 0x7e) {}        if(msgContent[0] == 0x7e && msgContent[msgContent.length-1]==0x7e){}    }}//通常如果集成ChannelInboundHandlerAdapter时,复写channelRead(),需要进行手动的消息释放//消息消费完成后自动释放private void release(Object msg) {    try {        ReferenceCountUtil.release(msg);    } catch (Exception e) {        e.printStackTrace();    }}===================================================//在SimpleChannelInboundHandler中的channelRead0()方法中自动添加了释放消息的方法//SimpleChannelInboundHandler extends ChannelInboundHandlerAdapter@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {    boolean release = true;    try {        if (acceptInboundMessage(msg)) {            @SuppressWarnings("unchecked")            I imsg = (I) msg;            channelRead0(ctx, imsg);        } else {            release = false;            ctx.fireChannelRead(msg);        }    } finally {        if (autoRelease && release) {            ReferenceCountUtil.release(msg);        }    }}

  

转载于:https://www.cnblogs.com/htkj/p/10932668.html

你可能感兴趣的文章
iOS 学习资料汇总
查看>>
centos7 yum安装jdk
查看>>
Bluedroid与BluZ,蓝牙测试方法的变动(基于bludroid和BlueZ的对比)
查看>>
接口和抽象类有什么区别
查看>>
Linux 下添加用户,修改权限
查看>>
请问view controller scene,该如何删除
查看>>
bootstrap新闻模块样式模板
查看>>
zzzzw_在线考试系统①准备篇
查看>>
App Store 审核被拒的23个理由
查看>>
剑指offer第二版-1.赋值运算符函数
查看>>
javascript 对象
查看>>
Android学习笔记——文件路径(/mnt/sdcard/...)、Uri(content://media/external/...)学习
查看>>
Echart:前端很好的数据图表展现工具+demo
查看>>
CATransform3D iOS动画特效详解
查看>>
Linux VNC黑屏(转)
查看>>
Java反射简介
查看>>
react脚手架应用以及iview安装
查看>>
shell学习之用户管理和文件属性
查看>>
day8--socket网络编程进阶
查看>>
node mysql模块写入中文字符时的乱码问题
查看>>