程序员求职经验分享与学习资料整理平台

网站首页 > 文章精选 正文

Netty源码—8.编解码原理一(netty自定义编码器和解码器)

balukai 2025-04-09 14:10:51 文章精选 14 ℃

大纲

1.读数据入口

2.拆包原理

3.ByteToMessageDecoder解码步骤

4.解码器抽象的解码过程总结

5.Netty里常见的开箱即用的解码器

6.writeAndFlush()方法的大体步骤

7.MessageToByteEncoder的编码步骤

8.unsafe.write()写队列

9.unsafe.flush()刷新写队列

10.如何把对象变成字节流写到unsafe底层


1.读数据入口

当客户端Channel的Reactor线程NioEventLoop检测到有读事件时,会执行NioByteUnsafe的read()方法。该方法会调用doReadBytes()方法将TCP缓冲区的数据读到由ByteBufAllocator分配的一个ByteBuf对象中,然后通过pipeline.fireChannelRead()方法带上这个ByteBuf对象向下传播ChannelRead事件


在传播的过程中,首先会来到pipeline的head结点的channelRead()方法。该方法会继续带着那个ByteBuf对象向下传播ChannelRead事件,比如会来到ByteToMessageDecoder结点的channelRead()方法


注意:服务端Channel的unsafe变量是一个NioMessageUnsafe对象,客户端Channel的unsafe变量是一个NioByteUnsafe对象。

//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
    Selector selector;
    private SelectedSelectionKeySet selectedKeys;
    private boolean needsToSelectAgain;
    private int cancelledKeys;
    ...
    @Override
    protected void run() {
        for (;;) {
            ...
            //1.调用select()方法执行一次事件轮询
            select(wakenUp.getAndSet(false));
            if (wakenUp.get()) {
                selector.wakeup();
            }
            ...
            //2.处理产生IO事件的Channel
            needsToSelectAgain = false;
            processSelectedKeys();
            ...
            //3.执行外部线程放入TaskQueue的任务
            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
        }
    }
    
    private void processSelectedKeys() {
        if (selectedKeys != null) {
            //selectedKeys.flip()会返回一个数组
            processSelectedKeysOptimized(selectedKeys.flip());
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }
    
    private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
        for (int i = 0;; i ++) {
            //1.首先取出IO事件
            final SelectionKey k = selectedKeys[i];
            if (k == null) {
                break;
            }
            selectedKeys[i] = null;//Help GC
            //2.然后获取对应的Channel和处理该Channel
            //默认情况下,这个a就是NioChannel,也就是服务端启动时经过Netty封装的Channel
            final Object a = k.attachment();
            if (a instanceof AbstractNioChannel) {
                //网络事件的处理
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                //NioTask主要用于当一个SelectableChannel注册到Selector时,执行的一些任务
                NioTask task = (NioTask) a;
                processSelectedKey(k, task);
            }
            //3.最后判断是否应该再进行一次轮询
            if (needsToSelectAgain) {
                for (;;) {
                    i++;
                    if (selectedKeys[i] == null) {
                        break;
                    }
                    selectedKeys[i] = null;
                }
                selectAgain();
                //selectedKeys.flip()会返回一个数组
                selectedKeys = this.selectedKeys.flip();
                i = -1;
            }
        }
    }
    
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        ...
        try {
            int readyOps = k.readyOps();
            ...
            //新连接已准备接入或者已经存在的连接有数据可读
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                //如果是新连接已准备接入,则调用NioMessageUnsafe的read()方法
                //如果是已经存在的连接有数据可读,执行的是NioByteUnsafe的read()方法
                unsafe.read();
                if (!ch.isOpen()) {
                    return;
                }
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
    ...
}

public abstract class AbstractNioByteChannel extends AbstractNioChannel {
    ...
    protected class NioByteUnsafe extends AbstractNioUnsafe {
        ...
        @Override
        public final void read() {
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            //创建ByteBuf分配器
            final ByteBufAllocator allocator = config.getAllocator();
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            do {
                //1.分配一个ByteBuf
                byteBuf = allocHandle.allocate(allocator);
                //2.将数据读取到分配的ByteBuf中
                allocHandle.lastBytesRead(doReadBytes(byteBuf));
                if (allocHandle.lastBytesRead() <= 0) {
                    byteBuf.release();
                    byteBuf = null;
                    close = allocHandle.lastBytesRead() < 0;
                    break;
                }
                ...
                //3.调用DefaultChannelPipeline的fireChannelRead()方法从Head结点开始传播事件
                pipeline.fireChannelRead(byteBuf);
                byteBuf = null;
            } while (allocHandle.continueReading());

            allocHandle.readComplete();
            //4.调用DefaultChannelPipeline的fireChannelReadComplete()方法从Head结点开始传播事件
            pipeline.fireChannelReadComplete();
            ...
        }
    }
}

