package api
import "bufio"
import "bytes"
import "context"
import "encoding/json"
import "fmt"
import "io/ioutil"
import "net/http"
import "strings"
// MessagesStream calls the streaming endpoint and
// delivers text deltas via cb. Set req.Model to one
// of the Klex LLM function names.
func (c *Client) MessagesStream(
ctx context.Context,
req MessagesRequest,
cb func(string),
) error {
f := req.Model
req.Model = ""
if f == "" {
return fmt.Errorf(
"MessagesRequest.Model is empty")
}
type streamReq struct {
FName string `json:"f_name"`
MessagesRequest
}
body, err := json.Marshal(
streamReq{FName: f, MessagesRequest: req})
if err != nil {
return fmt.Errorf("marshal: %v", err)
}
r, err := http.NewRequestWithContext(
ctx, "POST",
c.KlexURL+"/chat/stream",
bytes.NewReader(body))
if err != nil {
return fmt.Errorf("new request: %v", err)
}
r.Header.Set(
"Authorization", "Bearer "+c.APIKey)
r.Header.Set(
"Content-Type", "application/json")
resp, err := http.DefaultClient.Do(r)
if err != nil {
return fmt.Errorf("http: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
b, _ := ioutil.ReadAll(resp.Body)
return fmt.Errorf(
"status %d: %s", resp.StatusCode, b)
}
return parseStreamSSE(resp.Body, cb)
}
// parseStreamSSE reads SSE events from the funky
// /chat/stream endpoint. Events are JSON objects:
//
// {"text":"token"} — text delta
// {"done":true} — stream complete
// {"error":"msg"} — error
func parseStreamSSE(
r interface{ Read([]byte) (int, error) },
cb func(string),
) error {
scanner := bufio.NewScanner(r)
scanner.Buffer(
make([]byte, 0, 64*1024), 1024*1024)
for scanner.Scan() {
line := scanner.Text()
if !strings.HasPrefix(line, "data: ") {
continue
}
data := line[6:]
var event struct {
Text string `json:"text"`
Done bool `json:"done"`
Error string `json:"error"`
}
if err := json.Unmarshal(
[]byte(data), &event); err != nil {
continue
}
if event.Error != "" {
return fmt.Errorf("stream: %s", event.Error)
}
if event.Done {
return nil
}
if event.Text != "" {
cb(event.Text)
}
}
return scanner.Err()
}