本文为稀土技能社区首发签约文章,30天内禁止转载,30天后未获授权禁止转载,侵权必究!

前语

什么是 Streaming ?

百度百科却是有关于 Streaming Media 的解释:

流媒体(streaming media)是指将一连串的媒体数据压缩后,经过网上分段发送数据,在网上即时传输影音以供欣赏的一种技能与进程,此技能使得数据包得以像流水一样发送;假如不运用此技能,就必须在运用前下载整个媒体文件。

放到 HTTP 恳求上,Streaming 差不多也是相同的意思,将数据分段发送给客户端。

一个比较常见的运用是 ChatGPT 的打字作用:

怎么用 Next.js v14 完成一个 Streaming 接口?

假如咱们要用 Next.js 完成一个具有 Streaming 作用的接口该怎么完成呢?

注:本篇的终究咱们会调用 OpenAI 的接口来完成这样一个作用。

Fetch API

咱们先从 Fetch API 开端说起。

Fetch API 想必大家现已很了解了,咱们常常会这样运用 fetch:

fetch("http://example.com/movies.json")
  .then((response) => response.json())
  .then((data) => console.log(data));

在这个比如中,咱们获取了一个 JSON 文件并将其打印。为了获取 JSON 的内容,咱们需要运用 json() 办法(该办法回来一个将 response body 解析成 JSON 的 promise)。

实际上,response 还有一个 body 只读特点,它是一个简略的 getter,用于露出一个 ReadableStream 类型的 body 内容。简略来说,fetch 的 response.body 会回来一个流类型的内容。这样的设计在恳求大体积文件的时分会很有用。

ReadableStream

怎么读取

response.body 回来一个 ReadableStream 类型的内容。ReadableStream 有一个 getReader() 实例办法,它会创立一个读取器并将流确定。一旦流被确定,其他读取器将不能读取它,直到它被开释。

读取器是 ReadableStreamDefaultReader 类型,它是用于读取来自网络供给的流数据(例如 fetch 恳求)的默许 reader。它有一个 read() 实例办法,回来一个 Promise,供给对流内部行列中下一个分块的拜访权限。而这个 Promise 的值,其 resolve / reject 的结果取决于流的状况:

  • 假如有 chuck 可用,则 promise 将运用 { value: theChunk, done: false } 方式的目标来 resolve。
  • 假如流现已封闭,则 promise 将运用 { value: undefined, done: true } 方式的目标来 resolve。
  • 假如流发生过错,promise 将因相关过错被回绝。

依据 MDN ReadableStream.getReader() 中的比如尝试读取一下 ReadableStream 中的内容:

const decoder = new TextDecoder('utf-8');
fetch('https://api.thecatapi.com/v1/images/search')
  .then((response) => response.body)
  .then((body) => {
    const reader = body.getReader();
    reader.read().then(function process({ done, value }) {
      if (done) {
        console.log('Stream finished');
        return;
      }
      const text = decoder.decode(value);
      console.log('Received data chunk', text);
      return reader.read().then(process);
    });
  })

这儿咱们写了一个递归,不断读取流中的内容,直到流封闭(done 为 true)。咱们能够仿制这段代码,在浏览器中运转,作用如下:

怎么用 Next.js v14 完成一个 Streaming 接口?

由于接口本身不是流式,所以这儿只有一个 chunk,正是接口的回来内容。

怎么创立

知道了怎么读取,咱们又该怎么创立一个 ReadableStream 类型的内容呢?

能够运用 ReadableStream() 结构函数,示例代码如下:

const stream = new ReadableStream(
  {
    start(controller) {},
    pull(controller) {},
    cancel() {},
    type,
    autoAllocateChunkSize,
  }
);

结构函数第一个参数目标包括着五个特点,仅有第一个是必要的:

  • start(controller) —— 一个在 ReadableStream 构建后,立即被调用一次的办法。在这个办法中,你应该包括设置流功能的代码,例如开端生成数据或许以其他的方式拜访资源时

  • pull(controller) —— 一个办法,当被包括时,它会被重复的调用直到填满流的内置行列。当排入更多的分块时,这能够用于控制流

  • cancel() —— 一个办法,当被包括时,假如运用发出流将被取消的信号,它将被调用(例如,调用 ReadableStream.cancel())。内容应该采纳任何必要的办法开释对流源的拜访

  • typeautoAllocateChunkSize —— 当它们被包括时,会被用来表明流将是一个字节流。字节流将在未来的教程中单独包括,由于它们在目的和用例上与惯例的(默许的)流有些不同。它们也未在任何地方施行。

