简介

当咱们构建大规模的分布式应用集群时,咱们运用一切的尽力将单体分解成小的容器化作业负载,这些容器之间彼此通讯并同享信息以履行各种操作。

咱们没有花太多时刻去规划一个 音讯传递体系.

音讯传递一般被视为任何_大规模分布式体系_的_中枢神经体系_。一般状况下,单体内部的内存通讯被转化为线上通讯。

假如咱们对_集群_内的一切通讯进行布线,就会构成相似网状的模块,每个服务以同步的办法调用另一个服务,由于在恳求-呼应的生命周期中有大量的等候时刻,这并不抱负。

这一点 紊乱的网状结构能够经过引进一个 异步音讯传递集群而不是同步的。

在两个微服务之间没有_点对点的通讯_,咱们能够把它们的音讯托付给一种 纽带和辐条拓扑结构.因而,音讯传递是衔接整个体系的_胶水_。

用NATS JetStream在Java中进行异步的Pub/Sub信息传递

在本指南中,咱们将运用 NATS JetStream来履行异步音讯传递,经过 发布/订阅方式.

那么,咱们怎么为咱们的应用程序挑选一个音讯代理或音讯传递架构呢?

挑选一个音讯传递体系或许会让人感觉很吃力,现已有大量的挑选,并且每天都有新的挑选呈现,每个挑选都有不同的优势。

挑选一个分布式的音讯传递体系

最值得留意的是,咱们现已有了广泛流行和适当频频运用的Apache Kafka,它一般被称为_分布式日志存储_。

在Kafka中发布到主题的音讯会持续一段时刻,而.NET的概念答应音讯在多个实例中均匀分布。 顾客群体答应音讯在同一服务的多个实例中均匀分布。它的功用十分强壮,但伴随着强壮的功用而来的是巨大的职责和保护。Kafka显着难以保护,关于任何期望掌握该技术的团队来说,都有一个陡峭的学习曲线。

另一个独特的挑选是RabbitMQ。RabbitMQ运用高档音讯行列协议进行音讯传递。它也显着是轻量级的。

RabbitMQ 没有运用独特的顾客组的概念,而是采取了更简略的办法,即让客户端消费_行列_。假如一个客户端不承认一个音讯,它将回到行列中,由另一个客户端处理。

一切这些产品都有一些甜头,并在它们的用例中大放异彩。

那么,假如有人想实在承受具有一个简略而又超高功能的体系的主意,而不需求保护它的额定开支呢?假如有人想做传统的pub/sub,但也想做request/reply,甚至想做scatter-gather,一起又想坚持简略和简便呢?

这就是 NATS信息传递体系或许是最适合你的解决方案的地方。

介绍一下NATS

NATS是一个经过出产验证的、云原生的音讯传递体系,是为那些想花更多时刻完成事务逻辑,而少花时刻忧虑_怎么做音讯传递的_开发者或运营商而规划的。

它是一个令人难以置信的快速、开源的音讯传递体系,树立在一个简略而强壮的中心之上。服务器运用依据文本的协议,所以尽管有一些特定言语的客户端库,但你彻底能够经过_telnet_进入NATS服务器来发送和接纳信息。

NATS被规划成永久在线,衔接,并预备承受命令。假如你满足老,知道什么是_拨号音_,那么值得一提的是,NATS团队喜爱用这个比方来规划。

