[FastAPI - SSE Streaming] FastAPI SSE 스트리밍(StreamingResponse) 실전 적용기: 동기 generator를 비동기로 “진짜 스트리밍” 만들기

2025. 12. 19. 14:21·ServerDev/FastAPI

사내 챗봇/LLM 기능을 붙이면서, 답변을 한 번에 반환하는 게 아니라 토큰(혹은 chunk) 단위로 프론트에 흘려주는 SSE(또는 스트리밍 응답)를 구현해야 했습니다!

(기존에 MVP 용으로 개발했던 FastAPI 서버를 다 버리고 다시 새롭게 구현..ㅜㅜ.이제는 처음부터 끝까지 제가 하나씩 고쳐가고 패키지 구조를 잡고, common 모듈로 분리할 수 있고,, 재사용성을 고려한 코드 설계를 할 수 있다는 장점이 보였지만..

Spring 쪽을 하며 Fast 쪽도 하려니 조금 힘들 거 같아, SpringBoot 추가 기능 개발은 제 손을 떠나갔습니다..다만 FastAPI 로 MVC를 구현하는 것은 좀 오랜만이라 공부하며 하다보니 느릿느릿하게 개발하게 된 듯.. )

 

FastAPI에서 StreamingResponse는 간단해 보이지만, 실제로는 “동기/비동기 경계”, “블로킹 I/O”, “버퍼링”, “저장 로직 위치” 때문에 한 번에 되는 경우가 거의 없습니다. 이번 글은 제가 해당 기능을 개발하며 실제로 겪은 에러 흐름과, 왜 그 에러가 터졌는지, 그리고 어떻게 고쳤는지를 기록해놨습니다 ! (나중에 다시 보며 상기해야죵..)


1. 개발 배경: 왜 SSE(스트리밍)가 필요했나

LLM 답변을 사용자에게 빠르게 보여주려면, 모델이 전체 답을 다 만든 뒤 반환하는 방식보다 생성되는 대로 바로 출력하는 방식이 UX가 훨씬 좋다. 그래서 백엔드는 다음과 같은 구조를 목표로 했습니다. 

 

  • 프론트 → FastAPI: 채팅 요청(POST)
  • FastAPI → Ollama: 스트리밍 생성 요청
  • FastAPI → 프론트: 생성되는 chunk를 즉시 전송
스트리밍이 끝나면: 전체 답(full_answer)을 컨텍스트 저장 API(Spring)로 저장
그런데 이제 여기서 FastAPI 로 들어온 요청 중 데이터가 저장되어야 하는 것들은 SpringBoot로 전달해야되는 통신이 여러개 붙었음..

 

문제는 여기서 “Ollama 스트림 읽기”는 requests 기반이라 동기(generator)로 돌아가고, FastAPI의 StreamingResponse는 이벤트 루프(비동기 실행 컨텍스트) 위에서 동작한다는 점...


2. 초기 구현: 겉으로는 맞아 보였지만 실제로는 깨졌다

처음엔 StreamingResponse에 async generator를 감싸서 넣었습니다. 
( 사실 이것도 기존 코드를 다 뜯어 고쳤었음.. 원래 SSE 만 되어있었는데, 여기서 SpringBoot 와의 통신이 하나 들어갔어야 됐어서.. 고민고민하다가 finally 때 처리하기로 함.)

@router.post("")
def chat_stream(req: ChatRequest):


        async def proxy_stream():
            answer_chunks = []
            try:
                async for chunk in generate_stream_response(prompt):
                    answer_chunks.append(chunk)
                    yield chunk
            finally:
                full_answer = "".join(answer_chunks).strip()
                save_context_and_metadata(
            
                )

        return StreamingResponse(proxy_stream(), media_type="text/plain; charset=utf-8")

 

여기서 generate_stream_response()는 이런 형태? 였습니다.