让咱们写个比如了解一下:

fetch('https://mdn.github.io/dom-examples/streams/simple-pump/tortoise.png')
  .then(response => response.body)
  .then(rs => {
    const reader = rs.getReader();
    return new ReadableStream({
      async start(controller) {
        while (true) {
          const { done, value } = await reader.read();
          if (done) {
            break;
          }
          controller.enqueue(value);
        }
        controller.close();
        reader.releaseLock();
      }
    })
  })
  .then(rs => new Response(rs))
  .then(response => response.blob())
  .then(blob => URL.createObjectURL(blob))
  .then(url => {
    var img = new Image(); 
    img.src = url;  
    document.body.append(img)
  })
  .catch(console.error);

在这个比如中,咱们 fetch 了一张图片,先用读取器拜访流,依据流的内容创立新的流文件(相当于仿制一遍),然后获取新的流文件,终究将其转换为图片元素,添加到 body 中。咱们仿制这段代码到浏览器中,作用如下:

在这段代码中,可能有点困惑的是 start 函数的第一个参数 controller,它是 ReadableStreamDefaultController 类型,用于控制 ReadableStream 的状况和内部行列。

简略来说,它有三个实例办法,close() 用于封闭流,enqueue() 用于加入流,error() 用于报错。

Next.js 完成 Streaming

根底示例

有了这些根底知识,让咱们用 Next.js 完成一个 Streaming 接口吧。

运用官方脚手架创立一个 Next.js 项目:

npx create-next-app@latest

运转作用如下:

怎么用 Next.js v14 完成一个 Streaming 接口?

为了样式美观,咱们会用到 Tailwind CSS,所以留意勾选 Tailwind CSS,其他随意。

新建 api/chat/route.js,代码如下:

function iteratorToStream(iterator) {
  return new ReadableStream({
    async pull(controller) {
      const { value, done } = await iterator.next()
      if (done) {
        controller.close()
      } else {
        controller.enqueue(value)
      }
    },
  })
}
function sleep(time) {
  return new Promise((resolve) => {
    setTimeout(resolve, time)
  })
}
const encoder = new TextEncoder()
async function* makeIterator() {
  yield encoder.encode('<p>One</p>')
  await sleep(1000)
  yield encoder.encode('<p>Two</p>')
  await sleep(1000)
  yield encoder.encode('<p>Three</p>')
}
export async function GET() {
  const iterator = makeIterator()
  const stream = iteratorToStream(iterator)
  return new Response(stream)
}

这段代码是 Next.js 官方供给的关于运用底层 API 完成 Streaming 的示例代码,其间又参阅了 MDN ReadableStream 的示例代码。代码逻辑并不复杂,主要功能是在运转迭代器,不断将内容推到流中。为了作用明显,咱们加了 sleep 函数。

本地运转 npm run dev,此时拜访 http://localhost:3000/api/chat,作用如下:

留意:Next.js 开发方式默许开启 React Strict Mode,这会导致恳求调用两次,影响这儿的结果。你能够在 next.config.js配置中封闭 React Strict Mode:

const nextConfig = {
  reactStrictMode: false,
};
export default nextConfig;

跟着恳求的继续连接,这些内容会间隔 1s 呈现。检查此接口的呼应头:

怎么用 Next.js v14 完成一个 Streaming 接口?

恳求之所以能够继续回来数据,也是得益于 HTTP 的 Transfer-Encoding 标头的值为 chunked,表明数据将以一系列分块的方式进行发送。

分块传输编码(Chunked transfer encoding)是超文本传输协议(HTTP)中的一种数据传输机制,答应 HTTP由网页服务器发送给客户端运用( 通常是网页浏览器)的数据能够分成多个部分。分块传输编码只在 HTTP 协议1.1版别(HTTP/1.1)中供给。

接口写好了,前端又该怎么调用呢?

这个时分就要用到前面讲 ReadableStream 读取的内容了。修正 app/page.js,代码如下:

