Netty是由JBOSS供给的一个java开源结构。Netty供给异步的、根据事情驱动的网络使用程序结构,可以简略快速的开发高性能、高可靠性的网络IO使用, 是主流的NIO结构之一,Netty在互联网范畴、分布式核算范畴、通讯等范畴都有广泛的使用,知名的Elasticsearch、Dubbo结构等内部都采用了Netty。
Netty简化和流线化了网络使用的编程开发过程,例如本文主讲的根据Netty-UDP的socket音讯播送与监控。
本文主讲Netty UDP的音讯播送与监控的代码完成, 为了让读者更易了解,本文按如下顺序阐述
- 前置的常识与概念:主要阐述 四层和七层协议 与 TCP和UDP 的概念
- 对 Netty中UDP播送相关接口与完成类 做一个简要的阐明
- 实战代码部分:分为服务端与客户服,含代码完成功用的解析
前置常识
Netty官网: netty.io
HTTP协议-02:四层和七层协议
什么是TCP和UDP
TCP是面向衔接的传输,是指管理了两个端点之间的衔接的建立,在衔接的生命周期内的有序和可靠的音讯传输及有序的终止。
UDP属于无衔接协议,并无持久化衔接的概念,每个音讯(一个UDP数据包)都是一个独自的传输单元。UDP也无TCP的纠错机制,每个节点都将承认他们所接收到的包,而没有被承认的包将会被发送方重新传输。
UDP适用、优势与不足的分析
- 有限制,但UDP高速于TCP;适用于那些可以处理或容忍音讯丢失的使用程序(金融类的交易一定是不合适的)
- 单播: 发送音讯给一个由仅有地址所标识的单一的网络目的地,面向衔接的协议和无衔接协议都支撑
- 多播: 传输到一个预订的主机组
- 播送: 传到网络(或子网)上所有的主机
- 发布与订阅: 类似于syslog的使用程序将被归类为发布与订阅(一个生产者,多个接收者订阅音讯)
Netty中UDP播送相关接口与完成类
-
interface AddressedEnvelope<M,A extends SocketAddress>
: 定义一个音讯,其包装了另一个音讯并带有发送者和接收者地址。其间M是音讯类型, A是地址类型 -
class DefaultAddressEnvelope<M,A extends SocketAddress> implements AddressedEnvelope<M,A
: 供给了AddressedEnvelope默认完成 -
interface DatagramChannel extends Channe
: 扩展了Netty的Channel抽象类以支撑UDP的多播组管理 -
class NioDatagramChannel extends AbstractNioMessageChannel
: 定义一个能发送或接收AddressedEnvelope音讯的Channel类型 -
class DatagramPacket extends DefaultAddressEnvelope<ByteBuf,InetSocketAddress> implements ByteBufHolder
:- 扩展了DefaultAddressEnvelope以使用ByteBuf作为音讯数据容器
- DatagramPacket是一个简略的音讯容器,DatagramChannel完成用它来和长途节点通讯
实战代码
功用描绘
- 播送端(服务端):读取一个文件,将文件中的每一行当成一个音讯播送到指定端口(注:该程序无身份认证、验证或加密,请读者自行增加)
- 接收端(客户端):接收并处理音讯
ChannelPipeline事情流
- 本地: ChannelPipeline处理流程: LogEvent -> LogEventEncoder -> DataGramPacket
- 播送多个长途节点:长途节点1,长途节点2,长途节点3….
服务端代码
LogEvent — 定义音讯组件
public final class LogEvent {
public static final byte SEPARATOR=(byte)':';
private final InetSocketAddress source;
private final String logfile;
private final String msg;
private final long received;
public LogEvent(String logfile, String msg ){
this(null,logfile,msg,-1);
}
public LogEvent(InetSocketAddress source, String logfile, String msg, long received) {
this.source = source;
this.logfile = logfile;
this.msg = msg;
this.received = received;
}
public InetSocketAddress getSource() {
return source;
}
public String getLogfile() {
return logfile;
}
public String getMsg() {
return msg;
}
public long getReceivedTimestamp(){
return received;
}
}
LogEventEncoder – 音讯封装
/**
* LogEvent的编解码器
* 在将logevent转为DataGramPackage之前必须先进行编码
*/
public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> {
private final InetSocketAddress remoteAddress;
/**
* 创建行将被发送到指定的InetSocketAddress的DatagramPacket的音讯
* @param remoteAddress
*/
public LogEventEncoder(InetSocketAddress remoteAddress) {
this.remoteAddress = remoteAddress;
}
@Override
protected void encode(ChannelHandlerContext ctx, LogEvent logEvent, List<Object> out) throws Exception {
byte[] file = logEvent.getLogfile().getBytes(CharsetUtil.UTF_8);
byte[] msg = logEvent.getMsg().getBytes(CharsetUtil.UTF_8);
ByteBuf buf = ctx.alloc().buffer(file.length msg.length 1);
buf.writeBytes(file); //将文件写入到ByteBuf中
buf.writeByte(LogEvent.SEPARATOR); //增加一个SEPARATOR
buf.writeBytes(msg); //将日志音讯写入到ByteBuf中
//将一个拥有数据和目的地的新DatagramPacket增加到出站音讯列表中
out.add(new DatagramPacket(buf,remoteAddress));
}
}
LogEventBroadcaster – 发动类
public class LogEventBroadcaster {
private final EventLoopGroup group;
private final Bootstrap bootstrap;
private final File file;
public LogEventBroadcaster(InetSocketAddress address, File file) {
this.group = new NioEventLoopGroup();
this.file = file;
this.bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST,true) //设置SO_BROADCAST套接字选项
.handler(new LogEventEncoder(address));
}
public void run() throws Exception{
Channel ch = bootstrap.bind(0).sync().channel();
long pointer = 0;
for(;;){
//发动自动循环
long len = file.length();
if (len<pointer){
pointer = len; //将文件指针设置到该文件的最后一个字节
}else if(len>pointer){
RandomAccessFile raf = new RandomAccessFile(file,"r");
raf.seek(pointer); //设置当前的文件指针,以保证没有任何旧日志被发送
String line;
while((line=raf.readLine())!=null){
ch.writeAndFlush(new LogEvent(null,file.getAbsolutePath(),line,-1));
}
pointer = raf.getFilePointer();
raf.close();
}
try{
Thread.sleep(1000); //1秒
}catch (Exception e){
//休眠1秒被中断,则退出循环,否则重新处理它
Thread.interrupted();
break;
}
}
}
public void stop(){
group.shutdownGracefully();
}
/**
* 第1个参数为端口
* 第2个参数文件路径
* @param args
*/
public static void main(String[] args) throws Exception {
if (args.length!=2){
throw new IllegalArgumentException("请输入2个参数");
}
LogEventBroadcaster broadcaster = new LogEventBroadcaster(
new InetSocketAddress("255.255.255.255",Integer.parseInt(args[0]))
,new File(args[1]));
try{
broadcaster.run();
}finally {
broadcaster.stop();
}
}
}
客户端-监控端
ClientLogEventEncoder – LogEvent的编解码器
/**
* LogEvent的编解码器
* 在将logevent转为DataGramPackage之前必须先进行编码
*/
public class ClientLogEventEncoder extends MessageToMessageDecoder<DatagramPacket> {
@Override
protected void decode(ChannelHandlerContext ctx, DatagramPacket packet, List<Object> out) throws Exception {
ByteBuf data = packet.content();
int idx = data.indexOf(0,data.readableBytes(),LogEvent.SEPARATOR);
String filename = data.slice(0,idx).toString(CharsetUtil.UTF_8); //提取文件名
String logMsg = data.slice(idx 1,data.readableBytes()).toString(CharsetUtil.UTF_8);
LogEvent event = new LogEvent(packet.sender()
,filename,logMsg,System.currentTimeMillis()
);
out.add(event);
}
}
ClientLogEventHandler – 音讯处理
public class ClientLogEventHandler extends SimpleChannelInboundHandler<LogEvent> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LogEvent event) throws Exception {
StringBuilder builder = new StringBuilder();
builder.append(event.getReceivedTimestamp());
builder.append("[");
builder.append(event.getSource());
builder.append("][");
builder.append(event.getLogfile());
builder.append("]");
builder.append(event.getMsg());
System.out.println(builder.toString()); //打印logEvent的数据
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
LogEventMonitor — 发动程序
public class LogEventMonitor {
private final EventLoopGroup group;
private final Bootstrap bootstrap;
public LogEventMonitor(InetSocketAddress address){
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST,true) //设置套接字SO_BROADCAST
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ClientLogEventEncoder());
pipeline.addLast(new ClientLogEventHandler());
}
})
.localAddress(address);
}
public Channel bind(){
//绑定channel
//DatagramChannel无衔接
return bootstrap.bind().syncUninterruptibly().channel();
}
public void stop(){
group.shutdownGracefully();
}
public static void main(String[] args) throws InterruptedException {
if (args.length!=1){
throw new IllegalArgumentException("Usage: LogEventMonitor");
}
LogEventMonitor monitor = new LogEventMonitor(
new InetSocketAddress(Integer.parseInt(args[0]))
);
try{
Channel channel = monitor.bind();
System.out.println("LogEventMonitor running");
channel.closeFuture().sync(); //堵塞等候服务端监听端口关闭。
}finally {
monitor.stop();
}
}
}
该文章是笔者两年前学习Netty时写的笔记文章,但段前的基础常识是本次新增的。
[参考]