引言

关于Netty网络结构的内容,前面现已讲了两个章节,但总归来说难以真实把握,究竟只是对其间一个个组件进行解说,很难让诸位将其串起来构成一条线,所以本章中则会结合实战事例,对Netty进行更深层次的学习与把握,实战事例也并不难,一个非常朴素的IM谈天程序。

本来计划做个多人斗地主,但斗地主需求织入过多的事务逻辑,因而一方面会带来不必要的了解难度,让事例更为杂乱化,另一方面代码量也会偏多,所以终究仍旧挑选完结根本的谈天程序,既简略,又能加深对Netty的了解。

一、依据Netty规划通讯协议

协议,这玩意儿相信我们肯定不陌生了,究竟在《网络编程》系列的前两章,都在围绕着网络协议打开叙述,再来简略回忆一下协议的概念:网络协议是指一种通讯两边都必须遵守的约定,两个不同的端,依照必定的格局对数据进行“编码”,一起依照相同的规矩进行“解码”,然后完结两者之间的数据传输与通讯。

当自己想要打造一款IM通讯程序时,关于音讯的封装、拆分也相同需求规划一个协议,通讯的两端都必须遵守该协议作业,这也是完结通讯程序的条件,但为什么需求通讯协议呢?因为TCP/IP中是依据流的办法传输音讯,音讯与音讯之间没有鸿沟,而协议的意图则在于约定音讯的样式、鸿沟等。

1.1、Redis通讯的RESP协议

不知我们是否还记得之前在《Redis总述篇》中聊到的RESP客户端协议,这是Redis供给的一种客户端通讯协议,假如想要操作Redis,就必须遵守该协议的格局发送数据,但这个协议特别简略,如下:

  • 首要要求一切指令,都以*最初,后面跟着详细的子指令数量,接着用换行符切割。
  • 接着需求先用$符号声明每个子指令的长度,然后再用换行符切割。
  • 终究再拼接上详细的子指令,相同用换行符切割。

这样描绘有些令人难懂,那就直接看个事例,例如一条简略set指令,如下:

客户端指令:
    set name ZhuZi
转变为RESP指令:
    *3
    $3
    set
    $4
    name
    $5
    ZhuZi

依照Redis的规矩,但凡满意RESP协议的客户端,都能够直接衔接并操作Redis服务端,这也就意味着我们能够直接经过Netty来手写一个Redis客户端,代码如下:

// 依据Netty、RESP协议完结的Redis客户端
public class RedisClient {
    // 换行符的ASCII码
    static final byte[] LINE = {13, 10};
    public static void main(String[] args) {
        EventLoopGroup worker = new NioEventLoopGroup();
        Bootstrap client = new Bootstrap();
        try {
            client.group(worker);
            client.channel(NioSocketChannel.class);
            client.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel)
                                                        throws Exception {
                    ChannelPipeline pipeline = socketChannel.pipeline();
                    pipeline.addLast(new ChannelInboundHandlerAdapter(){
                        // 通道树立成功后调用:向Redis发送一条set指令
                        @Override
                        public void channelActive(ChannelHandlerContext ctx)
                                                            throws Exception {
                            String command = "set name ZhuZi";
                            ByteBuf buffer = respCommand(command);
                            ctx.channel().writeAndFlush(buffer);
                        }
                        // Redis呼应数据时触发:打印Redis的呼应成果
                        @Override
                        public void channelRead(ChannelHandlerContext ctx,
                                                Object msg) throws Exception {
                            // 接受Redis服务端履行指令后的成果
                            ByteBuf buffer = (ByteBuf) msg;
                            System.out.println(buffer.toString(CharsetUtil.UTF_8));
                        }
                    });
                }
            });
            // 依据IP、端口衔接Redis服务端
            client.connect("192.168.12.129", 6379).sync();
        } catch (Exception e){
            e.printStackTrace();
        }
    }
    private static ByteBuf respCommand(String command){
        // 先对传入的指令以空格进行切割
        String[] commands = command.split(" ");
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
        // 遵循RESP协议:先写入指令的个数
        buffer.writeBytes(("*" + commands.length).getBytes());
        buffer.writeBytes(LINE);
        // 接着别离写入每个指令的长度以及详细值
        for (String s : commands) {
            buffer.writeBytes(("$" + s.length()).getBytes());
            buffer.writeBytes(LINE);
            buffer.writeBytes(s.getBytes());
            buffer.writeBytes(LINE);
        }
        // 把转换成RESP格局的指令回来
        return buffer;
    }
}

在上述这个事例中,也只是只是经过respCommand()这个办法,对用户输入的指令进行了转换,一起在上面经过Netty,与Redis的地址、端口树立了衔接,在衔接树立成功后,就会向Redis发送一条转换成RESP指令的set指令,接着等候Redis的呼应成果并输出,如下:

+OK

因为这是一条写指令,所以当Redis收到履行完结后,终究就会回来一个OK,我们也可直接去Redis中查询,也仍旧能够查询到刚刚写入的name这个键值。

1.2、HTTP超文本传输协议

前面我们自己针关于RedisRESP协议,对用户指令进行了封装,然后发往Redis履行,但关于这些常用的协议,Netty早已供给好了现成的处理器,想要运用时无需从头开发,能够直接运用现成的处理器来完结,比方现在我们能够依据Netty供给的处理器,完结一个简略的HTTP服务器,代码如下:

