1
2
3
4
5 package cache
6
7 import (
8 "bufio"
9 "cmd/go/internal/base"
10 "cmd/go/internal/cacheprog"
11 "cmd/internal/quoted"
12 "context"
13 "crypto/sha256"
14 "encoding/base64"
15 "encoding/json"
16 "errors"
17 "fmt"
18 "io"
19 "log"
20 "os"
21 "os/exec"
22 "sync"
23 "sync/atomic"
24 "time"
25 )
26
27
28
29
30
31
32 type ProgCache struct {
33 cmd *exec.Cmd
34 stdout io.ReadCloser
35 stdin io.WriteCloser
36 bw *bufio.Writer
37 jenc *json.Encoder
38
39
40
41 can map[cacheprog.Cmd]bool
42
43
44
45
46
47
48
49 fuzzDirCache Cache
50
51 closing atomic.Bool
52 ctx context.Context
53 ctxCancel context.CancelFunc
54 readLoopDone chan struct{}
55
56 mu sync.Mutex
57 nextID int64
58 inFlight map[int64]chan<- *cacheprog.Response
59 outputFile map[OutputID]string
60
61
62
63 writeMu sync.Mutex
64 }
65
66
67
68
69
70
71 func startCacheProg(progAndArgs string, fuzzDirCache Cache) Cache {
72 if fuzzDirCache == nil {
73 panic("missing fuzzDirCache")
74 }
75 args, err := quoted.Split(progAndArgs)
76 if err != nil {
77 base.Fatalf("GOCACHEPROG args: %v", err)
78 }
79 var prog string
80 if len(args) > 0 {
81 prog = args[0]
82 args = args[1:]
83 }
84
85 ctx, ctxCancel := context.WithCancel(context.Background())
86
87 cmd := exec.CommandContext(ctx, prog, args...)
88 out, err := cmd.StdoutPipe()
89 if err != nil {
90 base.Fatalf("StdoutPipe to GOCACHEPROG: %v", err)
91 }
92 in, err := cmd.StdinPipe()
93 if err != nil {
94 base.Fatalf("StdinPipe to GOCACHEPROG: %v", err)
95 }
96 cmd.Stderr = os.Stderr
97
98
99 cmd.Cancel = in.Close
100
101 if err := cmd.Start(); err != nil {
102 base.Fatalf("error starting GOCACHEPROG program %q: %v", prog, err)
103 }
104
105 pc := &ProgCache{
106 ctx: ctx,
107 ctxCancel: ctxCancel,
108 fuzzDirCache: fuzzDirCache,
109 cmd: cmd,
110 stdout: out,
111 stdin: in,
112 bw: bufio.NewWriter(in),
113 inFlight: make(map[int64]chan<- *cacheprog.Response),
114 outputFile: make(map[OutputID]string),
115 readLoopDone: make(chan struct{}),
116 }
117
118
119
120 capResc := make(chan *cacheprog.Response, 1)
121 pc.inFlight[0] = capResc
122
123 pc.jenc = json.NewEncoder(pc.bw)
124 go pc.readLoop(pc.readLoopDone)
125
126
127
128 timer := time.NewTicker(5 * time.Second)
129 defer timer.Stop()
130 for {
131 select {
132 case <-timer.C:
133 log.Printf("# still waiting for GOCACHEPROG %v ...", prog)
134 case capRes := <-capResc:
135 can := map[cacheprog.Cmd]bool{}
136 for _, cmd := range capRes.KnownCommands {
137 can[cmd] = true
138 }
139 if len(can) == 0 {
140 base.Fatalf("GOCACHEPROG %v declared no supported commands", prog)
141 }
142 pc.can = can
143 return pc
144 }
145 }
146 }
147
148 func (c *ProgCache) readLoop(readLoopDone chan<- struct{}) {
149 defer close(readLoopDone)
150 jd := json.NewDecoder(c.stdout)
151 for {
152 res := new(cacheprog.Response)
153 if err := jd.Decode(res); err != nil {
154 if c.closing.Load() {
155 c.mu.Lock()
156 for _, ch := range c.inFlight {
157 close(ch)
158 }
159 c.inFlight = nil
160 c.mu.Unlock()
161 return
162 }
163 if err == io.EOF {
164 c.mu.Lock()
165 inFlight := len(c.inFlight)
166 c.mu.Unlock()
167 base.Fatalf("GOCACHEPROG exited pre-Close with %v pending requests", inFlight)
168 }
169 base.Fatalf("error reading JSON from GOCACHEPROG: %v", err)
170 }
171 c.mu.Lock()
172 ch, ok := c.inFlight[res.ID]
173 delete(c.inFlight, res.ID)
174 c.mu.Unlock()
175 if ok {
176 ch <- res
177 } else {
178 base.Fatalf("GOCACHEPROG sent response for unknown request ID %v", res.ID)
179 }
180 }
181 }
182
183 var errCacheprogClosed = errors.New("GOCACHEPROG program closed unexpectedly")
184
185 func (c *ProgCache) send(ctx context.Context, req *cacheprog.Request) (*cacheprog.Response, error) {
186 resc := make(chan *cacheprog.Response, 1)
187 if err := c.writeToChild(req, resc); err != nil {
188 return nil, err
189 }
190 select {
191 case res := <-resc:
192 if res == nil {
193 return nil, errCacheprogClosed
194 }
195 if res.Err != "" {
196 return nil, errors.New(res.Err)
197 }
198 return res, nil
199 case <-ctx.Done():
200 return nil, ctx.Err()
201 }
202 }
203
204 func (c *ProgCache) writeToChild(req *cacheprog.Request, resc chan<- *cacheprog.Response) (err error) {
205 c.mu.Lock()
206 if c.inFlight == nil {
207 return errCacheprogClosed
208 }
209 c.nextID++
210 req.ID = c.nextID
211 c.inFlight[req.ID] = resc
212 c.mu.Unlock()
213
214 defer func() {
215 if err != nil {
216 c.mu.Lock()
217 if c.inFlight != nil {
218 delete(c.inFlight, req.ID)
219 }
220 c.mu.Unlock()
221 }
222 }()
223
224 c.writeMu.Lock()
225 defer c.writeMu.Unlock()
226
227 if err := c.jenc.Encode(req); err != nil {
228 return err
229 }
230 if err := c.bw.WriteByte('\n'); err != nil {
231 return err
232 }
233 if req.Body != nil && req.BodySize > 0 {
234 if err := c.bw.WriteByte('"'); err != nil {
235 return err
236 }
237 e := base64.NewEncoder(base64.StdEncoding, c.bw)
238 wrote, err := io.Copy(e, req.Body)
239 if err != nil {
240 return err
241 }
242 if err := e.Close(); err != nil {
243 return nil
244 }
245 if wrote != req.BodySize {
246 return fmt.Errorf("short write writing body to GOCACHEPROG for action %x, output %x: wrote %v; expected %v",
247 req.ActionID, req.OutputID, wrote, req.BodySize)
248 }
249 if _, err := c.bw.WriteString("\"\n"); err != nil {
250 return err
251 }
252 }
253 if err := c.bw.Flush(); err != nil {
254 return err
255 }
256 return nil
257 }
258
259 func (c *ProgCache) Get(a ActionID) (Entry, error) {
260 if !c.can[cacheprog.CmdGet] {
261
262
263
264
265
266
267
268 return Entry{}, &entryNotFoundError{}
269 }
270 res, err := c.send(c.ctx, &cacheprog.Request{
271 Command: cacheprog.CmdGet,
272 ActionID: a[:],
273 })
274 if err != nil {
275 return Entry{}, err
276 }
277 if res.Miss {
278 return Entry{}, &entryNotFoundError{}
279 }
280 e := Entry{
281 Size: res.Size,
282 }
283 if res.Time != nil {
284 e.Time = *res.Time
285 } else {
286 e.Time = time.Now()
287 }
288 if res.DiskPath == "" {
289 return Entry{}, &entryNotFoundError{errors.New("GOCACHEPROG didn't populate DiskPath on get hit")}
290 }
291 if copy(e.OutputID[:], res.OutputID) != len(res.OutputID) {
292 return Entry{}, &entryNotFoundError{errors.New("incomplete ProgResponse OutputID")}
293 }
294 c.noteOutputFile(e.OutputID, res.DiskPath)
295 return e, nil
296 }
297
298 func (c *ProgCache) noteOutputFile(o OutputID, diskPath string) {
299 c.mu.Lock()
300 defer c.mu.Unlock()
301 c.outputFile[o] = diskPath
302 }
303
304 func (c *ProgCache) OutputFile(o OutputID) string {
305 c.mu.Lock()
306 defer c.mu.Unlock()
307 return c.outputFile[o]
308 }
309
310 func (c *ProgCache) Put(a ActionID, file io.ReadSeeker) (_ OutputID, size int64, _ error) {
311
312 h := sha256.New()
313 if _, err := file.Seek(0, 0); err != nil {
314 return OutputID{}, 0, err
315 }
316 size, err := io.Copy(h, file)
317 if err != nil {
318 return OutputID{}, 0, err
319 }
320 var out OutputID
321 h.Sum(out[:0])
322
323 if _, err := file.Seek(0, 0); err != nil {
324 return OutputID{}, 0, err
325 }
326
327 if !c.can[cacheprog.CmdPut] {
328
329 return out, size, nil
330 }
331
332 res, err := c.send(c.ctx, &cacheprog.Request{
333 Command: cacheprog.CmdPut,
334 ActionID: a[:],
335 OutputID: out[:],
336 Body: file,
337 BodySize: size,
338 })
339 if err != nil {
340 return OutputID{}, 0, err
341 }
342 if res.DiskPath == "" {
343 return OutputID{}, 0, errors.New("GOCACHEPROG didn't return DiskPath in put response")
344 }
345 c.noteOutputFile(out, res.DiskPath)
346 return out, size, err
347 }
348
349 func (c *ProgCache) Close() error {
350 c.closing.Store(true)
351 var err error
352
353
354
355
356 if c.can[cacheprog.CmdClose] {
357 _, err = c.send(c.ctx, &cacheprog.Request{Command: cacheprog.CmdClose})
358 if errors.Is(err, errCacheprogClosed) {
359
360 err = nil
361 }
362 }
363
364 c.ctxCancel()
365
366 <-c.readLoopDone
367 return err
368 }
369
370 func (c *ProgCache) FuzzDir() string {
371
372
373 return c.fuzzDirCache.FuzzDir()
374 }
375
View as plain text