code.oscarkilo.com/klex-git

Hash:
ac29de5196813cb508f22bdc06d22ca4175064c8
Author:
Igor Naverniouk <[email protected]>
Date:
Mon Sep 23 11:40:52 2024 -0700
Message:
using the new dataset API
diff --git a/api/api.go b/api/api.go
index 57fd8cb..9565c5c 100644
--- a/api/api.go
+++ b/api/api.go
@@ -46,11 +46,13 @@ func (c *Client) call(method, path string, req, res interface{}) error {
if err != nil {
return fmt.Errorf("Response error: %v", err)
}
- if resHttp.StatusCode != 200 {
+ if resHttp.StatusCode != 200 && resHttp.StatusCode != 204 {
return fmt.Errorf("Status %d; response=%s", resHttp.StatusCode, resBody)
}
- if err := json.Unmarshal(resBody, res); err != nil {
- return fmt.Errorf("Bad response %s\nerror=%v", resBody, err)
+ if res != nil {
+ if err := json.Unmarshal(resBody, res); err != nil {
+ return fmt.Errorf("Bad response %s\nerror=%v", resBody, err)
+ }
}
return nil
}
@@ -93,3 +95,36 @@ func (c *Client) NewDataset(name string, data map[string]string) error {
}
return nil
}
+
+// BeginNewDataset starts a new dataset upload using the v2 API.
+// Returns the version key to use in UploadKv() and EndNewDataset().
+// Keep the key secret until EndNewDataset() returns successfully.
+func (c *Client) BeginNewDataset(name string) (string, error) {
+ req := BeginNewDatasetRequest{Name: name}
+ var res BeginNewDatasetResponse
+ err := c.call("POST", "/datasets/begin_new", req, &res)
+ if err != nil {
+ return "", fmt.Errorf("Error POSTing to /datasets/begin_new: %v", err)
+ }
+ return res.VersionKey, nil
+}
+
+// UploadKv uploads more key-value pairs of the dataset being created.
+func (c *Client) UploadKV(versionKey string, records []KV) error {
+ req := UploadKVRequest{VersionKey: versionKey, Records: records}
+ err := c.call("POST", "/datasets/upload_kv", req, nil)
+ if err != nil {
+ return fmt.Errorf("Error POSTing to /datasets/upload_kv: %v", err)
+ }
+ return nil
+}
+
+// EndNewDataset commits the dataset being created.
+func (c *Client) EndNewDataset(name, version_key string, size int) error {
+ req := EndNewDatasetRequest{Name: name, VersionKey: version_key, Size: size}
+ err := c.call("POST", "/datasets/end_new", req, nil)
+ if err != nil {
+ return fmt.Errorf("Error POSTing to /datasets/end_new: %v", err)
+ }
+ return nil
+}
diff --git a/api/datasets.go b/api/datasets.go
index a6e0fa1..c6406d1 100644
--- a/api/datasets.go
+++ b/api/datasets.go
@@ -40,8 +40,8 @@ type ListDatasetsResponse struct {
}

type KV struct {
- Key string `json:"key"`
- Value string `json:"value"`
+ Key string `json:"k"`
+ Value string `json:"v"`
}

type GetDatasetResponse struct {
diff --git a/commit/main.go b/commit/main.go
index c78722d..7c81732 100644
--- a/commit/main.go
+++ b/commit/main.go
@@ -38,7 +38,7 @@ func main() {
log.Printf("plan:\n%s\n", plan.DebugString())

for _, ds := range plan.ChangedDatasets {
- err = plan.UploadDataset(ds)
+ err = plan.UploadDataset2(ds)
if err != nil {
panic(err)
}
diff --git a/plan/plan.go b/plan/plan.go
index 5517795..3b62133 100644
--- a/plan/plan.go
+++ b/plan/plan.go
@@ -3,10 +3,12 @@ package plan
import (
"encoding/json"
"fmt"
+ "io/ioutil"
"log"
"path"
"sort"
"strings"
+ "time"

"oscarkilo.com/klex-git/api"
"oscarkilo.com/klex-git/config"
@@ -119,3 +121,61 @@ func (p *Plan) UploadDataset(ds string) error {
log.Printf("uploading dataset %s", ds)
return p.Client.NewDataset(to, data)
}
+
+// UploadDataset2 is the new, incremental way of uploading datasets.
+func (p *Plan) UploadDataset2(ds string) error {
+ from := path.Join(p.Config.DatasetsDir, ds)
+ to := path.Join(p.Config.ProjectName, ds)
+
+ log.Printf("Initializing upload of dataset %s", ds)
+ ve_key, err := p.Client.BeginNewDataset(to)
+ if err != nil {
+ return fmt.Errorf("Error initializing dataset %s: %v", ds, err)
+ }
+
+ log.Printf("uploading dataset %s", ds)
+ files, err := ioutil.ReadDir(from)
+ if err != nil {
+ return fmt.Errorf("in ReadDir(): %v", err)
+ }
+
+ var batch []api.KV
+ batch_size := 0
+ last_status_time := time.Now()
+ last_status_percent := 0
+ for i, f := range files {
+ fpath := path.Join(from, f.Name())
+ contents, err := ioutil.ReadFile(fpath)
+ if err != nil {
+ return fmt.Errorf("in ReadFile(): %v", err)
+ }
+ batch = append(batch, api.KV{f.Name(), string(contents)})
+ batch_size += 16 + len(f.Name()) + len(contents)
+ if i == len(files)-1 || batch_size > (1<<20) {
+ err := p.Client.UploadKV(ve_key, batch)
+ if err != nil {
+ return fmt.Errorf("in UploadKV(): %v", err)
+ }
+ batch = nil
+ status_percent := 100 * (i + 1) / len(files)
+ if time.Since(last_status_time) > time.Second {
+ if status_percent > last_status_percent {
+ log.Printf(
+ "Uploaded %d of %d entries [%d%%]",
+ i+1, len(files), status_percent,
+ )
+ last_status_time = time.Now()
+ last_status_percent = status_percent
+ }
+ }
+ }
+ }
+
+ log.Printf("Finalizing upload of dataset %s", ds)
+ err = p.Client.EndNewDataset(to, ve_key, len(files))
+ if err != nil {
+ return fmt.Errorf("Error finalizing dataset %s: %v", ds, err)
+ }
+
+ return nil
+}
a/api/api.go
b/api/api.go
1
package api
1
package api
2
2
3
// This file is for Golang clients of Klex.
3
// This file is for Golang clients of Klex.
4
4
5
import (
5
import (
6
"bytes"
6
"bytes"
7
"encoding/json"
7
"encoding/json"
8
"fmt"
8
"fmt"
9
"io/ioutil"
9
"io/ioutil"
10
"log"
10
"log"
11
"net/http"
11
"net/http"
12
"sort"
12
"sort"
13
)
13
)
14
14
15
type Client struct {
15
type Client struct {
16
KlexURL string
16
KlexURL string
17
APIKey string
17
APIKey string
18
}
18
}
19
19
20
func NewClient(klexURL, apiKey string) *Client {
20
func NewClient(klexURL, apiKey string) *Client {
21
if klexURL == "" || apiKey == "" {
21
if klexURL == "" || apiKey == "" {
22
log.Printf("NewClient: missing klexURL or apiKey")
22
log.Printf("NewClient: missing klexURL or apiKey")
23
return nil
23
return nil
24
}
24
}
25
return &Client{klexURL, apiKey}
25
return &Client{klexURL, apiKey}
26
}
26
}
27
27
28
func (c *Client) call(method, path string, req, res interface{}) error {
28
func (c *Client) call(method, path string, req, res interface{}) error {
29
reqBody, err := json.Marshal(req)
29
reqBody, err := json.Marshal(req)
30
if err != nil {
30
if err != nil {
31
return fmt.Errorf("Cannot marshal request: %v", err)
31
return fmt.Errorf("Cannot marshal request: %v", err)
32
}
32
}
33
reqBytes := bytes.NewBuffer(reqBody)
33
reqBytes := bytes.NewBuffer(reqBody)
34
r, err := http.NewRequest(method, c.KlexURL + path, reqBytes)
34
r, err := http.NewRequest(method, c.KlexURL + path, reqBytes)
35
if err != nil {
35
if err != nil {
36
return fmt.Errorf("In http.NewRequest: %v", err)
36
return fmt.Errorf("In http.NewRequest: %v", err)
37
}
37
}
38
r.Header.Set("Authorization", "Bearer " + c.APIKey)
38
r.Header.Set("Authorization", "Bearer " + c.APIKey)
39
r.Header.Set("Content-Type", "application/json")
39
r.Header.Set("Content-Type", "application/json")
40
resHttp, err := http.DefaultClient.Do(r)
40
resHttp, err := http.DefaultClient.Do(r)
41
if err != nil {
41
if err != nil {
42
return fmt.Errorf("http.DefaultClient.Do: %v", err)
42
return fmt.Errorf("http.DefaultClient.Do: %v", err)
43
}
43
}
44
defer resHttp.Body.Close()
44
defer resHttp.Body.Close()
45
resBody, err := ioutil.ReadAll(resHttp.Body)
45
resBody, err := ioutil.ReadAll(resHttp.Body)
46
if err != nil {
46
if err != nil {
47
return fmt.Errorf("Response error: %v", err)
47
return fmt.Errorf("Response error: %v", err)
48
}
48
}
49
if resHttp.StatusCode != 200 {
49
if resHttp.StatusCode != 200 && resHttp.StatusCode != 204 {
50
return fmt.Errorf("Status %d; response=%s", resHttp.StatusCode, resBody)
50
return fmt.Errorf("Status %d; response=%s", resHttp.StatusCode, resBody)
51
}
51
}
52
if err := json.Unmarshal(resBody, res); err != nil {
52
if res != nil {
53
return fmt.Errorf("Bad response %s\nerror=%v", resBody, err)
53
if err := json.Unmarshal(resBody, res); err != nil {
54
return fmt.Errorf("Bad response %s\nerror=%v", resBody, err)
55
}
54
}
56
}
55
return nil
57
return nil
56
}
58
}
57
59
58
// F executes a function on one given input.
60
// F executes a function on one given input.
59
func (c *Client) F(f, in string) (string, error) {
61
func (c *Client) F(f, in string) (string, error) {
60
var res FResponse
62
var res FResponse
61
err := c.call("POST", "/f", FRequest{FName: f, In: in}, &res)
63
err := c.call("POST", "/f", FRequest{FName: f, In: in}, &res)
62
if err != nil {
64
if err != nil {
63
return "", err
65
return "", err
64
}
66
}
65
if res.Err != "" {
67
if res.Err != "" {
66
return "", fmt.Errorf(res.Err)
68
return "", fmt.Errorf(res.Err)
67
}
69
}
68
return res.Out, nil
70
return res.Out, nil
69
}
71
}
70
72
71
// NewDataset creates a new dataset or updates an existing one.
73
// NewDataset creates a new dataset or updates an existing one.
72
// This is the simplest way, meant for datasets smaller than ~1GB.
74
// This is the simplest way, meant for datasets smaller than ~1GB.
73
func (c *Client) NewDataset(name string, data map[string]string) error {
75
func (c *Client) NewDataset(name string, data map[string]string) error {
74
// TODO: this loses key names; make a new NewDataset API.
76
// TODO: this loses key names; make a new NewDataset API.
75
req := NewDatasetRequest{Name: name, Data: nil}
77
req := NewDatasetRequest{Name: name, Data: nil}
76
keys := make([]string, 0, len(data))
78
keys := make([]string, 0, len(data))
77
for k := range data {
79
for k := range data {
78
keys = append(keys, k)
80
keys = append(keys, k)
79
}
81
}
80
sort.Strings(keys)
82
sort.Strings(keys)
81
for _, k := range keys {
83
for _, k := range keys {
82
req.Data = append(req.Data, data[k])
84
req.Data = append(req.Data, data[k])
83
}
85
}
84
86
85
var res NewDatasetResponse
87
var res NewDatasetResponse
86
err := c.call("POST", "/datasets/new", req, &res)
88
err := c.call("POST", "/datasets/new", req, &res)
87
if err != nil {
89
if err != nil {
88
return fmt.Errorf("Error POSTing to /datasets/new: %v", err)
90
return fmt.Errorf("Error POSTing to /datasets/new: %v", err)
89
}
91
}
90
if res.Name != name || res.Size != len(data) {
92
if res.Name != name || res.Size != len(data) {
91
pretty, _ := json.MarshalIndent(res, "", " ")
93
pretty, _ := json.MarshalIndent(res, "", " ")
92
return fmt.Errorf("Unexpected response from /datasets/new: %s", pretty)
94
return fmt.Errorf("Unexpected response from /datasets/new: %s", pretty)
93
}
95
}
94
return nil
96
return nil
95
}
97
}
98
99
// BeginNewDataset starts a new dataset upload using the v2 API.
100
// Returns the version key to use in UploadKv() and EndNewDataset().
101
// Keep the key secret until EndNewDataset() returns successfully.
102
func (c *Client) BeginNewDataset(name string) (string, error) {
103
req := BeginNewDatasetRequest{Name: name}
104
var res BeginNewDatasetResponse
105
err := c.call("POST", "/datasets/begin_new", req, &res)
106
if err != nil {
107
return "", fmt.Errorf("Error POSTing to /datasets/begin_new: %v", err)
108
}
109
return res.VersionKey, nil
110
}
111
112
// UploadKv uploads more key-value pairs of the dataset being created.
113
func (c *Client) UploadKV(versionKey string, records []KV) error {
114
req := UploadKVRequest{VersionKey: versionKey, Records: records}
115
err := c.call("POST", "/datasets/upload_kv", req, nil)
116
if err != nil {
117
return fmt.Errorf("Error POSTing to /datasets/upload_kv: %v", err)
118
}
119
return nil
120
}
121
122
// EndNewDataset commits the dataset being created.
123
func (c *Client) EndNewDataset(name, version_key string, size int) error {
124
req := EndNewDatasetRequest{Name: name, VersionKey: version_key, Size: size}
125
err := c.call("POST", "/datasets/end_new", req, nil)
126
if err != nil {
127
return fmt.Errorf("Error POSTing to /datasets/end_new: %v", err)
128
}
129
return nil
130
}
a/api/datasets.go
b/api/datasets.go
1
package api
1
package api
2
2
3
type NewDatasetRequest struct {
3
type NewDatasetRequest struct {
4
Name string `json:"name"` // no need to be unique
4
Name string `json:"name"` // no need to be unique
5
Data []string `json:"data"`
5
Data []string `json:"data"`
6
}
6
}
7
7
8
type NewDatasetResponse struct {
8
type NewDatasetResponse struct {
9
Name string `json:"name"`
9
Name string `json:"name"`
10
DateCreated string `json:"date_created"` // RFC8601 with millis
10
DateCreated string `json:"date_created"` // RFC8601 with millis
11
Size int `json:"size"`
11
Size int `json:"size"`
12
}
12
}
13
13
14
type RenameDatasetRequest struct {
14
type RenameDatasetRequest struct {
15
OldName string `json:"old_name"`
15
OldName string `json:"old_name"`
16
NewName string `json:"new_name"`
16
NewName string `json:"new_name"`
17
}
17
}
18
18
19
type DeleteDatasetRequest struct {
19
type DeleteDatasetRequest struct {
20
Name string `json:"name"`
20
Name string `json:"name"`
21
}
21
}
22
22
23
type DatasetVersion struct {
23
type DatasetVersion struct {
24
Key string `json:"key"`
24
Key string `json:"key"`
25
Size int `json:"size"`
25
Size int `json:"size"`
26
Date string `json:"date"`
26
Date string `json:"date"`
27
}
27
}
28
28
29
type Dataset struct {
29
type Dataset struct {
30
Name string `json:"name"`
30
Name string `json:"name"`
31
OwnerOwid string `json:"owner_owid"`
31
OwnerOwid string `json:"owner_owid"`
32
OwnerUsername string `json:"owner_username"`
32
OwnerUsername string `json:"owner_username"`
33
ReaderOwid string `json:"reader_owid"`
33
ReaderOwid string `json:"reader_owid"`
34
ReaderUsername string `json:"reader_username"`
34
ReaderUsername string `json:"reader_username"`
35
Versions []DatasetVersion `json:"versions"`
35
Versions []DatasetVersion `json:"versions"`
36
}
36
}
37
37
38
type ListDatasetsResponse struct {
38
type ListDatasetsResponse struct {
39
Datasets []Dataset `json:"datasets"`
39
Datasets []Dataset `json:"datasets"`
40
}
40
}
41
41
42
type KV struct {
42
type KV struct {
43
Key string `json:"key"`
43
Key string `json:"k"`
44
Value string `json:"value"`
44
Value string `json:"v"`
45
}
45
}
46
46
47
type GetDatasetResponse struct {
47
type GetDatasetResponse struct {
48
DatasetVersion DatasetVersion `json:"dataset_version"`
48
DatasetVersion DatasetVersion `json:"dataset_version"`
49
Records []KV `json:"records"`
49
Records []KV `json:"records"`
50
}
50
}
51
51
52
// BeginNewDatasetRequest is for the new API for uploading datasets.
52
// BeginNewDatasetRequest is for the new API for uploading datasets.
53
// It's meant to work for huge datasets as well as small ones.
53
// It's meant to work for huge datasets as well as small ones.
54
// The flow goes like this:
54
// The flow goes like this:
55
// 1. Call /klex/datasets/begin_new and get a version key.
55
// 1. Call /klex/datasets/begin_new and get a version key.
56
// 2. Repeatedly call /klex/datasets/upload_kv with chunks of data.
56
// 2. Repeatedly call /klex/datasets/upload_kv with chunks of data.
57
// 3. Call /klex/datasets/end_new to commit and name the dataset.
57
// 3. Call /klex/datasets/end_new to commit and name the dataset.
58
type BeginNewDatasetRequest struct {
58
type BeginNewDatasetRequest struct {
59
Name string `json:"name"`
59
Name string `json:"name"`
60
}
60
}
61
61
62
type BeginNewDatasetResponse struct {
62
type BeginNewDatasetResponse struct {
63
VersionKey string `json:"version_key"`
63
VersionKey string `json:"version_key"`
64
}
64
}
65
65
66
// UploadKVRequest is for uploading some of the kv-pairs of a new dataset.
66
// UploadKVRequest is for uploading some of the kv-pairs of a new dataset.
67
type UploadKVRequest struct {
67
type UploadKVRequest struct {
68
VersionKey string `json:"version_key"`
68
VersionKey string `json:"version_key"`
69
Records []KV `json:"records"`
69
Records []KV `json:"records"`
70
}
70
}
71
71
72
type EndNewDatasetRequest struct {
72
type EndNewDatasetRequest struct {
73
Name string `json:"name"`
73
Name string `json:"name"`
74
VersionKey string `json:"version_key"`
74
VersionKey string `json:"version_key"`
75
Size int `json:"size"`
75
Size int `json:"size"`
76
}
76
}
a/commit/main.go
b/commit/main.go
1
package main
1
package main
2
2
3
// This binary is executed during a commit to the client Git repo.
3
// This binary is executed during a commit to the client Git repo.
4
// It figures out which files the commit affects, and then makes
4
// It figures out which files the commit affects, and then makes
5
// the appropriate calls to Klex.
5
// the appropriate calls to Klex.
6
6
7
import (
7
import (
8
"log"
8
"log"
9
9
10
"oscarkilo.com/klex-git/api"
10
"oscarkilo.com/klex-git/api"
11
"oscarkilo.com/klex-git/config"
11
"oscarkilo.com/klex-git/config"
12
"oscarkilo.com/klex-git/plan"
12
"oscarkilo.com/klex-git/plan"
13
"oscarkilo.com/klex-git/util"
13
"oscarkilo.com/klex-git/util"
14
)
14
)
15
15
16
func main() {
16
func main() {
17
log.SetFlags(log.LstdFlags | log.Lshortfile)
17
log.SetFlags(log.LstdFlags | log.Lshortfile)
18
log.SetPrefix("klex-git: ")
18
log.SetPrefix("klex-git: ")
19
19
20
err := util.CdRoot()
20
err := util.CdRoot()
21
if err != nil {
21
if err != nil {
22
panic(err)
22
panic(err)
23
}
23
}
24
24
25
config, err := config.ReadConfig()
25
config, err := config.ReadConfig()
26
if err != nil {
26
if err != nil {
27
panic(err)
27
panic(err)
28
}
28
}
29
29
30
commit, err := util.GetCommitInfo()
30
commit, err := util.GetCommitInfo()
31
if err != nil {
31
if err != nil {
32
panic(err)
32
panic(err)
33
}
33
}
34
34
35
client := api.NewClient(config.KlexUrl, config.ApiKey)
35
client := api.NewClient(config.KlexUrl, config.ApiKey)
36
log.Printf("handling %s\n", commit.DebugString())
36
log.Printf("handling %s\n", commit.DebugString())
37
plan := plan.NewPlan(config, commit, client)
37
plan := plan.NewPlan(config, commit, client)
38
log.Printf("plan:\n%s\n", plan.DebugString())
38
log.Printf("plan:\n%s\n", plan.DebugString())
39
39
40
for _, ds := range plan.ChangedDatasets {
40
for _, ds := range plan.ChangedDatasets {
41
err = plan.UploadDataset(ds)
41
err = plan.UploadDataset2(ds)
42
if err != nil {
42
if err != nil {
43
panic(err)
43
panic(err)
44
}
44
}
45
log.Printf("uploaded dataset: %s\n", ds)
45
log.Printf("uploaded dataset: %s\n", ds)
46
}
46
}
47
}
47
}
a/plan/plan.go
b/plan/plan.go
1
package plan
1
package plan
2
2
3
import (
3
import (
4
"encoding/json"
4
"encoding/json"
5
"fmt"
5
"fmt"
6
"io/ioutil"
6
"log"
7
"log"
7
"path"
8
"path"
8
"sort"
9
"sort"
9
"strings"
10
"strings"
11
"time"
10
12
11
"oscarkilo.com/klex-git/api"
13
"oscarkilo.com/klex-git/api"
12
"oscarkilo.com/klex-git/config"
14
"oscarkilo.com/klex-git/config"
13
"oscarkilo.com/klex-git/util"
15
"oscarkilo.com/klex-git/util"
14
)
16
)
15
17
16
// Plan is a list of steps that brings this repo in sync with Klex.
18
// Plan is a list of steps that brings this repo in sync with Klex.
17
type Plan struct {
19
type Plan struct {
18
Config *config.Config `json:"-"`
20
Config *config.Config `json:"-"`
19
Commit *util.CommitInfo `json:"-"`
21
Commit *util.CommitInfo `json:"-"`
20
Client *api.Client `json:"-"`
22
Client *api.Client `json:"-"`
21
ChangedDatasets []string `json:"changed_datasets"`
23
ChangedDatasets []string `json:"changed_datasets"`
22
ChangedFunctions []string `json:"changed_functions"`
24
ChangedFunctions []string `json:"changed_functions"`
23
ChangedPipelines []string `json:"changed_pipelines"`
25
ChangedPipelines []string `json:"changed_pipelines"`
24
}
26
}
25
27
26
func NewPlan(cfg *config.Config, ci *util.CommitInfo, cl *api.Client) *Plan {
28
func NewPlan(cfg *config.Config, ci *util.CommitInfo, cl *api.Client) *Plan {
27
p := &Plan{
29
p := &Plan{
28
Config: cfg,
30
Config: cfg,
29
Commit: ci,
31
Commit: ci,
30
Client: cl,
32
Client: cl,
31
}
33
}
32
p.findChangedDatasets()
34
p.findChangedDatasets()
33
p.findChangedFunctions()
35
p.findChangedFunctions()
34
p.findChangedPipelines()
36
p.findChangedPipelines()
35
return p
37
return p
36
}
38
}
37
39
38
func (p *Plan) DebugString() string {
40
func (p *Plan) DebugString() string {
39
b, err := json.MarshalIndent(p, "", " ")
41
b, err := json.MarshalIndent(p, "", " ")
40
if err != nil {
42
if err != nil {
41
return fmt.Sprintf("JSON error: %v", err)
43
return fmt.Sprintf("JSON error: %v", err)
42
}
44
}
43
return string(b)
45
return string(b)
44
}
46
}
45
47
46
// findChangedFilesMatching returns the sorted list of changed files that:
48
// findChangedFilesMatching returns the sorted list of changed files that:
47
// - have the given path prefix
49
// - have the given path prefix
48
// - have the given string suffix
50
// - have the given string suffix
49
// - have been transformed by the given function
51
// - have been transformed by the given function
50
// - aren't "" after the transformation
52
// - aren't "" after the transformation
51
func (p *Plan) findChangedFilesMatching(
53
func (p *Plan) findChangedFilesMatching(
52
pref, suff string,
54
pref, suff string,
53
transform func(string) string,
55
transform func(string) string,
54
) []string {
56
) []string {
55
seen := make(map[string]bool)
57
seen := make(map[string]bool)
56
for _, fpath := range p.Commit.ChangedPaths {
58
for _, fpath := range p.Commit.ChangedPaths {
57
if !util.IsDir(fpath) {
59
if !util.IsDir(fpath) {
58
if rest, ok := util.StripPathPrefix(fpath, pref); ok {
60
if rest, ok := util.StripPathPrefix(fpath, pref); ok {
59
if strings.HasSuffix(rest, suff) {
61
if strings.HasSuffix(rest, suff) {
60
seen[transform(rest)] = true
62
seen[transform(rest)] = true
61
}
63
}
62
}
64
}
63
}
65
}
64
}
66
}
65
delete(seen, "") // in case it snuck in there
67
delete(seen, "") // in case it snuck in there
66
list := make([]string, 0, len(seen))
68
list := make([]string, 0, len(seen))
67
for f := range seen {
69
for f := range seen {
68
list = append(list, f)
70
list = append(list, f)
69
}
71
}
70
sort.Strings(list)
72
sort.Strings(list)
71
return list
73
return list
72
}
74
}
73
75
74
func (p *Plan) findChangedDatasets() {
76
func (p *Plan) findChangedDatasets() {
75
if p.Config.DatasetsDir == "" {
77
if p.Config.DatasetsDir == "" {
76
return
78
return
77
}
79
}
78
p.ChangedDatasets = p.findChangedFilesMatching(
80
p.ChangedDatasets = p.findChangedFilesMatching(
79
p.Config.DatasetsDir,
81
p.Config.DatasetsDir,
80
"",
82
"",
81
path.Dir,
83
path.Dir,
82
)
84
)
83
}
85
}
84
86
85
func (p *Plan) findChangedFunctions() {
87
func (p *Plan) findChangedFunctions() {
86
if p.Config.FunctionsDir == "" {
88
if p.Config.FunctionsDir == "" {
87
return
89
return
88
}
90
}
89
p.ChangedFunctions = p.findChangedFilesMatching(
91
p.ChangedFunctions = p.findChangedFilesMatching(
90
p.Config.FunctionsDir,
92
p.Config.FunctionsDir,
91
".js",
93
".js",
92
func(f string) string {
94
func(f string) string {
93
return strings.TrimSuffix(f, ".js")
95
return strings.TrimSuffix(f, ".js")
94
},
96
},
95
)
97
)
96
}
98
}
97
99
98
func (p *Plan) findChangedPipelines() {
100
func (p *Plan) findChangedPipelines() {
99
if p.Config.PipelinesDir == "" {
101
if p.Config.PipelinesDir == "" {
100
return
102
return
101
}
103
}
102
p.ChangedPipelines = p.findChangedFilesMatching(
104
p.ChangedPipelines = p.findChangedFilesMatching(
103
p.Config.PipelinesDir,
105
p.Config.PipelinesDir,
104
".js",
106
".js",
105
func(f string) string {
107
func(f string) string {
106
return strings.TrimSuffix(f, ".js")
108
return strings.TrimSuffix(f, ".js")
107
},
109
},
108
)
110
)
109
}
111
}
110
112
111
func (p *Plan) UploadDataset(ds string) error {
113
func (p *Plan) UploadDataset(ds string) error {
112
from := path.Join(p.Config.DatasetsDir, ds)
114
from := path.Join(p.Config.DatasetsDir, ds)
113
to := path.Join(p.Config.ProjectName, ds)
115
to := path.Join(p.Config.ProjectName, ds)
114
log.Printf("reading dataset %s", ds)
116
log.Printf("reading dataset %s", ds)
115
data, err := util.ReadDir(from)
117
data, err := util.ReadDir(from)
116
if err != nil {
118
if err != nil {
117
return fmt.Errorf("Error reading directory %s: %v", from, err)
119
return fmt.Errorf("Error reading directory %s: %v", from, err)
118
}
120
}
119
log.Printf("uploading dataset %s", ds)
121
log.Printf("uploading dataset %s", ds)
120
return p.Client.NewDataset(to, data)
122
return p.Client.NewDataset(to, data)
121
}
123
}
124
125
// UploadDataset2 is the new, incremental way of uploading datasets.
126
func (p *Plan) UploadDataset2(ds string) error {
127
from := path.Join(p.Config.DatasetsDir, ds)
128
to := path.Join(p.Config.ProjectName, ds)
129
130
log.Printf("Initializing upload of dataset %s", ds)
131
ve_key, err := p.Client.BeginNewDataset(to)
132
if err != nil {
133
return fmt.Errorf("Error initializing dataset %s: %v", ds, err)
134
}
135
136
log.Printf("uploading dataset %s", ds)
137
files, err := ioutil.ReadDir(from)
138
if err != nil {
139
return fmt.Errorf("in ReadDir(): %v", err)
140
}
141
142
var batch []api.KV
143
batch_size := 0
144
last_status_time := time.Now()
145
last_status_percent := 0
146
for i, f := range files {
147
fpath := path.Join(from, f.Name())
148
contents, err := ioutil.ReadFile(fpath)
149
if err != nil {
150
return fmt.Errorf("in ReadFile(): %v", err)
151
}
152
batch = append(batch, api.KV{f.Name(), string(contents)})
153
batch_size += 16 + len(f.Name()) + len(contents)
154
if i == len(files)-1 || batch_size > (1<<20) {
155
err := p.Client.UploadKV(ve_key, batch)
156
if err != nil {
157
return fmt.Errorf("in UploadKV(): %v", err)
158
}
159
batch = nil
160
status_percent := 100 * (i + 1) / len(files)
161
if time.Since(last_status_time) > time.Second {
162
if status_percent > last_status_percent {
163
log.Printf(
164
"Uploaded %d of %d entries [%d%%]",
165
i+1, len(files), status_percent,
166
)
167
last_status_time = time.Now()
168
last_status_percent = status_percent
169
}
170
}
171
}
172
}
173
174
log.Printf("Finalizing upload of dataset %s", ds)
175
err = p.Client.EndNewDataset(to, ve_key, len(files))
176
if err != nil {
177
return fmt.Errorf("Error finalizing dataset %s: %v", ds, err)
178
}
179
180
return nil
181
}