def generate_stream_response(prompt: str):
    url = f"{settings.OLLAMA_BASE_URL}/api/generate"
    payload = {"model": settings.DEFAULT_MODEL, "prompt": prompt, "stream": True}

    with requests.post(url, json=payload, stream=True, timeout=settings.REQUEST_TIMEOUT) as r:
        r.raise_for_status()
        for line in r.iter_lines():
            if not line:
                continue
            data = json.loads(line.decode("utf-8"))
            if "response" in data:
                yield data["response"]
            if data.get("done"):
                break

 

겉으로 보면 “async for로 chunk를 계속 yield”하는 것처럼 보이지만, 실제로는 바로 에러...


3. 터졌던 에러 1: async for는 async iterator만 가능하다

에러 메시지 핵심

TypeError: 'async for' requires an object with __aiter__ method, got generator

원인은?

  • generate_stream_response()는 def로 정의된 “동기 generator”
  • async for는 __aiter__를 가진 “비동기 iterator”만 순회 가능

즉, “동기 generator를 비동기처럼 돌릴 수 없다”가 첫 번째 문제였습니다..


4. 동기/비동기 경계: 왜 여기서 동기면 안 되고, 비동기면 안 되나

이 지점에서 개념을 정리!

4.1 FastAPI 이벤트 루프에서 동기 작업을 돌리면 안 되는 이유

FastAPI(정확히는 Starlette/Uvicorn)는 이벤트 루프 기반이다. 이벤트 루프는 동시에 여러 요청을 처리하지만, 그 전제는 각 요청 핸들러가 오래 막히지 않는다......

requests.post(..., stream=True)는 블로킹 I/O입니다.

즉, 네트워크에서 줄 하나 받을 때까지 현재 스레드를 점유해서. 이걸 이벤트 루프 스레드에서 실행하면 다음 문제가 생깁니다. 

  • 다른 요청이 들어와도 이벤트 루프가 막혀서 응답이 느려짐
  • 스트리밍 중간에 지연이 커지고, 프론트에 chunk가 늦게 도착
  • 특정 환경에서는 “모아서 한 번에 나가는 것처럼” 보이는 현상까지 발생

4.2 그런데 “비동기만 쓰면” 되는 게 아닌 이유

그렇다고 async def generate_stream_response()로 바꾸면 끝이냐? 노노..

지금 Ollama 호출은 requests임.

requests는 async 라이브러리가 아닙니다.... await requests.post(...) 같은 건 존재하지 않는다...

즉,

  • 외부 호출을 async로 만들려면 httpx.AsyncClient 같은 라이브러리로 전환하거나
  • 동기 호출을 이벤트 루프 밖(스레드)에서 돌리고, 결과만 async로 전달해야 한다

이번 프로젝트에서는 “기존 requests 기반 코드를 크게 바꾸지 않고” 해결해야 했기 때문에 두 번째 방식을 택했습니다! 


5. 해결책: 동기 generator를 스레드에서 돌리고, async로 흘려보내는 브릿지

핵심은 

  • producer(동기): 스레드에서 for chunk in gen_factory(): ...
  • consumer(비동기): async generator에서 채널을 async for로 읽어서 yield

이를 위해 anyio의 memory object stream을 사용했습니다.

import anyio
from anyio import create_memory_object_stream
from typing import Callable, Iterator, AsyncIterator

async def stream_sync_generator(
    gen_factory: Callable[[], Iterator[str]],
    *,
    max_buffer_size: int = 200,
) -> AsyncIterator[str]:

    send_chan, recv_chan = create_memory_object_stream(max_buffer_size)

    def producer():
        try:
            for chunk in gen_factory():
                send_chan.send_nowait(chunk)
        finally:
            send_chan.close()

    async with anyio.create_task_group() as tg:
        tg.start_soon(anyio.to_thread.run_sync, producer)

        async for chunk in recv_chan:
            yield chunk

 

여기서 중요한 포인트는 create_memory_object_stream 사용법임

 

처음엔 아래처럼 작성했다가 에러가 났습니당.

create_memory_object_stream(str, max_buffer_size=max_buffer_size)

그리고 에러는 이렇게 터짐 ...

TypeError: create_memory_object_stream.__new__() got multiple values for argument 'max_buffer_size'