// 依据Netty供给的处理器完结HTTP服务器
public class HttpServer {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();
        ServerBootstrap server = new ServerBootstrap();
        server
            .group(boss,worker)
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) {
                    ChannelPipeline pipeline = ch.pipeline();
                    // 增加一个Netty供给的HTTP处理器
                    pipeline.addLast(new HttpServerCodec());
                    pipeline.addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelRead(ChannelHandlerContext ctx,
                                                Object msg) throws Exception {
                            // 在这儿输出一下音讯的类型
                            System.out.println("音讯类型:" + msg.getClass());
                            super.channelRead(ctx, msg);
                        }
                    });
                    pipeline.addLast(new SimpleChannelInboundHandler<HttpRequest>() {
                        @Override
                        protected void channelRead0(ChannelHandlerContext ctx,
                                                    HttpRequest msg) throws Exception {
                            System.out.println("客户端的恳求路径:" + msg.uri());
                            // 创立一个呼应方针,版别号与客户端保持一致,状况码为OK/200
                            DefaultFullHttpResponse response =
                                    new DefaultFullHttpResponse(
                                            msg.protocolVersion(),
                                            HttpResponseStatus.OK);
                            // 结构呼应内容
                            byte[] content = "<h1>Hi, ZhuZi!</h1>".getBytes();
                            // 设置呼应头:告诉客户端本次呼应的数据长度
                            response.headers().setInt(
                                HttpHeaderNames.CONTENT_LENGTH,content.length);
                            // 设置呼应主体
                            response.content().writeBytes(content);
                            // 向客户端写入呼应数据
                            ctx.writeAndFlush(response);
                        }
                    });
                }
            })
            .bind("127.0.0.1",8888)
            .sync();
    }
}

在该事例中,我们就未曾手动对HTTP的数据包进行拆包处理了,而是在服务端的pipeline上增加了一个HttpServerCodec处理器,这个处理器是Netty官方供给的,其类承继联系如下:

public final class HttpServerCodec
    extends CombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder> 
    implements SourceCodec {
    // ......
}

调查会发现,该类承继自CombinedChannelDuplexHandler这个组合类,它组合了编码器、解码器,这也就意味着HttpServerCodec即能够对客户端的数据做解码,也能够对服务端呼应的数据做编码,一起除开增加了这个处理器外,在第二个处理器中打印了一下客户端的音讯类型,终究一个处理器中,对客户端的恳求做出了呼应,其实也便是回来了一句话算了。

此刻在浏览器输入http://127.0.0.1:8888/index.html,成果如下:

音讯类型:class io.netty.handler.codec.http.DefaultHttpRequest
音讯类型:class io.netty.handler.codec.http.LastHttpContent$1
客户端的恳求路径:/index.html

此刻来看成果,客户端的恳求会被解析成两个部分,榜首个是恳求信息,第二个是主体信息,但按理来说浏览器发出的恳求,归于GET类型的恳求,GET恳求是没有恳求体信息的,但Netty仍旧会解析成两部分~,只不过GET恳求的第二部分是空的。

在第三个处理器中,我们直接向客户端回来了一个h1标签,一起也要记得在呼应头里边,加上呼应内容的长度信息,不然浏览器的加载圈,会一向不同的转动,究竟浏览器也不知道内容有多长,就会一向反复加载,尝试等候更多的数据。

1.3、自界说音讯传输协议

Netty除开供给了HTTP协议的处理器外,还供给了DNS、HaProxy、MemCache、MQTT、Protobuf、Redis、SCTP、RTSP.....一系列协议的完结,详细界说位于io.netty.handler.codec这个包下,当然,我们也能够自己完结自界说协议,依照自己的逻辑对数据进行编解码处理。

很多依据Netty开发的中间件/组件,其内部根本上都开发了专属的通讯协议,以此来作为不同节点间通讯的根底,所以解下来我们依据Netty也来自己规划一款通讯协议,这也会作为后续完结谈天程序时的根底。

但所谓的协议规划,其实只是只需求依照必定束缚,完结编码器与解码器即可,发送方在发出数据之前,会经过编码器对数据进行处理,而接纳方在收到数据之前,则会由解码器对数据进行处理。

1.3.1、自界说协议的要素

在自界说传输协议时,我们必定需求考虑几个因素,如下:

  • 魔数:用来榜首时间判别是否为自己需求的数据包。
  • 版别号:提高协议的拓宽性,便利后续对协议进行晋级。
  • 序列化算法:音讯正文详细该运用哪种办法进行序列化传输,例如Json、ProtoBuf、JDK...
  • 音讯类型:榜首时间判别出当时音讯的类型。
  • 音讯序号:为了完结双工通讯,客户端和服务端之间收/发音讯不会彼此阻塞。
  • 正文长度:供给给LTC解码器运用,防止解码时呈现粘包、半包的现象。
  • 音讯正文:本次音讯要传输的详细数据。

在规划协议时,一个完好的协议应该包含上述所说的几方面,这样才干供给两边通讯时的根底,依据上述几个字段,能够在榜首时间内判别出:音讯是否可用、当时协议版别、音讯的详细类型、音讯的长度等各类信息,然后给后续处理器运用(自界说的协议规矩本身便是一个编解码处理器算了)。

1.3.2、自界说协议实战

前面简略聊到过,所谓的自界说协议便是自己规矩音讯格局,以及自己完结编/解码器对音讯完结封装/拆解,所以这儿想要自界说一个音讯协议,就只需求满意前面两个条件即可,因而完结如下:

@ChannelHandler.Sharable
public class ChatMessageCodec extends MessageToMessageCodec<ByteBuf, Message> {
    // 音讯出站时会经过的编码办法(将原生音讯方针封装成自界说协议的音讯格局)
    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg,
                          List<Object> list) throws Exception {
        ByteBuf outMsg = ctx.alloc().buffer();
        // 前五个字节作为魔数
        byte[] magicNumber = new byte[]{'Z','h','u','Z','i'};
        outMsg.writeBytes(magicNumber);
        // 一个字节作为版别号
        outMsg.writeByte(1);
        // 一个字节表明序列化办法  0:JDK、1:Json、2:ProtoBuf.....
        outMsg.writeByte(0);
        // 一个字节用于表明音讯类型
        outMsg.writeByte(msg.getMessageType());
        // 四个字节表明音讯序号
        outMsg.writeInt(msg.getSequenceId());
        // 运用Java-Serializable的办法对音讯方针进行序列化
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);
        oos.writeObject(msg);
        byte[] msgBytes = bos.toByteArray();
        // 运用四个字节描绘音讯正文的长度
        outMsg.writeInt(msgBytes.length);
        // 将序列化后的音讯方针作为音讯正文
        outMsg.writeBytes(msgBytes);
        // 将封装好的数据传递给下一个处理器
        list.add(outMsg);
    }
    // 音讯入站时会经过的解码办法(将自界说格局的音讯转变为详细的音讯方针)
    @Override
    protected void decode(ChannelHandlerContext ctx,
                          ByteBuf inMsg, List<Object> list) throws Exception {
        // 读取前五个字节得到魔数
        byte[] magicNumber = new byte[5];
        inMsg.readBytes(magicNumber,0,5);
        // 再读取一个字节得到版别号
        byte version = inMsg.readByte();
        // 再读取一个字节得到序列化办法
        byte serializableType = inMsg.readByte();
        // 再读取一个字节得到音讯类型
        byte messageType = inMsg.readByte();
        // 再读取四个字节得到音讯序号
        int sequenceId = inMsg.readInt();
        // 再读取四个字节得到音讯正文长度
        int messageLength = inMsg.readInt();
        // 再依据正文长度读取序列化后的字节正文数据
        byte[] msgBytes = new byte[messageLength];
        inMsg.readBytes(msgBytes,0,messageLength);
        // 关于读取到的音讯正文进行反序列化,终究得到详细的音讯方针
        ByteArrayInputStream bis = new ByteArrayInputStream(msgBytes);
        ObjectInputStream ois = new ObjectInputStream(bis);
        Message message = (Message) ois.readObject();
        // 终究把反序列化得到的音讯方针传递给后续的处理器
        list.add(message);
    }
}

上面自界说的处理器中,承继了MessageToMessageCodec类,首要担任将数据在原生ByteBufMessage之间进行彼此转换,而Message方针是自界说的音讯方针,这儿暂且无需过多关怀。其间首要完结了两个办法:

  • encode():出站时会经过的编码办法,会将原生音讯方针按自界说的协议封装成对应的字节数据。
  • decode():入站时会经过的解码办法,会将协议格局的字节数据,转变为详细的音讯方针。

上述自界说的协议,也便是必定规矩的字节数据,每条音讯数据的组成如下:

  • 魔数:运用第1~5个字节来描绘,这个魔数值能够按自己的主意自界说。
  • 版别号:运用第6个字节来描绘,不同数字表明不同版别。
  • 序列化算法:运用第7个字节来描绘,不同数字表明不同序列化办法。
  • 音讯类型:运用第8个字节来描绘,不同的音讯类型运用不同数字表明。
  • 音讯序号:运用第9~12个字节来描绘,其实便是一个四字节的整数。
  • 正文长度:运用第13~16个字节来描绘,也是一个四字节的整数。
  • 音讯正文:长度不固定,依据每次详细发送的数据来决定。

在其间,为了完结简略,这儿的序列化办法,则选用的是JDK默认的Serializable接口办法,但这种办法生成的方针字节较大,实践情况中最好仍是挑选谷歌的ProtoBuf办法,这种算法归于序列化算法中,功用最佳的一种落地完结。

当然,这个自界说的协议是供给给后续的谈天事务运用的,但这种实战型的内容分享,根本上代码量较高,所以我们看起来会有些单调,而本文所运用的谈天室事例,是依据《B站-黑马Netty视频教程》二次改良的,因而如若感觉文字描绘较为单调,可直接点击前面给出的链接,观看P101~P121视频进行学习。

终究来调查一下,我们会发现,在我们界说的这个协议编解码处理器上,存在着一个@ChannelHandler.Sharable注解,这个注解的作用是干吗的呢?其实很简略,用来标识当时处理器是否可在多线程环境下运用,假如带有该注解的处理器,则表明能够在多个通道间共用,因而只需求创立一个即可,反之同理,假如不带有该注解的处理器,则每个通道需求单独创立运用。

二、依据Netty打造IM谈天程序

前面简略过了一下自界说协议后,接着来依据Netty结构上手一个真实的实战项目,那也便是依据Netty打造一款IM即时通讯的谈天程序,这儿在完结过程中,只是只会给出中心完结,但终究会供给完好代码的Github链接,因而我们要点了解中心即可。

2.1、IM程序的用户模块

谈天、谈天,天然是得先有人,然后才干进行谈天沟通,与QQ、微信类似,假如你想要运用某款谈天程序时,条件都得是先具有一个对应的账户才行,因而在我们规划IM体系之处,那也需求对应的用户功用完结,但这儿为了简略,相同不再结合数据库完结完好的用户模块了,而是依据内存完结用户的办理,如下:

public interface UserService {
    boolean login(String username, String password);
}

这是用户模块的顶层接口,只是只供给了一个登录接口,关于注册、鉴权、等级…..等一系列功用,我们感爱好的可在后续进行拓宽完结,接着来看看该接口的完结类,如下:

public class UserServiceMemoryImpl implements UserService {
    private Map<String, String> allUserMap = new ConcurrentHashMap<>();
    {
        // 在代码块中对用户列表进行初始化,向其间增加了两个用户信息
        allUserMap.put("ZhuZi", "123");
        allUserMap.put("XiongMao", "123");
    }
    @Override
    public boolean login(String username, String password) {
        String pass = allUserMap.get(username);
        if (pass == null) {
            return false;
        }
        return pass.equals(password);
    }
}

这个完结类并未结合数据库来完结,而是只是在程序发动时,经过代码块的办法,加载了ZhuZi、XiongMao两个用户信息并放入内存的Map容器中,这儿有爱好的小伙伴,可自即将Map容器换成数据库的表即可。

其间完结的login()登录接口尤为简略,只是只是判别了一下有没有对应用户,假如有的话则看看暗码是否正确,正确回来true,暗码过错则回来false,是的,我所写的登录功用便是这么简略,走个简略的过场,哈哈哈~

2.1.1、服务端、客户端的根底架构

