1
2
3
4
5 package trace
6
7 import (
8 "bufio"
9 "bytes"
10 "cmp"
11 "encoding/binary"
12 "fmt"
13 "io"
14 "slices"
15 "strings"
16 "time"
17
18 "internal/trace/tracev2"
19 "internal/trace/version"
20 )
21
22
23
24
25
26 type generation struct {
27 gen uint64
28 batches map[ThreadID][]batch
29 batchMs []ThreadID
30 cpuSamples []cpuSample
31 minTs timestamp
32 *evTable
33 }
34
35
36
37
38 type spilledBatch struct {
39 gen uint64
40 *batch
41 }
42
43
44
45
46
47
48
49
50 func readGeneration(r *bufio.Reader, spill *spilledBatch, ver version.Version) (*generation, *spilledBatch, error) {
51 g := &generation{
52 evTable: &evTable{
53 pcs: make(map[uint64]frame),
54 },
55 batches: make(map[ThreadID][]batch),
56 }
57
58 if spill != nil {
59 g.gen = spill.gen
60 g.minTs = spill.batch.time
61 if err := processBatch(g, *spill.batch, ver); err != nil {
62 return nil, nil, err
63 }
64 spill = nil
65 }
66
67
68 var spillErr error
69 for {
70 b, gen, err := readBatch(r)
71 if err == io.EOF {
72 break
73 }
74 if err != nil {
75 if g.gen != 0 {
76
77
78
79 spillErr = err
80 break
81 }
82 return nil, nil, err
83 }
84 if gen == 0 {
85
86 return nil, nil, fmt.Errorf("invalid generation number %d", gen)
87 }
88 if g.gen == 0 {
89
90 g.gen = gen
91 }
92 if gen == g.gen+1 {
93 spill = &spilledBatch{gen: gen, batch: &b}
94 break
95 }
96 if gen != g.gen {
97
98
99
100
101
102
103
104 return nil, nil, fmt.Errorf("generations out of order")
105 }
106 if g.minTs == 0 || b.time < g.minTs {
107 g.minTs = b.time
108 }
109 if err := processBatch(g, b, ver); err != nil {
110 return nil, nil, err
111 }
112 }
113
114
115 if g.freq == 0 {
116 return nil, nil, fmt.Errorf("no frequency event found")
117 }
118 if ver >= version.Go125 && !g.hasClockSnapshot {
119 return nil, nil, fmt.Errorf("no clock snapshot event found")
120 }
121
122
123
124
125
126
127
128
129 g.stacks.compactify()
130 g.strings.compactify()
131
132
133 if err := validateStackStrings(&g.stacks, &g.strings, g.pcs); err != nil {
134 return nil, nil, err
135 }
136
137
138 for i := range g.cpuSamples {
139 s := &g.cpuSamples[i]
140 s.time = g.freq.mul(timestamp(s.time))
141 }
142
143 slices.SortFunc(g.cpuSamples, func(a, b cpuSample) int {
144 return cmp.Compare(a.time, b.time)
145 })
146 return g, spill, spillErr
147 }
148
149
150 func processBatch(g *generation, b batch, ver version.Version) error {
151 switch {
152 case b.isStringsBatch():
153 if err := addStrings(&g.strings, b); err != nil {
154 return err
155 }
156 case b.isStacksBatch():
157 if err := addStacks(&g.stacks, g.pcs, b); err != nil {
158 return err
159 }
160 case b.isCPUSamplesBatch():
161 samples, err := addCPUSamples(g.cpuSamples, b)
162 if err != nil {
163 return err
164 }
165 g.cpuSamples = samples
166 case b.isSyncBatch(ver):
167 if err := setSyncBatch(&g.sync, b, ver); err != nil {
168 return err
169 }
170 case b.exp != tracev2.NoExperiment:
171 if g.expBatches == nil {
172 g.expBatches = make(map[tracev2.Experiment][]ExperimentalBatch)
173 }
174 if err := addExperimentalBatch(g.expBatches, b); err != nil {
175 return err
176 }
177 default:
178 if _, ok := g.batches[b.m]; !ok {
179 g.batchMs = append(g.batchMs, b.m)
180 }
181 g.batches[b.m] = append(g.batches[b.m], b)
182 }
183 return nil
184 }
185
186
187
188 func validateStackStrings(
189 stacks *dataTable[stackID, stack],
190 strings *dataTable[stringID, string],
191 frames map[uint64]frame,
192 ) error {
193 var err error
194 stacks.forEach(func(id stackID, stk stack) bool {
195 for _, pc := range stk.pcs {
196 frame, ok := frames[pc]
197 if !ok {
198 err = fmt.Errorf("found unknown pc %x for stack %d", pc, id)
199 return false
200 }
201 _, ok = strings.get(frame.funcID)
202 if !ok {
203 err = fmt.Errorf("found invalid func string ID %d for stack %d", frame.funcID, id)
204 return false
205 }
206 _, ok = strings.get(frame.fileID)
207 if !ok {
208 err = fmt.Errorf("found invalid file string ID %d for stack %d", frame.fileID, id)
209 return false
210 }
211 }
212 return true
213 })
214 return err
215 }
216
217
218
219
220 func addStrings(stringTable *dataTable[stringID, string], b batch) error {
221 if !b.isStringsBatch() {
222 return fmt.Errorf("internal error: addStrings called on non-string batch")
223 }
224 r := bytes.NewReader(b.data)
225 hdr, err := r.ReadByte()
226 if err != nil || tracev2.EventType(hdr) != tracev2.EvStrings {
227 return fmt.Errorf("missing strings batch header")
228 }
229
230 var sb strings.Builder
231 for r.Len() != 0 {
232
233 ev, err := r.ReadByte()
234 if err != nil {
235 return err
236 }
237 if tracev2.EventType(ev) != tracev2.EvString {
238 return fmt.Errorf("expected string event, got %d", ev)
239 }
240
241
242 id, err := binary.ReadUvarint(r)
243 if err != nil {
244 return err
245 }
246
247
248 len, err := binary.ReadUvarint(r)
249 if err != nil {
250 return err
251 }
252 if len > tracev2.MaxEventTrailerDataSize {
253 return fmt.Errorf("invalid string size %d, maximum is %d", len, tracev2.MaxEventTrailerDataSize)
254 }
255
256
257 n, err := io.CopyN(&sb, r, int64(len))
258 if n != int64(len) {
259 return fmt.Errorf("failed to read full string: read %d but wanted %d", n, len)
260 }
261 if err != nil {
262 return fmt.Errorf("copying string data: %w", err)
263 }
264
265
266 s := sb.String()
267 sb.Reset()
268 if err := stringTable.insert(stringID(id), s); err != nil {
269 return err
270 }
271 }
272 return nil
273 }
274
275
276
277
278 func addStacks(stackTable *dataTable[stackID, stack], pcs map[uint64]frame, b batch) error {
279 if !b.isStacksBatch() {
280 return fmt.Errorf("internal error: addStacks called on non-stacks batch")
281 }
282 r := bytes.NewReader(b.data)
283 hdr, err := r.ReadByte()
284 if err != nil || tracev2.EventType(hdr) != tracev2.EvStacks {
285 return fmt.Errorf("missing stacks batch header")
286 }
287
288 for r.Len() != 0 {
289
290 ev, err := r.ReadByte()
291 if err != nil {
292 return err
293 }
294 if tracev2.EventType(ev) != tracev2.EvStack {
295 return fmt.Errorf("expected stack event, got %d", ev)
296 }
297
298
299 id, err := binary.ReadUvarint(r)
300 if err != nil {
301 return err
302 }
303
304
305 nFrames, err := binary.ReadUvarint(r)
306 if err != nil {
307 return err
308 }
309 if nFrames > tracev2.MaxFramesPerStack {
310 return fmt.Errorf("invalid stack size %d, maximum is %d", nFrames, tracev2.MaxFramesPerStack)
311 }
312
313
314 frames := make([]uint64, 0, nFrames)
315 for i := uint64(0); i < nFrames; i++ {
316
317 pc, err := binary.ReadUvarint(r)
318 if err != nil {
319 return fmt.Errorf("reading frame %d's PC for stack %d: %w", i+1, id, err)
320 }
321 funcID, err := binary.ReadUvarint(r)
322 if err != nil {
323 return fmt.Errorf("reading frame %d's funcID for stack %d: %w", i+1, id, err)
324 }
325 fileID, err := binary.ReadUvarint(r)
326 if err != nil {
327 return fmt.Errorf("reading frame %d's fileID for stack %d: %w", i+1, id, err)
328 }
329 line, err := binary.ReadUvarint(r)
330 if err != nil {
331 return fmt.Errorf("reading frame %d's line for stack %d: %w", i+1, id, err)
332 }
333 frames = append(frames, pc)
334
335 if _, ok := pcs[pc]; !ok {
336 pcs[pc] = frame{
337 pc: pc,
338 funcID: stringID(funcID),
339 fileID: stringID(fileID),
340 line: line,
341 }
342 }
343 }
344
345
346 if err := stackTable.insert(stackID(id), stack{pcs: frames}); err != nil {
347 return err
348 }
349 }
350 return nil
351 }
352
353
354
355
356 func addCPUSamples(samples []cpuSample, b batch) ([]cpuSample, error) {
357 if !b.isCPUSamplesBatch() {
358 return nil, fmt.Errorf("internal error: addCPUSamples called on non-CPU-sample batch")
359 }
360 r := bytes.NewReader(b.data)
361 hdr, err := r.ReadByte()
362 if err != nil || tracev2.EventType(hdr) != tracev2.EvCPUSamples {
363 return nil, fmt.Errorf("missing CPU samples batch header")
364 }
365
366 for r.Len() != 0 {
367
368 ev, err := r.ReadByte()
369 if err != nil {
370 return nil, err
371 }
372 if tracev2.EventType(ev) != tracev2.EvCPUSample {
373 return nil, fmt.Errorf("expected CPU sample event, got %d", ev)
374 }
375
376
377 ts, err := binary.ReadUvarint(r)
378 if err != nil {
379 return nil, err
380 }
381
382
383 m, err := binary.ReadUvarint(r)
384 if err != nil {
385 return nil, err
386 }
387 mid := ThreadID(m)
388
389
390 p, err := binary.ReadUvarint(r)
391 if err != nil {
392 return nil, err
393 }
394 pid := ProcID(p)
395
396
397 g, err := binary.ReadUvarint(r)
398 if err != nil {
399 return nil, err
400 }
401 goid := GoID(g)
402 if g == 0 {
403 goid = NoGoroutine
404 }
405
406
407 s, err := binary.ReadUvarint(r)
408 if err != nil {
409 return nil, err
410 }
411
412
413 samples = append(samples, cpuSample{
414 schedCtx: schedCtx{
415 M: mid,
416 P: pid,
417 G: goid,
418 },
419 time: Time(ts),
420 stack: stackID(s),
421 })
422 }
423 return samples, nil
424 }
425
426
427 type sync struct {
428 freq frequency
429 hasClockSnapshot bool
430 snapTime timestamp
431 snapMono uint64
432 snapWall time.Time
433 }
434
435 func setSyncBatch(s *sync, b batch, ver version.Version) error {
436 if !b.isSyncBatch(ver) {
437 return fmt.Errorf("internal error: setSyncBatch called on non-sync batch")
438 }
439 r := bytes.NewReader(b.data)
440 if ver >= version.Go125 {
441 hdr, err := r.ReadByte()
442 if err != nil || tracev2.EventType(hdr) != tracev2.EvSync {
443 return fmt.Errorf("missing sync batch header")
444 }
445 }
446
447 lastTs := b.time
448 for r.Len() != 0 {
449
450 ev, err := r.ReadByte()
451 if err != nil {
452 return err
453 }
454 et := tracev2.EventType(ev)
455 switch {
456 case et == tracev2.EvFrequency:
457 if s.freq != 0 {
458 return fmt.Errorf("found multiple frequency events")
459 }
460
461 f, err := binary.ReadUvarint(r)
462 if err != nil {
463 return err
464 }
465
466 s.freq = frequency(1.0 / (float64(f) / 1e9))
467 case et == tracev2.EvClockSnapshot && ver >= version.Go125:
468 if s.hasClockSnapshot {
469 return fmt.Errorf("found multiple clock snapshot events")
470 }
471 s.hasClockSnapshot = true
472
473 tdiff, err := binary.ReadUvarint(r)
474 if err != nil {
475 return err
476 }
477 lastTs += timestamp(tdiff)
478 s.snapTime = lastTs
479 mono, err := binary.ReadUvarint(r)
480 if err != nil {
481 return err
482 }
483 s.snapMono = mono
484 sec, err := binary.ReadUvarint(r)
485 if err != nil {
486 return err
487 }
488 nsec, err := binary.ReadUvarint(r)
489 if err != nil {
490 return err
491 }
492
493
494
495 s.snapWall = time.Unix(int64(sec), int64(nsec))
496 default:
497 return fmt.Errorf("expected frequency or clock snapshot event, got %d", ev)
498 }
499 }
500 return nil
501 }
502
503
504
505 func addExperimentalBatch(expBatches map[tracev2.Experiment][]ExperimentalBatch, b batch) error {
506 if b.exp == tracev2.NoExperiment {
507 return fmt.Errorf("internal error: addExperimentalBatch called on non-experimental batch")
508 }
509 expBatches[b.exp] = append(expBatches[b.exp], ExperimentalBatch{
510 Thread: b.m,
511 Data: b.data,
512 })
513 return nil
514 }
515
View as plain text