Netty4 Others 其他

2020-10-19 15:44 更新

Others 其他

Netty 實(shí)現(xiàn)聊天功能

Netty 是一個 Java NIO 客戶端服務(wù)器框架,使用它可以快速簡單地開發(fā)網(wǎng)絡(luò)應(yīng)用程序,比如服務(wù)器和客戶端的協(xié)議。Netty 大大簡化了網(wǎng)絡(luò)程序的開發(fā)過程比如 TCP 和 UDP 的 socket 服務(wù)的開發(fā)。更多關(guān)于 Netty 的知識,可以參閱《Netty 4.x 用戶指南》 https://github.com/waylau/netty-4-user-guide

下面,就基于 Netty 快速實(shí)現(xiàn)一個聊天小程序。

準(zhǔn)備

  • JDK 7+
  • Maven 3.2.x
  • Netty 4.x
  • Eclipse 4.x

服務(wù)端

讓我們從 handler (處理器)的實(shí)現(xiàn)開始,handler 是由 Netty 生成用來處理 I/O 事件的。

SimpleChatServerHandler.java

public class SimpleChatServerHandler extends SimpleChannelInboundHandler<String> { // (1)
 
    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
 
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {  // (2)
        Channel incoming = ctx.channel();
        for (Channel channel : channels) {
            channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 加入\n");
        }
        channels.add(ctx.channel());
    }
 
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {  // (3)
        Channel incoming = ctx.channel();
        for (Channel channel : channels) {
            channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 離開\n");
        }
        channels.remove(ctx.channel());
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { // (4)
        Channel incoming = ctx.channel();
        for (Channel channel : channels) {
            if (channel != incoming){
                channel.writeAndFlush("[" + incoming.remoteAddress() + "]" + s + "\n");
            } else {
                channel.writeAndFlush("[you]" + s + "\n");
            }
        }
    }
 
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception { // (5)
        Channel incoming = ctx.channel();
        System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"在線");
    }
 
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception { // (6)
        Channel incoming = ctx.channel();
        System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"掉線");
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (7)
        Channel incoming = ctx.channel();
        System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"異常");
        // 當(dāng)出現(xiàn)異常就關(guān)閉連接
        cause.printStackTrace();
        ctx.close();
    }
}

1.SimpleChatServerHandler 繼承自 SimpleChannelInboundHandler,這個類實(shí)現(xiàn)了ChannelInboundHandler接口,ChannelInboundHandler 提供了許多事件處理的接口方法,然后你可以覆蓋這些方法?,F(xiàn)在僅僅只需要繼承 SimpleChannelInboundHandler 類而不是你自己去實(shí)現(xiàn)接口方法。

2.覆蓋了 handlerAdded() 事件處理方法。每當(dāng)從服務(wù)端收到新的客戶端連接時,客戶端的 Channel 存入ChannelGroup列表中,并通知列表中的其他客戶端 Channel

3.覆蓋了 handlerRemoved() 事件處理方法。每當(dāng)從服務(wù)端收到客戶端斷開時,客戶端的 Channel 移除 ChannelGroup 列表中,并通知列表中的其他客戶端 Channel

4.覆蓋了 channelRead0() 事件處理方法。每當(dāng)從服務(wù)端讀到客戶端寫入信息時,將信息轉(zhuǎn)發(fā)給其他客戶端的 Channel。其中如果你使用的是 Netty 5.x 版本時,需要把 channelRead0() 重命名為messageReceived()

5.覆蓋了 channelActive() 事件處理方法。服務(wù)端監(jiān)聽到客戶端活動

6.覆蓋了 channelInactive() 事件處理方法。服務(wù)端監(jiān)聽到客戶端不活動

7.exceptionCaught() 事件處理方法是當(dāng)出現(xiàn) Throwable 對象才會被調(diào)用,即當(dāng) Netty 由于 IO 錯誤或者處理器在處理事件時拋出的異常時。在大部分情況下,捕獲的異常應(yīng)該被記錄下來并且把關(guān)聯(lián)的 channel 給關(guān)閉掉。然而這個方法的處理方式會在遇到不同異常的情況下有不同的實(shí)現(xiàn),比如你可能想在關(guān)閉連接之前發(fā)送一個錯誤碼的響應(yīng)消息。

SimpleChatServerInitializer.java

SimpleChatServerInitializer 用來增加多個的處理類到 ChannelPipeline 上,包括編碼、解碼、SimpleChatServerHandler 等。

public class SimpleChatServerInitializer extends
        ChannelInitializer<SocketChannel> {
 
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline pipeline = ch.pipeline();
 
        pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());
        pipeline.addLast("handler", new SimpleChatServerHandler());
 
        System.out.println("SimpleChatClient:"+ch.remoteAddress() +"連接上");
    }
}

SimpleChatServer.java

編寫一個 main() 方法來啟動服務(wù)端。

public class SimpleChatServer {
 
    private int port;
 
    public SimpleChatServer(int port) {
        this.port = port;
    }
 
    public void run() throws Exception {
 
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new SimpleChatServerInitializer())  //(4)
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
 
            System.out.println("SimpleChatServer 啟動了");
 
            // 綁定端口,開始接收進(jìn)來的連接
            ChannelFuture f = b.bind(port).sync(); // (7)
 
            // 等待服務(wù)器  socket 關(guān)閉 。
            // 在這個例子中,這不會發(fā)生,但你可以優(yōu)雅地關(guān)閉你的服務(wù)器。
            f.channel().closeFuture().sync();
 
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
 
            System.out.println("SimpleChatServer 關(guān)閉了");
        }
    }
 
    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new SimpleChatServer(port).run();
 
    }
}