NioByteUnsafe主要会进行如下处理:

一.通过客户端Channel的ChannelConfig获取内存分配器ByteBufAllocator,然后用内存分配器来分配一个ByteBuf对象

二.将客户端Channel中的TCP缓冲区的数据读取到ByteBuf对象

三.读取完数据后便调用DefaultChannelPipeline的fireChannelReadComplete()方法,从HeadContext结点开始在整个ChannelPipeline中传播ChannelRead事件


2.拆包原理

一.不用Netty的拆包原理

不断地从TCP缓冲区中读取数据,每次读完都判断是否为一个完整的数据包。如果当前读取的数据不足以拼接成一个完整的数据包,就保留数据,继续从TCP缓冲区中读。如果当前读取的数据加上已读取的数据足够拼成完整的数据包,就将拼好的数据包往业务传递,而多余的数据则保留。


二.Netty的拆包原理

Netty拆包基类内部会有一个字节容器每次读取到数据就添加到字节容器中。然后尝试对累加的字节数据进行拆包,拆成一个完整的业务数据包,这个拆包基类叫ByteToMessageDecoder


3.ByteToMessageDecoder解码步骤

(1)解码的整体步骤

(2)首先累加字节流

(3)然后调用子类的decode()方法进行解析

(4)接着清理累加字节容器

(5)最后将解析到的ByteBuf向下传播


(1)解码的整体介绍

一.累加字节流

Netty会通过一个ByteBuf字节容器cumulation,把所有读取到的字节流累加到该字节容器。


二.调用子类的decode()方法进行解析

把累加字节容器里的字节流通过子类的decode()方法进行解析


三.将解析到的ByteBuf向下传播

如果调用子类的decode()方法可以解析到一个ByteBuf对象,则将这个ByteBuf对象向下传播

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
    ...
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //只基于ByteBuf对象进行解码
        if (msg instanceof ByteBuf) {
            //1.累加字节流
            //2.调用子类的decode()方法进行解析
            //3.清理累加字节容器
            //4.将解析到的ByteBuf向下传播
        } else {
            ctx.fireChannelRead(msg);
        }
    }
    ...
}

(2)首先累加字节流

如果当前字节容器中没有数据,那么就将字节容器的指针指向新读取的数据。如果当前字节容器中有数据,那么就调用累加器的cumulate()方法将数据累加到字节容器

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
    ByteBuf cumulation;//字节容器
    private Cumulator cumulator = MERGE_CUMULATOR;//默认的累加器
    private boolean decodeWasNull;
    private boolean first;
    private int discardAfterReads = 16;
    private int numReads;
    ...
    //Cumulate ByteBufs by merge them into one ByteBuf's, using memory copies.
    public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
        //累加器的累加方法,会传入一个字节容器cumulation
        @Override
        public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
            ByteBuf buffer;//一个大的字节容器,用来copy传入的字节容器
            if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() || cumulation.refCnt() > 1) {
                buffer = expandCumulation(alloc, cumulation, in.readableBytes());
            } else {
                buffer = cumulation;
            }
            buffer.writeBytes(in);//将当前数据累加到copy的字节容器中
            in.release();
            return buffer;//返回copy的字节容器
        }
    };

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                //1.累加字节流
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
                if (first) {//如果当前字节容器中没有数据
                    //就将字节容器的指针指向新读取的数据
                    cumulation = data;
                } else {//如果当前字节容器中有数据
                    //则调用累加器的cumulate()方法将数据累加到字节容器
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
             
                //2.将字节容器里的数据传递给业务拆包器进行拆包
                //调用callDecode()方法对数据进行拆包
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new DecoderException(t);
            } finally {
                //3.清理字节容器
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                } else if (++ numReads >= discardAfterReads) {
                    numReads = 0;
                    discardSomeReadBytes();
                }
          
                //4.将解析到的ByteBuf向下传播
                int size = out.size();
                decodeWasNull = !out.insertSinceRecycled();
                fireChannelRead(ctx, out, size);
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }
    
    static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
        ByteBuf oldCumulation = cumulation;
        cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
        cumulation.writeBytes(oldCumulation);
        oldCumulation.release();
        return cumulation;
    }
    ...
}

(3)然后调用子类的decode()方法进行解析

将数据累加到字节容器之后,就会调用callDecode()方法,这个方法会尝试将字节容器的数据拆分成业务数据包将业务数据包放入业务数据容器out中


Netty对各种用户协议的支持就体现在ByteToMessageDecoder的抽象方法decode()中,decode()方法的入参当前读取到的未被处理的所有数据in业务数据包容器out所有拆包器都需要实现ByteToMessageDecoder的decoed()方法