根本的用户模块有了,但这儿还未曾套入详细完结,因而先简略的建立出服务端、客户端的架构,然后再依据构建好的架构完结根底的用户登录功用,服务端的根底建立如下:

public class ChatServer {
    public static void main(String[] args) {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        ChatMessageCodec MESSAGE_CODEC = new ChatMessageCodec();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.group(boss, worker);
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(MESSAGE_CODEC);
                }
            });
            Channel channel = serverBootstrap.bind(8888).sync().channel();
            channel.closeFuture().sync();
        } catch (InterruptedException e) {
            System.out.println("服务端呈现过错:" + e);
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

服务端的代码现在很简略,只是只是装载了一个自己的协议编/解码处理器,然后便是一些老过程,不再过多的重复赘述,接着再来建立一个简略的客户端,代码完结如下:

public class ChatClient {
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup();
        ChatMessageCodec MESSAGE_CODEC = new ChatMessageCodec();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(group);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(MESSAGE_CODEC);
                }
            });
            Channel channel = bootstrap.connect("localhost", 8888).sync().channel();
            channel.closeFuture().sync();
        } catch (Exception e) {
            System.out.println("客户端呈现过错:" + e);
        } finally {
            group.shutdownGracefully();
        }
    }
}

现在只是只是与服务端树立了衔接,然后装载了一个自界说的编解码器,到这儿就建立了最根本的服务端、客户端的根底架构,接着来依据它完结简略的登录功用。

2.1.2、用户登录功用的完结

关于登录功用,因为需求在服务端与客户端之间传输数据,因而我们能够规划一个音讯方针,但因为后续单聊、群聊都需求发送不同的音讯格局,因而先规划出一个父类,如下:

public abstract class Message implements Serializable {
    private int sequenceId;
    private int messageType;
    @Override
    public String toString() {
        return "Message{" +
                "sequenceId=" + sequenceId +
                ", messageType=" + messageType +
                '}';
    }
    public int getSequenceId() {
        return sequenceId;
    }
    public void setSequenceId(int sequenceId) {
        this.sequenceId = sequenceId;
    }
    public void setMessageType(int messageType) {
        this.messageType = messageType;
    }
    public abstract int getMessageType();
    public static final int LoginRequestMessage = 0;
    public static final int LoginResponseMessage = 1;
    public static final int ChatRequestMessage = 2;
    public static final int ChatResponseMessage = 3;
    public static final int GroupCreateRequestMessage = 4;
    public static final int GroupCreateResponseMessage = 5;
    public static final int GroupJoinRequestMessage = 6;
    public static final int GroupJoinResponseMessage = 7;
    public static final int GroupQuitRequestMessage = 8;
    public static final int GroupQuitResponseMessage = 9;
    public static final int GroupChatRequestMessage = 10;
    public static final int GroupChatResponseMessage = 11;
    public static final int GroupMembersRequestMessage = 12;
    public static final int GroupMembersResponseMessage = 13;
    public static final int PingMessage = 14;
    public static final int PongMessage = 15;
}

在这个音讯父类中,界说了多种音讯类型的状况码,不同的音讯类型对应不同数字,一起其间还规划了一个笼统办法,即getMessageType(),该办法交给详细的子类完结,每个子类回来各自的音讯类型,为了便利后续拓宽,这儿又创立了一个笼统类作为中间类,如下:

public abstract class AbstractResponseMessage extends Message {
    private boolean success;
    private String reason;
    public AbstractResponseMessage() {
    }
    public AbstractResponseMessage(boolean success, String reason) {
        this.success = success;
        this.reason = reason;
    }
    @Override
    public String toString() {
        return "AbstractResponseMessage{" +
                "success=" + success +
                ", reason='" + reason + '\'' +
                '}';
    }
    public boolean isSuccess() {
        return success;
    }
    public void setSuccess(boolean success) {
        this.success = success;
    }
    public String getReason() {
        return reason;
    }
    public void setReason(String reason) {
        this.reason = reason;
    }
}

这个类首要是供给给呼应时运用的,其间包含了呼应状况以及呼应信息,接着再规划两个登录时会用到的音讯方针,如下:

public class LoginRequestMessage extends Message {
    private String username;
    private String password;
    public LoginRequestMessage() {
    }
    @Override
    public String toString() {
        return "LoginRequestMessage{" +
                "username='" + username + '\'' +
                ", password='" + password + '\'' +
                '}';
    }
    public String getUsername() {
        return username;
    }
    public void setUsername(String username) {
        this.username = username;
    }
    public String getPassword() {
        return password;
    }
    public void setPassword(String password) {
        this.password = password;
    }
    public LoginRequestMessage(String username, String password) {
        this.username = username;
        this.password = password;
    }
    @Override
    public int getMessageType() {
        return LoginRequestMessage;
    }
}

上述这个音讯类,首要是供给给客户端登录时运用,本质上也便是一个包含用户名、用户暗码的方针算了,一起还有一个用来给服务端呼应时的呼应类,如下:

public class LoginResponseMessage extends AbstractResponseMessage {
    public LoginResponseMessage(boolean success, String reason) {
        super(success, reason);
    }
    @Override
    public int getMessageType() {
        return LoginResponseMessage;
    }
}

登录呼应类的完结非常简略,由登录状况和登录音讯组成,OK,接着来看看登录的详细完结。

首要在客户端中,再经过pipeline增加一个处理器,如下:

CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);
AtomicBoolean LOGIN = new AtomicBoolean(false);
AtomicBoolean EXIT = new AtomicBoolean(false);
Scanner scanner = new Scanner(System.in);
ch.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter() {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 担任接纳用户在控制台的输入,担任向服务器发送各种音讯
        new Thread(() -> {
            System.out.println("请输入用户名:");
            String username = scanner.nextLine();
            if(EXIT.get()){
                return;
            }
            System.out.println("请输入暗码:");
            String password = scanner.nextLine();
            if(EXIT.get()){
                return;
            }
            // 结构音讯方针
            LoginRequestMessage message = new LoginRequestMessage(username, password);
            System.out.println(message);
            // 发送音讯
            ctx.writeAndFlush(message);
            System.out.println("等候后续操作...");
            try {
                WAIT_FOR_LOGIN.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 假如登录失利
            if (!LOGIN.get()) {
                ctx.channel().close();
                return;
            }
    }).start();
}

在与服务端树立衔接成功之后,就提示用户需求登录,接着接纳用户输入的用户名、暗码,然后构建出一个LoginRequestMessage音讯方针,接着将其发送给服务端,因为前面装载了自界说的协议编解码器,所以音讯在出站时,这个Message方针会被序列化成字节码,接着再服务端入站时,又会被反序列化成音讯方针,接着来看看服务端的完结,如下:

@ChannelHandler.Sharable
public class LoginRequestMessageHandler 
            extends SimpleChannelInboundHandler<LoginRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, 
                LoginRequestMessage msg) throws Exception {
        String username = msg.getUsername();
        String password = msg.getPassword();
        boolean login = UserServiceFactory.getUserService().login(username, password);
        LoginResponseMessage message;
        if (login) {
            SessionFactory.getSession().bind(ctx.channel(), username);
            message = new LoginResponseMessage(true, "登录成功");
        } else {
            message = new LoginResponseMessage(false, "用户名或暗码不正确");
        }
        ctx.writeAndFlush(message);
    }
}

在服务端中,新增了一个处理器类,承继自SimpleChannelInboundHandler这个处理器,其间指定的泛型为LoginRequestMessage,这表明当时处理器只重视这个类型的音讯,当呈现登录类型的音讯时,会进入该处理器并触发内部的channelRead0()办法。

在该办法中,获取了登录音讯中的用户名、暗码,接着对其做了根本的登录效验,假如用户名存在并且暗码正确,就会回来登录成功,不然会回来登录失利,终究登录后的状况会被封装成一个LoginResponseMessage方针,然后写回客户端的通道中。

当然,为了该处理器能够成功收效,这儿需求将其装载到服务端的pipeline上,如下:

LoginRequestMessageHandler LOGIN_HANDLER = new LoginRequestMessageHandler();
ch.pipeline().addLast(LOGIN_HANDLER);

装载好登录处理器后,接着别离发动服务端、客户端,测验成果如下:

(十一)Netty实战篇:基于Netty框架打造一款高性能的IM即时通讯程序

从图中的作用来看,这儿完结了最根本的登录功用,估量有些小伙伴看到这儿就有些晕了,但其实非常简略,只是只是经过Netty在做数据交互算了,客户端则供给输入用户名、暗码的功用,然后将用户输入的称号、暗码发送给服务端,服务端供给登录判别的功用,终究依据判别成果再向客户端回来数据算了。

2.2、依据Netty完结点对点单聊

有了根本的用户登录功用后,接着来看看怎样完结点对点的单聊功用呢?首要我界说了一个会话接口,如下:

public interface Session {
    void bind(Channel channel, String username);
    void unbind(Channel channel);
    Channel getChannel(String username);
}

这个接口中仍旧只要三个办法,释义如下:

  • bind():传入一个用户名和Socket通道,让两者之间的产生绑定联系。
  • unbind():撤销一个用户与某个Socket通道的绑定联系。
  • getChannel():依据一个用户名,获取与其存在绑定联系的通道。

该接口的完结类如下:

public class SessionMemoryImpl implements Session {
    private final Map<String, Channel> usernameChannelMap = new ConcurrentHashMap<>();
    private final Map<Channel, String> channelUsernameMap = new ConcurrentHashMap<>();
    @Override
    public void bind(Channel channel, String username) {
        usernameChannelMap.put(username, channel);
        channelUsernameMap.put(channel, username);
        channelAttributesMap.put(channel, new ConcurrentHashMap<>());
    }
    @Override
    public void unbind(Channel channel) {
        String username = channelUsernameMap.remove(channel);
        usernameChannelMap.remove(username);
        channelAttributesMap.remove(channel);
    }
    @Override
    public Channel getChannel(String username) {
        return usernameChannelMap.get(username);
    }
    @Override
    public String toString() {
        return usernameChannelMap.toString();
    }
}

该完结类最要害的是其间的两个Map容器,usernameChannelMap用来存储一切用户名与Socket通道的绑定联系,而channelUsernameMap则是反过来的顺序,这首要是为了便利,即能够经过用户名获得对应通道,也能够经过通道判别出用户名,实践上一个Map也能搞定,但仍是那句话,首要为了简略嘛~

有了上述这个最简略的会话办理功用后,就要着手完结详细的功用了,其实在前面完结登录功用的时候,就用过这其间的bind()办法,也便是当登录成功之后,就会将当时发送登录音讯的通道,与正在登录的用户名产生绑定联系,这样就便利后续完结单聊、群聊的功用。

2.2.1、界说单聊的音讯方针

与登录时相同,因为需求在服务端和客户端之间完结数据的转发,因而这儿也需求两个音讯方针,用来作为数据交互的音讯格局,如下:

public class ChatRequestMessage extends Message {
    private String content;
    private String to;
    private String from;
    public ChatRequestMessage() {
    }
    public ChatRequestMessage(String from, String to, String content) {
        this.from = from;
        this.to = to;
        this.content = content;
    }
    // 省掉Get/Setting、toString()办法.....
}

上述这个类,是供给给客户端用来发送音讯数据的,其间首要包含了三个值,谈天的音讯内容、发送人与接纳人,因为这儿是需求完结一个IM谈天程序,所以并不是客户端与服务端进行数据交互,而是客户端与客户端之间进行数据交互,服务端只是只供给音讯转发的功用,接着再构建一个音讯类,如下:

public class ChatResponseMessage extends AbstractResponseMessage {
    private String from;
    private String content;
    @Override
    public String toString() {
        return "ChatResponseMessage{" +
                "from='" + from + '\'' +
                ", content='" + content + '\'' +
                '}';
    }
    public ChatResponseMessage(boolean success, String reason) {
        super(success, reason);
    }
    public ChatResponseMessage(String from, String content) {
        this.from = from;
        this.content = content;
    }
    @Override
    public int getMessageType() {
        return ChatResponseMessage;
    }
    // 省掉Get/Setting、toString()办法.....
}