1.NioEventLoopGroup是用來處理I/O操作的多線程事件循環(huán)器,Netty 提供了許多不同的EventLoopGroup的實(shí)現(xiàn)用來處理不同的傳輸。在這個例子中我們實(shí)現(xiàn)了一個服務(wù)端的應(yīng)用,因此會有2個 NioEventLoopGroup 會被使用。第一個經(jīng)常被叫做‘boss’,用來接收進(jìn)來的連接。第二個經(jīng)常被叫做‘worker’,用來處理已經(jīng)被接收的連接,一旦‘boss’接收到連接,就會把連接信息注冊到‘worker’上。如何知道多少個線程已經(jīng)被使用,如何映射到已經(jīng)創(chuàng)建的 Channel上都需要依賴于 EventLoopGroup 的實(shí)現(xiàn),并且可以通過構(gòu)造函數(shù)來配置他們的關(guān)系。

2.ServerBootstrap是一個啟動 NIO 服務(wù)的輔助啟動類。你可以在這個服務(wù)中直接使用 Channel,但是這會是一個復(fù)雜的處理過程,在很多情況下你并不需要這樣做。

3.這里我們指定使用NioServerSocketChannel類來舉例說明一個新的 Channel 如何接收進(jìn)來的連接。

4.這里的事件處理類經(jīng)常會被用來處理一個最近的已經(jīng)接收的 Channel。SimpleChatServerInitializer 繼承自ChannelInitializer是一個特殊的處理類,他的目的是幫助使用者配置一個新的 Channel。也許你想通過增加一些處理類比如 SimpleChatServerHandler 來配置一個新的 Channel 或者其對應(yīng)的ChannelPipeline來實(shí)現(xiàn)你的網(wǎng)絡(luò)程序。當(dāng)你的程序變的復(fù)雜時,可能你會增加更多的處理類到 pipline 上,然后提取這些匿名類到最頂層的類上。

5.你可以設(shè)置這里指定的 Channel 實(shí)現(xiàn)的配置參數(shù)。我們正在寫一個TCP/IP 的服務(wù)端,因此我們被允許設(shè)置 socket 的參數(shù)選項(xiàng)比如tcpNoDelay 和 keepAlive。請參考ChannelOption和詳細(xì)的ChannelConfig實(shí)現(xiàn)的接口文檔以此可以對ChannelOption 的有一個大概的認(rèn)識。

6.option() 是提供給NioServerSocketChannel用來接收進(jìn)來的連接。childOption() 是提供給由父管道ServerChannel接收到的連接,在這個例子中也是 NioServerSocketChannel。

7.我們繼續(xù),剩下的就是綁定端口然后啟動服務(wù)。這里我們在機(jī)器上綁定了機(jī)器所有網(wǎng)卡上的 8080 端口。當(dāng)然現(xiàn)在你可以多次調(diào)用 bind() 方法(基于不同綁定地址)。

恭喜!你已經(jīng)完成了基于 Netty 聊天服務(wù)端程序。

客戶端

SimpleChatClientHandler.java

客戶端的處理類比較簡單,只需要將讀到的信息打印出來即可

public class SimpleChatClientHandler extends  SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
        System.out.println(s);
    }
}

SimpleChatClientInitializer.java

與服務(wù)端類似

public class SimpleChatClientInitializer extends ChannelInitializer<SocketChannel> {
 
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
 
        pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());
        pipeline.addLast("handler", new SimpleChatClientHandler());
    }
}

SimpleChatClient.java

編寫一個 main() 方法來啟動客戶端。

public class SimpleChatClient {
    public static void main(String[] args) throws Exception{
            new SimpleChatClient("localhost", 8080).run();
        }
 
        private final String host;
        private final int port;
 
        public SimpleChatClient(String host, int port){
            this.host = host;
            this.port = port;
        }
 
        public void run() throws Exception{
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap  = new Bootstrap()
                        .group(group)
                        .channel(NioSocketChannel.class)
                        .handler(new SimpleChatClientInitializer());
                Channel channel = bootstrap.connect(host, port).sync().channel();
                BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
                while(true){
                    channel.writeAndFlush(in.readLine() + "\r\n");
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                group.shutdownGracefully();
            }
 
        }
    }
 
}

運(yùn)行效果

先運(yùn)行 SimpleChatServer,再可以運(yùn)行多個 SimpleChatClient,控制臺輸入文本繼續(xù)測試

源碼

https://github.com/waylau/netty-4-user-guide-demos中 simplechat

參考

Netty 4.x 用戶指南 https://github.com/waylau/netty-4-user-guide

Netty 實(shí)現(xiàn) WebSocket 聊天功能

上一次我們用Netty快速實(shí)現(xiàn)了一個 Java 聊天程序(見 http://www.waylau.com/netty-chat/)。現(xiàn)在,我們要做下修改,加入 WebSocket 的支持,使它可以在瀏覽器里進(jìn)行文本聊天。

準(zhǔn)備

  • JDK 7+
  • Maven 3.2.x
  • Netty 4.x
  • Eclipse 4.x

WebSocket

WebSocket 通過“Upgrade handshake(升級握手)”從標(biāo)準(zhǔn)的 HTTP 或HTTPS 協(xié)議轉(zhuǎn)為 WebSocket。因此,使用 WebSocket 的應(yīng)用程序?qū)⑹冀K以 HTTP/S 開始,然后進(jìn)行升級。在什么時候發(fā)生這種情況取決于具體的應(yīng)用;它可以是在啟動時,或當(dāng)一個特定的 URL 被請求時。

在我們的應(yīng)用中,當(dāng) URL 請求以“/ws”結(jié)束時,我們才升級協(xié)議為WebSocket。否則,服務(wù)器將使用基本的 HTTP/S。一旦升級連接將使用的WebSocket 傳輸所有數(shù)據(jù)。

整個服務(wù)器邏輯如下:

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號