拆包器完成一次拆包后:如果没有拆到一个完整的数据包,此时若拆包器未读取任何数据则跳出循环,否则继续拆包。如果已经拆到一个完整的数据包,但此时拆包器未读取任何数据,则抛出一个异常DecodeException。

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
    ...
    //Called once data should be decoded from the given ByteBuf. 
    //This method will call #decode(ChannelHandlerContext, ByteBuf, List) as long as decoding should take place.
    //@param ctx,the ChannelHandlerContext which this ByteToMessageDecoder belongs to
    //@param in,the ByteBuf from which to read data
    //@param out,the List to which decoded messages should be added
    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            while (in.isReadable()) {
                int outSize = out.size();
                if (outSize > 0) {
                    fireChannelRead(ctx, out, outSize);
                    out.clear();
                    //Check if this handler was removed before continuing with decoding.
                    //If it was removed, it is not safe to continue to operate on the buffer.
                    if (ctx.isRemoved()) {
                        break;
                    }
                    outSize = 0;
                }

                int oldInputLength = in.readableBytes();
                //调用拆包器实现的decode()方法
                decode(ctx, in, out);
               
                //拆包器完成一次拆包后:
           
                //Check if this handler was removed before continuing the loop.
                //If it was removed, it is not safe to continue to operate on the buffer.
                if (ctx.isRemoved()) {
                    break;
                }
                //outSize == out.size()表示没有拆到一个完整的数据包
                if (outSize == out.size()) {
                    if (oldInputLength == in.readableBytes()) {
                        //此时拆包器未读取任何数据则跳出循环
                        break;
                    } else {
                        //此时拆包器已读取到数据则继续拆包
                        continue;
                    }
                }
                //执行到此处表明已经拆到一个完整的数据包
                if (oldInputLength == in.readableBytes()) {
                    //此时拆包器未读取任何数据,于是抛出一个异常DecodeException
                    throw new DecoderException(StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message.");
                }
                if (isSingleDecode()) {
                    break;
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Throwable cause) {
            throw new DecoderException(cause);
        }
    }
    
    //Decode the from one ByteBuf to an other. 
    //This method will be called till either the input ByteBuf has nothing to read 
    //when return from this method or till nothing was read from the input ByteBuf.
    //@param ctx,the ChannelHandlerContext which this ByteToMessageDecoder belongs to
    //@param in,the ByteBuf from which to read data
    //@param out,the List to which decoded messages should be added
    //@throws Exception,is thrown if an error accour
    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
    ...
}

(4)接着清理字节容器

拆包器完成拆包后,只是从字节容器中取走了数据,但这部分空间对字节容器来说还依然被保留。而字节容器每次累加字节数据时都是将字节数据追加到尾部,如果不对字节容器进行清理,那么时间一长可能就会OOM


正常情况下,每次读取完数据之后,ByteToMessageDecoder解码器都会在channelReadComplete()方法里清理字节容器。但是如果发送端发送数据过快,那么解码器的channelReadComplete()方法可能会很久才被调用一次


所以为了防止发送端发送数据过快,ByteToMessageDecoder会在读取完一次数据完成业务拆包后,清理字节容器。如果字节容器当前已无数据可读,则调用字节容器的release()方法释放字节容器。如果字节容器当前还有数据可读,并且已经连续读取了16次还有未拆包的数据,那么就进行压缩处理

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
    private int discardAfterReads = 16;
    ...
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                //1.累加字节流
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
                if (first) {//如果当前字节容器中没有数据
                    //就将字节容器的指针指向新读取的数据
                    cumulation = data;
                } else {//如果当前字节容器中有数据
                    //则调用累加器的cumulate()方法将数据累加到字节容器
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
             
                //2.将字节容器里的数据传递给业务拆包器进行拆包
                //调用callDecode()方法对数据进行拆包
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new DecoderException(t);
            } finally {
                //3.清理字节容器
                if (cumulation != null && !cumulation.isReadable()) {
                    //如果字节容器当前已无数据可读,则设置numReads为0,并释放字节容器cumulation
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                } else if (++ numReads >= discardAfterReads) {//numReads >= 16
                    //如果当前还有数据可读,并且已经连续读取了16次即numReads >= 16,
                    //此时字节容器中仍有未被业务拆包器拆包的数据,那么就做一次压缩处理;
                    numReads = 0;
                    discardSomeReadBytes();
                }
          
                //4.将解析到的ByteBuf向下传播
                int size = out.size();
                decodeWasNull = !out.insertSinceRecycled();
                fireChannelRead(ctx, out, size);
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }
    
    //Get numElements out of the CodecOutputList and forward these through the pipeline.
    static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
        //遍历业务数据包容器
        for(int i = 0; i < numElements; i ++) {
            //将一个个完整的业务数据包ByteBuf传递到后续的ChannelHandler进行处理
            ctx.fireChannelRead(msgs.getUnsafe(i));
        }
    }
    ...
}

(5)最后将解析到的ByteBuf向下传播