anyio의 create_memory_object_stream는 타입을 인자로 받지 않는다. 첫 번째 인자는 max_buffer_size다. 타입을 넘기면, 내부적으로 max_buffer_size 자리에 들어가버려 충돌한다.

 

그래서 올바른 호출은 아래처럼 해야 합니다.

 

 

send_chan, recv_chan = create_memory_object_stream(max_buffer_size)

 

 


 

6. 저장 로직에서 터진 에러 2: run_sync에 kwargs를 넣으면 스트림이 깨진다

 

스트리밍이 끝난 후 full_answer를 Spring으로 저장하려고 finally에서 저장을 호출했다. 이때 저장 함수는 requests.put(...) 같은 블로킹 호출이므로 이벤트 루프를 막지 않기 위해 anyio.to_thread.run_sync로 넘겼습니다.

그런데 여기서 또 문제가 생김.

다음처럼 kwargs를 직접 넘기면 anyio 버전에 따라 에러가 나면서, 스트리밍 자체가 종료 시점에 깨집니다.

 

await anyio.to_thread.run_sync(
    save_context_and_metadata,
    userId=req.user_id,
    roomId=req.room_id,
    messageId="",
    message=full_answer,
)

 

그래서 이 부분은 functools.partial로 해결!. “인자가 바인딩된 함수”를 만들어서 run_sync에는 함수 하나만 넘겨주는 방식입니다.

from functools import partial

await anyio.to_thread.run_sync(
    partial(
        save_context_and_metadata,
        userId=req.user_id,
        roomId=req.room_id,
        messageId="",
        message=full_answer,
        context=ctx.context or "",
    )
)

 

여기서도 실수 포인트가 있었는데, 저장 요청 바디에 context가 필수인데, 이를 누락하면 Spring에서 400을 반환하고, raise_for_status()에서 또 예외가 발생한다.

결국 스트리밍 끝자락에서 저장이 실패하면서 “응답이 끊긴 것처럼” 보일 수 있습니다.

그래서 저장 payload의 required 필드는 모두 채우는 것이 중요.


7. 최종 구현: “스트리밍은 계속 흘러가고, 끝나면 저장”이 되는 구조

최종적으로 컨트롤러는 이렇게 정리

from fastapi.responses import StreamingResponse
from functools import partial
import anyio

