共計 2924 個字符,預(yù)計需要花費 8 分鐘才能閱讀完成。
本篇內(nèi)容主要講解“netty server 的 read 流程是怎樣的”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓丸趣 TV 小編來帶大家學(xué)習(xí)“netty server 的 read 流程是怎樣的”吧!
客戶端發(fā)送數(shù)據(jù)過來的時候,netty 具體是怎么執(zhí)行讀取任務(wù)的。
仍然是從 NioEventLoop 開始,還是 NioEventLoop.processSelectedKey 這個方法,
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed – no need to handle write.
return;
}
}
if ((readyOps SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
if ((readyOps SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops = ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
} catch (CancelledKeyException e) {
unsafe.close(unsafe.voidPromise());
}
}
紅色字體部分,unsafe.read(),進入到 AbstractNioByteChannel$NioByteUnsafe,要從 io 當(dāng)中讀取數(shù)據(jù),首先需要分配一個 ByteBuf,這里涉及到了 2 個問題,1、從哪里分配內(nèi)存,2、每次具體分配多大的內(nèi)存。首先是第一個問題,首先大家需要了解(allocate 和 allocateDirect2 中方式的不同),具體的實現(xiàn)在 allocator.ioBuffer 當(dāng)中,會根據(jù)平臺不同,來決定能否直接分配系統(tǒng)內(nèi)存。第二個問題,大家可以再看一下 allocHandle.record 方法,這個當(dāng)中會根據(jù)當(dāng)前從 io 當(dāng)中讀取的數(shù)據(jù)量,來決定后續(xù)分配內(nèi)存的大小。然后就是從 socket 當(dāng)中讀取數(shù)據(jù),寫入 bytebuf,并觸發(fā)一系列的 handler。
public void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}
if (!config.isAutoRead()) {
removeReadOp();
}
ByteBuf byteBuf = null;
int messages = 0;
boolean close = false;
try {
int byteBufCapacity = allocHandle.guess();
int totalReadAmount = 0;
do {
byteBuf = allocator.ioBuffer(byteBufCapacity);
int writable = byteBuf.writableBytes();
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount = 0) {
// not was read release the buffer
byteBuf.release();
close = localReadAmount
break;
}
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
if (totalReadAmount = Integer.MAX_VALUE – localReadAmount) {
// Avoid overflow.
totalReadAmount = Integer.MAX_VALUE;
break;
}
totalReadAmount += localReadAmount;
if (localReadAmount writable) {
// Read less than what the buffer can hold,
// which might mean we drained the recv buffer completely.
break;
}
} while (++ messages maxMessagesPerRead);
pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount);
if (close) {
closeOnRead(pipeline);
close = false;
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close);
}
}
到此,相信大家對“netty server 的 read 流程是怎樣的”有了更深的了解,不妨來實際操作一番吧!這里是丸趣 TV 網(wǎng)站,更多相關(guān)內(nèi)容可以進入相關(guān)頻道進行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!