code.oscarkilo.com/klex-git/plan/plan.go

..
plan.go
plan_test.go
package plan

import (
  "encoding/json"
  "fmt"
  "io/ioutil"
  "log"
  "path"
  "sort"
  "strings"
  "time"

  "oscarkilo.com/klex-git/api"
  "oscarkilo.com/klex-git/config"
  "oscarkilo.com/klex-git/util"
)

// Plan is a list of steps that brings this repo in sync with Klex.
type Plan struct {
  Config *config.Config `json:"-"`
  Commit *util.CommitInfo `json:"-"`
  Client *api.Client `json:"-"`
  ChangedDatasets []string `json:"changed_datasets"`
  ChangedFunctions []string `json:"changed_functions"`
  ChangedPipelines []string `json:"changed_pipelines"`
}

func NewPlan(cfg *config.Config, ci *util.CommitInfo, cl *api.Client) *Plan {
  p := &Plan{
    Config: cfg,
    Commit: ci,
    Client: cl,
  }
  p.findChangedDatasets()
  p.findChangedFunctions()
  p.findChangedPipelines()
  return p
}

func (p *Plan) DebugString() string {
  b, err := json.MarshalIndent(p, "", "  ")
  if err != nil {
    return fmt.Sprintf("JSON error: %v", err)
  }
  return string(b)
}

// findChangedFilesMatching returns the sorted list of changed files that:
// - have the given path prefix
// - have the given string suffix
// - have been transformed by the given function
// - aren't "" after the transformation
func (p *Plan) findChangedFilesMatching(
  pref, suff string,
  transform func(string) string,
) []string {
  seen := make(map[string]bool)
  for _, fpath := range p.Commit.ChangedPaths {
    if !util.IsDir(fpath) {
      if rest, ok := util.StripPathPrefix(fpath, pref); ok {
        if strings.HasSuffix(rest, suff) {
          seen[transform(rest)] = true
        }
      }
    }
  }
  delete(seen, "")  // in case it snuck in there
  list := make([]string, 0, len(seen))
  for f := range seen {
    list = append(list, f)
  }
  sort.Strings(list)
  return list
}

func (p *Plan) findChangedDatasets() {
  if p.Config.DatasetsDir == "" {
    return
  }
  p.ChangedDatasets = p.findChangedFilesMatching(
    p.Config.DatasetsDir,
    "",
    path.Dir,
  )
}

func (p *Plan) findChangedFunctions() {
  if p.Config.FunctionsDir == "" {
    return
  }
  p.ChangedFunctions = p.findChangedFilesMatching(
    p.Config.FunctionsDir,
    ".js",
    func(f string) string {
      return strings.TrimSuffix(f, ".js")
    },
  )
}

func (p *Plan) findChangedPipelines() {
  if p.Config.PipelinesDir == "" {
    return
  }
  p.ChangedPipelines = p.findChangedFilesMatching(
    p.Config.PipelinesDir,
    ".js",
    func(f string) string {
      return strings.TrimSuffix(f, ".js")
    },
  )
}

func (p *Plan) UploadDataset(ds string) error {
  from := path.Join(p.Config.DatasetsDir, ds)
  to := path.Join(p.Config.ProjectName, ds)
  log.Printf("reading dataset %s", ds)
  data, err := util.ReadDir(from)
  if err != nil {
    return fmt.Errorf("Error reading directory %s: %v", from, err)
  }
  log.Printf("uploading dataset %s", ds)
  return p.Client.NewDataset(to, data)
}

// UploadDataset2 is the new, incremental way of uploading datasets.
func (p *Plan) UploadDataset2(ds string) error {
  from := path.Join(p.Config.DatasetsDir, ds)
  to := path.Join(p.Config.ProjectName, ds)

  log.Printf("Initializing upload of dataset %s", ds)
  ve_key, err := p.Client.BeginNewDataset(to)
  if err != nil {
    return fmt.Errorf("Error initializing dataset %s: %v", ds, err)
  }

  log.Printf("Uploading dataset %s", ds)
  files, err := ioutil.ReadDir(from)
  if err != nil {
    return fmt.Errorf("in ReadDir(): %v", err)
  }

  var batch []api.KV
  batch_size := 0
  last_status_time := time.Now()
  last_status_percent := 0
  for i, f := range files {
    fpath := path.Join(from, f.Name())
    contents, err := ioutil.ReadFile(fpath)
    if err != nil {
      return fmt.Errorf("in ReadFile(): %v", err)
    }
    batch = append(batch, api.KV{f.Name(), string(contents)})
    batch_size += 16 + len(f.Name()) + len(contents)
    if i == len(files)-1 || batch_size > (1<<20) {
      err := p.Client.UploadKV(ve_key, batch)
      if err != nil {
        return fmt.Errorf("in UploadKV(): %v", err)
      }
      batch = nil
      if time.Since(last_status_time) > time.Second {
        status_percent := 100 * (i + 1) / len(files)
        if status_percent > last_status_percent {
          log.Printf(
            "Uploaded %d of %d entries [%d%%]",
            i + 1, len(files), status_percent,
          )
          last_status_time = time.Now()
          last_status_percent = status_percent
        }
      }
    }
  }

  log.Printf("Finalizing upload of dataset %s", ds)
  err = p.Client.EndNewDataset(to, ve_key, len(files))
  if err != nil {
    return fmt.Errorf("Error finalizing dataset %s: %v", ds, err)
  }

  return nil
}