package plan import ( "encoding/json" "fmt" "io/ioutil" "log" "path" "sort" "strings" "time" "oscarkilo.com/klex-git/api" "oscarkilo.com/klex-git/config" "oscarkilo.com/klex-git/util" ) // Plan is a list of steps that brings this repo in sync with Klex. type Plan struct { Config *config.Config `json:"-"` Commit *util.CommitInfo `json:"-"` Client *api.Client `json:"-"` ChangedDatasets []string `json:"changed_datasets"` ChangedFunctions []string `json:"changed_functions"` ChangedPipelines []string `json:"changed_pipelines"` } func NewPlan(cfg *config.Config, ci *util.CommitInfo, cl *api.Client) *Plan { p := &Plan{ Config: cfg, Commit: ci, Client: cl, } p.findChangedDatasets() p.findChangedFunctions() p.findChangedPipelines() return p } func (p *Plan) DebugString() string { b, err := json.MarshalIndent(p, "", " ") if err != nil { return fmt.Sprintf("JSON error: %v", err) } return string(b) } // findChangedFilesMatching returns the sorted list of changed files that: // - have the given path prefix // - have the given string suffix // - have been transformed by the given function // - aren't "" after the transformation func (p *Plan) findChangedFilesMatching( pref, suff string, transform func(string) string, ) []string { seen := make(map[string]bool) for _, fpath := range p.Commit.ChangedPaths { if !util.IsDir(fpath) { if rest, ok := util.StripPathPrefix(fpath, pref); ok { if strings.HasSuffix(rest, suff) { seen[transform(rest)] = true } } } } delete(seen, "") // in case it snuck in there list := make([]string, 0, len(seen)) for f := range seen { list = append(list, f) } sort.Strings(list) return list } func (p *Plan) findChangedDatasets() { if p.Config.DatasetsDir == "" { return } p.ChangedDatasets = p.findChangedFilesMatching( p.Config.DatasetsDir, "", path.Dir, ) } func (p *Plan) findChangedFunctions() { if p.Config.FunctionsDir == "" { return } p.ChangedFunctions = p.findChangedFilesMatching( p.Config.FunctionsDir, ".js", func(f string) string { return strings.TrimSuffix(f, ".js") }, ) } func (p *Plan) findChangedPipelines() { if p.Config.PipelinesDir == "" { return } p.ChangedPipelines = p.findChangedFilesMatching( p.Config.PipelinesDir, ".js", func(f string) string { return strings.TrimSuffix(f, ".js") }, ) } func (p *Plan) UploadDataset(ds string) error { from := path.Join(p.Config.DatasetsDir, ds) to := path.Join(p.Config.ProjectName, ds) log.Printf("reading dataset %s", ds) data, err := util.ReadDir(from) if err != nil { return fmt.Errorf("Error reading directory %s: %v", from, err) } log.Printf("uploading dataset %s", ds) return p.Client.NewDataset(to, data) } // UploadDataset2 is the new, incremental way of uploading datasets. func (p *Plan) UploadDataset2(ds string) error { from := path.Join(p.Config.DatasetsDir, ds) to := path.Join(p.Config.ProjectName, ds) log.Printf("Initializing upload of dataset %s", ds) ve_key, err := p.Client.BeginNewDataset(to) if err != nil { return fmt.Errorf("Error initializing dataset %s: %v", ds, err) } log.Printf("Uploading dataset %s", ds) files, err := ioutil.ReadDir(from) if err != nil { return fmt.Errorf("in ReadDir(): %v", err) } var batch []api.KV batch_size := 0 last_status_time := time.Now() last_status_percent := 0 for i, f := range files { fpath := path.Join(from, f.Name()) contents, err := ioutil.ReadFile(fpath) if err != nil { return fmt.Errorf("in ReadFile(): %v", err) } batch = append(batch, api.KV{f.Name(), string(contents)}) batch_size += 16 + len(f.Name()) + len(contents) if i == len(files)-1 || batch_size > (1<<20) { err := p.Client.UploadKV(ve_key, batch) if err != nil { return fmt.Errorf("in UploadKV(): %v", err) } batch = nil if time.Since(last_status_time) > time.Second { status_percent := 100 * (i + 1) / len(files) if status_percent > last_status_percent { log.Printf( "Uploaded %d of %d entries [%d%%]", i + 1, len(files), status_percent, ) last_status_time = time.Now() last_status_percent = status_percent } } } } log.Printf("Finalizing upload of dataset %s", ds) err = p.Client.EndNewDataset(to, ve_key, len(files)) if err != nil { return fmt.Errorf("Error finalizing dataset %s: %v", ds, err) } return nil }