'use client'
const decoder = new TextDecoder('utf-8');
import { useEffect, useState } from "react";
export default function Chat() {
  const [text, setText] = useState('')
  useEffect(() => {
    const fetchData = async () => {
      const response = await fetch("http://localhost:3000/api/chat");
      const reader = response.body.getReader();
      reader.read().then(function process({ done, value }) {
        if (done) {
          console.log('Stream finished');
          return;
        }
        const text = decoder.decode(value);
        console.log('Received data chunk', text);
        setText((value) => {
          return value + text
        })
        return reader.read().then(process);
      });
    }
    fetchData()
  }, [])
  return (
    <div className="flex flex-col w-full max-w-md py-24 mx-auto stretch">
      {text}
    </div>
  );
}

刷新页面,交互作用如下:

调用 OpenAI 接口

写 Streaming 接口,一个很常见的运用是后端调用大模型的接口,比如 OpenAI 的接口:

import OpenAI from "openai";
const openai = new OpenAI();
async function main() {
    const stream = await openai.chat.completions.create({
        model: "gpt-4",
        messages: [{ role: "user", content: "Say this is a test" }],
        stream: true,
    });
    for await (const chunk of stream) {
        process.stdout.write(chunk.choices[0]?.delta?.content || "");
    }
}
main();

让咱们完成一下开头的那个作用。

为此你需要预备一个 OpenAI API 3.5 的 KEY。修正 api/chat/route.js,代码如下:

import OpenAI from 'openai';
const openai = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY || '',
  baseURL: "https://api.openai-proxy.com/v1"
});
const encoder = new TextEncoder()
async function* makeIterator(response) {
  for await (const chunk of response) {
    const delta = chunk.choices[0].delta.content
    yield encoder.encode(delta)
  }
}
function iteratorToStream(iterator) {
  return new ReadableStream({
    async pull(controller) {
      const { value, done } = await iterator.next()
      if (done) {
        controller.close()
      } else {
        controller.enqueue(value)
      }
    },
  })
}
export async function POST(req) {
  const { messages } = await req.json();
  const response = await openai.chat.completions.create({
    model: 'gpt-3.5-turbo',
    stream: true,
    messages,
  });
  return new Response(iteratorToStream(makeIterator(response)))
}

新建 .env.local,代码如下:

OPENAI_API_KEY=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

修正 app/page.js,代码如下:

'use client';
import { useState, useEffect } from "react";
const decoder = new TextDecoder('utf-8');
export default function Chat() {
  const [text, setText] = useState('')
  const [input, setInput] = useState('')
  const handleInputChange = (e) => {
    setInput(e.target.value)
  }
  const handleSubmit = async (e) => {
    e.preventDefault()
    setText('')
    setInput('')
    const response = await fetchData(input)
    const reader = response.body.getReader();
    reader.read().then(function process({ done, value }) {
      if (done) {
        console.log('Stream finished');
        return;
      }
      const text = decoder.decode(value);
      console.log('Received data chunk', text);
      setText((value) => {
        return value + text
      })
      return reader.read().then(process);
    });
  }
  const fetchData = async (input) => {
    const response = await fetch("http://localhost:3000/api/chat", {
      method: "POST",
      body: JSON.stringify({messages: [{ role: "user", content: input }]})
    });
    return response
  }
  return (
    <div className="flex flex-col w-full max-w-md p-2 mx-auto stretch">
        <div className="whitespace-pre-wrap">
          {text ? 'AI: ' + text : ''}
        </div>
      <form onSubmit={handleSubmit}>
        <input
          className="fixed bottom-0 w-full max-w-md p-2 mb-8 border border-gray-300 rounded shadow-xl"
          value={input}
          placeholder="Say something..."
          onChange={handleInputChange}
        />
      </form>
    </div>
  );
}

交互作用如下:

怎么用 Next.js v14 完成一个 Streaming 接口?

从右侧浏览器的打印中,咱们也能够看出,跟着内容的不断回来,React 在不断的烘托内容,这才完成了打字流的作用。

总结

本篇咱们从 fetch API 开端讲起,response.body 回来的正是一个 ReadableStream 类型的只读流。接下来咱们讲了 ReadableStream 中的内容怎么读取以及 ReadableStream 怎么创立。借助底层 API 完成流的时分,就需要经过创立 ReadableStream 的方式来完成。

终究咱们讲了两个全栈示例,后端接口怎么创立,前端又该怎么调用,期望对大家事务中完成 Streaming 有借鉴意义。

参阅链接

  1. developer.mozilla.org/zh-CN/docs/…
  2. developer.mozilla.org/zh-CN/docs/…