NATS的一些突出特点包括。

  • _超高_的功能
  • 低装备
    • 客户端只需求一个URL和凭证
    • 服务器自动发现自己
  • 能够在不影响运转服务的状况下扩展架构
  • 自愈且始终可用
  • 支撑多种交给方式。
    • 最多一次(NATS中心)
    • 至少一次(NATS流或JetStream)
  • 将音讯存储到持久性存储,并按时刻或顺序重放
  • 支撑通配符
  • 在REST加密的数据
  • 净化特定的音讯(GDPR
  • 横向可扩展性
  • 完好的TLS支撑。CA证书,双向支撑
  • 支撑标准的用户/暗码认证/JWT的运用
  • 权限约束
  • 具有数据隔离的安全多租户
  • 账户之间同享数据
  • 具有30多个用不同言语编写的客户端库

信息传递方式

NATS支撑4种首要的通讯方式。它们是

  • 依据主题
  • 发布-订阅
  • 恳求-回复/涣散-收集
  • 行列组

每一个都是不同的方式,都有其运用案例,有一些重叠。答应一切这四种方式给了NATS极大的灵活性和功用,以应对多个应用程序之间的各种不同状况,或一个大型单体。

依据主题的音讯传递

A 主题在NATS中是一个简略的字符串,代表对数据的爱好。它被分_层符号_以支撑_通配符订阅_。

  • foo.* 匹配 _foo.bar_和 foo.baz
  • foo.*.bar匹配 _foo.a.bar_和 foo.b.bar
  • _foo.>_匹配上述任何一个
  • _>_匹配NATS中的一切内容

这种音讯传递方式答应发布者运用一个_Subject_来同享数据,而顾客能够经过运用通配符来监听这些Subject来接纳这些音讯。

从某种意义上说,这种方式是依据观察者规划方式的,它一般有一个_主题_和_观察者_。

例如,假如有人向_’audit.us.east’_发送音讯,那么一切监听该切当主题或通配符主题的订阅者都会收到这个音讯。

用NATS JetStream在Java中进行异步的Pub/Sub信息传递

发布-订阅音讯

这是传统的音讯传递方式之一,其间 _发布者_将一个音讯发布到一个 _订阅者_列表,其间每个订阅者都独自订阅了它。

用NATS JetStream在Java中进行异步的Pub/Sub信息传递

这相似于通讯,这种方式被_广泛_用于各种体系中。从告诉/警报体系到VoD平台,如YouTube。

这就是咱们在本指南中要运用的方式。

恳求-回复音讯/涣散-收集方式

当咱们进行REST API调用时,咱们发出一个HTTP恳求并收到一个呼应,咱们运用的是传统的同步恳求-呼应方式。_恳求-回应方式一般很困难,或许有时需求杂乱的解决方案或妥协。这种方式在运用NATS完成时适当简略,由于它只需求你在发布音讯时供给一个“回复到 “_主题。

这种方式也能够被称为 _涣散-集合_方式,发布者向未知数量的订阅者一起发布一个主题的音讯。然后一切监听这个主题的听众都会活泼起来并开始处理。然后,发布者将等候堆集来自部分或悉数订阅者的一切回复。

用NATS JetStream在Java中进行异步的Pub/Sub信息传递

行列组

有时在一个分布式集群中,你必须对多个应用程序或同一应用程序的多个实例进行_负载平衡_。这种方式将是一个完美的解决方案,能够在订阅了同一主题的多个订阅者之间完成音讯的_负载平衡_。

这个解决方案最好的部分是,与其他音讯体系不同,它不需求在NATS服务器进行任何装备。行列组是由应用程序和他们的行列订阅者界说的,并在他们之间进行办理。

为了创立一个行列订阅,一切的订阅者都注册一个行列称号。当注册的主题上的音讯被发布时,组中的一个成员被随机挑选来接纳音讯。尽管行列组有多个订阅者,但每个音讯只被一个人消费。

用NATS JetStream在Java中进行异步的Pub/Sub信息传递

一切这些方式在NATS服务器上需求零装备。

它彻底由应用程序或客户端库驱动。因而,让咱们研究一下jnatsJava客户端库,看看咱们怎么界说其间的一些方式并履行异步音讯传递。

基本的NATS服务器、NATS流和NATS JetStream

第一个 _NATS云原生_信息传递生态体系是以 NATS服务器依据 _‘最多一次’(At-most once_交给方式–音讯最多交给一次。它曾经以难以置信的速度将发布的音讯转发给顾客,为该行业设定了新的功能界限。关于一些应用来说,基本的NATS供给的功能超过了丢失音讯的潜在丢失。

但是,在’最多一次’的交给方式下,假如任何一个订户呈现故障,发送到的音讯将永久不会抵达,因而,没有确保数据的交给。

这相似于大多数流媒体服务所运用的超高速UDP协议,数据的速度比数据的完好性更重要。你甘愿在视频中丢失几个像素或分辨率较低,也不愿意长时刻等候听到或人的声音。

但这并不是你想在金融交易中产生的事情。在这里和那里丢失一点或许会改动或人的账单或收件人的地址。

作为对这个问题的回应 NATS流媒体引进了NATS,它用一些功能来交换音讯的持久性。没有牺牲太多的功能,NATS流是一个轻量级和高功能的平台,在引擎盖下运用基本的NATS。它的构建办法是 _‘至少一次_交给模型,具有为发布者和订阅者发送ACK 音讯的才能。

这相似于TCP,它确保数据的完好性,假如没有收到ACK ,就会从头发送包,表明客户端或许没有收到包。

当音讯被发布后,它们会被持久化一段时刻(可定制),这样假如顾客没有收到它,它就能够被从头播放给顾客。尽管这个组件的功能十分好,并且是轻量级的,但在才能和成熟度方面,它和Kafka等分布式流媒体体系相同强壮。

开发人员提出了一些要求,如分布式安全、涣散式办理、多租户、超级集群的全球扩展以及数据的安全同享,这些要求在NATS 2.0年代催生了下一代NATS流,被称为 NATS JetStream.

关于具有分布式集群的现代流媒体体系,建议运用最新的 NATS JetStream供给。JetStream_的创立是为了解决当今流媒体技术所发现的问题–杂乱性、脆弱性和缺少扩展性。咱们将在本文中进一步讨论_JetStream

用NATS JetStream完成Java中的异步Pub/Sub音讯传递

项目设置

运转或安装一个_NATS JetStream_服务器是十分简略的。无论你想在Windows、Mac或Linux机器上保管这个集群,Docker引擎都能使设置变得十分简略。

咱们将运用一个Docker容器来保管JetStream服务器。为了运转Docker镜像,咱们能够简略地运转。

$ docker run -ti -p 4222:4222 --name jetstream synadia/jsm:latest server

一旦你运转该程序,你会看到一些相似的内容。

用NATS JetStream在Java中进行异步的Pub/Sub信息传递

NATS有一个巨大的不同言语的客户端库列表,有一个由1000多个贡献者组成的活泼社区。它在2018年加入了 CNCF(云原生计算基金会)。作为一个孵化项目在2018年加入。

咱们将运用NATS的Java客户端,即jnats。为了衔接到NATS JetStream,咱们只需求在pom.xml 中界说一个依靠项。

<dependency>
    <groupId>io.nats</groupId>
    <artifactId>jnats</artifactId>
    <version>${version}</version>
</dependency>

就这样了!咱们现已预备好了。现在让咱们来看看咱们的一些运用状况。假如你遇到困难,你能够在GitHub上找到完好的源代码。

发布者/订阅者流

让咱们测验经过创立一个新的Stream 和一个主题来界说一个传统的_发布者/订阅_者模型。Stream,在NATS JetStream中代表两个端点之间的任何数据流,是API的中心构建块。

咱们将创立一个单一的类,首要发布一些音讯,然后订阅览取这些音讯并发送承认。

public class PubSubAsync {
// Proceeding code goes here
}

让咱们持续界说一些全局静态设置,如流称号、主题、默认音讯和服务器。

private static final String defaultStream = "pubsubasync-stream";
private static final String defaultSubject = "pubsubasync-subject";
private static final String defaultMessage = "Hello User";
private static final int defaultMessageCount = 2;
private static final String defaultServer = "nats://localhost:4222";

咱们将在今后逐渐设置流的时候运用这些设置,以防止在其间硬编码变量。

让咱们首要设置一个Connection 到NATS JetStream服务器,实例化一个JetStreamManagement 实例,用来增加Stream 实例,以及一个StreamConnfiguration 实例–经过生成器规划方式树立,以便在界说设置时有灵活性。

与NATS服务器的衔接或许会失利,所以你要把*一切的程序代码包在一个try-catch 块中。咱们将运用一个 try-with-resources块,由于这是一个可封闭的衔接。

try (Connection nc = Nats.connect(defaultServer)) {
    // Creating streams, managers, sending messages, subscribing, etc.
} catch (Exception e) {
    e.printStackTrace();
}

try 块中,咱们将首要创立一个JetStreamManagement 实例,以及一个StreamConfigurationJetStream 上下文。

JetStream 类是该结构的中心API。JetStream 经过将音讯推送到订阅者正在收听的_主题_,间接地将音讯_发布_给_订阅_者。它还将用户_订阅_到主题上。

_主题_是在构建StreamConfiguration 时界说的,而JetStreamManagement 实例让咱们将具有该装备的Streams 增加到咱们的管道中。咱们将在后边的章节中更具体地介绍JetStreamManagement 。让咱们创立一个单一的流来发布音讯给一个主题,并创立JetStream 上下文来办理发布和订阅发送给该主题的音讯。

JetStreamManagement jsm = nc.jetStreamManagement();
// Create a stream, here will use an in-memory storage type, and one subject
StreamConfiguration sc = StreamConfiguration.builder()
        .name(defaultStream)
        .storageType(StorageType.Memory)
        .subjects(defaultSubject)
        .build();
// Add a stream via the `JetStreamManagement` instance and capture its info in a `StreamInfo` object
StreamInfo streamInfo = jsm.addStream(sc);
JsonUtils.printFormatted(streamInfo);
// Create a JetStream context. This hangs off the original connection
// allowing us to produce data to publish into streams and consume data from
// JetStream consumers.
JetStream js = nc.jetStream();         

现在,咱们能够持续创立一个Future列表来保存咱们的音讯成果,由于咱们正在处理异步音讯,不知道它们_何时_会回来。当经过JetStream 实例的publishAsync() 办法发布音讯时,会回来一个PublishAck ,表明未来客户端对接纳的承认。

假如你想阅览更多关于Future 接口的信息,请阅览咱们的《Java中的未来接口指南》。

此外,关于每个音讯,咱们将创立一个Message 实例,它承受一个_主题_和_数据_。咱们要向谁发送音讯以及音讯是什么。运用NatsMessage.builder() 办法,咱们能够很简略地树立一个咱们想发送的音讯,并省略某些咱们没有任何用途的参数。

一旦树立了一个Message ,咱们就能够经过JetStream‘的publishAsync() 办法异步发布它。

// Create a future for asynchronous message processing
List<CompletableFuture<PublishAck>> futures = new ArrayList<>();
int stop = defaultMessageCount + 1;
for (int x = 1; x < stop; x++) {
    String data = defaultMessage + "-" + x;
    // Create a typical NATS message
    Message msg = NatsMessage.builder()
            .subject(defaultSubject)
            .data(data, StandardCharsets.UTF_8)
            .build();
    System.out.printf("Publishing message %s on subject %s.\n", data, defaultSubject);
    // Publish a message and add the result to our `CompletableFuture` list
    futures.add(js.publishAsync(msg));
}

一旦咱们发送了音讯,咱们很或许想知道它们产生了什么,是否有任何问题被提出。经过迭代咱们的futures 列表,咱们能够查看CompletableFuture 实例是否_完成了_,假如完成了就打印它们的内容,假如没有完成果从头排队,今后再查看。

// Get Acknowledgement for the messages
while (futures.size() > 0) {
    CompletableFuture<PublishAck> f = futures.remove(0);
    if (f.isDone()) {
        try {
            PublishAck pa = f.get();
            System.out.printf("Publish Succeeded on subject %s, stream %s, seqno %d.\n",
                    defaultSubject, pa.getStream(), pa.getSeqno());
        }
        catch (ExecutionException ee) {
            System.out.println("Publish Failed " + ee);
        }
    }
    else {
        // Re-queue it and try again
        futures.add(f);
    }
} 

关于一个_发布者_来说,咱们需求一个订阅_者_来发布(合理的),避免信息在没有什么意义的状况下悬空。一个_订阅者_被创立为一个JetStreamSubscription 实例,由JetStream 上下文的subscribe() 办法回来。

// Subscribe to the messages that have been published to the subject
JetStreamSubscription sub = js.subscribe(defaultSubject);
List<Message> messages = new ArrayList<>();
// Retrieve the next message and kick off an iteration of all the messages
Message msg = sub.nextMessage(Duration.ofSeconds(1));
boolean first = true;
while (msg != null) {
    if (first) {
        first = false;
        System.out.print("Read/Ack ->");
   }
   messages.add(msg);
   if (msg.isJetStream()) {
        msg.ack();
        System.out.print(" " + new String(msg.getData()) + "\n");                    
    }
    else if (msg.isStatusMessage()) {
            System.out.print(" !" + msg.getStatus().getCode() + "!");
    }
    JsonUtils.printFormatted(msg.metaData());
    msg = sub.nextMessage(Duration.ofSeconds(1));
}
// Make sure the message goes through before we close
nc.flush(Duration.ZERO);
nc.close();

将一切这些联系起来,当咱们运转代码时–咱们应该看到这样的信息。

用NATS JetStream在Java中进行异步的Pub/Sub信息传递

咱们现已成功地树立了一个数据的Stream ,它将音讯传递给一个_主题_,而咱们的订阅者正在观察它们的异步抵达。但有时,咱们的主题称号在咱们想要订阅它们之前是不知道的。例如,你或许会_生成_主题称号,并想在新主题创立时订阅它们。或许,有一整个具有共同前缀的主题列表,你想订阅。

在这两种状况下–而不是杂乱的循环和生成-订阅逻辑–你能够运用_通配符_来针对不止一个主题。

通配符发布者/订阅者流

NATS支撑_层次化_的符号,以支撑通配符订阅。作为本指南开始时的复习。

A 主题在NATS中是一个简略的字符串,代表对数据的爱好。它被_分层符号_以支撑_通配符订阅_。

  • foo.* 匹配 _foo.bar_和 foo.baz
  • foo.*.bar匹配 _foo.a.bar_和 foo.b.bar
  • _foo.>_匹配上述任何一个
  • _>_匹配NATS中的一切内容

这些通配符能够在发布者或订阅者中装备,也能够在两者中装备。咱们稍后会看一下这个典型的比如。咱们现在要运用的办法背面的逻辑与咱们之前看到的大致相同。

public class PubWildcardSubWildcard {
	private static final String defaultStream = "pubsubwildcardasync-stream";
	private static final String defaultSubjectWildcard = "audit.us.*";
	private static final String defaultSubjectSpecific = "audit.us.east";
	private static final String defaultMessage = "Audit User";
	private static final int defaultMessageCount = 2;
	private static final String defaultServer = "nats://localhost:4222";
	public static void main( String[] args ) {
	    System.out.printf("\nPublishing to %s. Server is %s\n\n", defaultSubjectWildcard, defaultServer);
		  try (Connection nc = Nats.connect(defaultServer)) {      
          JetStreamManagement jsm = nc.jetStreamManagement();
         StreamConfiguration sc = StreamConfiguration.builder()
                 .name(defaultStream)
                 .storageType(StorageType.Memory)
                 .subjects(defaultSubjectWildcard)
                 .build();
         StreamInfo streamInfo = jsm.addStream(sc);
         JsonUtils.printFormatted(streamInfo);
         JetStream js = nc.jetStream();            
         List<CompletableFuture<PublishAck>> futures = new ArrayList<>();
         int stop = defaultMessageCount + 1;
         for (int x = 1; x < stop; x++) {
             String data = defaultMessage + "-" + x;
             Message msg = NatsMessage.builder()
                     .subject(defaultSubjectSpecific)
                     .data(data, StandardCharsets.UTF_8)
                     .build();
             System.out.printf("Publishing message %s on subject %s.\n", data, defaultSubjectSpecific);
             futures.add(js.publishAsync(msg));
         }
         while (futures.size() > 0) {
             CompletableFuture<PublishAck> f = futures.remove(0);
             if (f.isDone()) {
                 try {
                     PublishAck pa = f.get();
                     System.out.printf("Publish Succeeded on subject %s, stream %s, seqno %d.\n",
                     		defaultSubjectSpecific, pa.getStream(), pa.getSeqno());
                 }
                 catch (ExecutionException ee) {
                     System.out.println("Publish Failed " + ee);
                 }
             }
             else {
                 futures.add(f);
             }
        }
         JetStreamSubscription sub = js.subscribe(defaultSubjectWildcard);
         List<Message> messages = new ArrayList<>();
         Message msg = sub.nextMessage(Duration.ofSeconds(1));
         boolean first = true;
         while (msg != null) {
             if (first) {
                 first = false;
                 System.out.print("Read/Ack ->");
             }
             messages.add(msg);
             if (msg.isJetStream()) {
                 msg.ack();
                 System.out.print(" " + new String(msg.getData()) + "\n");            
             }
             else if (msg.isStatusMessage()) {
                     System.out.print(" !" + msg.getStatus().getCode() + "!");
             }
             JsonUtils.printFormatted(msg.metaData());
             msg = sub.nextMessage(Duration.ofSeconds(1));
         }
          nc.flush(Duration.ZERO)
          nc.close();
     }
     catch (Exception e) {
         e.printStackTrace();
     }
}
}

当咱们运转这段代码时,咱们会看到。

用NATS JetStream在Java中进行异步的Pub/Sub信息传递

作为Pub/Sub方式的替代品,运用msg.getReplyTo() ,咱们能够开始构建一个_Request-Reply_方式的完成,经过构建行列组和通道来订阅和撤销订阅–咱们能够构建一个_行列组_方式的完成。

这是或许的,由于咱们根本没有为NATS做任何特定方式的装备–你想运用的特定方式只取决于你怎么运用这个库

JetStream办理

在某一点上,你或许想观察或办理你的流。为了做到这一点,咱们将研究NATS JetStream中的流的生命周期。

  • 创立或增加一个带有主题的流
  • 经过增加一个主题来更新一个流
  • 获取流的信息
  • 铲除一个流中的信息
  • 删去一个流

为了演示这些,让咱们创立一个有几个静态字段和只要一个main() 办法的类。在这个类中,咱们将测试其间的一些操作,但依据你的架构和这些操作的触发器,你会想相应地附加上接下来的代码段。

public class NatsJsManageStreams {
    private static final String STREAM1 = "manage-stream1";
    private static final String STREAM2 = "manage-stream2";
    private static final String SUBJECT1 = "manage-subject1";
    private static final String SUBJECT2 = "manage-subject2";
    private static final String SUBJECT3 = "manage-subject3";
    private static final String SUBJECT4 = "manage-subject4";
    private static final String defaultServer = "nats://localhost:4222";
    public static void main(String[] args) {
        try (Connection nc = Nats.connect(defaultServer)) {
            JetStreamManagement jsm = nc.jetStreamManagement();
            // Management code
            // ...
          // Make sure the message goes through before we close
            nc.flush(Duration.ZERO);
            nc.close();
        } catch (Exception exp) {
            exp.printStackTrace();
        }
    }
}

在剩下的示例中,咱们将运用同一个JetStreamManagement 实例,由于咱们在一个单一的类中运用它们。不过,请记住,在实在国际的场景中,你永久不会/很少会创立一个多流设置。相反,你一般会将主题增加到现有的流中以从头运用资源。

**留意:**在整个比如中,咱们将运用一个自界说的_实用程序类来_处理流的创立或更新,异步发布而无需等候,或读取有无承认的音讯 –NatsJsUtils 。这个实用类能够在GitHub上找到。

创立或增加一个带有主题的流

咱们第一次创立一个Stream ,咱们只是设置了它的名字、主题和存储策略。还有其他各种设置,咱们能够经过构建器的办法进行调整。

// 1. Create (add) a stream with a subject
System.out.println("\n----------\n1. Configure And Add Stream 1");
StreamConfiguration streamConfig = StreamConfiguration.builder()
        .name(STREAM1)
        .subjects(SUBJECT1)
        // .retentionPolicy()
        // .maxConsumers(...)
        // .maxBytes(...)
        // .maxAge(...)
        // .maxMsgSize(...)
         .storageType(StorageType.Memory)
        // .replicas(...)
        // .noAck(...)
        // .template(...)
        // .discardPolicy(...)
        .build();
StreamInfo streamInfo = jsm.addStream(streamConfig);
NatsJsUtils.printStreamInfo(streamInfo);

RetentionPolicy 设置音讯被删去的时刻–当对它们没有爱好时(没有顾客会消费它),当它们被消费时,等等。你能够约束顾客的数量,音讯能够有多长的字节,它能够被持久化多长时刻,是否需求一个ACK 呼应–等等。

在最简略的方式下–你供给一个称号、主题和存储类型,然后build() 。咱们能够在一个Stream ,作为JetStreamManagement 实例的addStream() 办法的回来类型,经过NatsJsUtils 类漂亮地打印出信息。

用NATS JetStream在Java中进行异步的Pub/Sub信息传递

用一个主题更新一个流

你能够经过JetStreamManagement 实例的updateStream() 办法来更新现有的流。咱们将从头运用streamConfig 参考变量,并依据从现有的StreamInfo 实例中提取的装备,为咱们想要更新的流树立一个新的装备build()

// 2. Update stream, in this case, adding a new subject
// -  StreamConfiguration is immutable once created
// -  but the builder can help with that.
System.out.println("----------\n2. Update Stream 1");
streamConfig = StreamConfiguration.builder(streamInfo.getConfiguration())
        .addSubjects(SUBJECT2).build();
streamInfo = jsm.updateStream(streamConfig);
NatsJsUtils.printStreamInfo(streamInfo);
// 3. Create (add) another stream with 2 subjects
System.out.println("----------\n3. Configure And Add Stream 2");
streamConfig = StreamConfiguration.builder()
        .name(STREAM2)
        .storageType(StorageType.Memory)
        .subjects(SUBJECT3, SUBJECT4)
        .build();
streamInfo = jsm.addStream(streamConfig);
NatsJsUtils.printStreamInfo(streamInfo);

这样做的成果是。

用NATS JetStream在Java中进行异步的Pub/Sub信息传递

用NATS JetStream在Java中进行异步的Pub/Sub信息传递

获取流的信息

// 4. Get information on streams
// 4.0 publish some message for more interesting stream state information
// -   SUBJECT1 is associated with STREAM1
// 4.1 getStreamInfo on a specific stream
// 4.2 get a list of all streams
// 4.3 get a list of StreamInfo's for all streams
System.out.println("----------\n4.1 getStreamInfo");
NatsJsUtils.publish(nc, SUBJECT1, 5);
streamInfo = jsm.getStreamInfo(STREAM1);
NatsJsUtils.printStreamInfo(streamInfo);
System.out.println("----------\n4.2 getStreamNames");
List<String> streamNames = jsm.getStreamNames();
NatsJsUtils.printObject(streamNames);
System.out.println("----------\n4.2 getStreamNames");
List<StreamInfo> streamInfos = jsm.getStreams();
NatsJsUtils.printStreamInfoList(streamInfos);

用NATS JetStream在Java中进行异步的Pub/Sub信息传递

铲除一个流

你能够很简略地铲除一个流中的一切信息,把它彻底清空。

// 5. Purge a stream of it's messages
System.out.println("----------\n5. Purge stream");
PurgeResponse purgeResponse = jsm.purgeStream(STREAM1);
NatsJsUtils.printObject(purgeResponse);

用NATS JetStream在Java中进行异步的Pub/Sub信息传递

删去一个数据流

或许,假如你必定现已用完了一个信息流–你能够轻松地删去它。

// 6. Delete a stream
System.out.println("----------\n6. Delete stream");
jsm.deleteStream(STREAM2);
System.out.println("----------\n");

用NATS JetStream在Java中进行异步的Pub/Sub信息传递

处理安全问题

NATS JetStream支撑用TLS对衔接进行加密。TLS能够用来加密/解密客户/服务器衔接之间的流量,并查看服务器的身份。当在TLS方式下启用时,NATS将要求一切客户端用TLS衔接。

你能够经过加载一切的Keystores和Truststores来界说一个SSLContext ,然后在衔接到NATS时将SSLContext作为一个选项来重载。让咱们界说一个SSLUtils 类,咱们能够用它来加载一个钥匙库,创立钥匙办理器和一个SSL上下文。

class SSLUtils {
    public static String KEYSTORE_PATH = "keystore.jks";
    public static String TRUSTSTORE_PATH = "truststore.jks";
    public static String STORE_PASSWORD = "password";
    public static String KEY_PASSWORD = "password";
    public static String ALGORITHM = "SunX509";
    public static KeyStore loadKeystore(String path) throws Exception {
        KeyStore store = KeyStore.getInstance("JKS");
        BufferedInputStream in = new BufferedInputStream(new FileInputStream(path));
        try {
            store.load(in, STORE_PASSWORD.toCharArray());
        } finally {
            if (in != null) {
                in.close();
            }
        }
        return store;
    }
    public static KeyManager[] createTestKeyManagers() throws Exception {
        KeyStore store = loadKeystore(KEYSTORE_PATH);
        KeyManagerFactory factory = KeyManagerFactory.getInstance(ALGORITHM);
        factory.init(store, KEY_PASSWORD.toCharArray());
        return factory.getKeyManagers();
    }
    public static TrustManager[] createTestTrustManagers() throws Exception {
        KeyStore store = loadKeystore(TRUSTSTORE_PATH);
        TrustManagerFactory factory = TrustManagerFactory.getInstance(ALGORITHM);
        factory.init(store);
        return factory.getTrustManagers();
    }
    public static SSLContext createSSLContext() throws Exception {
        SSLContext ctx = SSLContext.getInstance(Options.DEFAULT_SSL_PROTOCOL);
        ctx.init(createTestKeyManagers(), createTestTrustManagers(), new SecureRandom());
        return ctx;
    }
}

然后,咱们的实用类都预备好了–咱们能够在创立NATS衔接时向sslContext() builder办法供给由它创立的SSLContext

public class NatsConnectTLS {
    public static void main(String[] args) {
        try {
            SSLContext ctx = SSLUtils.createSSLContext();
            Options options = new Options.Builder()
                                .server("nats://localhost:4222")
                                .sslContext(ctx) // Set the SSL context
                                .build();
            Connection nc = Nats.connect(options);
            // Do something with the connection
            nc.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

咱们还能够界说一个认证机制来约束对NATS体系的拜访。客户端没有对拜访操控的操控权,但客户端供给了与体系进行身份验证所需的装备,绑定到一个账户,并要求TLS。

在设置Options 时,能够经过userInfo() 办法设置一个简略的装备,用一个_用户名_和_暗码_来衔接。

Options options = new Options.Builder().
                            .server("nats://localhost:4222")
                            .userInfo("myname","password") // Set a user and plain text password
                            .build();
Connection nc = Nats.connect(options);

然后,在创立一个衔接时,咱们能够经过在URL中供给用户名和暗码来衔接到NATS服务器。

Connection nc = Nats.connect("nats://myname:password@localhost:4222");

相同,咱们也能够经过认证令牌,如JWTs,或隐秘作为以下装备的一部分。

Options options = new Options.Builder()
                            .server("nats://localhost:4222")
                            .token("mytoken") // Set a token
                            .build();
Connection nc = Nats.connect(options);

咱们现在能够像下面这样衔接到NATS Url。

Connection nc = Nats.connect("nats://mytoken@localhost:4222"); // Token in URL

结论

当你考虑运用分布式流媒体体系作为构建分布式微服务集群、依据物联网的体系、下一代边缘体系的神经体系时,你能够考虑运用NATS JetStream,与其他流行的、强壮的结构(如Apache Kafka)比较,这是一个轻量级的挑选。在一个数据驱动的国际里,处理大量的事情和音讯流正变得越来越遍及。NATS JetStream供给了分布式安全、多租户和横向扩展的才能。

一如既往,你能够在GitHub上找到完好的源代码。