简介
当咱们构建大规模的分布式应用集群时,咱们运用一切的尽力将单体分解成小的容器化作业负载,这些容器之间彼此通讯并同享信息以履行各种操作。
咱们没有花太多时刻去规划一个 音讯传递体系.
音讯传递一般被视为任何_大规模分布式体系_的_中枢神经体系_。一般状况下,单体内部的内存通讯被转化为线上通讯。
假如咱们对_集群_内的一切通讯进行布线,就会构成相似网状的模块,每个服务以同步的办法调用另一个服务,由于在恳求-呼应的生命周期中有大量的等候时刻,这并不抱负。
这一点 紊乱的网状结构能够经过引进一个 异步音讯传递集群而不是同步的。
在两个微服务之间没有_点对点的通讯_,咱们能够把它们的音讯托付给一种 纽带和辐条拓扑结构.因而,音讯传递是衔接整个体系的_胶水_。
在本指南中,咱们将运用 NATS JetStream来履行异步音讯传递,经过 发布/订阅方式.
那么,咱们怎么为咱们的应用程序挑选一个音讯代理或音讯传递架构呢?
挑选一个音讯传递体系或许会让人感觉很吃力,现已有大量的挑选,并且每天都有新的挑选呈现,每个挑选都有不同的优势。
挑选一个分布式的音讯传递体系
最值得留意的是,咱们现已有了广泛流行和适当频频运用的Apache Kafka,它一般被称为_分布式日志存储_。
在Kafka中发布到主题的音讯会持续一段时刻,而.NET的概念答应音讯在多个实例中均匀分布。 顾客群体答应音讯在同一服务的多个实例中均匀分布。它的功用十分强壮,但伴随着强壮的功用而来的是巨大的职责和保护。Kafka显着难以保护,关于任何期望掌握该技术的团队来说,都有一个陡峭的学习曲线。
另一个独特的挑选是RabbitMQ。RabbitMQ运用高档音讯行列协议进行音讯传递。它也显着是轻量级的。
RabbitMQ 没有运用独特的顾客组的概念,而是采取了更简略的办法,即让客户端消费_行列_。假如一个客户端不承认一个音讯,它将回到行列中,由另一个客户端处理。
一切这些产品都有一些甜头,并在它们的用例中大放异彩。
那么,假如有人想实在承受具有一个简略而又超高功能的体系的主意,而不需求保护它的额定开支呢?假如有人想做传统的pub/sub,但也想做request/reply,甚至想做scatter-gather,一起又想坚持简略和简便呢?
这就是 NATS信息传递体系或许是最适合你的解决方案的地方。
介绍一下NATS
NATS是一个经过出产验证的、云原生的音讯传递体系,是为那些想花更多时刻完成事务逻辑,而少花时刻忧虑_怎么做音讯传递的_开发者或运营商而规划的。
它是一个令人难以置信的快速、开源的音讯传递体系,树立在一个简略而强壮的中心之上。服务器运用依据文本的协议,所以尽管有一些特定言语的客户端库,但你彻底能够经过_telnet_进入NATS服务器来发送和接纳信息。
NATS被规划成永久在线,衔接,并预备承受命令。假如你满足老,知道什么是_拨号音_,那么值得一提的是,NATS团队喜爱用这个比方来规划。
NATS的一些突出特点包括。
- _超高_的功能
-
低装备
- 客户端只需求一个URL和凭证
- 服务器自动发现自己
- 能够在不影响运转服务的状况下扩展架构
- 自愈且始终可用
- 支撑多种交给方式。
- 最多一次(NATS中心)
- 至少一次(NATS流或JetStream)
- 将音讯存储到持久性存储,并按时刻或顺序重放
- 支撑通配符
- 在REST加密的数据
- 净化特定的音讯(GDPR
- 横向可扩展性
- 完好的TLS支撑。CA证书,双向支撑
- 支撑标准的用户/暗码认证/JWT的运用
- 权限约束
- 具有数据隔离的安全多租户
- 账户之间同享数据
- 具有30多个用不同言语编写的客户端库
信息传递方式
NATS支撑4种首要的通讯方式。它们是
- 依据主题
- 发布-订阅
- 恳求-回复/涣散-收集
- 行列组
每一个都是不同的方式,都有其运用案例,有一些重叠。答应一切这四种方式给了NATS极大的灵活性和功用,以应对多个应用程序之间的各种不同状况,或一个大型单体。
依据主题的音讯传递
A 主题在NATS中是一个简略的字符串,代表对数据的爱好。它被分_层符号_以支撑_通配符订阅_。
- foo.* 匹配 _foo.bar_和 foo.baz
- foo.*.bar匹配 _foo.a.bar_和 foo.b.bar
- _foo.>_匹配上述任何一个
- _>_匹配NATS中的一切内容
这种音讯传递方式答应发布者运用一个_Subject_来同享数据,而顾客能够经过运用通配符来监听这些Subject来接纳这些音讯。
从某种意义上说,这种方式是依据观察者规划方式的,它一般有一个_主题_和_观察者_。
例如,假如有人向_’audit.us.east’_发送音讯,那么一切监听该切当主题或通配符主题的订阅者都会收到这个音讯。
发布-订阅音讯
这是传统的音讯传递方式之一,其间 _发布者_将一个音讯发布到一个 _订阅者_列表,其间每个订阅者都独自订阅了它。
这相似于通讯,这种方式被_广泛_用于各种体系中。从告诉/警报体系到VoD平台,如YouTube。
这就是咱们在本指南中要运用的方式。
恳求-回复音讯/涣散-收集方式
当咱们进行REST API调用时,咱们发出一个HTTP恳求并收到一个呼应,咱们运用的是传统的同步恳求-呼应方式。_恳求-回应方式一般很困难,或许有时需求杂乱的解决方案或妥协。这种方式在运用NATS完成时适当简略,由于它只需求你在发布音讯时供给一个“回复到 “_主题。
这种方式也能够被称为 _涣散-集合_方式,发布者向未知数量的订阅者一起发布一个主题的音讯。然后一切监听这个主题的听众都会活泼起来并开始处理。然后,发布者将等候堆集来自部分或悉数订阅者的一切回复。
行列组
有时在一个分布式集群中,你必须对多个应用程序或同一应用程序的多个实例进行_负载平衡_。这种方式将是一个完美的解决方案,能够在订阅了同一主题的多个订阅者之间完成音讯的_负载平衡_。
这个解决方案最好的部分是,与其他音讯体系不同,它不需求在NATS服务器进行任何装备。行列组是由应用程序和他们的行列订阅者界说的,并在他们之间进行办理。
为了创立一个行列订阅,一切的订阅者都注册一个行列称号。当注册的主题上的音讯被发布时,组中的一个成员被随机挑选来接纳音讯。尽管行列组有多个订阅者,但每个音讯只被一个人消费。
一切这些方式在NATS服务器上需求零装备。
它彻底由应用程序或客户端库驱动。因而,让咱们研究一下jnatsJava客户端库,看看咱们怎么界说其间的一些方式并履行异步音讯传递。
基本的NATS服务器、NATS流和NATS JetStream
第一个 _NATS云原生_信息传递生态体系是以 NATS服务器依据 _‘最多一次’(At-most once_交给方式–音讯最多交给一次。它曾经以难以置信的速度将发布的音讯转发给顾客,为该行业设定了新的功能界限。关于一些应用来说,基本的NATS供给的功能超过了丢失音讯的潜在丢失。
但是,在’最多一次’的交给方式下,假如任何一个订户呈现故障,发送到的音讯将永久不会抵达,因而,没有确保数据的交给。
这相似于大多数流媒体服务所运用的超高速UDP协议,数据的速度比数据的完好性更重要。你甘愿在视频中丢失几个像素或分辨率较低,也不愿意长时刻等候听到或人的声音。
但这并不是你想在金融交易中产生的事情。在这里和那里丢失一点或许会改动或人的账单或收件人的地址。
作为对这个问题的回应 NATS流媒体引进了NATS,它用一些功能来交换音讯的持久性。没有牺牲太多的功能,NATS流是一个轻量级和高功能的平台,在引擎盖下运用基本的NATS。它的构建办法是 _‘至少一次_交给模型,具有为发布者和订阅者发送ACK
音讯的才能。
这相似于TCP,它确保数据的完好性,假如没有收到
ACK
,就会从头发送包,表明客户端或许没有收到包。
当音讯被发布后,它们会被持久化一段时刻(可定制),这样假如顾客没有收到它,它就能够被从头播放给顾客。尽管这个组件的功能十分好,并且是轻量级的,但在才能和成熟度方面,它和Kafka等分布式流媒体体系相同强壮。
开发人员提出了一些要求,如分布式安全、涣散式办理、多租户、超级集群的全球扩展以及数据的安全同享,这些要求在NATS 2.0年代催生了下一代NATS流,被称为 NATS JetStream.
关于具有分布式集群的现代流媒体体系,建议运用最新的 NATS JetStream供给。JetStream_的创立是为了解决当今流媒体技术所发现的问题–杂乱性、脆弱性和缺少扩展性。咱们将在本文中进一步讨论_JetStream。
用NATS JetStream完成Java中的异步Pub/Sub音讯传递
项目设置
运转或安装一个_NATS JetStream_服务器是十分简略的。无论你想在Windows、Mac或Linux机器上保管这个集群,Docker引擎都能使设置变得十分简略。
咱们将运用一个Docker容器来保管JetStream服务器。为了运转Docker镜像,咱们能够简略地运转。
$ docker run -ti -p 4222:4222 --name jetstream synadia/jsm:latest server
一旦你运转该程序,你会看到一些相似的内容。
NATS有一个巨大的不同言语的客户端库列表,有一个由1000多个贡献者组成的活泼社区。它在2018年加入了 CNCF(云原生计算基金会)。作为一个孵化项目在2018年加入。
咱们将运用NATS的Java客户端,即jnats。为了衔接到NATS JetStream,咱们只需求在pom.xml
中界说一个依靠项。
<dependency>
<groupId>io.nats</groupId>
<artifactId>jnats</artifactId>
<version>${version}</version>
</dependency>
就这样了!咱们现已预备好了。现在让咱们来看看咱们的一些运用状况。假如你遇到困难,你能够在GitHub上找到完好的源代码。
发布者/订阅者流
让咱们测验经过创立一个新的Stream
和一个主题来界说一个传统的_发布者/订阅_者模型。Stream
,在NATS JetStream中代表两个端点之间的任何数据流,是API的中心构建块。
咱们将创立一个单一的类,首要发布一些音讯,然后订阅览取这些音讯并发送承认。
public class PubSubAsync {
// Proceeding code goes here
}
让咱们持续界说一些全局静态设置,如流称号、主题、默认音讯和服务器。
private static final String defaultStream = "pubsubasync-stream";
private static final String defaultSubject = "pubsubasync-subject";
private static final String defaultMessage = "Hello User";
private static final int defaultMessageCount = 2;
private static final String defaultServer = "nats://localhost:4222";
咱们将在今后逐渐设置流的时候运用这些设置,以防止在其间硬编码变量。
让咱们首要设置一个Connection
到NATS JetStream服务器,实例化一个JetStreamManagement
实例,用来增加Stream
实例,以及一个StreamConnfiguration
实例–经过生成器规划方式树立,以便在界说设置时有灵活性。
与NATS服务器的衔接或许会失利,所以你要把*一切的程序代码包在一个try-catch
块中。咱们将运用一个 try-with-resources
块,由于这是一个可封闭的衔接。
try (Connection nc = Nats.connect(defaultServer)) {
// Creating streams, managers, sending messages, subscribing, etc.
} catch (Exception e) {
e.printStackTrace();
}
在try
块中,咱们将首要创立一个JetStreamManagement
实例,以及一个StreamConfiguration
和JetStream
上下文。
JetStream
类是该结构的中心API。JetStream
经过将音讯推送到订阅者正在收听的_主题_,间接地将音讯_发布_给_订阅_者。它还将用户_订阅_到主题上。
_主题_是在构建StreamConfiguration
时界说的,而JetStreamManagement
实例让咱们将具有该装备的Stream
s 增加到咱们的管道中。咱们将在后边的章节中更具体地介绍JetStreamManagement
。让咱们创立一个单一的流来发布音讯给一个主题,并创立JetStream
上下文来办理发布和订阅发送给该主题的音讯。
JetStreamManagement jsm = nc.jetStreamManagement();
// Create a stream, here will use an in-memory storage type, and one subject
StreamConfiguration sc = StreamConfiguration.builder()
.name(defaultStream)
.storageType(StorageType.Memory)
.subjects(defaultSubject)
.build();
// Add a stream via the `JetStreamManagement` instance and capture its info in a `StreamInfo` object
StreamInfo streamInfo = jsm.addStream(sc);
JsonUtils.printFormatted(streamInfo);
// Create a JetStream context. This hangs off the original connection
// allowing us to produce data to publish into streams and consume data from
// JetStream consumers.
JetStream js = nc.jetStream();
现在,咱们能够持续创立一个Future
列表来保存咱们的音讯成果,由于咱们正在处理异步音讯,不知道它们_何时_会回来。当经过JetStream
实例的publishAsync()
办法发布音讯时,会回来一个PublishAck
,表明未来客户端对接纳的承认。
假如你想阅览更多关于
Future
接口的信息,请阅览咱们的《Java中的未来接口指南》。
此外,关于每个音讯,咱们将创立一个Message
实例,它承受一个_主题_和_数据_。咱们要向谁发送音讯以及音讯是什么。运用NatsMessage.builder()
办法,咱们能够很简略地树立一个咱们想发送的音讯,并省略某些咱们没有任何用途的参数。
一旦树立了一个Message
,咱们就能够经过JetStream
‘的publishAsync()
办法异步发布它。
// Create a future for asynchronous message processing
List<CompletableFuture<PublishAck>> futures = new ArrayList<>();
int stop = defaultMessageCount + 1;
for (int x = 1; x < stop; x++) {
String data = defaultMessage + "-" + x;
// Create a typical NATS message
Message msg = NatsMessage.builder()
.subject(defaultSubject)
.data(data, StandardCharsets.UTF_8)
.build();
System.out.printf("Publishing message %s on subject %s.\n", data, defaultSubject);
// Publish a message and add the result to our `CompletableFuture` list
futures.add(js.publishAsync(msg));
}
一旦咱们发送了音讯,咱们很或许想知道它们产生了什么,是否有任何问题被提出。经过迭代咱们的futures
列表,咱们能够查看CompletableFuture
实例是否_完成了_,假如完成了就打印它们的内容,假如没有完成果从头排队,今后再查看。
// Get Acknowledgement for the messages
while (futures.size() > 0) {
CompletableFuture<PublishAck> f = futures.remove(0);
if (f.isDone()) {
try {
PublishAck pa = f.get();
System.out.printf("Publish Succeeded on subject %s, stream %s, seqno %d.\n",
defaultSubject, pa.getStream(), pa.getSeqno());
}
catch (ExecutionException ee) {
System.out.println("Publish Failed " + ee);
}
}
else {
// Re-queue it and try again
futures.add(f);
}
}
关于一个_发布者_来说,咱们需求一个订阅_者_来发布(合理的),避免信息在没有什么意义的状况下悬空。一个_订阅者_被创立为一个JetStreamSubscription
实例,由JetStream
上下文的subscribe()
办法回来。
// Subscribe to the messages that have been published to the subject
JetStreamSubscription sub = js.subscribe(defaultSubject);
List<Message> messages = new ArrayList<>();
// Retrieve the next message and kick off an iteration of all the messages
Message msg = sub.nextMessage(Duration.ofSeconds(1));
boolean first = true;
while (msg != null) {
if (first) {
first = false;
System.out.print("Read/Ack ->");
}
messages.add(msg);
if (msg.isJetStream()) {
msg.ack();
System.out.print(" " + new String(msg.getData()) + "\n");
}
else if (msg.isStatusMessage()) {
System.out.print(" !" + msg.getStatus().getCode() + "!");
}
JsonUtils.printFormatted(msg.metaData());
msg = sub.nextMessage(Duration.ofSeconds(1));
}
// Make sure the message goes through before we close
nc.flush(Duration.ZERO);
nc.close();
将一切这些联系起来,当咱们运转代码时–咱们应该看到这样的信息。
咱们现已成功地树立了一个数据的Stream
,它将音讯传递给一个_主题_,而咱们的订阅者正在观察它们的异步抵达。但有时,咱们的主题称号在咱们想要订阅它们之前是不知道的。例如,你或许会_生成_主题称号,并想在新主题创立时订阅它们。或许,有一整个具有共同前缀的主题列表,你想订阅。
在这两种状况下–而不是杂乱的循环和生成-订阅逻辑–你能够运用_通配符_来针对不止一个主题。
通配符发布者/订阅者流
NATS支撑_层次化_的符号,以支撑通配符订阅。作为本指南开始时的复习。
A 主题在NATS中是一个简略的字符串,代表对数据的爱好。它被_分层符号_以支撑_通配符订阅_。
- foo.* 匹配 _foo.bar_和 foo.baz
- foo.*.bar匹配 _foo.a.bar_和 foo.b.bar
- _foo.>_匹配上述任何一个
- _>_匹配NATS中的一切内容
这些通配符能够在发布者或订阅者中装备,也能够在两者中装备。咱们稍后会看一下这个典型的比如。咱们现在要运用的办法背面的逻辑与咱们之前看到的大致相同。
public class PubWildcardSubWildcard {
private static final String defaultStream = "pubsubwildcardasync-stream";
private static final String defaultSubjectWildcard = "audit.us.*";
private static final String defaultSubjectSpecific = "audit.us.east";
private static final String defaultMessage = "Audit User";
private static final int defaultMessageCount = 2;
private static final String defaultServer = "nats://localhost:4222";
public static void main( String[] args ) {
System.out.printf("\nPublishing to %s. Server is %s\n\n", defaultSubjectWildcard, defaultServer);
try (Connection nc = Nats.connect(defaultServer)) {
JetStreamManagement jsm = nc.jetStreamManagement();
StreamConfiguration sc = StreamConfiguration.builder()
.name(defaultStream)
.storageType(StorageType.Memory)
.subjects(defaultSubjectWildcard)
.build();
StreamInfo streamInfo = jsm.addStream(sc);
JsonUtils.printFormatted(streamInfo);
JetStream js = nc.jetStream();
List<CompletableFuture<PublishAck>> futures = new ArrayList<>();
int stop = defaultMessageCount + 1;
for (int x = 1; x < stop; x++) {
String data = defaultMessage + "-" + x;
Message msg = NatsMessage.builder()
.subject(defaultSubjectSpecific)
.data(data, StandardCharsets.UTF_8)
.build();
System.out.printf("Publishing message %s on subject %s.\n", data, defaultSubjectSpecific);
futures.add(js.publishAsync(msg));
}
while (futures.size() > 0) {
CompletableFuture<PublishAck> f = futures.remove(0);
if (f.isDone()) {
try {
PublishAck pa = f.get();
System.out.printf("Publish Succeeded on subject %s, stream %s, seqno %d.\n",
defaultSubjectSpecific, pa.getStream(), pa.getSeqno());
}
catch (ExecutionException ee) {
System.out.println("Publish Failed " + ee);
}
}
else {
futures.add(f);
}
}
JetStreamSubscription sub = js.subscribe(defaultSubjectWildcard);
List<Message> messages = new ArrayList<>();
Message msg = sub.nextMessage(Duration.ofSeconds(1));
boolean first = true;
while (msg != null) {
if (first) {
first = false;
System.out.print("Read/Ack ->");
}
messages.add(msg);
if (msg.isJetStream()) {
msg.ack();
System.out.print(" " + new String(msg.getData()) + "\n");
}
else if (msg.isStatusMessage()) {
System.out.print(" !" + msg.getStatus().getCode() + "!");
}
JsonUtils.printFormatted(msg.metaData());
msg = sub.nextMessage(Duration.ofSeconds(1));
}
nc.flush(Duration.ZERO)
nc.close();
}
catch (Exception e) {
e.printStackTrace();
}
}
}
当咱们运转这段代码时,咱们会看到。
作为Pub/Sub方式的替代品,运用msg.getReplyTo()
,咱们能够开始构建一个_Request-Reply_方式的完成,经过构建行列组和通道来订阅和撤销订阅–咱们能够构建一个_行列组_方式的完成。
这是或许的,由于咱们根本没有为NATS做任何特定方式的装备–你想运用的特定方式只取决于你怎么运用这个库。
JetStream办理
在某一点上,你或许想观察或办理你的流。为了做到这一点,咱们将研究NATS JetStream中的流的生命周期。
- 创立或增加一个带有主题的流
- 经过增加一个主题来更新一个流
- 获取流的信息
- 铲除一个流中的信息
- 删去一个流
为了演示这些,让咱们创立一个有几个静态字段和只要一个main()
办法的类。在这个类中,咱们将测试其间的一些操作,但依据你的架构和这些操作的触发器,你会想相应地附加上接下来的代码段。
public class NatsJsManageStreams {
private static final String STREAM1 = "manage-stream1";
private static final String STREAM2 = "manage-stream2";
private static final String SUBJECT1 = "manage-subject1";
private static final String SUBJECT2 = "manage-subject2";
private static final String SUBJECT3 = "manage-subject3";
private static final String SUBJECT4 = "manage-subject4";
private static final String defaultServer = "nats://localhost:4222";
public static void main(String[] args) {
try (Connection nc = Nats.connect(defaultServer)) {
JetStreamManagement jsm = nc.jetStreamManagement();
// Management code
// ...
// Make sure the message goes through before we close
nc.flush(Duration.ZERO);
nc.close();
} catch (Exception exp) {
exp.printStackTrace();
}
}
}
在剩下的示例中,咱们将运用同一个JetStreamManagement
实例,由于咱们在一个单一的类中运用它们。不过,请记住,在实在国际的场景中,你永久不会/很少会创立一个多流设置。相反,你一般会将主题增加到现有的流中以从头运用资源。
**留意:**在整个比如中,咱们将运用一个自界说的_实用程序类来_处理流的创立或更新,异步发布而无需等候,或读取有无承认的音讯 –
NatsJsUtils
。这个实用类能够在GitHub上找到。
创立或增加一个带有主题的流
咱们第一次创立一个Stream
,咱们只是设置了它的名字、主题和存储策略。还有其他各种设置,咱们能够经过构建器的办法进行调整。
// 1. Create (add) a stream with a subject
System.out.println("\n----------\n1. Configure And Add Stream 1");
StreamConfiguration streamConfig = StreamConfiguration.builder()
.name(STREAM1)
.subjects(SUBJECT1)
// .retentionPolicy()
// .maxConsumers(...)
// .maxBytes(...)
// .maxAge(...)
// .maxMsgSize(...)
.storageType(StorageType.Memory)
// .replicas(...)
// .noAck(...)
// .template(...)
// .discardPolicy(...)
.build();
StreamInfo streamInfo = jsm.addStream(streamConfig);
NatsJsUtils.printStreamInfo(streamInfo);
RetentionPolicy
设置音讯被删去的时刻–当对它们没有爱好时(没有顾客会消费它),当它们被消费时,等等。你能够约束顾客的数量,音讯能够有多长的字节,它能够被持久化多长时刻,是否需求一个ACK
呼应–等等。
在最简略的方式下–你供给一个称号、主题和存储类型,然后build()
。咱们能够在一个Stream
,作为JetStreamManagement
实例的addStream()
办法的回来类型,经过NatsJsUtils
类漂亮地打印出信息。
用一个主题更新一个流
你能够经过JetStreamManagement
实例的updateStream()
办法来更新现有的流。咱们将从头运用streamConfig
参考变量,并依据从现有的StreamInfo
实例中提取的装备,为咱们想要更新的流树立一个新的装备build()
。
// 2. Update stream, in this case, adding a new subject
// - StreamConfiguration is immutable once created
// - but the builder can help with that.
System.out.println("----------\n2. Update Stream 1");
streamConfig = StreamConfiguration.builder(streamInfo.getConfiguration())
.addSubjects(SUBJECT2).build();
streamInfo = jsm.updateStream(streamConfig);
NatsJsUtils.printStreamInfo(streamInfo);
// 3. Create (add) another stream with 2 subjects
System.out.println("----------\n3. Configure And Add Stream 2");
streamConfig = StreamConfiguration.builder()
.name(STREAM2)
.storageType(StorageType.Memory)
.subjects(SUBJECT3, SUBJECT4)
.build();
streamInfo = jsm.addStream(streamConfig);
NatsJsUtils.printStreamInfo(streamInfo);
这样做的成果是。
获取流的信息
// 4. Get information on streams
// 4.0 publish some message for more interesting stream state information
// - SUBJECT1 is associated with STREAM1
// 4.1 getStreamInfo on a specific stream
// 4.2 get a list of all streams
// 4.3 get a list of StreamInfo's for all streams
System.out.println("----------\n4.1 getStreamInfo");
NatsJsUtils.publish(nc, SUBJECT1, 5);
streamInfo = jsm.getStreamInfo(STREAM1);
NatsJsUtils.printStreamInfo(streamInfo);
System.out.println("----------\n4.2 getStreamNames");
List<String> streamNames = jsm.getStreamNames();
NatsJsUtils.printObject(streamNames);
System.out.println("----------\n4.2 getStreamNames");
List<StreamInfo> streamInfos = jsm.getStreams();
NatsJsUtils.printStreamInfoList(streamInfos);
铲除一个流
你能够很简略地铲除一个流中的一切信息,把它彻底清空。
// 5. Purge a stream of it's messages
System.out.println("----------\n5. Purge stream");
PurgeResponse purgeResponse = jsm.purgeStream(STREAM1);
NatsJsUtils.printObject(purgeResponse);
删去一个数据流
或许,假如你必定现已用完了一个信息流–你能够轻松地删去它。
// 6. Delete a stream
System.out.println("----------\n6. Delete stream");
jsm.deleteStream(STREAM2);
System.out.println("----------\n");
处理安全问题
NATS JetStream支撑用TLS对衔接进行加密。TLS能够用来加密/解密客户/服务器衔接之间的流量,并查看服务器的身份。当在TLS方式下启用时,NATS将要求一切客户端用TLS衔接。
你能够经过加载一切的Keystores和Truststores来界说一个SSLContext
,然后在衔接到NATS时将SSLContext作为一个选项来重载。让咱们界说一个SSLUtils
类,咱们能够用它来加载一个钥匙库,创立钥匙办理器和一个SSL上下文。
class SSLUtils {
public static String KEYSTORE_PATH = "keystore.jks";
public static String TRUSTSTORE_PATH = "truststore.jks";
public static String STORE_PASSWORD = "password";
public static String KEY_PASSWORD = "password";
public static String ALGORITHM = "SunX509";
public static KeyStore loadKeystore(String path) throws Exception {
KeyStore store = KeyStore.getInstance("JKS");
BufferedInputStream in = new BufferedInputStream(new FileInputStream(path));
try {
store.load(in, STORE_PASSWORD.toCharArray());
} finally {
if (in != null) {
in.close();
}
}
return store;
}
public static KeyManager[] createTestKeyManagers() throws Exception {
KeyStore store = loadKeystore(KEYSTORE_PATH);
KeyManagerFactory factory = KeyManagerFactory.getInstance(ALGORITHM);
factory.init(store, KEY_PASSWORD.toCharArray());
return factory.getKeyManagers();
}
public static TrustManager[] createTestTrustManagers() throws Exception {
KeyStore store = loadKeystore(TRUSTSTORE_PATH);
TrustManagerFactory factory = TrustManagerFactory.getInstance(ALGORITHM);
factory.init(store);
return factory.getTrustManagers();
}
public static SSLContext createSSLContext() throws Exception {
SSLContext ctx = SSLContext.getInstance(Options.DEFAULT_SSL_PROTOCOL);
ctx.init(createTestKeyManagers(), createTestTrustManagers(), new SecureRandom());
return ctx;
}
}
然后,咱们的实用类都预备好了–咱们能够在创立NATS衔接时向sslContext()
builder办法供给由它创立的SSLContext
。
public class NatsConnectTLS {
public static void main(String[] args) {
try {
SSLContext ctx = SSLUtils.createSSLContext();
Options options = new Options.Builder()
.server("nats://localhost:4222")
.sslContext(ctx) // Set the SSL context
.build();
Connection nc = Nats.connect(options);
// Do something with the connection
nc.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
咱们还能够界说一个认证机制来约束对NATS体系的拜访。客户端没有对拜访操控的操控权,但客户端供给了与体系进行身份验证所需的装备,绑定到一个账户,并要求TLS。
在设置Options
时,能够经过userInfo()
办法设置一个简略的装备,用一个_用户名_和_暗码_来衔接。
Options options = new Options.Builder().
.server("nats://localhost:4222")
.userInfo("myname","password") // Set a user and plain text password
.build();
Connection nc = Nats.connect(options);
然后,在创立一个衔接时,咱们能够经过在URL中供给用户名和暗码来衔接到NATS服务器。
Connection nc = Nats.connect("nats://myname:password@localhost:4222");
相同,咱们也能够经过认证令牌,如JWTs,或隐秘作为以下装备的一部分。
Options options = new Options.Builder()
.server("nats://localhost:4222")
.token("mytoken") // Set a token
.build();
Connection nc = Nats.connect(options);
咱们现在能够像下面这样衔接到NATS Url。
Connection nc = Nats.connect("nats://mytoken@localhost:4222"); // Token in URL
结论
当你考虑运用分布式流媒体体系作为构建分布式微服务集群、依据物联网的体系、下一代边缘体系的神经体系时,你能够考虑运用NATS JetStream,与其他流行的、强壮的结构(如Apache Kafka)比较,这是一个轻量级的挑选。在一个数据驱动的国际里,处理大量的事情和音讯流正变得越来越遍及。NATS JetStream供给了分布式安全、多租户和横向扩展的才能。
一如既往,你能够在GitHub上找到完好的源代码。