go - How to handle HTTP request timeout when using OpenAI streaming API and prevent “http2: response body closed” error? - Stack

I am working on an application that uses the OpenAI API for streaming chat completion. The streaming wo

I am working on an application that uses the OpenAI API for streaming chat completion. The streaming works well, but I want to handle timeouts if the API doesn’t respond within a certain period. When I add a timeout check, I encounter the error "http2: response body closed", and no response is received. Below is the code that works without the timeout check, but it doesn’t handle timeout cases:

Code that works (without timeout check):

func (g *gptAdaptorClient) CreateChatCompletionStream(messages []map[string]string, model string, maxTokens int, temperature float64) (<-chan string, error) {
    url := ";

    // Payload to send to the API
    payload := map[string]interface{}{
        "model":       model,
        "messages":    messages,
        "max_tokens":  maxTokens,
        "temperature": temperature,
        "stream":      true, // Request to return as a stream
    }

    requestBody, err := json.Marshal(payload)
    if err != nil {
        return nil, fmt.Errorf("failed to marshal payload: %w", err)
    }

    // Create HTTP request
    req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestBody))
    if err != nil {
        return nil, fmt.Errorf("failed to create request: %w", err)
    }

    g.addCommonHeaders(req) // Add necessary headers

    // Send request and get response
    resp, err := g.Client.Do(req)
    if err != nil {
        return nil, fmt.Errorf("failed to send request: %w", err)
    }

    // Check HTTP status code
    if resp.StatusCode != http.StatusOK {
        body, _ := io.ReadAll(resp.Body)
        return nil, fmt.Errorf("API error: %s", string(body))
    }

    // Create channel to pass stream data
    dataChannel := make(chan string)

    // Process stream in a goroutine
    go func() {
        defer close(dataChannel)
        defer resp.Body.Close()

        scanner := bufio.NewScanner(resp.Body)
        for scanner.Scan() {
            line := scanner.Text()

            // Check if the line doesn't contain data
            if len(line) < 6 || line[:6] != "data: " {
                continue
            }

            // Extract the JSON content after "data: "
            chunk := line[6:]
            if chunk == "[DONE]" {
                break
            }

            // Send chunk to the channel
            dataChannel <- chunk
        }

        // Check for scanner errors (if any)
        if err := scanner.Err(); err != nil {
            fmt.Printf("Error reading streaming response: %v\n", err)
        }
    }()

    return dataChannel, nil
}

Code with Timeout (with error “http2: response body closed”):

func (g *gptAdaptorClient) CreateChatCompletionStream(messages []map[string]string, model string, maxTokens int, temperature float64) (<-chan string, error) {
    url := ";

    // Payload to send to the API
    payload := map[string]interface{}{
        "model":       model,
        "messages":    messages,
        "max_tokens":  maxTokens,
        "temperature": temperature,
        "stream":      true, // Request to return as a stream
    }

    requestBody, err := json.Marshal(payload)
    if err != nil {
        return nil, fmt.Errorf("failed to marshal payload: %w", err)
    }

    // Create HTTP request
    req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestBody))
    if err != nil {
        return nil, fmt.Errorf("failed to create request: %w", err)
    }

    g.addCommonHeaders(req) // Add necessary headers

    // Set up the context with timeout for HTTP request (10 seconds)
    reqCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    req = req.WithContext(reqCtx) // Send the request with the timeout context

    // Send request and get response
    resp, err := g.Client.Do(req)
    if err != nil {
        // Check if the error is a timeout and switch to Perplexity API
        if reqCtx.Err() == context.DeadlineExceeded {
            fmt.Println("OpenAI API call timed out, switching to Perplexity API...")
            return g.callPerplexityApi(messages, model, maxTokens, temperature)
        }
        return nil, fmt.Errorf("failed to send request: %w", err)
    }
    defer resp.Body.Close()

    // Check HTTP status code
    if resp.StatusCode != http.StatusOK {
        body, _ := io.ReadAll(resp.Body)
        return nil, fmt.Errorf("OpenAI API error: %s", string(body))
    }

    // Create channel to pass stream data
    dataChannel := make(chan string)

    // Process stream in a goroutine
    go func() {
        defer close(dataChannel)

        // Use scanner to read lines from the response body
        scanner := bufio.NewScanner(resp.Body)
        for scanner.Scan() {
            line := scanner.Text()

            // Check if the line doesn't contain data
            if len(line) < 6 || line[:6] != "data: " {
                continue
            }

            // Extract the JSON content after "data: "
            chunk := line[6:]
            if chunk == "[DONE]" {
                break
            }

            // Send chunk to the channel
            dataChannel <- chunk
        }

        // Check for scanner errors (if any)
        if err := scanner.Err(); err != nil {
            fmt.Printf("Error reading streaming response from OpenAI API: %v\n", err)
        }
    }()

    return dataChannel, nil
}

Problem:

  • Current Working Code: The code works fine for streaming without timeout checks.
  • Issue with Timeout: When I add the context timeout for the HTTP request (set to 10 seconds), I encounter the error “http2: response body closed” and no response is received from the OpenAI API.
  • What I expect: I want to ensure that if the OpenAI API does not respond within 10 seconds, I should switch to an alternative API provider like Perplexity.

Question:

  1. Why does adding a context timeout for the HTTP request cause the error "http2: response body closed" when streaming?
  2. How can I add a timeout for the HTTP request while avoiding this issue and ensuring the stream continues properly if the request completes successfully?
  3. Is there a better approach to handle HTTP request timeouts for streaming APIs like OpenAI’s in Go without closing the response body prematurely?

I am working on an application that uses the OpenAI API for streaming chat completion. The streaming works well, but I want to handle timeouts if the API doesn’t respond within a certain period. When I add a timeout check, I encounter the error "http2: response body closed", and no response is received. Below is the code that works without the timeout check, but it doesn’t handle timeout cases:

Code that works (without timeout check):

func (g *gptAdaptorClient) CreateChatCompletionStream(messages []map[string]string, model string, maxTokens int, temperature float64) (<-chan string, error) {
    url := "https://api.openai/v1/chat/completions"

    // Payload to send to the API
    payload := map[string]interface{}{
        "model":       model,
        "messages":    messages,
        "max_tokens":  maxTokens,
        "temperature": temperature,
        "stream":      true, // Request to return as a stream
    }

    requestBody, err := json.Marshal(payload)
    if err != nil {
        return nil, fmt.Errorf("failed to marshal payload: %w", err)
    }

    // Create HTTP request
    req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestBody))
    if err != nil {
        return nil, fmt.Errorf("failed to create request: %w", err)
    }

    g.addCommonHeaders(req) // Add necessary headers

    // Send request and get response
    resp, err := g.Client.Do(req)
    if err != nil {
        return nil, fmt.Errorf("failed to send request: %w", err)
    }

    // Check HTTP status code
    if resp.StatusCode != http.StatusOK {
        body, _ := io.ReadAll(resp.Body)
        return nil, fmt.Errorf("API error: %s", string(body))
    }

    // Create channel to pass stream data
    dataChannel := make(chan string)

    // Process stream in a goroutine
    go func() {
        defer close(dataChannel)
        defer resp.Body.Close()

        scanner := bufio.NewScanner(resp.Body)
        for scanner.Scan() {
            line := scanner.Text()

            // Check if the line doesn't contain data
            if len(line) < 6 || line[:6] != "data: " {
                continue
            }

            // Extract the JSON content after "data: "
            chunk := line[6:]
            if chunk == "[DONE]" {
                break
            }

            // Send chunk to the channel
            dataChannel <- chunk
        }

        // Check for scanner errors (if any)
        if err := scanner.Err(); err != nil {
            fmt.Printf("Error reading streaming response: %v\n", err)
        }
    }()

    return dataChannel, nil
}

Code with Timeout (with error “http2: response body closed”):

func (g *gptAdaptorClient) CreateChatCompletionStream(messages []map[string]string, model string, maxTokens int, temperature float64) (<-chan string, error) {
    url := "https://api.openai/v1/chat/completions"

    // Payload to send to the API
    payload := map[string]interface{}{
        "model":       model,
        "messages":    messages,
        "max_tokens":  maxTokens,
        "temperature": temperature,
        "stream":      true, // Request to return as a stream
    }

    requestBody, err := json.Marshal(payload)
    if err != nil {
        return nil, fmt.Errorf("failed to marshal payload: %w", err)
    }

    // Create HTTP request
    req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestBody))
    if err != nil {
        return nil, fmt.Errorf("failed to create request: %w", err)
    }

    g.addCommonHeaders(req) // Add necessary headers

    // Set up the context with timeout for HTTP request (10 seconds)
    reqCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    req = req.WithContext(reqCtx) // Send the request with the timeout context

    // Send request and get response
    resp, err := g.Client.Do(req)
    if err != nil {
        // Check if the error is a timeout and switch to Perplexity API
        if reqCtx.Err() == context.DeadlineExceeded {
            fmt.Println("OpenAI API call timed out, switching to Perplexity API...")
            return g.callPerplexityApi(messages, model, maxTokens, temperature)
        }
        return nil, fmt.Errorf("failed to send request: %w", err)
    }
    defer resp.Body.Close()

    // Check HTTP status code
    if resp.StatusCode != http.StatusOK {
        body, _ := io.ReadAll(resp.Body)
        return nil, fmt.Errorf("OpenAI API error: %s", string(body))
    }

    // Create channel to pass stream data
    dataChannel := make(chan string)

    // Process stream in a goroutine
    go func() {
        defer close(dataChannel)

        // Use scanner to read lines from the response body
        scanner := bufio.NewScanner(resp.Body)
        for scanner.Scan() {
            line := scanner.Text()

            // Check if the line doesn't contain data
            if len(line) < 6 || line[:6] != "data: " {
                continue
            }

            // Extract the JSON content after "data: "
            chunk := line[6:]
            if chunk == "[DONE]" {
                break
            }

            // Send chunk to the channel
            dataChannel <- chunk
        }

        // Check for scanner errors (if any)
        if err := scanner.Err(); err != nil {
            fmt.Printf("Error reading streaming response from OpenAI API: %v\n", err)
        }
    }()

    return dataChannel, nil
}