@router.post("")
def chat_stream(req: ChatRequest):
    
    """

        async def proxy_stream():
            answer_chunks: list[str] = []
            try:
                async for chunk in stream_sync_generator(lambda: generate_stream_response(prompt)):
                    answer_chunks.append(chunk)
                    yield chunk
            finally:
                full_answer = "".join(answer_chunks).strip()

                await anyio.to_thread.run_sync(
                    partial(

                    )
                )

        return StreamingResponse(proxy_stream(), media_type="text/plain; charset=utf-8")

8. 정리: SSE/스트리밍 구현에서 내가 얻은 교훈

  1. StreamingResponse는 “그냥 yield 하면 되겠지” 수준이 아니다. 동기/비동기 경계가 핵심이다.
  2. requests 기반 스트림은 동기다. 이벤트 루프에서 직접 돌리면 지연/버퍼링/장애로 이어진다.
  3. 동기 generator를 그대로 유지하고 싶으면, 스레드에서 돌리고 async로 흘리는 브릿지가 필요하다.
  4. 스트리밍 종료 후 저장 같은 블로킹 작업은 반드시 스레드로 빼야 한다. 그렇지 않으면 마지막에 스트림이 깨질 수 있다.
  5. anyio/Starlette는 버전에 따라 함수 시그니처/kwargs 처리 차이가 있어서, partial 같은 안전장치가 실전에서 매우 유용했다.

 


 

9. 다음에 개선할 포인트

 

이번 구현은 “기존 requests/Ollama 호출을 최대한 유지”하면서 스트리밍을 살린 방식!

더 깔끔하게 가려면 다음도 고려할 수 있을 듯 해용.

 

  • Ollama 호출을 httpx.AsyncClient로 바꾸고, 진짜 async 스트림으로 처리
  • 프론트에서 fetch streaming으로 chunk를 즉시 렌더링하는 방식 정리
  • 스트림 중간에 클라이언트가 끊었을 때 저장 로직을 스킵하는 disconnect 처리

이 글은 제가 실제로 SSE/스트리밍을 붙이면서 “왜 동기면 안 되는지”, “왜 무조건 비동기가 정답도 아닌지”, 그리고 “실무에서 어떤 형태로 연결해야 하는지”를 에러 기반으로 정리한 기록입니다.

다음 글에서는 Nginx 뒤에서 스트리밍이 “한꺼번에” 모여서 오는 문제까지 포함해서, 배포 환경에서의 최종 점검 포인트를 더 정리해볼 생각입니다!!!!

'ServerDev > FastAPI' 카테고리의 다른 글

[FastAPI 운영 로그 설계] 로그 포맷과 클라이언트 IP  (0) 2025.12.27
[FastAPI] 30명 동시 접속 RAG 챗봇, 왜 FastAPI 비동기(Async)가 필수일까?  (0) 2025.12.18
[FastAPI/AI] OCR, LLM(Ollama), RAG 구현 시 절대 BackgroundTasks만 믿으면 안 되는 이유  (0) 2025.12.13
[FastAPI] 미들웨어 - 작동 원리부터 ELK 연동을 위한 JSON 구조화 로깅까지  (0) 2025.12.13
[FastAPI] 미들웨어(Middleware) - 개념부터 커스텀 구현까지  (0) 2025.12.12
'ServerDev/FastAPI' 카테고리의 다른 글
  • [FastAPI 운영 로그 설계] 로그 포맷과 클라이언트 IP
  • [FastAPI] 30명 동시 접속 RAG 챗봇, 왜 FastAPI 비동기(Async)가 필수일까?
  • [FastAPI/AI] OCR, LLM(Ollama), RAG 구현 시 절대 BackgroundTasks만 믿으면 안 되는 이유
  • [FastAPI] 미들웨어 - 작동 원리부터 ELK 연동을 위한 JSON 구조화 로깅까지
yeseul-kim01
yeseul-kim01
  • yeseul-kim01
    슬 개발일지
    yeseul-kim01
  • 전체
    오늘
    어제
    • 분류 전체보기 (79)
      • 자격증 (1)
        • 정보보안기사 (0)
      • DevOps (17)
        • Docker (6)
        • Kubernetes (1)
        • GitHub Actions (0)
        • AWS (4)
        • Monitoring (1)
        • Nginx (1)
        • GCP (3)
      • ServerDev (34)
        • SpringBoot (13)
        • DJango (5)
        • FastAPI (14)
        • Next (0)
        • Flask (0)
        • Database (2)
      • Algorithm (2)
        • BFS (0)
        • DFS (1)
        • 다익스트라 (0)
      • CS (8)
      • Data Engineering (1)
      • AI&MLOps (2)
      • Architecture (6)
      • Software Engineering (0)
        • Library Packaging (0)
      • Project (5)
        • docx-generator (0)
        • speak-note (2)
        • ms-serving (1)
        • keyshield (2)
      • ProgrammingLanguages (3)
        • Python (1)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 공지사항

  • 인기 글

  • 태그

    실무일기-인프라편
    depends
    비동기처리
    MLops
    아키텍처
    KServe
    NLP부트캠프
    STT
    실무일기-백엔드편
    백엔드
    FastAPI
    멀티모듈
    di
    프로젝트기록-KeyShield
    Kubernetes
    하이브리드아키텍처
    grpc
    SpeakNote
    SpringBoot
    프로젝트기록-speaknote
    KeyShield
    트러블슈팅
    Django
    아키텍처설계
    실시간시스템
    docker
    rag
    FastAPI - CORS 마스터
    multipartfile
    동시성제어
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.5
yeseul-kim01
[FastAPI - SSE Streaming] FastAPI SSE 스트리밍(StreamingResponse) 실전 적용기: 동기 generator를 비동기로 “진짜 스트리밍” 만들기
상단으로

티스토리툴바