FastAPI inside docker stopped receiving any requests after a while - Stack Overflow

I have a FastAPI app running inside docker which is deployed using portainer. Works fine after a few mi

I have a FastAPI app running inside docker which is deployed using portainer. Works fine after a few minutes but then suddenly it stops receiving any requests. I don't see any requests in the logs, instead when doing curl on the docker bridge port, it just simply forever hangs.

The portainer setup is basic, with only randomized port mappings.

FROM python:3.10-slim

ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
ENV POETRY_VERSION=1.8.2

WORKDIR /app

RUN apt-get update && apt-get install -y \
    curl \
    build-essential \
    postgresql-client \
    libpq-dev \
    ca-certificates \
    && rm -rf /var/lib/apt/lists/*

RUN curl -sSL  | python3 -

ENV PATH="${PATH}:/root/.local/bin"

RUN mkdir -p $HOME/.postgresql

RUN curl --create-dirs -o $HOME/.postgresql/root.crt '/clusters/.../cert'

COPY pyproject.toml poetry.lock* ./

RUN poetry config virtualenvs.create false \
    && poetry install --no-interaction --no-ansi

COPY . .

EXPOSE 8121

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8121"]

The project is also splinted into a module.

main.py file looks like this

import secrets
from datetime import datetime
from typing import List

import fastapi
import uvicorn
from fastapi import Depends, HTTPException, Security
from fastapi.security import APIKeyHeader

from gpt_proxy.utils import mask_token

from .config import ADMIN_KEY, log
from .db import USERS, add_user, db_close, db_init, del_user
from .firebase_manager import Firebase
from .models import TokenCreate, TokenResponse, UserToken
from .openai_forward import OpenAiForward

app = fastapi.FastAPI()
forwarder = OpenAiForward()
api_key_header = APIKeyHeader(name="X-Admin-Key", auto_error=True)


def verify_admin_key(api_key: str = Security(api_key_header)):
    if api_key != ADMIN_KEY:
        raise HTTPException(status_code=403, detail="Invalid admin key")
    return api_key


@app.on_event("startup")
async def startup():
    log.info("Starting up OpenAI Forward application")
    await db_init()
    log.info("Application startup complete")


@app.on_event("shutdown")
async def shutdown():
    log.info("Shutting down OpenAI Forward application")
    if forwarder.client:
        await forwarder.client.close()
    await db_close()
    log.info("Application shutdown complete")


@app.post("/tokens", response_model=TokenResponse)
async def create_token(
    token_request: TokenCreate, api_key: str = Depends(verify_admin_key)
):
    new_token = f"mn-{secrets.token_urlsafe(32)}"
    return await add_user(username=token_request.username, token=new_token)


@app.delete("/tokens/{username}")
async def delete_token(username: str, api_key: str = Depends(verify_admin_key)):
    await del_user(username)
    return {"message": f"Token for user {username} has been deleted"}


@app.get("/tokens", response_model=List[UserToken])
async def list_users(api_key: str = Depends(verify_admin_key)):
    users = []
    for user in USERS:
        user["token"] = mask_token(user["token"])
        users.append(user)
    return users


@app.route(
    "/{api_path:path}",
    methods=["GET", "POST", "PUT", "DELETE", "OPTIONS", "HEAD", "PATCH", "TRACE"],
)
async def _handle_openai_request(request: fastapi.Request):
    return await forwarder.reverse_proxy(request)


if __name__ == "__main__":
    log.info("Starting OpenAI Forward server")
    uvicorn.run(app, host="0.0.0.0", port=8010)

Database initialization:

import time

from tortoise import Tortoise

from ..config import log
from .models import FirebaseToken, User

USERS = []


async def _do_migration() -> None:
    old_users = {
        ....
    }

    for k, v in old_users.items():
        _ = await User.get_or_create(username=k, token=v, created_at=time.time())


async def _get_all_users() -> None:
    users = await User.all()
    if not users:
        await _do_migration()
        await _get_all_users()
    for user in users:
        USERS.append(dict(user))


async def db_init() -> None:
    await Tortoise.init(db_url=DB_URI, modules={"models": ["gpt_proxy.db.models"]})

    await Tortoise.generate_schemas()
    await _get_all_users()

Openai forward class code. Not sure if this has any relevance since previous requests work just fine. So I'm thinking the blocking is somewhere else

import asyncio
import time
from functools import wraps
from typing import Any, AsyncGenerator, Callable, Tuple, Type, TypeVar

import aiohttp
import anyio
import fastapi
from fastapi import HTTPException
from starlette.responses import BackgroundTask, StreamingResponse

from .config import log
from .firebase_manager import Firebase
from .models import ClientConfig
from .utils import get_token_user, header_cloudflare_safe, mask_token

T = TypeVar("T")


def async_retry(
    max_retries: int = 3,
    delay: float = 1.0,
    backoff: float = 2.0,
    exceptions: Tuple[Type[Exception], ...] = (Exception,),
) -> Callable:
    def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
        @wraps(func)
        async def wrapper(*args: Any, **kwargs: Any) -> T | None:
            current_delay = delay
            for attempt in range(max_retries + 1):
                try:
                    if attempt > 0:
                        log.info(
                            f"Retrying {func.__name__}, attempt {attempt}/{max_retries} "
                            f"after {current_delay:.2f}s delay"
                        )
                        await anyio.sleep(current_delay)
                        current_delay *= backoff

                    return await func(*args, **kwargs)

                except exceptions as e:
                    log.warning(
                        f"Attempt {attempt + 1}/{max_retries + 1} failed for {func.__name__}: "
                        f"{type(e).__name__}: {str(e)}"
                    )

                    if attempt == max_retries:
                        log.error(
                            f"All retry attempts failed for {func.__name__}. "
                            f"Final exception: {type(e).__name__}: {str(e)}"
                        )
                        raise

            return None

        return wrapper

    return decorator


class OpenAiForward:
    def __init__(self) -> None:
        log.info("Initializing OpenAI Forward")
        self.base_url = "/"
        self.client: aiohttp.ClientSession | None = None
        self.firebase = Firebase()

    async def _init_client(self) -> None:
        if self.client is None:
            log.info("Initializing aiohttp client session")
            tcp_connector = aiohttp.TCPConnector(
                limit=500, limit_per_host=0, force_close=False
            )
            self.client = aiohttp.ClientSession(connector=tcp_connector)
            log.info("aiohttp client session initialized")

    async def _get_token(self, token: str):
        log.info(f"Processing token {mask_token(token)}")
        if token.startswith("mn-"):
            username = await get_token_user(token)
            if not username:
                log.info("Using direct token")
                return None, token
            fb_token = await self.firebase.get_token()
            return username, fb_token
        else:
            log.info("Using direct token")
            return None, token

    async def iter_bytes(
        self, response: aiohttp.ClientResponse, request: fastapi.Request
    ) -> AsyncGenerator[bytes, Any]:
        log.info(f"Streaming response for {request.url.path}")
        async for chunk, _ in response.content.iter_chunks():
            yield chunk

    @async_retry(
        max_retries=3,
        delay=0.2,
        backoff=0.2,
        exceptions=(
            aiohttp.ServerTimeoutError,
            aiohttp.ServerConnectionError,
            aiohttp.ServerDisconnectedError,
            asyncio.TimeoutError,
            anyio.EndOfStream,
            RuntimeError,
        ),
    )
    async def send(
        self, client_config: ClientConfig, data: dict | None = None
    ) -> aiohttp.client.ClientRequest | Any | None:
        if not self.client:
            await self._init_client()

        log.info(f"Sending {client_config.method} request to {client_config.url}")
        if self.client:
            return await self.client.request(
                method=client_config.method,
                url=client_config.url,
                data=data,
                headers=client_config.headers,
            )
        return None

    async def prepare_config(self, request: fastapi.Request) -> ClientConfig:
        headers: dict = header_cloudflare_safe(request)
        original_bearer: str = headers.get(
            "Authorization", headers.get("authorization")
        )

        if original_bearer:
            token: str = original_bearer.split()[-1].strip()
            user, replacement_token = await self._get_token(token)

            if replacement_token is None:
                raise HTTPException(status_code=401, detail="Invalid token")

            auth_header = f"Bearer {replacement_token}"
            if "Authorization" in headers:
                headers["Authorization"] = auth_header
            elif "authorization" in headers:
                headers["authorization"] = auth_header

            log.info(
                f"Token processing: User={user or 'direct'}, "
                f"Using={'Firebase' if user else 'direct'} token"
            )

        url = f"/{request.url.path}"
        if request.url.query:
            url = f"{url}?{request.url.query}"

        return ClientConfig(
            headers=headers,
            method=request.method,
            url=url,
        )

    async def reverse_proxy(self, request: fastapi.Request) -> StreamingResponse:
        request_id = str(time.time())
        log.info(
            f"[{request_id}] Incoming request: {request.method} {request.url.path}"
        )

        config = await self.prepare_config(request)
        body = await request.body()
        data = body if body else None

        try:
            log.info(f"[{request_id}] Forwarding request to OpenAI")
            response = await self.send(config, data=data)
            log.info(f"[{request_id}] OpenAI response received: {response.status}")

            return StreamingResponse(
                self.iter_bytes(response, request),
                status_code=response.status,
                media_type=response.headers.get("content-type"),
                background=BackgroundTask(response.release),
            )

        except aiohttp.ClientError as e:
            log.exception(f"[{request_id}] Failed to forward request to OpenAI")
            raise fastapi.HTTPException(
                status_code=fastapi.status.HTTP_502_BAD_GATEWAY,
                detail=f"Failed to forward request: {str(e)}",
            )
        except Exception as e:
            log.exception(f"[{request_id}] Unexpected error during request forwarding")
            raise fastapi.HTTPException(
                status_code=fastapi.status.HTTP_500_INTERNAL_SERVER_ERROR,
                detail=f"Internal server error: {str(e)}",
            )

I have a FastAPI app running inside docker which is deployed using portainer. Works fine after a few minutes but then suddenly it stops receiving any requests. I don't see any requests in the logs, instead when doing curl on the docker bridge port, it just simply forever hangs.

The portainer setup is basic, with only randomized port mappings.

FROM python:3.10-slim

ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
ENV POETRY_VERSION=1.8.2

WORKDIR /app

RUN apt-get update && apt-get install -y \
    curl \
    build-essential \
    postgresql-client \
    libpq-dev \
    ca-certificates \
    && rm -rf /var/lib/apt/lists/*

RUN curl -sSL https://install.python-poetry. | python3 -

ENV PATH="${PATH}:/root/.local/bin"

RUN mkdir -p $HOME/.postgresql

RUN curl --create-dirs -o $HOME/.postgresql/root.crt 'https://cockroachlabs.cloud/clusters/.../cert'

COPY pyproject.toml poetry.lock* ./

RUN poetry config virtualenvs.create false \
    && poetry install --no-interaction --no-ansi

COPY . .

EXPOSE 8121

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8121"]

The project is also splinted into a module.

main.py file looks like this

import secrets
from datetime import datetime
from typing import List

import fastapi
import uvicorn
from fastapi import Depends, HTTPException, Security
from fastapi.security import APIKeyHeader

from gpt_proxy.utils import mask_token

from .config import ADMIN_KEY, log
from .db import USERS, add_user, db_close, db_init, del_user
from .firebase_manager import Firebase
from .models import TokenCreate, TokenResponse, UserToken
from .openai_forward import OpenAiForward

app = fastapi.FastAPI()
forwarder = OpenAiForward()
api_key_header = APIKeyHeader(name="X-Admin-Key", auto_error=True)


def verify_admin_key(api_key: str = Security(api_key_header)):
    if api_key != ADMIN_KEY:
        raise HTTPException(status_code=403, detail="Invalid admin key")
    return api_key


@app.on_event("startup")
async def startup():
    log.info("Starting up OpenAI Forward application")
    await db_init()
    log.info("Application startup complete")


@app.on_event("shutdown")
async def shutdown():
    log.info("Shutting down OpenAI Forward application")
    if forwarder.client:
        await forwarder.client.close()
    await db_close()
    log.info("Application shutdown complete")


@app.post("/tokens", response_model=TokenResponse)
async def create_token(
    token_request: TokenCreate, api_key: str = Depends(verify_admin_key)
):
    new_token = f"mn-{secrets.token_urlsafe(32)}"
    return await add_user(username=token_request.username, token=new_token)


@app.delete("/tokens/{username}")
async def delete_token(username: str, api_key: str = Depends(verify_admin_key)):
    await del_user(username)
    return {"message": f"Token for user {username} has been deleted"}


@app.get("/tokens", response_model=List[UserToken])
async def list_users(api_key: str = Depends(verify_admin_key)):
    users = []
    for user in USERS:
        user["token"] = mask_token(user["token"])
        users.append(user)
    return users


@app.route(
    "/{api_path:path}",
    methods=["GET", "POST", "PUT", "DELETE", "OPTIONS", "HEAD", "PATCH", "TRACE"],
)
async def _handle_openai_request(request: fastapi.Request):
    return await forwarder.reverse_proxy(request)


if __name__ == "__main__":
    log.info("Starting OpenAI Forward server")
    uvicorn.run(app, host="0.0.0.0", port=8010)

Database initialization:

import time

from tortoise import Tortoise

from ..config import log
from .models import FirebaseToken, User

USERS = []


async def _do_migration() -> None:
    old_users = {
        ....
    }

    for k, v in old_users.items():
        _ = await User.get_or_create(username=k, token=v, created_at=time.time())


async def _get_all_users() -> None:
    users = await User.all()
    if not users:
        await _do_migration()
        await _get_all_users()
    for user in users:
        USERS.append(dict(user))


async def db_init() -> None:
    await Tortoise.init(db_url=DB_URI, modules={"models": ["gpt_proxy.db.models"]})

    await Tortoise.generate_schemas()
    await _get_all_users()

Openai forward class code. Not sure if this has any relevance since previous requests work just fine. So I'm thinking the blocking is somewhere else

import asyncio
import time
from functools import wraps
from typing import Any, AsyncGenerator, Callable, Tuple, Type, TypeVar

import aiohttp
import anyio
import fastapi
from fastapi import HTTPException
from starlette.responses import BackgroundTask, StreamingResponse

from .config import log
from .firebase_manager import Firebase
from .models import ClientConfig
from .utils import get_token_user, header_cloudflare_safe, mask_token

T = TypeVar("T")


def async_retry(
    max_retries: int = 3,
    delay: float = 1.0,
    backoff: float = 2.0,
    exceptions: Tuple[Type[Exception], ...] = (Exception,),
) -> Callable:
    def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
        @wraps(func)
        async def wrapper(*args: Any, **kwargs: Any) -> T | None:
            current_delay = delay
            for attempt in range(max_retries + 1):
                try:
                    if attempt > 0:
                        log.info(
                            f"Retrying {func.__name__}, attempt {attempt}/{max_retries} "
                            f"after {current_delay:.2f}s delay"
                        )
                        await anyio.sleep(current_delay)
                        current_delay *= backoff

                    return await func(*args, **kwargs)

                except exceptions as e:
                    log.warning(
                        f"Attempt {attempt + 1}/{max_retries + 1} failed for {func.__name__}: "
                        f"{type(e).__name__}: {str(e)}"
                    )

                    if attempt == max_retries:
                        log.error(
                            f"All retry attempts failed for {func.__name__}. "
                            f"Final exception: {type(e).__name__}: {str(e)}"
                        )
                        raise

            return None

        return wrapper

    return decorator


class OpenAiForward:
    def __init__(self) -> None:
        log.info("Initializing OpenAI Forward")
        self.base_url = "https://api.openai/"
        self.client: aiohttp.ClientSession | None = None
        self.firebase = Firebase()

    async def _init_client(self) -> None:
        if self.client is None:
            log.info("Initializing aiohttp client session")
            tcp_connector = aiohttp.TCPConnector(
                limit=500, limit_per_host=0, force_close=False
            )
            self.client = aiohttp.ClientSession(connector=tcp_connector)
            log.info("aiohttp client session initialized")

    async def _get_token(self, token: str):
        log.info(f"Processing token {mask_token(token)}")
        if token.startswith("mn-"):
            username = await get_token_user(token)
            if not username:
                log.info("Using direct token")
                return None, token
            fb_token = await self.firebase.get_token()
            return username, fb_token
        else:
            log.info("Using direct token")
            return None, token

    async def iter_bytes(
        self, response: aiohttp.ClientResponse, request: fastapi.Request
    ) -> AsyncGenerator[bytes, Any]:
        log.info(f"Streaming response for {request.url.path}")
        async for chunk, _ in response.content.iter_chunks():
            yield chunk

    @async_retry(
        max_retries=3,
        delay=0.2,
        backoff=0.2,
        exceptions=(
            aiohttp.ServerTimeoutError,
            aiohttp.ServerConnectionError,
            aiohttp.ServerDisconnectedError,
            asyncio.TimeoutError,
            anyio.EndOfStream,
            RuntimeError,
        ),
    )
    async def send(
        self, client_config: ClientConfig, data: dict | None = None
    ) -> aiohttp.client.ClientRequest | Any | None:
        if not self.client:
            await self._init_client()

        log.info(f"Sending {client_config.method} request to {client_config.url}")
        if self.client:
            return await self.client.request(
                method=client_config.method,
                url=client_config.url,
                data=data,
                headers=client_config.headers,
            )
        return None

    async def prepare_config(self, request: fastapi.Request) -> ClientConfig:
        headers: dict = header_cloudflare_safe(request)
        original_bearer: str = headers.get(
            "Authorization", headers.get("authorization")
        )

        if original_bearer:
            token: str = original_bearer.split()[-1].strip()
            user, replacement_token = await self._get_token(token)

            if replacement_token is None:
                raise HTTPException(status_code=401, detail="Invalid token")

            auth_header = f"Bearer {replacement_token}"
            if "Authorization" in headers:
                headers["Authorization"] = auth_header
            elif "authorization" in headers:
                headers["authorization"] = auth_header

            log.info(
                f"Token processing: User={user or 'direct'}, "
                f"Using={'Firebase' if user else 'direct'} token"
            )

        url = f"https://api.openai/{request.url.path}"
        if request.url.query:
            url = f"{url}?{request.url.query}"

        return ClientConfig(
            headers=headers,
            method=request.method,
            url=url,
        )

    async def reverse_proxy(self, request: fastapi.Request) -> StreamingResponse:
        request_id = str(time.time())
        log.info(
            f"[{request_id}] Incoming request: {request.method} {request.url.path}"
        )

        config = await self.prepare_config(request)
        body = await request.body()
        data = body if body else None

        try:
            log.info(f"[{request_id}] Forwarding request to OpenAI")
            response = await self.send(config, data=data)
            log.info(f"[{request_id}] OpenAI response received: {response.status}")

            return StreamingResponse(
                self.iter_bytes(response, request),
                status_code=response.status,
                media_type=response.headers.get("content-type"),
                background=BackgroundTask(response.release),
            )

        except aiohttp.ClientError as e:
            log.exception(f"[{request_id}] Failed to forward request to OpenAI")
            raise fastapi.HTTPException(
                status_code=fastapi.status.HTTP_502_BAD_GATEWAY,
                detail=f"Failed to forward request: {str(e)}",
            )
        except Exception as e:
            log.exception(f"[{request_id}] Unexpected error during request forwarding")
            raise fastapi.HTTPException(
                status_code=fastapi.status.HTTP_500_INTERNAL_SERVER_ERROR,
                detail=f"Internal server error: {str(e)}",
            )
Share Improve this question edited Nov 19, 2024 at 9:42 Chris 35.6k10 gold badges105 silver badges251 bronze badges asked Nov 19, 2024 at 8:46 MarcelMarcel 811 silver badge4 bronze badges 4
  • Please extract a minimal reproducible example, which should answer whether the PostgreSQL DB has any influence on that, for example. – Ulrich Eckhardt Commented Nov 19, 2024 at 8:48
  • @UlrichEckhardt right. I've included more details – Marcel Commented Nov 19, 2024 at 8:57
  • Attaching some sort of debugger that allows you to see exactly what code the python interpreter is currently running could be helpful: stackoverflow/questions/25308847/… - I'd also advice having a few log statments on your happy path so that you can see when requests succeed - it might be helpful to discover where the deadlock happens. – MatsLindh Commented Nov 19, 2024 at 10:00
  • @MatsLindh thanks for the hint. Anyway I've went through a refractory and the problem seems to be gone. No idea what fixed it. Either because I launched the web app with the built-in fastapi command or that I specified a count of 3 workers. – Marcel Commented Nov 20, 2024 at 19:49
Add a comment  | 

1 Answer 1

Reset to default 1

I've went through a refractory and the problem seems to be gone. No idea what fixed it. Either because I launched the web app with the built-in fastapi command or that I specified the count of workers fastapi can use.

Now my dockerfile looks like this

FROM python:3.10-slim

ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
ENV POETRY_VERSION=1.8.2

WORKDIR /app

RUN apt-get update && apt-get install -y \
    curl \
    build-essential \
    postgresql-client \
    libpq-dev \
    ca-certificates \
    && rm -rf /var/lib/apt/lists/*

RUN curl -sSL https://install.python-poetry. | python3 -

ENV PATH="${PATH}:/root/.local/bin"

RUN mkdir -p $HOME/.postgresql

RUN curl --create-dirs -o $HOME/.postgresql/root.crt 'https://cockroachlabs.cloud/clusters/1234/cert'

COPY pyproject.toml poetry.lock* ./

RUN poetry config virtualenvs.create false \
    && poetry install --no-interaction --no-ansi

COPY . .

EXPOSE 8121

CMD ["fastapi", "run", "main", "--port", "8121", "--workers", "3"]

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1745572515a4633765.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信