也就是调用fireChannelRead()方法,遍历业务数据包容器,将一个个完整的业务数据包ByteBuf传递到后续的ChannelHandler中进行处理。


4.解码器抽象的解码过程总结

解码过程是通过一个叫ByteToMessageDecoder的抽象解码器来实现的,ByteToMessageDecoder实现的解码过程分为如下四步。


步骤一:累加字节流

也就是把当前读到的字节流累加到一个字节容器里。


步骤二:调用子类的decode()方法进行解析

ByteToMessageDecoder的decode()方法是一个抽象方法,不同种类的解码器会有自己的decode()方法逻辑。该decode()方法被调用时会传入两个关键参数:一个是ByteBuf对象表示当前累加的字节流,一个是List列表用来存放被成功解码的业务数据包。


步骤三:清理字节容器

为了防止发送端发送数据过快,ByteToMessageDecoder会在读取完一次数据并完成业务拆包后,清理字节容器。


步骤四:传播已解码的业务数据包

如果List列表里有解析出来的业务数据包,那么就通过pipeline的事件传播机制往下进行传播。


5.Netty里常见的开箱即用的解码器

(1)基于固定长度解码器

(2)基于行分隔符解码器

(3)基于分隔符解码器

(4)基于长度域解码器


(1)基于固定长度解码器

判断当前字节容器可读字节是否小于固定长度。

//A decoder that splits the received ByteBufs by the fixed number of bytes. 
//For example, if you received the following four fragmented packets:
//+---+----+------+----+
//| A | BC | DEFG | HI |
//+---+----+------+----+
//A FixedLengthFrameDecoder (3) will decode them into the following three packets with the fixed length:
//+-----+-----+-----+
//| ABC | DEF | GHI |
//+-----+-----+-----+
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
    private final int frameLength;
    //Creates a new instance.
    public FixedLengthFrameDecoder(int frameLength) {
        if (frameLength <= 0) {
            throw new IllegalArgumentException("frameLength must be a positive integer: " + frameLength);
        }
        this.frameLength = frameLength;
    }

    @Override
    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Object decoded = decode(ctx, in);
        if (decoded != null) {
            out.add(decoded);
        }
    }

    //Create a frame out of the ByteBuf and return it.
    //@param   ctx,the ChannelHandlerContext which this ByteToMessageDecoder belongs to
    //@param   in,the ByteBuf from which to read data
    //@return  frame,the ByteBuf which represent the frame or null if no frame could be created.
    protected Object decode(@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception { 
        if (in.readableBytes() < frameLength) {
            return null;
        } else {
            return in.readRetainedSlice(frameLength);
        }
    }
}

(2)基于行分隔符解码器

基于行分隔符的拆包器可以同时处理\n和\r\n两种类型的行分隔符,其处理逻辑分为非丢弃模式丢弃模式找到行分隔符未找到行分隔符的情况。


一.非丢弃模式时找到行分隔符

首先新建一个帧,也就是ByteBuf frame。然后计算需要解码的数据包的长度和分隔符的长度。接着判断需要拆包的长度是否大于该拆包器允许的最大长度,如果大于,则丢弃这段数据,返回null。然后将一个完整的数据包取出,如果stripDelimiter在构造方法中被设置为false,则数据包含分隔符。


二.非丢弃模式时未找到行分隔符

首先取得当前字节容器的可读字节数,然后判断是否超出允许的最大长度。如果没超过最大长度,则直接返回null字节容器的数据没有改变。如果已超过最大长度,则进入丢弃模式,设置discarding为true。


三.丢弃模式下找到行分隔符

这种情况下需要将分隔符之前的数据都丢弃。在计算出分隔符的长度之后,会通过移动字节容器的readerIndex指针把分隔符之前的数据全部丢弃,当然丢弃的数据也包括分隔符。经过这么一次丢弃后,后面就有可能是正常的数据包。于是设置discarding为false进入非丢弃模式,这样下次解码数据包时就会进入正常的解码流程。


四.丢弃模式下未找到行分隔符

由于当前还处于丢弃模式,没有找到行分隔符意味着当前一个完整的数据包还没丢弃完,所以当前数据继续丢弃,移动字节容器的readerIndex指针

//A decoder that splits the received {@link ByteBuf}s on line endings.
//Both "\n" and "\r\n" are handled.
//For a more general delimiter-based decoder, see DelimiterBasedFrameDecoder.
public class LineBasedFrameDecoder extends ByteToMessageDecoder {
    //Maximum length of a frame we're willing to decode.
    private final int maxLength;
    //Whether or not to throw an exception as soon as we exceed maxLength.
    private final boolean failFast;
    private final boolean stripDelimiter;
    //True if we're discarding input because we're already over maxLength.
    private boolean discarding;
    private int discardedBytes;

    public LineBasedFrameDecoder(final int maxLength) {
        this(maxLength, true, false);
    }

