Netty 是一個 Java NIO 客戶端服務(wù)器框架,使用它可以快速簡單地開發(fā)網(wǎng)絡(luò)應(yīng)用程序,比如服務(wù)器和客戶端的協(xié)議。Netty 大大簡化了網(wǎng)絡(luò)程序的開發(fā)過程比如 TCP 和 UDP 的 socket 服務(wù)的開發(fā)。更多關(guān)于 Netty 的知識,可以參閱《Netty 4.x 用戶指南》 https://github.com/waylau/netty-4-user-guide
下面,就基于 Netty 快速實(shí)現(xiàn)一個聊天小程序。
讓我們從 handler (處理器)的實(shí)現(xiàn)開始,handler 是由 Netty 生成用來處理 I/O 事件的。
public class SimpleChatServerHandler extends SimpleChannelInboundHandler<String> { // (1)
public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // (2)
Channel incoming = ctx.channel();
for (Channel channel : channels) {
channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 加入\n");
}
channels.add(ctx.channel());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // (3)
Channel incoming = ctx.channel();
for (Channel channel : channels) {
channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 離開\n");
}
channels.remove(ctx.channel());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { // (4)
Channel incoming = ctx.channel();
for (Channel channel : channels) {
if (channel != incoming){
channel.writeAndFlush("[" + incoming.remoteAddress() + "]" + s + "\n");
} else {
channel.writeAndFlush("[you]" + s + "\n");
}
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { // (5)
Channel incoming = ctx.channel();
System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"在線");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { // (6)
Channel incoming = ctx.channel();
System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"掉線");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (7)
Channel incoming = ctx.channel();
System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"異常");
// 當(dāng)出現(xiàn)異常就關(guān)閉連接
cause.printStackTrace();
ctx.close();
}
}
1.SimpleChatServerHandler 繼承自 SimpleChannelInboundHandler,這個類實(shí)現(xiàn)了ChannelInboundHandler接口,ChannelInboundHandler 提供了許多事件處理的接口方法,然后你可以覆蓋這些方法?,F(xiàn)在僅僅只需要繼承 SimpleChannelInboundHandler 類而不是你自己去實(shí)現(xiàn)接口方法。
2.覆蓋了 handlerAdded()
事件處理方法。每當(dāng)從服務(wù)端收到新的客戶端連接時,客戶端的 Channel 存入ChannelGroup
列表中,并通知列表中的其他客戶端 Channel
3.覆蓋了 handlerRemoved()
事件處理方法。每當(dāng)從服務(wù)端收到客戶端斷開時,客戶端的 Channel 移除 ChannelGroup 列表中,并通知列表中的其他客戶端 Channel
4.覆蓋了 channelRead0()
事件處理方法。每當(dāng)從服務(wù)端讀到客戶端寫入信息時,將信息轉(zhuǎn)發(fā)給其他客戶端的 Channel。其中如果你使用的是 Netty 5.x 版本時,需要把 channelRead0()
重命名為messageReceived()
5.覆蓋了 channelActive()
事件處理方法。服務(wù)端監(jiān)聽到客戶端活動
6.覆蓋了 channelInactive()
事件處理方法。服務(wù)端監(jiān)聽到客戶端不活動
7.exceptionCaught()
事件處理方法是當(dāng)出現(xiàn) Throwable 對象才會被調(diào)用,即當(dāng) Netty 由于 IO 錯誤或者處理器在處理事件時拋出的異常時。在大部分情況下,捕獲的異常應(yīng)該被記錄下來并且把關(guān)聯(lián)的 channel 給關(guān)閉掉。然而這個方法的處理方式會在遇到不同異常的情況下有不同的實(shí)現(xiàn),比如你可能想在關(guān)閉連接之前發(fā)送一個錯誤碼的響應(yīng)消息。
SimpleChatServerInitializer 用來增加多個的處理類到 ChannelPipeline 上,包括編碼、解碼、SimpleChatServerHandler 等。
public class SimpleChatServerInitializer extends
ChannelInitializer<SocketChannel> {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new SimpleChatServerHandler());
System.out.println("SimpleChatClient:"+ch.remoteAddress() +"連接上");
}
}
編寫一個 main()
方法來啟動服務(wù)端。
public class SimpleChatServer {
private int port;
public SimpleChatServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new SimpleChatServerInitializer()) //(4)
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
System.out.println("SimpleChatServer 啟動了");
// 綁定端口,開始接收進(jìn)來的連接
ChannelFuture f = b.bind(port).sync(); // (7)
// 等待服務(wù)器 socket 關(guān)閉 。
// 在這個例子中,這不會發(fā)生,但你可以優(yōu)雅地關(guān)閉你的服務(wù)器。
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
System.out.println("SimpleChatServer 關(guān)閉了");
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new SimpleChatServer(port).run();
}
}
1.NioEventLoopGroup是用來處理I/O操作的多線程事件循環(huán)器,Netty 提供了許多不同的EventLoopGroup的實(shí)現(xiàn)用來處理不同的傳輸。在這個例子中我們實(shí)現(xiàn)了一個服務(wù)端的應(yīng)用,因此會有2個 NioEventLoopGroup 會被使用。第一個經(jīng)常被叫做‘boss’,用來接收進(jìn)來的連接。第二個經(jīng)常被叫做‘worker’,用來處理已經(jīng)被接收的連接,一旦‘boss’接收到連接,就會把連接信息注冊到‘worker’上。如何知道多少個線程已經(jīng)被使用,如何映射到已經(jīng)創(chuàng)建的 Channel上都需要依賴于 EventLoopGroup 的實(shí)現(xiàn),并且可以通過構(gòu)造函數(shù)來配置他們的關(guān)系。
2.ServerBootstrap是一個啟動 NIO 服務(wù)的輔助啟動類。你可以在這個服務(wù)中直接使用 Channel,但是這會是一個復(fù)雜的處理過程,在很多情況下你并不需要這樣做。
3.這里我們指定使用NioServerSocketChannel類來舉例說明一個新的 Channel 如何接收進(jìn)來的連接。
4.這里的事件處理類經(jīng)常會被用來處理一個最近的已經(jīng)接收的 Channel。SimpleChatServerInitializer 繼承自ChannelInitializer是一個特殊的處理類,他的目的是幫助使用者配置一個新的 Channel。也許你想通過增加一些處理類比如 SimpleChatServerHandler 來配置一個新的 Channel 或者其對應(yīng)的ChannelPipeline來實(shí)現(xiàn)你的網(wǎng)絡(luò)程序。當(dāng)你的程序變的復(fù)雜時,可能你會增加更多的處理類到 pipline 上,然后提取這些匿名類到最頂層的類上。
5.你可以設(shè)置這里指定的 Channel 實(shí)現(xiàn)的配置參數(shù)。我們正在寫一個TCP/IP 的服務(wù)端,因此我們被允許設(shè)置 socket 的參數(shù)選項(xiàng)比如tcpNoDelay 和 keepAlive。請參考ChannelOption和詳細(xì)的ChannelConfig實(shí)現(xiàn)的接口文檔以此可以對ChannelOption 的有一個大概的認(rèn)識。
6.option()
是提供給NioServerSocketChannel用來接收進(jìn)來的連接。childOption()
是提供給由父管道ServerChannel接收到的連接,在這個例子中也是
NioServerSocketChannel。
7.我們繼續(xù),剩下的就是綁定端口然后啟動服務(wù)。這里我們在機(jī)器上綁定了機(jī)器所有網(wǎng)卡上的 8080 端口。當(dāng)然現(xiàn)在你可以多次調(diào)用 bind()
方法(基于不同綁定地址)。
恭喜!你已經(jīng)完成了基于 Netty 聊天服務(wù)端程序。
客戶端的處理類比較簡單,只需要將讀到的信息打印出來即可
public class SimpleChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
System.out.println(s);
}
}
與服務(wù)端類似
public class SimpleChatClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new SimpleChatClientHandler());
}
}
編寫一個 main()
方法來啟動客戶端。
public class SimpleChatClient {
public static void main(String[] args) throws Exception{
new SimpleChatClient("localhost", 8080).run();
}
private final String host;
private final int port;
public SimpleChatClient(String host, int port){
this.host = host;
this.port = port;
}
public void run() throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new SimpleChatClientInitializer());
Channel channel = bootstrap.connect(host, port).sync().channel();
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
while(true){
channel.writeAndFlush(in.readLine() + "\r\n");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}
}
先運(yùn)行 SimpleChatServer,再可以運(yùn)行多個 SimpleChatClient,控制臺輸入文本繼續(xù)測試
見 https://github.com/waylau/netty-4-user-guide-demos中 simplechat
Netty 4.x 用戶指南 https://github.com/waylau/netty-4-user-guide
上一次我們用Netty快速實(shí)現(xiàn)了一個 Java 聊天程序(見 http://www.waylau.com/netty-chat/)。現(xiàn)在,我們要做下修改,加入 WebSocket 的支持,使它可以在瀏覽器里進(jìn)行文本聊天。
WebSocket 通過“Upgrade handshake(升級握手)”從標(biāo)準(zhǔn)的 HTTP 或HTTPS 協(xié)議轉(zhuǎn)為 WebSocket。因此,使用 WebSocket 的應(yīng)用程序?qū)⑹冀K以 HTTP/S 開始,然后進(jìn)行升級。在什么時候發(fā)生這種情況取決于具體的應(yīng)用;它可以是在啟動時,或當(dāng)一個特定的 URL 被請求時。
在我們的應(yīng)用中,當(dāng) URL 請求以“/ws”結(jié)束時,我們才升級協(xié)議為WebSocket。否則,服務(wù)器將使用基本的 HTTP/S。一旦升級連接將使用的WebSocket 傳輸所有數(shù)據(jù)。
整個服務(wù)器邏輯如下:
更多建議: