code.oscarkilo.com/klex-git/api/stream.go

..
api.go
datasets.go
embed.go
f.go
funcs.go
messages.go
pipelines.go
stream.go
worker.go
package api

import "bufio"
import "bytes"
import "context"
import "encoding/json"
import "fmt"
import "io/ioutil"
import "net/http"
import "strings"

// MessagesStream calls the streaming endpoint and
// delivers text deltas via cb. Set req.Model to one
// of the Klex LLM function names.
func (c *Client) MessagesStream(
  ctx context.Context,
  req MessagesRequest,
  cb func(string),
) error {
  f := req.Model
  req.Model = ""
  if f == "" {
    return fmt.Errorf(
      "MessagesRequest.Model is empty")
  }

  type streamReq struct {
    FName string `json:"f_name"`
    MessagesRequest
  }
  body, err := json.Marshal(
    streamReq{FName: f, MessagesRequest: req})
  if err != nil {
    return fmt.Errorf("marshal: %v", err)
  }

  r, err := http.NewRequestWithContext(
    ctx, "POST",
    c.KlexURL+"/chat/stream",
    bytes.NewReader(body))
  if err != nil {
    return fmt.Errorf("new request: %v", err)
  }
  r.Header.Set(
    "Authorization", "Bearer "+c.APIKey)
  r.Header.Set(
    "Content-Type", "application/json")

  resp, err := http.DefaultClient.Do(r)
  if err != nil {
    return fmt.Errorf("http: %v", err)
  }
  defer resp.Body.Close()

  if resp.StatusCode != 200 {
    b, _ := ioutil.ReadAll(resp.Body)
    return fmt.Errorf(
      "status %d: %s", resp.StatusCode, b)
  }

  return parseStreamSSE(resp.Body, cb)
}

// parseStreamSSE reads SSE events from the funky
// /chat/stream endpoint. Events are JSON objects:
//
//	{"text":"token"}  — text delta
//	{"done":true}     — stream complete
//	{"error":"msg"}   — error
func parseStreamSSE(
  r interface{ Read([]byte) (int, error) },
  cb func(string),
) error {
  scanner := bufio.NewScanner(r)
  scanner.Buffer(
    make([]byte, 0, 64*1024), 1024*1024)
  for scanner.Scan() {
    line := scanner.Text()
    if !strings.HasPrefix(line, "data: ") {
      continue
    }
    data := line[6:]
    var event struct {
      Text  string `json:"text"`
      Done  bool   `json:"done"`
      Error string `json:"error"`
    }
    if err := json.Unmarshal(
      []byte(data), &event); err != nil {
      continue
    }
    if event.Error != "" {
      return fmt.Errorf("stream: %s", event.Error)
    }
    if event.Done {
      return nil
    }
    if event.Text != "" {
      cb(event.Text)
    }
  }
  return scanner.Err()
}