    public LineBasedFrameDecoder(final int maxLength, final boolean stripDelimiter, final boolean failFast) {
        this.maxLength = maxLength;
        this.failFast = failFast;
        this.stripDelimiter = stripDelimiter;
    }

    @Override
    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Object decoded = decode(ctx, in);
        if (decoded != null) {
            out.add(decoded);
        }
    }

    //Create a frame out of the ByteBuf and return it.
    //@param   ctx,the ChannelHandlerContext which this ByteToMessageDecoder belongs to
    //@param   buffer,the ByteBuf from which to read data
    //@return  frame,the ByteBuf which represent the frame or null if no frame could be created.
    protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
        final int eol = findEndOfLine(buffer);
        if (!discarding) {//非丢弃模式
            if (eol >= 0) {//找到行分隔符
                //新建一个帧,也就是ByteBuf frame
                final ByteBuf frame;
                //计算需要解码的数据包的长度
                final int length = eol - buffer.readerIndex();
                //计算分隔符的长度
                final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
          
                //判断需要拆包的长度是否大于该拆包器允许的最大长度
                if (length > maxLength) {
                    //如果大于,则丢弃这段数据,返回null
                    buffer.readerIndex(eol + delimLength);
                    fail(ctx, length);
                    return null;
                }
          
                //将一个完整的数据包取出
                if (stripDelimiter) {
                    frame = buffer.readRetainedSlice(length);
                    buffer.skipBytes(delimLength);
                } else {
                    //如果stripDelimiter在构造方法中被设置为false,则数据包含分隔符
                    frame = buffer.readRetainedSlice(length + delimLength);
                }
                return frame;
            } else {//未找到行分隔符
                //首先取得当前字节容器的可读字节数
                final int length = buffer.readableBytes();
                //然后判断是否超出允许的最大长度
                if (length > maxLength) {
                    //如果已超过最大长度,则进入丢弃模式,设置discarding为true
                    discardedBytes = length;
                    buffer.readerIndex(buffer.writerIndex());
                    discarding = true;
                    if (failFast) {
                        fail(ctx, "over " + discardedBytes);
                    }
                }
                //如果没超过最大长度,则直接返回null,字节容器的数据没有改变
                return null;
            }
        } else {//丢弃模式
            if (eol >= 0) {//找到行分隔符
                final int length = discardedBytes + eol - buffer.readerIndex();
                //计算出分隔符的长度
                final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
                //把分隔符之前的数据全部丢弃,移动字节容器的readerIndex指针
                buffer.readerIndex(eol + delimLength);
                discardedBytes = 0;
                //经过这么一次丢弃后,后面就有可能是正常的数据包
                //于是设置discarding为false,这样下次解码数据包时就会进入正常的解码流程
                discarding = false;
                if (!failFast) {
                    fail(ctx, length);
                }
            } else {//未找到行分隔符
                //当前还处于丢弃模式,没有找到行分隔符意味着当前一个完整的数据包还没丢弃完
                //所以当前数据继续丢弃,移动字节容器的readerIndex指针
                discardedBytes += buffer.readableBytes();
                buffer.readerIndex(buffer.writerIndex());
            }
            return null;
        }
    }

    private void fail(final ChannelHandlerContext ctx, int length) {
        fail(ctx, String.valueOf(length));
    }

    private void fail(final ChannelHandlerContext ctx, String length) {
        ctx.fireExceptionCaught(new TooLongFrameException("frame length (" + length + ") exceeds the allowed maximum (" + maxLength + ')'));
    }

    //Returns the index in the buffer of the end of line found.
    //Returns -1 if no end of line was found in the buffer.
    private static int findEndOfLine(final ByteBuf buffer) {
        int i = buffer.forEachByte(ByteProcessor.FIND_LF);
        if (i > 0 && buffer.getByte(i - 1) == '\r') {
            i--;
        }
        return i;
    }
}

(3)基于分隔符解码器

可以向基于分隔符解码器
DelimiterBasedFrameDecoder传递一个
分隔符列表,这样该解码器就会按照分隔符列表对数据包进行拆分。基于分隔符解码器的decode()方法和基于行分隔符解码器的decode()方法基本类似。