这个类是供给给服务端用来转发的,当服务端收到一个谈天音讯后,因为谈天音讯中包含了接纳人,所以能够先依据接纳人的用户名,找到对应的客户端通道,然后再封装成一个呼应音讯,转发给对应的客户端即可,下面来做详细完结。

2.2.2、完结点对点单聊功用

因为谈天功用是供给给客户端运用的,所以当一个客户端登录成功之后,应该露出给用户一个操作菜单,所以直接在本来客户端的channelActive()办法中,登录成功之后持续加代码即可,代码如下:

while (true) {
    System.out.println("==================================");
    System.out.println("\t1、发送单聊音讯");
    System.out.println("\t2、发送群聊音讯");
    System.out.println("\t3、创立一个群聊");
    System.out.println("\t4、获取群聊成员");
    System.out.println("\t5、参加一个群聊");
    System.out.println("\t6、退出一个群聊");
    System.out.println("\t7、退出谈天体系");
    System.out.println("==================================");
    String command = scanner.nextLine();
}

首要会开启一个死循环,然后不断接纳用户的操作,接着运用switch语法来对详细的菜单功用进行完结,先完结单聊功用,如下:

switch (command){
    case "1":
        System.out.print("请挑选你要发送音讯给谁:");
        String toUserName = scanner.nextLine();
        System.out.print("请输入你要发送的音讯内容:");
        String content = scanner.nextLine();
        ctx.writeAndFlush(new ChatRequestMessage(username, toUserName, content));
        break;
}

假如用户挑选了单聊,接着会提示用户挑选要发送音讯给谁,这儿也便是让用户输入对方的用户名,实践上假如有界面的话,这一步是并不需求用户自己输入的,而是供给窗口让用户点击,比方QQ、微信一样,想要给某个人发送音讯时,只需求点击“他”的头像私聊即可。

等用户挑选了谈天方针,并且输入了音讯内容后,接着会构建一个ChatRequestMessage音讯方针,然后会发送给服务端,但这儿先不看服务端的完结,客户端这边还需求重写一个办法,如下:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    System.out.println("收到音讯:" + msg);
    if ((msg instanceof LoginResponseMessage)) {
        LoginResponseMessage response = (LoginResponseMessage) msg;
        if (response.isSuccess()) {
            // 假如登录成功
            LOGIN.set(true);
        }
        // 唤醒 system in 线程
        WAIT_FOR_LOGIN.countDown();
    }
}

前面的逻辑是在channelActive()办法中完结的,也便是衔接树立成功后,就会让用户登录,接着登录成功之后会给用户一个菜单栏,供给给用户进行操作,但前面的逻辑中一向没有对服务端呼应的音讯进行处理,因而channelRead()办法中会对服务端呼应的数据进行处理。

channelRead()办法会在有数据可读时被触发,所以当服务端呼应数据时,首要会判别一下:现在服务端呼应的是不是登录音讯,假如是的话,则需求依据登录的成果来唤醒前面channelActive()办法中的线程。假如现在服务端呼应的不是登录音讯,这也就意味着客户端前面现已登录成功了,所以接着会直接打印一下收到的数据。

OK,有了上述客户端的代码完结后,接着再来服务端多创立一个处理器,如下:

@ChannelHandler.Sharable
public class ChatRequestMessageHandler 
            extends SimpleChannelInboundHandler<ChatRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx,
                    ChatRequestMessage msg) throws Exception {
        String to = msg.getTo();
        Channel channel = SessionFactory.getSession().getChannel(to);
        // 在线
        if (channel != null) {
            channel.writeAndFlush(new ChatResponseMessage(
                        msg.getFrom(), msg.getContent()));
        }
        // 不在线
        else {
            ctx.writeAndFlush(new ChatResponseMessage(
                        false, "对方用户不存在或许不在线"));
        }
    }
}

这儿仍旧经过承继SimpleChannelInboundHandler类的形式,来特别重视ChatRequestMessage单聊类型的音讯,假如现在服务端收到的是单聊音讯,则会进入触发该处理器的channelRead0()办法,该处理器内部的逻辑也并不杂乱,首要依据单聊音讯的接纳人,去找一下与之对应的通道:

  • 假如依据用户名查到了通道,表明接纳人现在是登录在线状况。
  • 反之,假如无法依据用户名找到通道,表明对应的用户不存在或许没有登录。

接着会依据上面的查询成果,进行对应的成果回来:

  • 假如在线:把要发送的单聊音讯,直接写入至找到的通道中。
  • 假如不在线:向发送单聊音讯的客户端,回来用户不存在或用户不在线。

有了这个处理器之后,接着还需求把该处理器装载到服务端上,如下:

ChatRequestMessageHandler CHAT_HANDLER = new ChatRequestMessageHandler();
ch.pipeline().addLast(CHAT_HANDLER);

装载好单聊处理器后,接着别离发动一个服务端、两个客户端,测验成果如下:

(十一)Netty实战篇:基于Netty框架打造一款高性能的IM即时通讯程序

从测验成果中能够显着看出作用,其间的单聊功用确完成已完结,能够完结A→B用户之间的单聊功用,两者之间借助服务器转发,能够完结两人私聊的功用。

2.3、依据Netty打造多人谈天室

前面完结了两个用户之间的私聊功用,接着再来完结一个多人谈天室的功用,究竟像QQ、微信、钉钉….等任何通讯软件,都支撑多人树立群聊的功用,但多人谈天室的功用,完结之前还需求先完结建群的功用,究竟假如群都没树立,天然无法向某个群内发送数据。

完结拉群也好,群聊也罢,其完结过程仍旧和前面相同,如下:

  • ①先界说对应的音讯方针。
  • ②完结客户端发送对应音讯数据的功用。
  • ③再写一个服务端的群聊处理器,然后装载到服务端上。

