ChatGPT처럼 AI 채팅 메시지를 스트리밍 하려면?
전통적인 HTTP로 구현할 수 있을까?
자주 사용하는 ChatGPT나 Claude, Gemini와 같은 서비스를 보면, AI의 메시지를 조금씩 화면에 출력합니다. 이건 어떻게 구현한걸까요?
기본적으로 웹서비스는 HTTP 프로토콜을 기반으로 동작합니다. 그리고 이 프로토콜은 클라이언트가 서버에 요청하면, 서버가 이에 대한 응답을 하는 단방향 통신으로 동작합니다.
하지만 우리가 원하는건 서버에서 AI의 메시지 토큰을 준비되는대로 바로 클라이언트에 내려주는 것입니다. 우리가 알던 요청-응답 한쌍으로 이루어지는 통신으로는 무리가 있습니다.
WebSocket을 사용하면 구현할 수 있을까?
대안으로, WebSocket을 사용하는 방법이 있지만, 이 역시 좋은 방법은 아닙니다. 그 이유는 아래와 같습니다.
- 메시지를 스트리밍하는건 사실 양방향 채널이 필요 없는 구조다. 클라이언트가 요청하면, 서버에서 여러 차례로 나눠서 응답할 수 있기만 하면 된다.
- HTTP가 아니라서 인증(쿠키, 토큰), CORS, 캐싱, 로깅 등 기존 HTTP 프로토콜의 기능을 그대로 활용할 수 없다.
- 스케일링을 어렵게 만든다.
특히, 로드밸런서 구성이 제일 문제입니다. 일반적인 웹서비스에선 로드밸런서가 트래픽을 여러 서버로 분산시킵니다. 그런데 웹소켓을 사용하게 되면 특정 서버와 클라이언트가 연결을 유지해야 합니다. 따라서 스티키(Sticky) 세션을 통해 연결을 유지시켜줘야 하는데 이러면 수평 확장이 어려워지게 됩니다. 트래픽 분산이 안될테니까요.
정답은 Server Sent Events(SSE)
사실, 이 문제는 HTTP 프로토콜로 해결할 수 있습니다. 일반적인 응답은 아래와 같이 응답 전체가 한번에 옵니다.
1HTTP/1.1 200 OK
2Content-Length: 42
3...
4Content Length가 있으니, 브라우저는 그만큼 받으면 연결을 끝냅니다.
반면, SSE 방식으로 응답하는 경우, 아래와 같이 옵니다.
1HTTP/1.1 200 OK
2Transfer-Encoding: chunked
3Content-Type: text/event-stream
4
5data: 첫 번째 조각
6
7data: 두 번째 조각
8
9...
10
11data: N 번째 조각
12보면 Content-Length가 없고 Transfer-Encoding이 chunked입니다. 이 경우 브라우저는 응답이 아직 안끝났다고 판단하고 연결을 열어둔 채, 데이터가 올 때마다 처리합니다.
이렇게만 해도 기존의 HTTP 프로토콜을 유지하면서 서버는 하나의 요청에 대해 여러 차례로 나누어 응답을 전송할 수 있게 됩니다. AI 메시지를 스트리밍할 수 있게 되는거죠.
생각보다 간단하죠?
그럼 백문이 불여일타라고, 직접 구현해보겠습니다.
SSE를 통한 메시지 스트리밍 기능 직접 구현하기
- 이번 실습에선 Next.js 앱을 만들고, SSE 방식으로 메시지를 응답하는 Route Handler를 구현해보려고 합니다. 먼저 Next 앱을 설치하겠습니다
Next.js 앱 설치 및 실행
1# 보일러 플레이트를 통해 next 앱 생성
2pnpm create next-app@16.2.1 sse-exam --yes
3
4# 생성한 프로젝트로 이동
5cd sse-exam
6
7# next 앱 실행
8pnpm dev
9- 준비가 끝났습니다. 이제 Route Handler를 만들겠습니다.
Route 핸들러 만들기 1: 사전 준비
- app/api/route.ts 파일을 생성하고 아래와 같이 입력합니다.
1// 테스트에서 사용할 더미 토큰 목록
2const tokens = [
3 'Never gonna give you up ',
4 'Never gonna let you down ',
5 'Never gonna run around and desert you ',
6 'Never gonna make you cry ',
7 'Never gonna say goodbye ',
8 'Never gonna tell a lie and hurt you ',
9];
10
11/**
12 * 외부 API 호출을 시뮬레이션하기 위해 매개변수만큼 대기하는 함수
13 * @param ms 대기시간(밀리초)
14 */
15async function sleep(ms: number): Promise<void> {
16 return new Promise((resolve) => {
17 setTimeout(() => {
18 resolve();
19 }, ms);
20 });
21}
22- tokens는 테스트에서 사용할 더미 토큰 목록입니다. 서버에서 응답할 데이터는 필요하니까요.
- sleep는 외부 API 호출로 인해 지연시간이 생기는 것을 시뮬레이션하기 위한 함수입니다. 실제로 GPT나 Anthropic의 API를 호출하면 지연시간이 발생할텐데, 이를 시뮬레이션 하기 위해서 사용합니다.
Route 핸들러 만들기 2: GET 요청을 처리하는 핸들러
그럼 이제 GET 요청을 처리하는 핸들러를 구현해보겠습니다. 마찬가지로 app/api/route.ts 파일에 아래와 같은 함수를 추가합니다.
1export async function GET() {
2 // SSE를 위한 ReadableStream을 만든다.
3 const stream = new ReadableStream({
4 async start(controller) {
5 const encoder = new TextEncoder();
6
7 // 토큰을 인코딩하고 controller에 enqueue
8 for (const token of tokens) {
9 // SSE 표준에 맞게 포맷을 맞춰서 데이터 추가
10 const encodedToken = encoder.encode(`data: ${JSON.stringify({ text: token })}\n\n`);
11
12 await sleep(100);
13 controller.enqueue(encodedToken);
14 }
15
16 controller.enqueue(encoder.encode('data: [DONE]\n\n'));
17 controller.close();
18 },
19 });
20
21 // 응답 객체를 만들고 그 안에 Stream 객체를 넣고 반환
22 return new Response(stream, {
23 headers: {
24 'Content-Type': 'text/event-stream',
25 'Cache-Control': 'no-cache',
26 },
27 });
28}
29복잡해보이는데, 실제로는 꽤 단순합니다. 단순하게 ReadableStream 객체를 만들고, 이를 Response 객체에 담아서 응답하면 끝입니다.
그러면 조금 파고들어 보겠습니다.
ReadableStream 파고들기
- ReadableStream은 전체 데이터를 한번에 전송하는 대신, 조각(Chunk) 단위로 읽고 전송하는 객체입니다.
- 생성자의 인자로 start라는 비동기 함수를 작성할 수 있는데, 여기서 조각 단위로 읽고 전송하는 코드를 작성할 수 있습니다.
- 조각을 읽고 전송하려면, start 콜백의 controller 매개변수의 enqueue를 호출하면 됩니다.
- 모든 전송이 끝났다면 close를 호출해서 스트림을 종료할 수 있습니다.
- 이를 단순화하면 아래와 같이 작성할 수 있습니다.
1const stream = new ReadableStream({
2 start(controller) {
3 // 스트림이 생성될 때 한 번 호출
4 controller.enqueue('첫 번째 조각'); // 데이터 넣기
5 controller.enqueue('두 번째 조각');
6 controller.close(); // 스트림 종료
7 },
8});
9Response 파고들기
1// 응답 객체를 만들고 그 안에 Stream 객체를 넣고 반환
2return new Response(stream, {
3 headers: {
4 'Content-Type': 'text/event-stream',
5 'Cache-Control': 'no-cache',
6 },
7});
8- 응답객체를 생성할 때, 헤더로 몇가지 속성을 정의하고 있습니다.
- 앞서 언급한 것 처럼, Content-Type을 text/event-stream으로 설정합니다.
- 또한 Response의 body로 ReadableStream을 넣었으니, 자동으로 transfer-encoding은 chunked가 됩니다.
- 해당 API를 호출한 클라이언트는 HTTP 응답메시지의 헤더에서 해당 속성을 읽고,
아, 서버에서 조각 단위로 메시지를 전송하겠구나, 나도 결과를 ReadableStream으로 취급해서 읽어야겠다라고 판단합니다. - cache-control같은 경우는 필수는 아닌데, 없으면 운영 환경에서 예상치 못할 캐싱이 발생할 수 있어서 방어적으로 넣었다고 보시면 되겠습니다.
중간 정리
설명이 길어졌는데, 정리하고 넘어가겠습니다.
먼저 SSE 방식의 장점 및 특징은 아래와 같습니다.
- 스트리밍 방식으로 조각조각 응답되는 AI의 메시지를 클라이언트에 스트리밍하려면, SSE 방식을 사용해야 한다.
- SSE는 HTTP 프로토콜을 이용한 방식이므로, 기존 인증(쿠키, 토큰)을 그대로 사용 가능하다.
- WebSocket 방식과 다르게 여전히 단방향 통신이므로 수평확장에 용이하다.
서버의 SSE 구현 방법은 아래와 같습니다.
- ReadableStream 객체를 생성하고 Response 객체의 바디에 담아서 응답한다.
- 헤더로 Content-Type을 text/event-stream으로 설정한다. 바디에 담은 스트림 객체로 인해 transfer-encoding은 자동으로 chunked가 된다.
클라이언트 코드 구현
이번에는 SSE 방식으로 메시지를 전송하는 API를 클라이언트에서 어떻게 호출하는지 알아보겠습니다.
먼저, app/page.tsx를 열고, 아래와 같이 수정합니다.
1'use client';
2
3import { useState } from 'react';
4
5export default function Home() {
6 const [message, setMessage] = useState<string>('');
7
8 const onClick = async () => {
9 // API 호출 코드를 여기서 작성해야함!
10 };
11
12 return (
13 <div className="w-160 h-175 bg-gray-50 border border-gray-300 shadow-md rounded-md mx-auto mt-10 overflow-hidden">
14 <div className="flex justify-between items-center bg-gray-100 p-5 border-b border-b-gray-300">
15 <h1 className="font-bold text-xl">SSE Exam</h1>
16 <button
17 className="px-4 py-2 bg-blue-700 rounded-md text-white font-bold hover:brightness-105 cursor-pointer"
18 onClick={onClick}
19 >
20 Request AI
21 </button>
22 </div>
23
24 <div className="p-5 overflow-auto">
25 {message.length === 0 && <p className="text-gray-500 text-2xl">no data!</p>}
26 {message.length > 0 && message}
27 </div>
28 </div>
29 );
30}
31Request AI버튼을 누르면 서버에서 SSE 방식으로 전송한 메시지를 출력하는 간단한 화면입니다.- 이제 onClick을 구현해보겠습니다. onClick을 아래와 같이 작성해주세요.
1const onClick = async () => {
2 const response = await fetch('/api', { method: 'GET' });
3 const reader = response.body?.getReader();
4
5 if (!reader) {
6 throw new Error('응답 결과가 잘못되었습니다. reader를 가져오지 못했습니다.');
7 }
8
9 const decoder = new TextDecoder();
10
11 while (true) {
12 const { done, value } = await reader.read();
13
14 if (done) break;
15
16 const lines = decoder.decode(value).split('\n\n');
17
18 for (const line of lines) {
19 if (!line.startsWith('data: ') || line === 'data: [DONE]') continue;
20
21 const { text } = JSON.parse(line.slice(6));
22 setMessage((prev) => prev + text);
23 }
24 }
25};
26서버에서 ReadableStream을 바디에 담아 전송했으니, 클라이언트 역시 ReadableStream으로서 읽어야 합니다.
간단하게 response.body.getReader()를 호출하여 스트림을 읽는 객체를 꺼낼 수 있습니다.
더 이상 읽을 데이터가 없을 때 까지(done) 루프를 돌면서 메시지를 읽습니다.
이 때, while(true)루프로 들어갔으니, 해당 루프가 끝날때까지 UI 스레드가 블록되는게 아닌지 걱정하실 수 있는데, 괜찮습니다.
잘 보시면 reader.read()에서 await가 걸려있습니다. 해당 메서드가 비동기라, 서버에서 청크를 보낼 때까지 이벤트 루프의 다른 태스크를 처리합니다.
따라서 해당 코드로 인해 화면이 블로킹 될 일은 없습니다.
"SSE 방식으로 메시지를 조각 단위로 받고 이를 출력하는 화면"
개발자 도구에서 Network 요청 확인해보기.
이제 request ai 버튼을 누르면 화면에 메시지가 조각 단위로 출력되는 것을 확인하실 수 있습니다.
실제로 전송이 어떻게 이루어졌는지 알아보기 위해 개발자도구의 network탭을 열어서 확인해보겠습니다.
먼저 응답 메시지의 헤더입니다
| 헤더 | 값 |
|---|---|
| cache-control | no-cache |
| content-type | text/event-stream |
| transfer-encoding | chunked |
- 앞서 설정한 것 처럼 content type이나 transfer encoding이 잘 설정된 것을 확인할 수 있습니다.
다음으로 EventStream 탭에서 실제 수신된 메시지 목록입니다.
| Type | Data | Time |
|---|---|---|
| message | {"text":"Never gonna give you up "} | 11:04:21.967 |
| message | {"text":"Never gonna let you down "} | 11:04:22.179 |
| message | {"text":"Never gonna run around and desert you "} | 11:04:22.280 |
| message | {"text":"Never gonna make you cry "} | 11:04:22.380 |
| message | {"text":"Never gonna say goodbye "} | 11:04:22.482 |
| message | {"text":"Never gonna tell a lie and hurt you "} | 11:04:22.583 |
EventStream탭은 SSE 방식으로 전송했을때 표시되는 탭인데, 시간순으로 어떤 메시지가 전송되었는지를 확인할 수 있습니다.
복잡한 WebSocket 설정 없이 간단한 HTTP 설정과 ReadableStream을 통해 데이터가 준비되는대로 나누어 응답할 수 있다는게 정말 매력적이네요.
솔직히 해당 기능을 어떻게 구현했는지 조사하기전엔 당연히 WebSocket을 이용해서 구현했을거라고 생각했는데, HTTP로도 이런 기능을 구현할 수 있다는 사실이 정말 신기했습니다.
AI 챗봇이 아니더라도, SSE의 기반이 되는 HTTP chunked transfer 자체는 대용량 파일을 조금씩 전송하는 경우에도 활용할 수 있습니다. 영상이나 음성을 스트리밍 하는 경우도
MediaSource API와 결합하면 브라우저에서 영상 청크를 받으면서 재생할 수 있다고 하네요.
마치며
이번 포스트에선 HTTP의 스펙을 활용해서 ChatGPT나 Claude처럼 AI의 메시지를 클라이언트에 스트리밍하는 방법에 대해 알아보았습니다. 방법을 찾아보기 전에는 WebSocket을 사용해야 하는줄 알고 어렵겠다고 생각했는데, 막상 해보니 쉽고 직관적이어서 매우 놀랐습니다. AI를 활용한 프로젝트를 만들 때 이 방법을 써서 구현해보고 싶다는 생각이 드네요.
긴 포스트 읽어주셔서 감사합니다!