1
package plan
1
package plan
2
2
3
import (
3
import (
4
"encoding/json"
4
"encoding/json"
5
"fmt"
5
"fmt"
6
"io/ioutil"
6
"io/ioutil"
7
"log"
7
"log"
8
"path"
8
"path"
9
"sort"
9
"sort"
10
"strings"
10
"strings"
11
"time"
11
"time"
12
12
13
"oscarkilo.com/klex-git/api"
13
"oscarkilo.com/klex-git/api"
14
"oscarkilo.com/klex-git/config"
14
"oscarkilo.com/klex-git/config"
15
"oscarkilo.com/klex-git/util"
15
"oscarkilo.com/klex-git/util"
16
)
16
)
17
17
18
// Plan is a list of steps that brings this repo in sync with Klex.
18
// Plan is a list of steps that brings this repo in sync with Klex.
19
type Plan struct {
19
type Plan struct {
20
Config *config.Config `json:"-"`
20
Config *config.Config `json:"-"`
21
Commit *util.CommitInfo `json:"-"`
21
Commit *util.CommitInfo `json:"-"`
22
Client *api.Client `json:"-"`
22
Client *api.Client `json:"-"`
23
ChangedDatasets []string `json:"changed_datasets"`
23
ChangedDatasets []string `json:"changed_datasets"`
24
ChangedFunctions []string `json:"changed_functions"`
24
ChangedFunctions []string `json:"changed_functions"`
25
ChangedPipelines []string `json:"changed_pipelines"`
25
ChangedPipelines []string `json:"changed_pipelines"`
26
}
26
}
27
27
28
func NewPlan(cfg *config.Config, ci *util.CommitInfo, cl *api.Client) *Plan {
28
func NewPlan(cfg *config.Config, ci *util.CommitInfo, cl *api.Client) *Plan {
29
p := &Plan{
29
p := &Plan{
30
Config: cfg,
30
Config: cfg,
31
Commit: ci,
31
Commit: ci,
32
Client: cl,
32
Client: cl,
33
}
33
}
34
p.findChangedDatasets()
34
p.findChangedDatasets()
35
p.findChangedFunctions()
35
p.findChangedFunctions()
36
p.findChangedPipelines()
36
p.findChangedPipelines()
37
return p
37
return p
38
}
38
}
39
39
40
func (p *Plan) DebugString() string {
40
func (p *Plan) DebugString() string {
41
b, err := json.MarshalIndent(p, "", " ")
41
b, err := json.MarshalIndent(p, "", " ")
42
if err != nil {
42
if err != nil {
43
return fmt.Sprintf("JSON error: %v", err)
43
return fmt.Sprintf("JSON error: %v", err)
44
}
44
}
45
return string(b)
45
return string(b)
46
}
46
}
47
47
48
// findChangedFilesMatching returns the sorted list of changed files that:
48
// findChangedFilesMatching returns the sorted list of changed files that:
49
// - have the given path prefix
49
// - have the given path prefix
50
// - have the given string suffix
50
// - have the given string suffix
51
// - have been transformed by the given function
51
// - have been transformed by the given function
52
// - aren't "" after the transformation
52
// - aren't "" after the transformation
53
func (p *Plan) findChangedFilesMatching(
53
func (p *Plan) findChangedFilesMatching(
54
pref, suff string,
54
pref, suff string,
55
transform func(string) string,
55
transform func(string) string,
56
) []string {
56
) []string {
57
seen := make(map[string]bool)
57
seen := make(map[string]bool)
58
for _, fpath := range p.Commit.ChangedPaths {
58
for _, fpath := range p.Commit.ChangedPaths {
59
if !util.IsDir(fpath) {
59
if !util.IsDir(fpath) {
60
if rest, ok := util.StripPathPrefix(fpath, pref); ok {
60
if rest, ok := util.StripPathPrefix(fpath, pref); ok {
61
if strings.HasSuffix(rest, suff) {
61
if strings.HasSuffix(rest, suff) {
62
seen[transform(rest)] = true
62
seen[transform(rest)] = true
63
}
63
}
64
}
64
}
65
}
65
}
66
}
66
}
67
delete(seen, "") // in case it snuck in there
67
delete(seen, "") // in case it snuck in there
68
list := make([]string, 0, len(seen))
68
list := make([]string, 0, len(seen))
69
for f := range seen {
69
for f := range seen {
70
list = append(list, f)
70
list = append(list, f)
71
}
71
}
72
sort.Strings(list)
72
sort.Strings(list)
73
return list
73
return list
74
}
74
}
75
75
76
func (p *Plan) findChangedDatasets() {
76
func (p *Plan) findChangedDatasets() {
77
if p.Config.DatasetsDir == "" {
77
if p.Config.DatasetsDir == "" {
78
return
78
return
79
}
79
}
80
p.ChangedDatasets = p.findChangedFilesMatching(
80
p.ChangedDatasets = p.findChangedFilesMatching(
81
p.Config.DatasetsDir,
81
p.Config.DatasetsDir,
82
"",
82
"",
83
path.Dir,
83
path.Dir,
84
)
84
)
85
}
85
}
86
86
87
func (p *Plan) findChangedFunctions() {
87
func (p *Plan) findChangedFunctions() {
88
if p.Config.FunctionsDir == "" {
88
if p.Config.FunctionsDir == "" {
89
return
89
return
90
}
90
}
91
p.ChangedFunctions = p.findChangedFilesMatching(
91
p.ChangedFunctions = p.findChangedFilesMatching(
92
p.Config.FunctionsDir,
92
p.Config.FunctionsDir,
93
".js",
93
".js",
94
func(f string) string {
94
func(f string) string {
95
return strings.TrimSuffix(f, ".js")
95
return strings.TrimSuffix(f, ".js")
96
},
96
},
97
)
97
)
98
}
98
}
99
99
100
func (p *Plan) findChangedPipelines() {
100
func (p *Plan) findChangedPipelines() {
101
if p.Config.PipelinesDir == "" {
101
if p.Config.PipelinesDir == "" {
102
return
102
return
103
}
103
}
104
p.ChangedPipelines = p.findChangedFilesMatching(
104
p.ChangedPipelines = p.findChangedFilesMatching(
105
p.Config.PipelinesDir,
105
p.Config.PipelinesDir,
106
".js",
106
".js",
107
func(f string) string {
107
func(f string) string {
108
return strings.TrimSuffix(f, ".js")
108
return strings.TrimSuffix(f, ".js")
109
},
109
},
110
)
110
)
111
}
111
}
112
112
113
func (p *Plan) UploadDataset(ds string) error {
113
func (p *Plan) UploadDataset(ds string) error {
114
from := path.Join(p.Config.DatasetsDir, ds)
114
from := path.Join(p.Config.DatasetsDir, ds)
115
to := path.Join(p.Config.ProjectName, ds)
115
to := path.Join(p.Config.ProjectName, ds)
116
log.Printf("reading dataset %s", ds)
116
log.Printf("reading dataset %s", ds)
117
data, err := util.ReadDir(from)
117
data, err := util.ReadDir(from)
118
if err != nil {
118
if err != nil {
119
return fmt.Errorf("Error reading directory %s: %v", from, err)
119
return fmt.Errorf("Error reading directory %s: %v", from, err)
120
}
120
}
121
log.Printf("uploading dataset %s", ds)
121
log.Printf("uploading dataset %s", ds)
122
return p.Client.NewDataset(to, data)
122
return p.Client.NewDataset(to, data)
123
}
123
}
124
124
125
// UploadDataset2 is the new, incremental way of uploading datasets.
125
// UploadDataset2 is the new, incremental way of uploading datasets.
126
func (p *Plan) UploadDataset2(ds string) error {
126
func (p *Plan) UploadDataset2(ds string) error {
127
from := path.Join(p.Config.DatasetsDir, ds)
127
from := path.Join(p.Config.DatasetsDir, ds)
128
to := path.Join(p.Config.ProjectName, ds)
128
to := path.Join(p.Config.ProjectName, ds)
129
129
130
log.Printf("Initializing upload of dataset %s", ds)
130
log.Printf("Initializing upload of dataset %s", ds)
131
ve_key, err := p.Client.BeginNewDataset(to)
131
ve_key, err := p.Client.BeginNewDataset(to)
132
if err != nil {
132
if err != nil {
133
return fmt.Errorf("Error initializing dataset %s: %v", ds, err)
133
return fmt.Errorf("Error initializing dataset %s: %v", ds, err)
134
}
134
}
135
135
136
log.Printf("ploading dataset %s", ds)
136
log.Printf("ploading dataset %s", ds)
137
files, err := ioutil.ReadDir(from)
137
files, err := ioutil.ReadDir(from)
138
if err != nil {
138
if err != nil {
139
return fmt.Errorf("in ReadDir(): %v", err)
139
return fmt.Errorf("in ReadDir(): %v", err)
140
}
140
}
141
141
142
var batch []api.KV
142
var batch []api.KV
143
batch_size := 0
143
batch_size := 0
144
last_status_time := time.Now()
144
last_status_time := time.Now()
145
last_status_percent := 0
145
last_status_percent := 0
146
for i, f := range files {
146
for i, f := range files {
147
fpath := path.Join(from, f.Name())
147
fpath := path.Join(from, f.Name())
148
contents, err := ioutil.ReadFile(fpath)
148
contents, err := ioutil.ReadFile(fpath)
149
if err != nil {
149
if err != nil {
150
return fmt.Errorf("in ReadFile(): %v", err)
150
return fmt.Errorf("in ReadFile(): %v", err)
151
}
151
}
152
batch = append(batch, api.KV{f.Name(), string(contents)})
152
batch = append(batch, api.KV{f.Name(), string(contents)})
153
batch_size += 16 + len(f.Name()) + len(contents)
153
batch_size += 16 + len(f.Name()) + len(contents)
154
if i == len(files)-1 || batch_size > (1<<20) {
154
if i == len(files)-1 || batch_size > (1<<20) {
155
err := p.Client.UploadKV(ve_key, batch)
155
err := p.Client.UploadKV(ve_key, batch)
156
if err != nil {
156
if err != nil {
157
return fmt.Errorf("in UploadKV(): %v", err)
157
return fmt.Errorf("in UploadKV(): %v", err)
158
}
158
}
159
batch = nil
159
batch = nil
160
status_percent := 100 * (i + 1) / len(files)
161
if time.Since(last_status_time) > time.Second {
160
if time.Since(last_status_time) > time.Second {
161
status_percent := 100 * (i + 1) / len(files)
162
if status_percent > last_status_percent {
162
if status_percent > last_status_percent {
163
log.Printf(
163
log.Printf(
164
"Uploaded %d of %d entries [%d%%]",
164
"Uploaded %d of %d entries [%d%%]",
165
i+1, len(files), status_percent,
165
i+1, len(files), status_percent,
166
)
166
)
167
last_status_time = time.Now()
167
last_status_time = time.Now()
168
last_status_percent = status_percent
168
last_status_percent = status_percent
169
}
169
}
170
}
170
}
171
}
171
}
172
}
172
}
173
173
174
log.Printf("Finalizing upload of dataset %s", ds)
174
log.Printf("Finalizing upload of dataset %s", ds)
175
err = p.Client.EndNewDataset(to, ve_key, len(files))
175
err = p.Client.EndNewDataset(to, ve_key, len(files))
176
if err != nil {
176
if err != nil {
177
return fmt.Errorf("Error finalizing dataset %s: %v", ds, err)
177
return fmt.Errorf("Error finalizing dataset %s: %v", ds, err)
178
}
178
}
179
179
180
return nil
180
return nil
181
}
181
}