code.oscarkilo.com/klex-git

Hash:
671a286e9c91448a439826242af3e3f30fe0b545
Author:
Igor Naverniouk <[email protected]>
Date:
Tue Mar 3 16:30:44 2026 -0500
Message:
Add streaming client for Klex chat/stream endpoint MessagesStream(ctx, req, cb) calls /klex/chat/stream with SSE parsing. Text deltas are delivered via callback. Used by corp's KlexAgent to stream LLM responses to Telegram in real time.
diff --git a/api/stream.go b/api/stream.go
new file mode 100644
index 0000000..194d61e
--- /dev/null
+++ b/api/stream.go
@@ -0,0 +1,103 @@
+package api
+
+import "bufio"
+import "bytes"
+import "context"
+import "encoding/json"
+import "fmt"
+import "io/ioutil"
+import "net/http"
+import "strings"
+
+// MessagesStream calls the streaming endpoint and
+// delivers text deltas via cb. Set req.Model to one
+// of the Klex LLM function names.
+func (c *Client) MessagesStream(
+ ctx context.Context,
+ req MessagesRequest,
+ cb func(string),
+) error {
+ f := req.Model
+ req.Model = ""
+ if f == "" {
+ return fmt.Errorf(
+ "MessagesRequest.Model is empty")
+ }
+
+ type streamReq struct {
+ FName string `json:"f_name"`
+ MessagesRequest
+ }
+ body, err := json.Marshal(
+ streamReq{FName: f, MessagesRequest: req})
+ if err != nil {
+ return fmt.Errorf("marshal: %v", err)
+ }
+
+ r, err := http.NewRequestWithContext(
+ ctx, "POST",
+ c.KlexURL+"/chat/stream",
+ bytes.NewReader(body))
+ if err != nil {
+ return fmt.Errorf("new request: %v", err)
+ }
+ r.Header.Set(
+ "Authorization", "Bearer "+c.APIKey)
+ r.Header.Set(
+ "Content-Type", "application/json")
+
+ resp, err := http.DefaultClient.Do(r)
+ if err != nil {
+ return fmt.Errorf("http: %v", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != 200 {
+ b, _ := ioutil.ReadAll(resp.Body)
+ return fmt.Errorf(
+ "status %d: %s", resp.StatusCode, b)
+ }
+
+ return parseStreamSSE(resp.Body, cb)
+}
+
+// parseStreamSSE reads SSE events from the funky
+// /chat/stream endpoint. Events are JSON objects:
+//
+// {"text":"token"} — text delta
+// {"done":true} — stream complete
+// {"error":"msg"} — error
+func parseStreamSSE(
+ r interface{ Read([]byte) (int, error) },
+ cb func(string),
+) error {
+ scanner := bufio.NewScanner(r)
+ scanner.Buffer(
+ make([]byte, 0, 64*1024), 1024*1024)
+ for scanner.Scan() {
+ line := scanner.Text()
+ if !strings.HasPrefix(line, "data: ") {
+ continue
+ }
+ data := line[6:]
+ var event struct {
+ Text string `json:"text"`
+ Done bool `json:"done"`
+ Error string `json:"error"`
+ }
+ if err := json.Unmarshal(
+ []byte(data), &event); err != nil {
+ continue
+ }
+ if event.Error != "" {
+ return fmt.Errorf("stream: %s", event.Error)
+ }
+ if event.Done {
+ return nil
+ }
+ if event.Text != "" {
+ cb(event.Text)
+ }
+ }
+ return scanner.Err()
+}
/dev/null
b/api/stream.go
1
package api
2
3
import "bufio"
4
import "bytes"
5
import "context"
6
import "encoding/json"
7
import "fmt"
8
import "io/ioutil"
9
import "net/http"
10
import "strings"
11
12
// MessagesStream calls the streaming endpoint and
13
// delivers text deltas via cb. Set req.Model to one
14
// of the Klex LLM function names.
15
func (c *Client) MessagesStream(
16
ctx context.Context,
17
req MessagesRequest,
18
cb func(string),
19
) error {
20
f := req.Model
21
req.Model = ""
22
if f == "" {
23
return fmt.Errorf(
24
"MessagesRequest.Model is empty")
25
}
26
27
type streamReq struct {
28
FName string `json:"f_name"`
29
MessagesRequest
30
}
31
body, err := json.Marshal(
32
streamReq{FName: f, MessagesRequest: req})
33
if err != nil {
34
return fmt.Errorf("marshal: %v", err)
35
}
36
37
r, err := http.NewRequestWithContext(
38
ctx, "POST",
39
c.KlexURL+"/chat/stream",
40
bytes.NewReader(body))
41
if err != nil {
42
return fmt.Errorf("new request: %v", err)
43
}
44
r.Header.Set(
45
"Authorization", "Bearer "+c.APIKey)
46
r.Header.Set(
47
"Content-Type", "application/json")
48
49
resp, err := http.DefaultClient.Do(r)
50
if err != nil {
51
return fmt.Errorf("http: %v", err)
52
}
53
defer resp.Body.Close()
54
55
if resp.StatusCode != 200 {
56
b, _ := ioutil.ReadAll(resp.Body)
57
return fmt.Errorf(
58
"status %d: %s", resp.StatusCode, b)
59
}
60
61
return parseStreamSSE(resp.Body, cb)
62
}
63
64
// parseStreamSSE reads SSE events from the funky
65
// /chat/stream endpoint. Events are JSON objects:
66
//
67
// {"text":"token"} — text delta
68
// {"done":true} — stream complete
69
// {"error":"msg"} — error
70
func parseStreamSSE(
71
r interface{ Read([]byte) (int, error) },
72
cb func(string),
73
) error {
74
scanner := bufio.NewScanner(r)
75
scanner.Buffer(
76
make([]byte, 0, 64*1024), 1024*1024)
77
for scanner.Scan() {
78
line := scanner.Text()
79
if !strings.HasPrefix(line, "data: ") {
80
continue
81
}
82
data := line[6:]
83
var event struct {
84
Text string `json:"text"`
85
Done bool `json:"done"`
86
Error string `json:"error"`
87
}
88
if err := json.Unmarshal(
89
[]byte(data), &event); err != nil {
90
continue
91
}
92
if event.Error != "" {
93
return fmt.Errorf("stream: %s", event.Error)
94
}
95
if event.Done {
96
return nil
97
}
98
if event.Text != "" {
99
cb(event.Text)
100
}
101
}
102
return scanner.Err()
103
}