I am working on an application that uses the OpenAI API for streaming chat completion. The streaming works well, but I want to handle timeouts if the API doesn’t respond within a certain period. When I add a timeout check, I encounter the error "http2: response body closed", and no response is received. Below is the code that works without the timeout check, but it doesn’t handle timeout cases:
Code that works (without timeout check):
func (g *gptAdaptorClient) CreateChatCompletionStream(messages []map[string]string, model string, maxTokens int, temperature float64) (<-chan string, error) {
url := ";
// Payload to send to the API
payload := map[string]interface{}{
"model": model,
"messages": messages,
"max_tokens": maxTokens,
"temperature": temperature,
"stream": true, // Request to return as a stream
}
requestBody, err := json.Marshal(payload)
if err != nil {
return nil, fmt.Errorf("failed to marshal payload: %w", err)
}
// Create HTTP request
req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestBody))
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
g.addCommonHeaders(req) // Add necessary headers
// Send request and get response
resp, err := g.Client.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to send request: %w", err)
}
// Check HTTP status code
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("API error: %s", string(body))
}
// Create channel to pass stream data
dataChannel := make(chan string)
// Process stream in a goroutine
go func() {
defer close(dataChannel)
defer resp.Body.Close()
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
// Check if the line doesn't contain data
if len(line) < 6 || line[:6] != "data: " {
continue
}
// Extract the JSON content after "data: "
chunk := line[6:]
if chunk == "[DONE]" {
break
}
// Send chunk to the channel
dataChannel <- chunk
}
// Check for scanner errors (if any)
if err := scanner.Err(); err != nil {
fmt.Printf("Error reading streaming response: %v\n", err)
}
}()
return dataChannel, nil
}
Code with Timeout (with error “http2: response body closed”):
func (g *gptAdaptorClient) CreateChatCompletionStream(messages []map[string]string, model string, maxTokens int, temperature float64) (<-chan string, error) {
url := ";
// Payload to send to the API
payload := map[string]interface{}{
"model": model,
"messages": messages,
"max_tokens": maxTokens,
"temperature": temperature,
"stream": true, // Request to return as a stream
}
requestBody, err := json.Marshal(payload)
if err != nil {
return nil, fmt.Errorf("failed to marshal payload: %w", err)
}
// Create HTTP request
req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestBody))
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
g.addCommonHeaders(req) // Add necessary headers
// Set up the context with timeout for HTTP request (10 seconds)
reqCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
req = req.WithContext(reqCtx) // Send the request with the timeout context
// Send request and get response
resp, err := g.Client.Do(req)
if err != nil {
// Check if the error is a timeout and switch to Perplexity API
if reqCtx.Err() == context.DeadlineExceeded {
fmt.Println("OpenAI API call timed out, switching to Perplexity API...")
return g.callPerplexityApi(messages, model, maxTokens, temperature)
}
return nil, fmt.Errorf("failed to send request: %w", err)
}
defer resp.Body.Close()
// Check HTTP status code
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("OpenAI API error: %s", string(body))
}
// Create channel to pass stream data
dataChannel := make(chan string)
// Process stream in a goroutine
go func() {
defer close(dataChannel)
// Use scanner to read lines from the response body
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
// Check if the line doesn't contain data
if len(line) < 6 || line[:6] != "data: " {
continue
}
// Extract the JSON content after "data: "
chunk := line[6:]
if chunk == "[DONE]" {
break
}
// Send chunk to the channel
dataChannel <- chunk
}
// Check for scanner errors (if any)
if err := scanner.Err(); err != nil {
fmt.Printf("Error reading streaming response from OpenAI API: %v\n", err)
}
}()
return dataChannel, nil
}
Problem:
- Current Working Code: The code works fine for streaming without timeout checks.
- Issue with Timeout: When I add the context timeout for the HTTP request (set to 10 seconds), I encounter the error “http2: response body closed” and no response is received from the OpenAI API.
- What I expect: I want to ensure that if the OpenAI API does not respond within 10 seconds, I should switch to an alternative API provider like Perplexity.
Question:
- Why does adding a context timeout for the HTTP request cause the error "http2: response body closed" when streaming?
- How can I add a timeout for the HTTP request while avoiding this issue and ensuring the stream continues properly if the request completes successfully?
- Is there a better approach to handle HTTP request timeouts for streaming APIs like OpenAI’s in Go without closing the response body prematurely?
I am working on an application that uses the OpenAI API for streaming chat completion. The streaming works well, but I want to handle timeouts if the API doesn’t respond within a certain period. When I add a timeout check, I encounter the error "http2: response body closed", and no response is received. Below is the code that works without the timeout check, but it doesn’t handle timeout cases:
Code that works (without timeout check):
func (g *gptAdaptorClient) CreateChatCompletionStream(messages []map[string]string, model string, maxTokens int, temperature float64) (<-chan string, error) {
url := "https://api.openai/v1/chat/completions"
// Payload to send to the API
payload := map[string]interface{}{
"model": model,
"messages": messages,
"max_tokens": maxTokens,
"temperature": temperature,
"stream": true, // Request to return as a stream
}
requestBody, err := json.Marshal(payload)
if err != nil {
return nil, fmt.Errorf("failed to marshal payload: %w", err)
}
// Create HTTP request
req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestBody))
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
g.addCommonHeaders(req) // Add necessary headers
// Send request and get response
resp, err := g.Client.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to send request: %w", err)
}
// Check HTTP status code
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("API error: %s", string(body))
}
// Create channel to pass stream data
dataChannel := make(chan string)
// Process stream in a goroutine
go func() {
defer close(dataChannel)
defer resp.Body.Close()
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
// Check if the line doesn't contain data
if len(line) < 6 || line[:6] != "data: " {
continue
}
// Extract the JSON content after "data: "
chunk := line[6:]
if chunk == "[DONE]" {
break
}
// Send chunk to the channel
dataChannel <- chunk
}
// Check for scanner errors (if any)
if err := scanner.Err(); err != nil {
fmt.Printf("Error reading streaming response: %v\n", err)
}
}()
return dataChannel, nil
}
Code with Timeout (with error “http2: response body closed”):
func (g *gptAdaptorClient) CreateChatCompletionStream(messages []map[string]string, model string, maxTokens int, temperature float64) (<-chan string, error) {
url := "https://api.openai/v1/chat/completions"
// Payload to send to the API
payload := map[string]interface{}{
"model": model,
"messages": messages,
"max_tokens": maxTokens,
"temperature": temperature,
"stream": true, // Request to return as a stream
}
requestBody, err := json.Marshal(payload)
if err != nil {
return nil, fmt.Errorf("failed to marshal payload: %w", err)
}
// Create HTTP request
req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestBody))
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
g.addCommonHeaders(req) // Add necessary headers
// Set up the context with timeout for HTTP request (10 seconds)
reqCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
req = req.WithContext(reqCtx) // Send the request with the timeout context
// Send request and get response
resp, err := g.Client.Do(req)
if err != nil {
// Check if the error is a timeout and switch to Perplexity API
if reqCtx.Err() == context.DeadlineExceeded {
fmt.Println("OpenAI API call timed out, switching to Perplexity API...")
return g.callPerplexityApi(messages, model, maxTokens, temperature)
}
return nil, fmt.Errorf("failed to send request: %w", err)
}
defer resp.Body.Close()
// Check HTTP status code
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("OpenAI API error: %s", string(body))
}
// Create channel to pass stream data
dataChannel := make(chan string)
// Process stream in a goroutine
go func() {
defer close(dataChannel)
// Use scanner to read lines from the response body
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
// Check if the line doesn't contain data
if len(line) < 6 || line[:6] != "data: " {
continue
}
// Extract the JSON content after "data: "
chunk := line[6:]
if chunk == "[DONE]" {
break
}
// Send chunk to the channel
dataChannel <- chunk
}
// Check for scanner errors (if any)
if err := scanner.Err(); err != nil {
fmt.Printf("Error reading streaming response from OpenAI API: %v\n", err)
}
}()
return dataChannel, nil
}
Problem:
- Current Working Code: The code works fine for streaming without timeout checks.
- Issue with Timeout: When I add the context timeout for the HTTP request (set to 10 seconds), I encounter the error “http2: response body closed” and no response is received from the OpenAI API.
- What I expect: I want to ensure that if the OpenAI API does not respond within 10 seconds, I should switch to an alternative API provider like Perplexity.
Question:
- Why does adding a context timeout for the HTTP request cause the error "http2: response body closed" when streaming?
- How can I add a timeout for the HTTP request while avoiding this issue and ensuring the stream continues properly if the request completes successfully?
- Is there a better approach to handle HTTP request timeouts for streaming APIs like OpenAI’s in Go without closing the response body prematurely?
1 Answer
Reset to default -1To handle timeout in http request the proper way is to use httpclient setup like this:
package main
import (
"bufio"
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
)
type gptAdaptorClient struct {
http.Client
}
var httpClient = &http.Client{
Timeout: 30 * time.Second,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
},
}
func NewGptAdaptorClient() *gptAdaptorClient {
// Set up timeout for HTTP request (10 seconds)
return &gptAdaptorClient{
Client: http.Client{
Timeout: 10 * time.Second,
},
}
}
func (g *gptAdaptorClient) addCommonHeaders(req *http.Request) {}
func (g *gptAdaptorClient) callPerplexityApi(messages []map[string]string, model string, maxTokens int, temperature float64) (<-chan string, error) {
return nil, nil
}
func (g *gptAdaptorClient) CreateChatCompletionStream(messages []map[string]string, model string, maxTokens int, temperature float64) (<-chan string, error) {
url := "https://api.openai/v1/chat/completions"
// Payload to send to the API
payload := map[string]interface{}{
"model": model,
"messages": messages,
"max_tokens": maxTokens,
"temperature": temperature,
"stream": true, // Request to return as a stream
}
requestBody, err := json.Marshal(payload)
if err != nil {
return nil, fmt.Errorf("failed to marshal payload: %w", err)
}
// Create HTTP request
req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestBody))
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
g.addCommonHeaders(req) // Add necessary headers
// Send request and get response
resp, err := g.Client.Do(req)
if err != nil {
// Check if the error is a timeout and switch to Perplexity API
if resp.StatusCode == http.StatusGatewayTimeout {
fmt.Println("OpenAI API call timed out, switching to Perplexity API...")
return g.callPerplexityApi(messages, model, maxTokens, temperature)
}
return nil, fmt.Errorf("failed to send request: %w", err)
}
defer resp.Body.Close()
// Check HTTP status code
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("OpenAI API error: %s", string(body))
}
// Create channel to pass stream data
dataChannel := make(chan string)
// Process stream in a goroutine
go func() {
defer close(dataChannel)
// Use scanner to read lines from the response body
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
// Check if the line doesn't contain data
if len(line) < 6 || line[:6] != "data: " {
continue
}
// Extract the JSON content after "data: "
chunk := line[6:]
if chunk == "[DONE]" {
break
}
// Send chunk to the channel
dataChannel <- chunk
}
// Check for scanner errors (if any)
if err := scanner.Err(); err != nil {
fmt.Printf("Error reading streaming response from OpenAI API: %v\n", err)
}
}()
return dataChannel, nil
}
发布者:admin,转转请注明出处:http://www.yc00.com/questions/1745239971a4618122.html
ResponseHeaderTimeout
. – Mr_Pink Commented Feb 4 at 19:37