사내 챗봇/LLM 기능을 붙이면서, 답변을 한 번에 반환하는 게 아니라 토큰(혹은 chunk) 단위로 프론트에 흘려주는 SSE(또는 스트리밍 응답)를 구현해야 했습니다!
(기존에 MVP 용으로 개발했던 FastAPI 서버를 다 버리고 다시 새롭게 구현..ㅜㅜ.이제는 처음부터 끝까지 제가 하나씩 고쳐가고 패키지 구조를 잡고, common 모듈로 분리할 수 있고,, 재사용성을 고려한 코드 설계를 할 수 있다는 장점이 보였지만..
Spring 쪽을 하며 Fast 쪽도 하려니 조금 힘들 거 같아, SpringBoot 추가 기능 개발은 제 손을 떠나갔습니다..다만 FastAPI 로 MVC를 구현하는 것은 좀 오랜만이라 공부하며 하다보니 느릿느릿하게 개발하게 된 듯.. )
FastAPI에서 StreamingResponse는 간단해 보이지만, 실제로는 “동기/비동기 경계”, “블로킹 I/O”, “버퍼링”, “저장 로직 위치” 때문에 한 번에 되는 경우가 거의 없습니다. 이번 글은 제가 해당 기능을 개발하며 실제로 겪은 에러 흐름과, 왜 그 에러가 터졌는지, 그리고 어떻게 고쳤는지를 기록해놨습니다 ! (나중에 다시 보며 상기해야죵..)
1. 개발 배경: 왜 SSE(스트리밍)가 필요했나
LLM 답변을 사용자에게 빠르게 보여주려면, 모델이 전체 답을 다 만든 뒤 반환하는 방식보다 생성되는 대로 바로 출력하는 방식이 UX가 훨씬 좋다. 그래서 백엔드는 다음과 같은 구조를 목표로 했습니다.
- 프론트 → FastAPI: 채팅 요청(POST)
- FastAPI → Ollama: 스트리밍 생성 요청
- FastAPI → 프론트: 생성되는 chunk를 즉시 전송
스트리밍이 끝나면: 전체 답(full_answer)을 컨텍스트 저장 API(Spring)로 저장
그런데 이제 여기서 FastAPI 로 들어온 요청 중 데이터가 저장되어야 하는 것들은 SpringBoot로 전달해야되는 통신이 여러개 붙었음..
문제는 여기서 “Ollama 스트림 읽기”는 requests 기반이라 동기(generator)로 돌아가고, FastAPI의 StreamingResponse는 이벤트 루프(비동기 실행 컨텍스트) 위에서 동작한다는 점...
2. 초기 구현: 겉으로는 맞아 보였지만 실제로는 깨졌다
처음엔 StreamingResponse에 async generator를 감싸서 넣었습니다.
( 사실 이것도 기존 코드를 다 뜯어 고쳤었음.. 원래 SSE 만 되어있었는데, 여기서 SpringBoot 와의 통신이 하나 들어갔어야 됐어서.. 고민고민하다가 finally 때 처리하기로 함.)
@router.post("")
def chat_stream(req: ChatRequest):
async def proxy_stream():
answer_chunks = []
try:
async for chunk in generate_stream_response(prompt):
answer_chunks.append(chunk)
yield chunk
finally:
full_answer = "".join(answer_chunks).strip()
save_context_and_metadata(
)
return StreamingResponse(proxy_stream(), media_type="text/plain; charset=utf-8")
여기서 generate_stream_response()는 이런 형태? 였습니다.
def generate_stream_response(prompt: str):
url = f"{settings.OLLAMA_BASE_URL}/api/generate"
payload = {"model": settings.DEFAULT_MODEL, "prompt": prompt, "stream": True}
with requests.post(url, json=payload, stream=True, timeout=settings.REQUEST_TIMEOUT) as r:
r.raise_for_status()
for line in r.iter_lines():
if not line:
continue
data = json.loads(line.decode("utf-8"))
if "response" in data:
yield data["response"]
if data.get("done"):
break
겉으로 보면 “async for로 chunk를 계속 yield”하는 것처럼 보이지만, 실제로는 바로 에러...
3. 터졌던 에러 1: async for는 async iterator만 가능하다
에러 메시지 핵심
TypeError: 'async for' requires an object with __aiter__ method, got generator
원인은?
- generate_stream_response()는 def로 정의된 “동기 generator”
- async for는 __aiter__를 가진 “비동기 iterator”만 순회 가능
즉, “동기 generator를 비동기처럼 돌릴 수 없다”가 첫 번째 문제였습니다..
4. 동기/비동기 경계: 왜 여기서 동기면 안 되고, 비동기면 안 되나
이 지점에서 개념을 정리!
4.1 FastAPI 이벤트 루프에서 동기 작업을 돌리면 안 되는 이유
FastAPI(정확히는 Starlette/Uvicorn)는 이벤트 루프 기반이다. 이벤트 루프는 동시에 여러 요청을 처리하지만, 그 전제는 각 요청 핸들러가 오래 막히지 않는다......
requests.post(..., stream=True)는 블로킹 I/O입니다.
즉, 네트워크에서 줄 하나 받을 때까지 현재 스레드를 점유해서. 이걸 이벤트 루프 스레드에서 실행하면 다음 문제가 생깁니다.
- 다른 요청이 들어와도 이벤트 루프가 막혀서 응답이 느려짐
- 스트리밍 중간에 지연이 커지고, 프론트에 chunk가 늦게 도착
- 특정 환경에서는 “모아서 한 번에 나가는 것처럼” 보이는 현상까지 발생
4.2 그런데 “비동기만 쓰면” 되는 게 아닌 이유
그렇다고 async def generate_stream_response()로 바꾸면 끝이냐? 노노..
지금 Ollama 호출은 requests임.
requests는 async 라이브러리가 아닙니다.... await requests.post(...) 같은 건 존재하지 않는다...
즉,
- 외부 호출을 async로 만들려면 httpx.AsyncClient 같은 라이브러리로 전환하거나
- 동기 호출을 이벤트 루프 밖(스레드)에서 돌리고, 결과만 async로 전달해야 한다
이번 프로젝트에서는 “기존 requests 기반 코드를 크게 바꾸지 않고” 해결해야 했기 때문에 두 번째 방식을 택했습니다!
5. 해결책: 동기 generator를 스레드에서 돌리고, async로 흘려보내는 브릿지
핵심은
- producer(동기): 스레드에서 for chunk in gen_factory(): ...
- consumer(비동기): async generator에서 채널을 async for로 읽어서 yield
이를 위해 anyio의 memory object stream을 사용했습니다.
import anyio
from anyio import create_memory_object_stream
from typing import Callable, Iterator, AsyncIterator
async def stream_sync_generator(
gen_factory: Callable[[], Iterator[str]],
*,
max_buffer_size: int = 200,
) -> AsyncIterator[str]:
send_chan, recv_chan = create_memory_object_stream(max_buffer_size)
def producer():
try:
for chunk in gen_factory():
send_chan.send_nowait(chunk)
finally:
send_chan.close()
async with anyio.create_task_group() as tg:
tg.start_soon(anyio.to_thread.run_sync, producer)
async for chunk in recv_chan:
yield chunk
여기서 중요한 포인트는 create_memory_object_stream 사용법임
처음엔 아래처럼 작성했다가 에러가 났습니당.
create_memory_object_stream(str, max_buffer_size=max_buffer_size)
그리고 에러는 이렇게 터짐 ...
TypeError: create_memory_object_stream.__new__() got multiple values for argument 'max_buffer_size'
anyio의 create_memory_object_stream는 타입을 인자로 받지 않는다. 첫 번째 인자는 max_buffer_size다. 타입을 넘기면, 내부적으로 max_buffer_size 자리에 들어가버려 충돌한다.
그래서 올바른 호출은 아래처럼 해야 합니다.
send_chan, recv_chan = create_memory_object_stream(max_buffer_size)
6. 저장 로직에서 터진 에러 2: run_sync에 kwargs를 넣으면 스트림이 깨진다
스트리밍이 끝난 후 full_answer를 Spring으로 저장하려고 finally에서 저장을 호출했다. 이때 저장 함수는 requests.put(...) 같은 블로킹 호출이므로 이벤트 루프를 막지 않기 위해 anyio.to_thread.run_sync로 넘겼습니다.
그런데 여기서 또 문제가 생김.
다음처럼 kwargs를 직접 넘기면 anyio 버전에 따라 에러가 나면서, 스트리밍 자체가 종료 시점에 깨집니다.
await anyio.to_thread.run_sync(
save_context_and_metadata,
userId=req.user_id,
roomId=req.room_id,
messageId="",
message=full_answer,
)
그래서 이 부분은 functools.partial로 해결!. “인자가 바인딩된 함수”를 만들어서 run_sync에는 함수 하나만 넘겨주는 방식입니다.
from functools import partial
await anyio.to_thread.run_sync(
partial(
save_context_and_metadata,
userId=req.user_id,
roomId=req.room_id,
messageId="",
message=full_answer,
context=ctx.context or "",
)
)
여기서도 실수 포인트가 있었는데, 저장 요청 바디에 context가 필수인데, 이를 누락하면 Spring에서 400을 반환하고, raise_for_status()에서 또 예외가 발생한다.
결국 스트리밍 끝자락에서 저장이 실패하면서 “응답이 끊긴 것처럼” 보일 수 있습니다.
그래서 저장 payload의 required 필드는 모두 채우는 것이 중요.
7. 최종 구현: “스트리밍은 계속 흘러가고, 끝나면 저장”이 되는 구조
최종적으로 컨트롤러는 이렇게 정리
from fastapi.responses import StreamingResponse
from functools import partial
import anyio
@router.post("")
def chat_stream(req: ChatRequest):
"""
async def proxy_stream():
answer_chunks: list[str] = []
try:
async for chunk in stream_sync_generator(lambda: generate_stream_response(prompt)):
answer_chunks.append(chunk)
yield chunk
finally:
full_answer = "".join(answer_chunks).strip()
await anyio.to_thread.run_sync(
partial(
)
)
return StreamingResponse(proxy_stream(), media_type="text/plain; charset=utf-8")
8. 정리: SSE/스트리밍 구현에서 내가 얻은 교훈
- StreamingResponse는 “그냥 yield 하면 되겠지” 수준이 아니다. 동기/비동기 경계가 핵심이다.
- requests 기반 스트림은 동기다. 이벤트 루프에서 직접 돌리면 지연/버퍼링/장애로 이어진다.
- 동기 generator를 그대로 유지하고 싶으면, 스레드에서 돌리고 async로 흘리는 브릿지가 필요하다.
- 스트리밍 종료 후 저장 같은 블로킹 작업은 반드시 스레드로 빼야 한다. 그렇지 않으면 마지막에 스트림이 깨질 수 있다.
- anyio/Starlette는 버전에 따라 함수 시그니처/kwargs 처리 차이가 있어서, partial 같은 안전장치가 실전에서 매우 유용했다.
9. 다음에 개선할 포인트
이번 구현은 “기존 requests/Ollama 호출을 최대한 유지”하면서 스트리밍을 살린 방식!
더 깔끔하게 가려면 다음도 고려할 수 있을 듯 해용.
- Ollama 호출을 httpx.AsyncClient로 바꾸고, 진짜 async 스트림으로 처리
- 프론트에서 fetch streaming으로 chunk를 즉시 렌더링하는 방식 정리
- 스트림 중간에 클라이언트가 끊었을 때 저장 로직을 스킵하는 disconnect 처리
이 글은 제가 실제로 SSE/스트리밍을 붙이면서 “왜 동기면 안 되는지”, “왜 무조건 비동기가 정답도 아닌지”, 그리고 “실무에서 어떤 형태로 연결해야 하는지”를 에러 기반으로 정리한 기록입니다.
다음 글에서는 Nginx 뒤에서 스트리밍이 “한꺼번에” 모여서 오는 문제까지 포함해서, 배포 환경에서의 최종 점검 포인트를 더 정리해볼 생각입니다!!!!
'ServerDev > FastAPI' 카테고리의 다른 글
| [FastAPI 운영 로그 설계] 로그 포맷과 클라이언트 IP (0) | 2025.12.27 |
|---|---|
| [FastAPI] 30명 동시 접속 RAG 챗봇, 왜 FastAPI 비동기(Async)가 필수일까? (0) | 2025.12.18 |
| [FastAPI/AI] OCR, LLM(Ollama), RAG 구현 시 절대 BackgroundTasks만 믿으면 안 되는 이유 (0) | 2025.12.13 |
| [FastAPI] 미들웨어 - 작동 원리부터 ELK 연동을 위한 JSON 구조화 로깅까지 (0) | 2025.12.13 |
| [FastAPI] 미들웨어(Middleware) - 개념부터 커스텀 구현까지 (0) | 2025.12.12 |