Simulate streaming API with FastAPI

The server code

import asyncio
import uvicorn
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

async def fake_text_streaming():
    for i in range(1000):
        yield b"Fake text #" + str(i).encode() + b"\n"
        await asyncio.sleep(1)

@app.post("/")
@app.get("/")
async def main():
    return StreamingResponse(fake_text_streaming())

if __name__ == '__main__':
    uvicorn.run(app, host="0.0.0.0", port=8000)

How to test?

After the server starting, use curl to test:

curl http://0.0.0.0:8000/

Fake text #0
Fake text #1
Fake text #2
Fake text #3
Fake text #4
Fake text #5
Fake text #6
Fake text #7
Fake text #8
Fake text #9
Fake text #10

Leave a Reply

Your email address will not be published. Required fields are marked *

Name *