本文为稀土技能社区首发签约文章,30天内禁止转载,30天后未获授权禁止转载,侵权必究!
前语
什么是 Streaming ?
百度百科却是有关于 Streaming Media 的解释:
流媒体(streaming media)是指将一连串的媒体数据压缩后,经过网上分段发送数据,在网上即时传输影音以供欣赏的一种技能与进程,此技能使得数据包得以像流水一样发送;假如不运用此技能,就必须在运用前下载整个媒体文件。
放到 HTTP 恳求上,Streaming 差不多也是相同的意思,将数据分段发送给客户端。
一个比较常见的运用是 ChatGPT 的打字作用:
假如咱们要用 Next.js 完成一个具有 Streaming 作用的接口该怎么完成呢?
注:本篇的终究咱们会调用 OpenAI 的接口来完成这样一个作用。
Fetch API
咱们先从 Fetch API 开端说起。
Fetch API 想必大家现已很了解了,咱们常常会这样运用 fetch:
fetch("http://example.com/movies.json")
.then((response) => response.json())
.then((data) => console.log(data));
在这个比如中,咱们获取了一个 JSON 文件并将其打印。为了获取 JSON 的内容,咱们需要运用 json()
办法(该办法回来一个将 response body 解析成 JSON 的 promise)。
实际上,response 还有一个 body 只读特点,它是一个简略的 getter,用于露出一个 ReadableStream 类型的 body 内容。简略来说,fetch 的 response.body 会回来一个流类型的内容。这样的设计在恳求大体积文件的时分会很有用。
ReadableStream
怎么读取
response.body 回来一个 ReadableStream 类型的内容。ReadableStream 有一个 getReader() 实例办法,它会创立一个读取器并将流确定。一旦流被确定,其他读取器将不能读取它,直到它被开释。
读取器是 ReadableStreamDefaultReader 类型,它是用于读取来自网络供给的流数据(例如 fetch 恳求)的默许 reader。它有一个 read() 实例办法,回来一个 Promise,供给对流内部行列中下一个分块的拜访权限。而这个 Promise 的值,其 resolve / reject 的结果取决于流的状况:
- 假如有 chuck 可用,则 promise 将运用
{ value: theChunk, done: false }
方式的目标来 resolve。 - 假如流现已封闭,则 promise 将运用
{ value: undefined, done: true }
方式的目标来 resolve。 - 假如流发生过错,promise 将因相关过错被回绝。
依据 MDN ReadableStream.getReader() 中的比如尝试读取一下 ReadableStream 中的内容:
const decoder = new TextDecoder('utf-8');
fetch('https://api.thecatapi.com/v1/images/search')
.then((response) => response.body)
.then((body) => {
const reader = body.getReader();
reader.read().then(function process({ done, value }) {
if (done) {
console.log('Stream finished');
return;
}
const text = decoder.decode(value);
console.log('Received data chunk', text);
return reader.read().then(process);
});
})
这儿咱们写了一个递归,不断读取流中的内容,直到流封闭(done 为 true)。咱们能够仿制这段代码,在浏览器中运转,作用如下:
由于接口本身不是流式,所以这儿只有一个 chunk,正是接口的回来内容。
怎么创立
知道了怎么读取,咱们又该怎么创立一个 ReadableStream 类型的内容呢?
能够运用 ReadableStream() 结构函数,示例代码如下:
const stream = new ReadableStream(
{
start(controller) {},
pull(controller) {},
cancel() {},
type,
autoAllocateChunkSize,
}
);
结构函数第一个参数目标包括着五个特点,仅有第一个是必要的:
-
start(controller)
—— 一个在 ReadableStream 构建后,立即被调用一次的办法。在这个办法中,你应该包括设置流功能的代码,例如开端生成数据或许以其他的方式拜访资源时 -
pull(controller)
—— 一个办法,当被包括时,它会被重复的调用直到填满流的内置行列。当排入更多的分块时,这能够用于控制流 -
cancel()
—— 一个办法,当被包括时,假如运用发出流将被取消的信号,它将被调用(例如,调用 ReadableStream.cancel())。内容应该采纳任何必要的办法开释对流源的拜访 -
type
和autoAllocateChunkSize
—— 当它们被包括时,会被用来表明流将是一个字节流。字节流将在未来的教程中单独包括,由于它们在目的和用例上与惯例的(默许的)流有些不同。它们也未在任何地方施行。
让咱们写个比如了解一下:
fetch('https://mdn.github.io/dom-examples/streams/simple-pump/tortoise.png')
.then(response => response.body)
.then(rs => {
const reader = rs.getReader();
return new ReadableStream({
async start(controller) {
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
controller.enqueue(value);
}
controller.close();
reader.releaseLock();
}
})
})
.then(rs => new Response(rs))
.then(response => response.blob())
.then(blob => URL.createObjectURL(blob))
.then(url => {
var img = new Image();
img.src = url;
document.body.append(img)
})
.catch(console.error);
在这个比如中,咱们 fetch 了一张图片,先用读取器拜访流,依据流的内容创立新的流文件(相当于仿制一遍),然后获取新的流文件,终究将其转换为图片元素,添加到 body 中。咱们仿制这段代码到浏览器中,作用如下:
在这段代码中,可能有点困惑的是 start 函数的第一个参数 controller,它是 ReadableStreamDefaultController 类型,用于控制 ReadableStream 的状况和内部行列。
简略来说,它有三个实例办法,close()
用于封闭流,enqueue()
用于加入流,error()
用于报错。
Next.js 完成 Streaming
根底示例
有了这些根底知识,让咱们用 Next.js 完成一个 Streaming 接口吧。
运用官方脚手架创立一个 Next.js 项目:
npx create-next-app@latest
运转作用如下:
为了样式美观,咱们会用到 Tailwind CSS,所以留意勾选 Tailwind CSS,其他随意。
新建 api/chat/route.js
,代码如下:
function iteratorToStream(iterator) {
return new ReadableStream({
async pull(controller) {
const { value, done } = await iterator.next()
if (done) {
controller.close()
} else {
controller.enqueue(value)
}
},
})
}
function sleep(time) {
return new Promise((resolve) => {
setTimeout(resolve, time)
})
}
const encoder = new TextEncoder()
async function* makeIterator() {
yield encoder.encode('<p>One</p>')
await sleep(1000)
yield encoder.encode('<p>Two</p>')
await sleep(1000)
yield encoder.encode('<p>Three</p>')
}
export async function GET() {
const iterator = makeIterator()
const stream = iteratorToStream(iterator)
return new Response(stream)
}
这段代码是 Next.js 官方供给的关于运用底层 API 完成 Streaming 的示例代码,其间又参阅了 MDN ReadableStream 的示例代码。代码逻辑并不复杂,主要功能是在运转迭代器,不断将内容推到流中。为了作用明显,咱们加了 sleep 函数。
本地运转 npm run dev
,此时拜访 http://localhost:3000/api/chat,作用如下:
留意:Next.js 开发方式默许开启 React Strict Mode,这会导致恳求调用两次,影响这儿的结果。你能够在 next.config.js
配置中封闭 React Strict Mode:
const nextConfig = {
reactStrictMode: false,
};
export default nextConfig;
跟着恳求的继续连接,这些内容会间隔 1s 呈现。检查此接口的呼应头:
恳求之所以能够继续回来数据,也是得益于 HTTP 的 Transfer-Encoding 标头的值为 chunked,表明数据将以一系列分块的方式进行发送。
分块传输编码(Chunked transfer encoding)是超文本传输协议(HTTP)中的一种数据传输机制,答应 HTTP由网页服务器发送给客户端运用( 通常是网页浏览器)的数据能够分成多个部分。分块传输编码只在 HTTP 协议1.1版别(HTTP/1.1)中供给。
接口写好了,前端又该怎么调用呢?
这个时分就要用到前面讲 ReadableStream 读取的内容了。修正 app/page.js
,代码如下:
'use client'
const decoder = new TextDecoder('utf-8');
import { useEffect, useState } from "react";
export default function Chat() {
const [text, setText] = useState('')
useEffect(() => {
const fetchData = async () => {
const response = await fetch("http://localhost:3000/api/chat");
const reader = response.body.getReader();
reader.read().then(function process({ done, value }) {
if (done) {
console.log('Stream finished');
return;
}
const text = decoder.decode(value);
console.log('Received data chunk', text);
setText((value) => {
return value + text
})
return reader.read().then(process);
});
}
fetchData()
}, [])
return (
<div className="flex flex-col w-full max-w-md py-24 mx-auto stretch">
{text}
</div>
);
}
刷新页面,交互作用如下:
调用 OpenAI 接口
写 Streaming 接口,一个很常见的运用是后端调用大模型的接口,比如 OpenAI 的接口:
import OpenAI from "openai";
const openai = new OpenAI();
async function main() {
const stream = await openai.chat.completions.create({
model: "gpt-4",
messages: [{ role: "user", content: "Say this is a test" }],
stream: true,
});
for await (const chunk of stream) {
process.stdout.write(chunk.choices[0]?.delta?.content || "");
}
}
main();
让咱们完成一下开头的那个作用。
为此你需要预备一个 OpenAI API 3.5 的 KEY。修正 api/chat/route.js
,代码如下:
import OpenAI from 'openai';
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY || '',
baseURL: "https://api.openai-proxy.com/v1"
});
const encoder = new TextEncoder()
async function* makeIterator(response) {
for await (const chunk of response) {
const delta = chunk.choices[0].delta.content
yield encoder.encode(delta)
}
}
function iteratorToStream(iterator) {
return new ReadableStream({
async pull(controller) {
const { value, done } = await iterator.next()
if (done) {
controller.close()
} else {
controller.enqueue(value)
}
},
})
}
export async function POST(req) {
const { messages } = await req.json();
const response = await openai.chat.completions.create({
model: 'gpt-3.5-turbo',
stream: true,
messages,
});
return new Response(iteratorToStream(makeIterator(response)))
}
新建 .env.local
,代码如下:
OPENAI_API_KEY=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
修正 app/page.js
,代码如下:
'use client';
import { useState, useEffect } from "react";
const decoder = new TextDecoder('utf-8');
export default function Chat() {
const [text, setText] = useState('')
const [input, setInput] = useState('')
const handleInputChange = (e) => {
setInput(e.target.value)
}
const handleSubmit = async (e) => {
e.preventDefault()
setText('')
setInput('')
const response = await fetchData(input)
const reader = response.body.getReader();
reader.read().then(function process({ done, value }) {
if (done) {
console.log('Stream finished');
return;
}
const text = decoder.decode(value);
console.log('Received data chunk', text);
setText((value) => {
return value + text
})
return reader.read().then(process);
});
}
const fetchData = async (input) => {
const response = await fetch("http://localhost:3000/api/chat", {
method: "POST",
body: JSON.stringify({messages: [{ role: "user", content: input }]})
});
return response
}
return (
<div className="flex flex-col w-full max-w-md p-2 mx-auto stretch">
<div className="whitespace-pre-wrap">
{text ? 'AI: ' + text : ''}
</div>
<form onSubmit={handleSubmit}>
<input
className="fixed bottom-0 w-full max-w-md p-2 mb-8 border border-gray-300 rounded shadow-xl"
value={input}
placeholder="Say something..."
onChange={handleInputChange}
/>
</form>
</div>
);
}
交互作用如下:
从右侧浏览器的打印中,咱们也能够看出,跟着内容的不断回来,React 在不断的烘托内容,这才完成了打字流的作用。
总结
本篇咱们从 fetch API 开端讲起,response.body 回来的正是一个 ReadableStream 类型的只读流。接下来咱们讲了 ReadableStream 中的内容怎么读取以及 ReadableStream 怎么创立。借助底层 API 完成流的时分,就需要经过创立 ReadableStream 的方式来完成。
终究咱们讲了两个全栈示例,后端接口怎么创立,前端又该怎么调用,期望对大家事务中完成 Streaming 有借鉴意义。