//A decoder that splits the received ByteBufs by one or more delimiters.  
public class DelimiterBasedFrameDecoder extends ByteToMessageDecoder {
    private final ByteBuf[] delimiters;
    private final int maxFrameLength;
    private final boolean stripDelimiter;
    private final boolean failFast;
    private boolean discardingTooLongFrame;
    private int tooLongFrameLength;
    private final LineBasedFrameDecoder lineBasedDecoder;
    ...
    //Creates a new instance.
    //@param maxFrameLength,the maximum length of the decoded frame.
    //A TooLongFrameException is thrown if the length of the frame exceeds this value.
    //@param stripDelimiter,whether the decoded frame should strip out the delimiter or not
    //@param failFast,If true, a TooLongFrameException is thrown as soon as the decoder 
    //notices the length of the frame will exceed maxFrameLength regardless of 
    //whether the entire frame has been read.
    //If false, a TooLongFrameException is thrown after the entire frame that exceeds maxFrameLength has been read.
    //@param delimiters  the delimiters
    public DelimiterBasedFrameDecoder(int maxFrameLength, boolean stripDelimiter, boolean failFast, ByteBuf... delimiters) {
        validateMaxFrameLength(maxFrameLength);
        if (delimiters == null) {
            throw new NullPointerException("delimiters");
        }
        if (delimiters.length == 0) {
            throw new IllegalArgumentException("empty delimiters");
        }

        if (isLineBased(delimiters) && !isSubclass()) {
            lineBasedDecoder = new LineBasedFrameDecoder(maxFrameLength, stripDelimiter, failFast);
            this.delimiters = null;
        } else {
            this.delimiters = new ByteBuf[delimiters.length];
            for (int i = 0; i < delimiters.length; i ++) {
                ByteBuf d = delimiters[i];
                validateDelimiter(d);
                this.delimiters[i] = d.slice(d.readerIndex(), d.readableBytes());
            }
            lineBasedDecoder = null;
        }
        this.maxFrameLength = maxFrameLength;
        this.stripDelimiter = stripDelimiter;
        this.failFast = failFast;
    }
    
    //Returns true if the delimiters are "\n" and "\r\n".
    private static boolean isLineBased(final ByteBuf[] delimiters) {
        if (delimiters.length != 2) {
            return false;
        }
        ByteBuf a = delimiters[0];
        ByteBuf b = delimiters[1];
        if (a.capacity() < b.capacity()) {
            a = delimiters[1];
            b = delimiters[0];
        }
        return a.capacity() == 2 && b.capacity() == 1
            && a.getByte(0) == '\r' && a.getByte(1) == '\n'
            && b.getByte(0) == '\n';
    }

    //Return true if the current instance is a subclass of DelimiterBasedFrameDecoder
    private boolean isSubclass() {
        return getClass() != DelimiterBasedFrameDecoder.class;
    }

    @Override
    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Object decoded = decode(ctx, in);
        if (decoded != null) {
            out.add(decoded);
        }
    }

    //Create a frame out of the {@link ByteBuf} and return it.
    //@param   ctx,the ChannelHandlerContext which this ByteToMessageDecoder belongs to
    //@param   buffer,the ByteBuf from which to read data
    //@return  frame,the ByteBuf which represent the frame or null if no frame could be created.
    protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
        if (lineBasedDecoder != null) {
            return lineBasedDecoder.decode(ctx, buffer);
        }
        //Try all delimiters and choose the delimiter which yields the shortest frame.
        int minFrameLength = Integer.MAX_VALUE;
        ByteBuf minDelim = null;
        for (ByteBuf delim: delimiters) {
            int frameLength = indexOf(buffer, delim);
            if (frameLength >= 0 && frameLength < minframelength minframelength='frameLength;' mindelim='delim;' if mindelim int mindelimlength='minDelim.capacity();' bytebuf frame if discardingtoolongframe weve just finished discarding a very large frame. go back to the initial state. discardingtoolongframe='false;' buffer.skipbytesminframelength mindelimlength int toolongframelength='this.tooLongFrameLength;' this.toolongframelength='0;' if failfast failtoolongframelength return null if minframelength> maxFrameLength) {
                //Discard read frame.
                buffer.skipBytes(minFrameLength + minDelimLength);
                fail(minFrameLength);
                return null;
            }

            if (stripDelimiter) {
                frame = buffer.readRetainedSlice(minFrameLength);
                buffer.skipBytes(minDelimLength);
            } else {
                frame = buffer.readRetainedSlice(minFrameLength + minDelimLength);
            }
            return frame;
        } else {
            if (!discardingTooLongFrame) {
                if (buffer.readableBytes() > maxFrameLength) {
                    //Discard the content of the buffer until a delimiter is found.
                    tooLongFrameLength = buffer.readableBytes();
                    buffer.skipBytes(buffer.readableBytes());
                    discardingTooLongFrame = true;
                    if (failFast) {
                        fail(tooLongFrameLength);
                    }
                }
            } else {
                //Still discarding the buffer since a delimiter is not found.
                tooLongFrameLength += buffer.readableBytes();
                buffer.skipBytes(buffer.readableBytes());
            }
            return null;
        }
    }
    
    private void fail(long frameLength) {
        if (frameLength > 0) {
            throw new TooLongFrameException("frame length exceeds " + maxFrameLength + ": " + frameLength + " - discarded");
        } else {
            throw new TooLongFrameException("frame length exceeds " + maxFrameLength + " - discarding");
        }
    }

    //Returns the number of bytes between the readerIndex of the haystack and the first needle found in the haystack.  
    //-1 is returned if no needle is found in the haystack.
    private static int indexOf(ByteBuf haystack, ByteBuf needle) {
        for (int i = haystack.readerIndex(); i < haystack.writerIndex(); i ++) {
            int haystackIndex = i;
            int needleIndex;
            for (needleIndex = 0; needleIndex < needle.capacity(); needleIndex ++) {
                if (haystack.getByte(haystackIndex) != needle.getByte(needleIndex)) {
                    break;
                } else {
                    haystackIndex ++;
                    if (haystackIndex == haystack.writerIndex() && needleIndex != needle.capacity() - 1) {
                        return -1;
                    }
                }
            }

            if (needleIndex == needle.capacity()) {
                //Found the needle from the haystack!
                return i - haystack.readerIndex();
            }
        }
        return -1;
    }

    private static void validateDelimiter(ByteBuf delimiter) {
        if (delimiter == null) {
            throw new NullPointerException("delimiter");
        }
        if (!delimiter.isReadable()) {
            throw new IllegalArgumentException("empty delimiter");
        }
    }

    private static void validateMaxFrameLength(int maxFrameLength) {
        if (maxFrameLength <= 0) {
            throw new IllegalArgumentException("maxFrameLength must be a positive integer: " + maxFrameLength);
        }
    }
    ...
}