2.3.1、界说拉群的音讯体

首要来界说两个拉群时用的音讯体,如下:

public class GroupCreateRequestMessage extends Message {
    private String groupName;
    private Set<String> members;
    public GroupCreateRequestMessage(String groupName, Set<String> members) {
        this.groupName = groupName;
        this.members = members;
    }
    @Override
    public int getMessageType() {
        return GroupCreateRequestMessage;
    }
    // 省掉其他Get/Settings、toString()办法.....
}

上述这个音讯体是供给给客户端运用的,其间首要存在两个成员,也便是群称号与群成员列表,寄存一切群成员的容器选用了Set调集,因为Set调集具有不行重复性,因而能够有效的避免同一用户屡次进群,接着再来看看服务端呼应时用的音讯体,如下:

public class GroupCreateResponseMessage extends AbstractResponseMessage {
    public GroupCreateResponseMessage(boolean success, String reason) {
        super(success, reason);
    }
    @Override
    public int getMessageType() {
        return GroupCreateResponseMessage;
    }
}

这个音讯体的完结尤为简略,只是只是给客户端回来了拉群状况以及拉群的附加信息。

2.3.2、界说群聊会话办理

前面单聊有单聊的会话办理机制,而完结多人群聊时,仍旧需求有群聊的会话办理机制,首要封装了一个群聊实体类,如下:

public class Group {
    // 谈天室称号
    private String name;
    // 谈天室成员
    private Set<String> members;
    public static final Group EMPTY_GROUP = new Group("empty", Collections.emptySet());
    public Group(String name, Set<String> members) {
        this.name = name;
        this.members = members;
    }
    // 省掉其他Get/Settings、toString()办法.....
}

接着界说了一个群聊会话的顶级接口,如下:

public interface GroupSession {
    // 创立一个群聊
    Group createGroup(String name, Set<String> members);
    // 参加某个群聊
    Group joinMember(String name, String member);
    // 移除群聊中的某个成员
    Group removeMember(String name, String member);
    // 闭幕一个群聊
    Group removeGroup(String name);
    // 获取一个群聊的成员列表
    Set<String> getMembers(String name);
    // 获取一个群聊一切在线用户的Channel通道
    List<Channel> getMembersChannel(String name);
}

上述接口中,供给了几个接口办法,其实也首要是群聊体系中的一些日常操作,如创群、加群、踢人、闭幕群、查看群成员….等功用,接着来看看该接口的完结者,如下:

public class GroupSessionMemoryImpl implements GroupSession {
    private final Map<String, Group> groupMap = new ConcurrentHashMap<>();
    @Override
    public Group createGroup(String name, Set<String> members) {
        Group group = new Group(name, members);
        return groupMap.putIfAbsent(name, group);
    }
    @Override
    public Group joinMember(String name, String member) {
        return groupMap.computeIfPresent(name, (key, value) -> {
            value.getMembers().add(member);
            return value;
        });
    }
    @Override
    public Group removeMember(String name, String member) {
        return groupMap.computeIfPresent(name, (key, value) -> {
            value.getMembers().remove(member);
            return value;
        });
    }
    @Override
    public Group removeGroup(String name) {
        return groupMap.remove(name);
    }
    @Override
    public Set<String> getMembers(String name) {
        return groupMap.getOrDefault(name, Group.EMPTY_GROUP).getMembers();
    }
    @Override
    public List<Channel> getMembersChannel(String name) {
        return getMembers(name).stream()
                .map(member -> SessionFactory.getSession().getChannel(member))
                .filter(Objects::nonNull)
                .collect(Collectors.toList());
    }
}

这个完结类没啥好说的,要点记住里边有个Map容器即可,这个容器首要担任存储一切群称号与Group群聊方针的联系,后续能够经过群聊称号,在这个容器中找到一个对应群聊方针。一起为了便利后续调用这些接口,还供给了一个东西类,如下:

public abstract class GroupSessionFactory {
    private static GroupSession session = new GroupSessionMemoryImpl();
    public static GroupSession getGroupSession() {
        return session;
    }
}

很简略,只是只实例化了一个群聊会话办理的完结类,因为这儿没有结合Spring来完结,所以并不能依托IOC技术来自动办理Bean,因而我们需求手动创立出一个实例,以供于后续运用。

2.3.3、完结拉群功用

前面客户端的功用菜单中,3对应着拉群功用,所以我们需求对3做详细的功用完结,逻辑如下:

case "3":
    System.out.print("请输入你要创立的群聊昵称:");
    String newGroupName = scanner.nextLine();
    System.out.print("请挑选你要邀请的群成员(不同成员用、切割):");
    String members = scanner.nextLine();
    Set<String> memberSet = new HashSet<>(Arrays.asList(members.split("、")));
    memberSet.add(username); // 参加自己
    ctx.writeAndFlush(new GroupCreateRequestMessage(newGroupName, memberSet));
    break;

在该分支完结中,首要会要求用户输入一个群聊昵称,接着需求输入需求拉入群聊的用户称号,多个用户之间运用切割,接着会把用户输入的群成员以及自己,悉数放入到一个Set调集中,终究组装成一个拉群音讯体,发送给服务端处理,服务端的处理器如下:

@ChannelHandler.Sharable
public class GroupCreateRequestMessageHandler 
        extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, 
                GroupCreateRequestMessage msg) throws Exception {
        String groupName = msg.getGroupName();
        Set<String> members = msg.getMembers();
        // 群办理器
        GroupSession groupSession = GroupSessionFactory.getGroupSession();
        Group group = groupSession.createGroup(groupName, members);
        if (group == null) {
            // 发生成功音讯
            ctx.writeAndFlush(new GroupCreateResponseMessage(true, 
                                groupName + "创立成功"));
            // 发送拉群音讯
            List<Channel> channels = groupSession.getMembersChannel(groupName);
            for (Channel channel : channels) {
                channel.writeAndFlush(new GroupCreateResponseMessage(
                                    true, "您已被拉入" + groupName));
            }
        } else {
            ctx.writeAndFlush(new GroupCreateResponseMessage(
                                false, groupName + "现已存在"));
        }
    }
}

