Source file src/cmd/go/internal/cache/prog.go

     1  // Copyright 2023 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package cache
     6  
     7  import (
     8  	"bufio"
     9  	"cmd/go/internal/base"
    10  	"cmd/go/internal/cacheprog"
    11  	"cmd/internal/quoted"
    12  	"context"
    13  	"crypto/sha256"
    14  	"encoding/base64"
    15  	"encoding/json"
    16  	"errors"
    17  	"fmt"
    18  	"io"
    19  	"log"
    20  	"os"
    21  	"os/exec"
    22  	"sync"
    23  	"sync/atomic"
    24  	"time"
    25  )
    26  
    27  // ProgCache implements Cache via JSON messages over stdin/stdout to a child
    28  // helper process which can then implement whatever caching policy/mechanism it
    29  // wants.
    30  //
    31  // See https://github.com/golang/go/issues/59719
    32  type ProgCache struct {
    33  	cmd    *exec.Cmd
    34  	stdout io.ReadCloser  // from the child process
    35  	stdin  io.WriteCloser // to the child process
    36  	bw     *bufio.Writer  // to stdin
    37  	jenc   *json.Encoder  // to bw
    38  
    39  	// can are the commands that the child process declared that it supports.
    40  	// This is effectively the versioning mechanism.
    41  	can map[cacheprog.Cmd]bool
    42  
    43  	// fuzzDirCache is another Cache implementation to use for the FuzzDir
    44  	// method. In practice this is the default GOCACHE disk-based
    45  	// implementation.
    46  	//
    47  	// TODO(bradfitz): maybe this isn't ideal. But we'd need to extend the Cache
    48  	// interface and the fuzzing callers to be less disk-y to do more here.
    49  	fuzzDirCache Cache
    50  
    51  	closing      atomic.Bool
    52  	ctx          context.Context    // valid until Close via ctxClose
    53  	ctxCancel    context.CancelFunc // called on Close
    54  	readLoopDone chan struct{}      // closed when readLoop returns
    55  
    56  	mu         sync.Mutex // guards following fields
    57  	nextID     int64
    58  	inFlight   map[int64]chan<- *cacheprog.Response
    59  	outputFile map[OutputID]string // object => abs path on disk
    60  
    61  	// writeMu serializes writing to the child process.
    62  	// It must never be held at the same time as mu.
    63  	writeMu sync.Mutex
    64  }
    65  
    66  // startCacheProg starts the prog binary (with optional space-separated flags)
    67  // and returns a Cache implementation that talks to it.
    68  //
    69  // It blocks a few seconds to wait for the child process to successfully start
    70  // and advertise its capabilities.
    71  func startCacheProg(progAndArgs string, fuzzDirCache Cache) Cache {
    72  	if fuzzDirCache == nil {
    73  		panic("missing fuzzDirCache")
    74  	}
    75  	args, err := quoted.Split(progAndArgs)
    76  	if err != nil {
    77  		base.Fatalf("GOCACHEPROG args: %v", err)
    78  	}
    79  	var prog string
    80  	if len(args) > 0 {
    81  		prog = args[0]
    82  		args = args[1:]
    83  	}
    84  
    85  	ctx, ctxCancel := context.WithCancel(context.Background())
    86  
    87  	cmd := exec.CommandContext(ctx, prog, args...)
    88  	out, err := cmd.StdoutPipe()
    89  	if err != nil {
    90  		base.Fatalf("StdoutPipe to GOCACHEPROG: %v", err)
    91  	}
    92  	in, err := cmd.StdinPipe()
    93  	if err != nil {
    94  		base.Fatalf("StdinPipe to GOCACHEPROG: %v", err)
    95  	}
    96  	cmd.Stderr = os.Stderr
    97  	// On close, we cancel the context. Rather than killing the helper,
    98  	// close its stdin.
    99  	cmd.Cancel = in.Close
   100  
   101  	if err := cmd.Start(); err != nil {
   102  		base.Fatalf("error starting GOCACHEPROG program %q: %v", prog, err)
   103  	}
   104  
   105  	pc := &ProgCache{
   106  		ctx:          ctx,
   107  		ctxCancel:    ctxCancel,
   108  		fuzzDirCache: fuzzDirCache,
   109  		cmd:          cmd,
   110  		stdout:       out,
   111  		stdin:        in,
   112  		bw:           bufio.NewWriter(in),
   113  		inFlight:     make(map[int64]chan<- *cacheprog.Response),
   114  		outputFile:   make(map[OutputID]string),
   115  		readLoopDone: make(chan struct{}),
   116  	}
   117  
   118  	// Register our interest in the initial protocol message from the child to
   119  	// us, saying what it can do.
   120  	capResc := make(chan *cacheprog.Response, 1)
   121  	pc.inFlight[0] = capResc
   122  
   123  	pc.jenc = json.NewEncoder(pc.bw)
   124  	go pc.readLoop(pc.readLoopDone)
   125  
   126  	// Give the child process a few seconds to report its capabilities. This
   127  	// should be instant and not require any slow work by the program.
   128  	timer := time.NewTicker(5 * time.Second)
   129  	defer timer.Stop()
   130  	for {
   131  		select {
   132  		case <-timer.C:
   133  			log.Printf("# still waiting for GOCACHEPROG %v ...", prog)
   134  		case capRes := <-capResc:
   135  			can := map[cacheprog.Cmd]bool{}
   136  			for _, cmd := range capRes.KnownCommands {
   137  				can[cmd] = true
   138  			}
   139  			if len(can) == 0 {
   140  				base.Fatalf("GOCACHEPROG %v declared no supported commands", prog)
   141  			}
   142  			pc.can = can
   143  			return pc
   144  		}
   145  	}
   146  }
   147  
   148  func (c *ProgCache) readLoop(readLoopDone chan<- struct{}) {
   149  	defer close(readLoopDone)
   150  	jd := json.NewDecoder(c.stdout)
   151  	for {
   152  		res := new(cacheprog.Response)
   153  		if err := jd.Decode(res); err != nil {
   154  			if c.closing.Load() {
   155  				c.mu.Lock()
   156  				for _, ch := range c.inFlight {
   157  					close(ch)
   158  				}
   159  				c.inFlight = nil
   160  				c.mu.Unlock()
   161  				return // quietly
   162  			}
   163  			if err == io.EOF {
   164  				c.mu.Lock()
   165  				inFlight := len(c.inFlight)
   166  				c.mu.Unlock()
   167  				base.Fatalf("GOCACHEPROG exited pre-Close with %v pending requests", inFlight)
   168  			}
   169  			base.Fatalf("error reading JSON from GOCACHEPROG: %v", err)
   170  		}
   171  		c.mu.Lock()
   172  		ch, ok := c.inFlight[res.ID]
   173  		delete(c.inFlight, res.ID)
   174  		c.mu.Unlock()
   175  		if ok {
   176  			ch <- res
   177  		} else {
   178  			base.Fatalf("GOCACHEPROG sent response for unknown request ID %v", res.ID)
   179  		}
   180  	}
   181  }
   182  
   183  var errCacheprogClosed = errors.New("GOCACHEPROG program closed unexpectedly")
   184  
   185  func (c *ProgCache) send(ctx context.Context, req *cacheprog.Request) (*cacheprog.Response, error) {
   186  	resc := make(chan *cacheprog.Response, 1)
   187  	if err := c.writeToChild(req, resc); err != nil {
   188  		return nil, err
   189  	}
   190  	select {
   191  	case res := <-resc:
   192  		if res == nil {
   193  			return nil, errCacheprogClosed
   194  		}
   195  		if res.Err != "" {
   196  			return nil, errors.New(res.Err)
   197  		}
   198  		return res, nil
   199  	case <-ctx.Done():
   200  		return nil, ctx.Err()
   201  	}
   202  }
   203  
   204  func (c *ProgCache) writeToChild(req *cacheprog.Request, resc chan<- *cacheprog.Response) (err error) {
   205  	c.mu.Lock()
   206  	if c.inFlight == nil {
   207  		return errCacheprogClosed
   208  	}
   209  	c.nextID++
   210  	req.ID = c.nextID
   211  	c.inFlight[req.ID] = resc
   212  	c.mu.Unlock()
   213  
   214  	defer func() {
   215  		if err != nil {
   216  			c.mu.Lock()
   217  			if c.inFlight != nil {
   218  				delete(c.inFlight, req.ID)
   219  			}
   220  			c.mu.Unlock()
   221  		}
   222  	}()
   223  
   224  	c.writeMu.Lock()
   225  	defer c.writeMu.Unlock()
   226  
   227  	if err := c.jenc.Encode(req); err != nil {
   228  		return err
   229  	}
   230  	if err := c.bw.WriteByte('\n'); err != nil {
   231  		return err
   232  	}
   233  	if req.Body != nil && req.BodySize > 0 {
   234  		if err := c.bw.WriteByte('"'); err != nil {
   235  			return err
   236  		}
   237  		e := base64.NewEncoder(base64.StdEncoding, c.bw)
   238  		wrote, err := io.Copy(e, req.Body)
   239  		if err != nil {
   240  			return err
   241  		}
   242  		if err := e.Close(); err != nil {
   243  			return nil
   244  		}
   245  		if wrote != req.BodySize {
   246  			return fmt.Errorf("short write writing body to GOCACHEPROG for action %x, output %x: wrote %v; expected %v",
   247  				req.ActionID, req.OutputID, wrote, req.BodySize)
   248  		}
   249  		if _, err := c.bw.WriteString("\"\n"); err != nil {
   250  			return err
   251  		}
   252  	}
   253  	if err := c.bw.Flush(); err != nil {
   254  		return err
   255  	}
   256  	return nil
   257  }
   258  
   259  func (c *ProgCache) Get(a ActionID) (Entry, error) {
   260  	if !c.can[cacheprog.CmdGet] {
   261  		// They can't do a "get". Maybe they're a write-only cache.
   262  		//
   263  		// TODO(bradfitz,bcmills): figure out the proper error type here. Maybe
   264  		// errors.ErrUnsupported? Is entryNotFoundError even appropriate? There
   265  		// might be places where we rely on the fact that a recent Put can be
   266  		// read through a corresponding Get. Audit callers and check, and document
   267  		// error types on the Cache interface.
   268  		return Entry{}, &entryNotFoundError{}
   269  	}
   270  	res, err := c.send(c.ctx, &cacheprog.Request{
   271  		Command:  cacheprog.CmdGet,
   272  		ActionID: a[:],
   273  	})
   274  	if err != nil {
   275  		return Entry{}, err // TODO(bradfitz): or entryNotFoundError? Audit callers.
   276  	}
   277  	if res.Miss {
   278  		return Entry{}, &entryNotFoundError{}
   279  	}
   280  	e := Entry{
   281  		Size: res.Size,
   282  	}
   283  	if res.Time != nil {
   284  		e.Time = *res.Time
   285  	} else {
   286  		e.Time = time.Now()
   287  	}
   288  	if res.DiskPath == "" {
   289  		return Entry{}, &entryNotFoundError{errors.New("GOCACHEPROG didn't populate DiskPath on get hit")}
   290  	}
   291  	if copy(e.OutputID[:], res.OutputID) != len(res.OutputID) {
   292  		return Entry{}, &entryNotFoundError{errors.New("incomplete ProgResponse OutputID")}
   293  	}
   294  	c.noteOutputFile(e.OutputID, res.DiskPath)
   295  	return e, nil
   296  }
   297  
   298  func (c *ProgCache) noteOutputFile(o OutputID, diskPath string) {
   299  	c.mu.Lock()
   300  	defer c.mu.Unlock()
   301  	c.outputFile[o] = diskPath
   302  }
   303  
   304  func (c *ProgCache) OutputFile(o OutputID) string {
   305  	c.mu.Lock()
   306  	defer c.mu.Unlock()
   307  	return c.outputFile[o]
   308  }
   309  
   310  func (c *ProgCache) Put(a ActionID, file io.ReadSeeker) (_ OutputID, size int64, _ error) {
   311  	// Compute output ID.
   312  	h := sha256.New()
   313  	if _, err := file.Seek(0, 0); err != nil {
   314  		return OutputID{}, 0, err
   315  	}
   316  	size, err := io.Copy(h, file)
   317  	if err != nil {
   318  		return OutputID{}, 0, err
   319  	}
   320  	var out OutputID
   321  	h.Sum(out[:0])
   322  
   323  	if _, err := file.Seek(0, 0); err != nil {
   324  		return OutputID{}, 0, err
   325  	}
   326  
   327  	if !c.can[cacheprog.CmdPut] {
   328  		// Child is a read-only cache. Do nothing.
   329  		return out, size, nil
   330  	}
   331  
   332  	res, err := c.send(c.ctx, &cacheprog.Request{
   333  		Command:  cacheprog.CmdPut,
   334  		ActionID: a[:],
   335  		OutputID: out[:],
   336  		Body:     file,
   337  		BodySize: size,
   338  	})
   339  	if err != nil {
   340  		return OutputID{}, 0, err
   341  	}
   342  	if res.DiskPath == "" {
   343  		return OutputID{}, 0, errors.New("GOCACHEPROG didn't return DiskPath in put response")
   344  	}
   345  	c.noteOutputFile(out, res.DiskPath)
   346  	return out, size, err
   347  }
   348  
   349  func (c *ProgCache) Close() error {
   350  	c.closing.Store(true)
   351  	var err error
   352  
   353  	// First write a "close" message to the child so it can exit nicely
   354  	// and clean up if it wants. Only after that exchange do we cancel
   355  	// the context that kills the process.
   356  	if c.can[cacheprog.CmdClose] {
   357  		_, err = c.send(c.ctx, &cacheprog.Request{Command: cacheprog.CmdClose})
   358  		if errors.Is(err, errCacheprogClosed) {
   359  			// Allow the child to quit without responding to close.
   360  			err = nil
   361  		}
   362  	}
   363  	// Cancel the context, which will close the helper's stdin.
   364  	c.ctxCancel()
   365  	// Wait until the helper closes its stdout.
   366  	<-c.readLoopDone
   367  	return err
   368  }
   369  
   370  func (c *ProgCache) FuzzDir() string {
   371  	// TODO(bradfitz): figure out what to do here. For now just use the
   372  	// disk-based default.
   373  	return c.fuzzDirCache.FuzzDir()
   374  }
   375  

View as plain text