(4)基于长度域解码器

主要的逻辑步骤如下:

一.丢弃模式的处理

二.获取待拆数据包的大小

三.对数据包进行长度校验

四.跳过指定字节长度

五.抽取数据包

public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder {
    private final ByteOrder byteOrder;//表示字节流表示的数据是大端还是小端,用于长度域的读取
    private final int maxFrameLength;//表示数据包的最大长度
    private final int lengthFieldOffset;//表示长度域的偏移量
    private final int lengthFieldLength;//表示长度域的长度
    private final int lengthFieldEndOffset;//表示紧跟长度域字段后面的第一字节在整个数据包中的偏移量
    private final int lengthAdjustment;//表示数据包体长度调整大小,长度域只表示数据包体的长度
    private final int initialBytesToStrip;//表示拿到完整的数据包之后,向业务解码器传递之前,应该跳过多少字节
    private final boolean failFast;//默认为true,否则可能会OOM
    private boolean discardingTooLongFrame;
    private long tooLongFrameLength;
    private long bytesToDiscard;
    ...
    //Creates a new instance.
    //@param byteOrder,the ByteOrder of the length field
    //@param maxFrameLength,the maximum length of the frame.  
    //If the length of the frame is greater than this value, TooLongFrameException will be thrown.
    //@param lengthFieldOffset,the offset of the length field
    //@param lengthFieldLength,the length of the length field
    //@param lengthAdjustment,the compensation value to add to the value of the length field
    //@param initialBytesToStrip,the number of first bytes to strip out from the decoded frame
    //@param failFast,If true, a TooLongFrameException is thrown as soon as the decoder notices the length of the frame 
    //will exceed maxFrameLength regardless of whether the entire frame has been read.
    //If false, a TooLongFrameException is thrown after the entire frame that exceeds maxFrameLength has been read.
    public LengthFieldBasedFrameDecoder(ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) { 
        ...
        this.byteOrder = byteOrder;
        this.maxFrameLength = maxFrameLength;
        this.lengthFieldOffset = lengthFieldOffset;
        this.lengthFieldLength = lengthFieldLength;
        this.lengthAdjustment = lengthAdjustment;
        lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength;
        this.initialBytesToStrip = initialBytesToStrip;
        this.failFast = failFast;
    }
    
    @Override
    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Object decoded = decode(ctx, in);
        if (decoded != null) {
            out.add(decoded);
        }
    }

    //Create a frame out of the {@link ByteBuf} and return it.
    //@param   ctx,the ChannelHandlerContext which this ByteToMessageDecoder belongs to
    //@param   in,the ByteBuf from which to read data
    //@return  frame,the ByteBuf which represent the frame or null if no frame could be created.
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        //步骤一开始:丢弃模式的处理
        if (discardingTooLongFrame) {
            //如果当前处于丢弃模式,则先计算需要丢弃多少字节,取当前还需可丢弃字节和可读字节的最小值
            long bytesToDiscard = this.bytesToDiscard;
            int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());
            in.skipBytes(localBytesToDiscard);//进行丢弃
            bytesToDiscard -= localBytesToDiscard;
            this.bytesToDiscard = bytesToDiscard;
            failIfNecessary(false);
        }
        //步骤一结束
       
        //步骤二开始:获取待拆数据包的大小
        //如果当前可读字节还没达到长度域的偏移,说明肯定是读不到长度域的,则直接不读
        if (in.readableBytes() < lengthFieldEndOffset) {
            return null;
        }
        //计算长度域的实际字节偏移
        int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
        //拿到实际的未调整过的数据包长度
        long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
        //如果拿到的长度为负数,则直接跳过长度域并抛出异常
        if (frameLength < 0) {
            in.skipBytes(lengthFieldEndOffset);
            throw new CorruptedFrameException("negative pre-adjustment length field: " + frameLength);
        }
        //调整数据包的长度,后面统一做拆分
        frameLength += lengthAdjustment + lengthFieldEndOffset;
        //步骤二结束
      
        //步骤三开始:对数据包进行长度校验
        //整个数据包的长度还没有长度域长,则直接抛出异常
        if (frameLength < lengthfieldendoffset in.skipbyteslengthfieldendoffset throw new corruptedframeexceptionadjusted frame length framelength is less than lengthfieldendoffset: lengthfieldendoffset if framelength> maxFrameLength) {
            long discard = frameLength - in.readableBytes();
            tooLongFrameLength = frameLength;
            if (discard < 0) {
                //当前可读字节已达到frameLength,直接跳过frameLength字节
                //丢弃之后,后面又可能就是一个合法的数据包了
                in.skipBytes((int) frameLength);
            } else {
                //当前可读字节未达到frameLength,说明后面未读到的字节也需要丢弃,进入丢弃模式,先把当前累积的字节全部丢弃
                discardingTooLongFrame = true;
                //bytesToDiscard表示还需要丢弃多少字节
                bytesToDiscard = discard;
                in.skipBytes(in.readableBytes());
            }
            //调用failIfNecessary判断是否需要抛出异常
            failIfNecessary(true);
            return null;
        }
        //步骤三结束
      
        //步骤四开始:跳过指定字节长度
        //never overflows because it's less than maxFrameLength
        int frameLengthInt = (int) frameLength;
        if (in.readableBytes() < framelengthint return null if initialbytestostrip> frameLengthInt) {
            //如果跳过的字节大于数据包的长度,则抛异常
            in.skipBytes(frameLengthInt);
            throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less " + "than initialBytesToStrip: " + initialBytesToStrip);
        }
        in.skipBytes(initialBytesToStrip);
        //步骤四结束
      
        //步骤五开始:抽取数据包
        //拿到当前累积数据的读指针
        int readerIndex = in.readerIndex();
        //拿到待抽取数据包的实际长度进行抽取
        int actualFrameLength = frameLengthInt - initialBytesToStrip;
        //进行抽取数据
        ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
        //移动读指针
        in.readerIndex(readerIndex + actualFrameLength);
        return frame;
    }
    
    protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) {
        return buffer.retainedSlice(index, length);
    }
    
    //拿到实际的未调整过的数据包长度
    //如果长度域代表的值表达的含义不是正常的int、short等类型,则可以重写这个方法
    //比如有的长度域虽然是4字节,比如0x1234,但是它的含义是十进制的,即长度就是十进制的1234
    protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {
        buf = buf.order(order);
        long frameLength;
        switch (length) {
        case 1:
            frameLength = buf.getUnsignedByte(offset);
            break;
        case 2:
            frameLength = buf.getUnsignedShort(offset);
            break;
        case 3:
            frameLength = buf.getUnsignedMedium(offset);
            break;
        case 4:
            frameLength = buf.getUnsignedInt(offset);
            break;
        case 8:
            frameLength = buf.getLong(offset);
            break;
        default:
            throw new DecoderException("unsupported lengthFieldLength: " + lengthFieldLength + " (expected: 1, 2, 3, 4, or 8)");
        }
        return frameLength;
    }

    private void failIfNecessary(boolean firstDetectionOfTooLongFrame) {
        //不需要再丢弃后面的未读字节,就开始重置丢弃状态
        if (bytesToDiscard == 0) {
            //Reset to the initial state and tell the handlers that the frame was too large.
            long tooLongFrameLength = this.tooLongFrameLength;
            this.tooLongFrameLength = 0;
            discardingTooLongFrame = false;
            //如果没有设置快速失败,或者设置了快速失败并且是第一次检测到大包错误,则抛出异常,让Handler处理
            if (!failFast || failFast && firstDetectionOfTooLongFrame) {
                fail(tooLongFrameLength);
            }
        } else {
            //如果设置了快速失败,并且是第一次检测到打包错误,则抛出异常,让Handler处理
            if (failFast && firstDetectionOfTooLongFrame) {
                fail(tooLongFrameLength);
            }
        }
    }

    protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) {
        return buffer.retainedSlice(index, length);
    }

    private void fail(long frameLength) {
        if (frameLength > 0) {
            throw new TooLongFrameException("Adjusted frame length exceeds " + maxFrameLength + ": " + frameLength + " - discarded");
        } else {
            throw new TooLongFrameException("Adjusted frame length exceeds " + maxFrameLength + " - discarding");
        }
    }
    ...
}

Tags:

最近发表
标签列表