这儿仍旧承继了SimpleChannelInboundHandler类,只关怀拉群的音讯,当客户端呈现拉群音讯时,首要会获取用户输入的群昵称和群成员,接着经过前面供给的创群接口,尝试创立一个群聊,假如群聊现已存在,则会创立失利,反之则会创立成功,在创立群聊成功的情况下,会给一切的群成员发送一条“你已被拉入[XXX]”的音讯。

终究,相同需求将该处理器装载到服务端上,如下:

GroupCreateRequestMessageHandler GROUP_CREATE_HANDLER =
                    new GroupCreateRequestMessageHandler();
ch.pipeline().addLast(GROUP_CREATE_HANDLER);

终究别离发动一个服务端、两个客户端进行作用测验,如下:

(十一)Netty实战篇:基于Netty框架打造一款高性能的IM即时通讯程序

从上图的测验成果来看,确实完结了我们的拉群作用,一个用户拉群之后,被邀请的成员都会收到来自于服务端的拉群提醒,这也就为后续群聊功用奠定了根底。

2.3.4、界说群聊的音讯体

这儿就不重复赘述了,仍是之前的套路,界说一个客户端用的音讯体,如下:

public class GroupChatRequestMessage extends Message {
    private String content;
    private String groupName;
    private String from;
    public GroupChatRequestMessage(String from, String groupName, String content) {
        this.content = content;
        this.groupName = groupName;
        this.from = from;
    }
    @Override
    public int getMessageType() {
        return GroupChatRequestMessage;
    }
    // 省掉其他Get/Settings、toString()办法.....
}    

这个是客户端用来发送群聊音讯的音讯体,其间存在三个成员,发送人、群聊昵称、音讯内容,经过这三个成员,能够描绘清楚任何一条群聊记载,接着来看看服务端呼应时用的音讯体,如下:

public class GroupChatResponseMessage extends AbstractResponseMessage {
    private String from;
    private String content;
    public GroupChatResponseMessage(boolean success, String reason) {
        super(success, reason);
    }
    public GroupChatResponseMessage(String from, String content) {
        this.from = from;
        this.content = content;
    }
    @Override
    public int getMessageType() {
        return GroupChatResponseMessage;
    }
    // 省掉其他Get/Settings、toString()办法.....
}

在这个音讯体中,就省去了群聊昵称这个成员,因为这个音讯体的用途,首要是给服务端转发给客户端时运用的,因而不需求群聊昵称,当然,要也能够,我这儿就直接省去了。

2.3.5、完结群聊功用

仍旧先来做客户端的完结,完结了客户端之后再去完结服务端的完结,客户端完结如下:

case "2":
    System.out.print("请挑选你要发送音讯的群聊:");
    String groupName = scanner.nextLine();
    System.out.print("请输入你要发送的音讯内容:");
    String groupContent = scanner.nextLine();
    ctx.writeAndFlush(new GroupChatRequestMessage(username, groupName, groupContent));
    break;

因为发送群聊音讯对应着之前菜单中的2,所以这儿对该分支进行完结,当用户挑选发送群聊音讯时,首要会让用户自己先挑选一个群聊,接着输入要发送的音讯内容,接着组装成一个群聊音讯方针,发送给服务端处理,服务端的完结如下:

@ChannelHandler.Sharable
public class GroupChatRequestMessageHandler 
        extends SimpleChannelInboundHandler<GroupChatRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx,
                GroupChatRequestMessage msg) throws Exception {
        List<Channel> channels = GroupSessionFactory.getGroupSession()
                .getMembersChannel(msg.getGroupName());
        for (Channel channel : channels) {
            channel.writeAndFlush(new GroupChatResponseMessage(
                            msg.getFrom(), msg.getContent()));
        }
    }
}

这儿仍旧界说了一个处理器,关于原因就不再重复烦琐了,服务端关于群聊音讯的完结额定简略,也便是先依据用户挑选的群昵称,找到该群一切的群成员,然后依次遍历成员列表,获取对应的Socket通道,转发音讯即可。

接着将该处理器装载到服务端pipeline上,然后别离发动一个服务端、两个客户端,进行作用测验,如下:

(十一)Netty实战篇:基于Netty框架打造一款高性能的IM即时通讯程序

作用如上图的注释,依据上述的代码测验,作用确实达到了我们需求的群聊作用~

2.3.6、谈天室的其他功用完结

到这儿为止,完结了最根本的建群、群聊的功用,但关于踢人、加群、闭幕群….等一系列群聊功用还未曾完结,但我这儿就不持续重复了,究竟仍是那个套路:

  • ①界说对应功用的音讯体。
  • ②客户端向服务端发送对应格局的音讯。
  • ③服务端编写处理器,对特定的音讯进行处理。

所以我们感爱好的情况下,能够依据上述过程持续进行完结,完结的过程没有任何难度,要点便是时间问题算了。

三、Netty实战篇总结

看到这儿,其实Netty实战篇的内容也就大致结束了,个人关于实战篇的内容并不怎样满意,因为与最初设想的完结存在很大偏差,这是因为近期作业、日子状况不对,所以内容输出也没那么夯实,关于这篇中的完好代码完结,也包含前面两篇中的一些代码完结,这儿给出完好的GitHub链接:>>>戳我访问<<<,我们感爱好能够自行Down下去玩玩。

在我所编撰的事例中,自界说协议能够持续优化,挑选功用更强的序列化办法,而谈天室也能够进一步拓宽,比方将用户信息、群聊信息、联系人信息都结合数据库完结,进一步完结离线音讯功用,但因为该事例的规划之初就有问题,所以是存在功用问题的,想要打造一款真实高功用的IM程序,那诸位可参阅《计算机网络总述-腾讯QQ原理》其间的内容。