Problem:

  • Current Working Code: The code works fine for streaming without timeout checks.
  • Issue with Timeout: When I add the context timeout for the HTTP request (set to 10 seconds), I encounter the error “http2: response body closed” and no response is received from the OpenAI API.
  • What I expect: I want to ensure that if the OpenAI API does not respond within 10 seconds, I should switch to an alternative API provider like Perplexity.

Question:

  1. Why does adding a context timeout for the HTTP request cause the error "http2: response body closed" when streaming?
  2. How can I add a timeout for the HTTP request while avoiding this issue and ensuring the stream continues properly if the request completes successfully?
  3. Is there a better approach to handle HTTP request timeouts for streaming APIs like OpenAI’s in Go without closing the response body prematurely?
Share Improve this question asked Feb 4 at 18:19 minhminh 1352 gold badges2 silver badges7 bronze badges 2
  • 1 The transfer of the response body is part of the entire "request" cycle, so is subject to the request timeout. Do you just want a timeout of the initial response header? Your http transport can handle more detailed timeouts, e.g. ResponseHeaderTimeout. – Mr_Pink Commented Feb 4 at 19:37
  • 1 When a timeout occurs, no response is received from the OpenAI API. That's why the timeout occurred. Therefore your question (2) doesn't make sense. – user207421 Commented Feb 5 at 5:45
Add a comment  | 

1 Answer 1

Reset to default -1

To handle timeout in http request the proper way is to use httpclient setup like this:

package main

import (
    "bufio"
    "bytes"
    "crypto/tls"
    "encoding/json"
    "fmt"
    "io"
    "net/http"
    "time"
)

type gptAdaptorClient struct {
    http.Client
}

var httpClient = &http.Client{
    Timeout: 30 * time.Second,
    Transport: &http.Transport{
        TLSClientConfig: &tls.Config{
            InsecureSkipVerify: true,
        },
    },
}

func NewGptAdaptorClient() *gptAdaptorClient {
    // Set up timeout for HTTP request (10 seconds)
    return &gptAdaptorClient{
        Client: http.Client{
            Timeout: 10 * time.Second,
        },
    }
}

func (g *gptAdaptorClient) addCommonHeaders(req *http.Request) {}
func (g *gptAdaptorClient) callPerplexityApi(messages []map[string]string, model string, maxTokens int, temperature float64) (<-chan string, error) {
    return nil, nil
}

func (g *gptAdaptorClient) CreateChatCompletionStream(messages []map[string]string, model string, maxTokens int, temperature float64) (<-chan string, error) {
    url := "https://api.openai/v1/chat/completions"

    // Payload to send to the API
    payload := map[string]interface{}{
        "model":       model,
        "messages":    messages,
        "max_tokens":  maxTokens,
        "temperature": temperature,
        "stream":      true, // Request to return as a stream
    }

    requestBody, err := json.Marshal(payload)
    if err != nil {
        return nil, fmt.Errorf("failed to marshal payload: %w", err)
    }

    // Create HTTP request
    req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestBody))
    if err != nil {
        return nil, fmt.Errorf("failed to create request: %w", err)
    }

    g.addCommonHeaders(req) // Add necessary headers

    // Send request and get response
    resp, err := g.Client.Do(req)
    if err != nil {
        // Check if the error is a timeout and switch to Perplexity API
        if resp.StatusCode == http.StatusGatewayTimeout {
            fmt.Println("OpenAI API call timed out, switching to Perplexity API...")
            return g.callPerplexityApi(messages, model, maxTokens, temperature)
        }
        return nil, fmt.Errorf("failed to send request: %w", err)
    }
    defer resp.Body.Close()

    // Check HTTP status code
    if resp.StatusCode != http.StatusOK {
        body, _ := io.ReadAll(resp.Body)
        return nil, fmt.Errorf("OpenAI API error: %s", string(body))
    }

    // Create channel to pass stream data
    dataChannel := make(chan string)

    // Process stream in a goroutine
    go func() {
        defer close(dataChannel)

        // Use scanner to read lines from the response body
        scanner := bufio.NewScanner(resp.Body)
        for scanner.Scan() {
            line := scanner.Text()

            // Check if the line doesn't contain data
            if len(line) < 6 || line[:6] != "data: " {
                continue
            }

            // Extract the JSON content after "data: "
            chunk := line[6:]
            if chunk == "[DONE]" {
                break
            }

            // Send chunk to the channel
            dataChannel <- chunk
        }

        // Check for scanner errors (if any)
        if err := scanner.Err(); err != nil {
            fmt.Printf("Error reading streaming response from OpenAI API: %v\n", err)
        }
    }()

    return dataChannel, nil
}

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1745239971a4618122.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信