1
2
3
4
5 package trace
6
7 import (
8 "bytes"
9 "encoding/binary"
10 "fmt"
11 "io"
12
13 "internal/trace/tracev2"
14 "internal/trace/version"
15 )
16
17
18 type timestamp uint64
19
20
21
22 type batch struct {
23 m ThreadID
24 time timestamp
25 data []byte
26 exp tracev2.Experiment
27 }
28
29 func (b *batch) isStringsBatch() bool {
30 return b.exp == tracev2.NoExperiment && len(b.data) > 0 && tracev2.EventType(b.data[0]) == tracev2.EvStrings
31 }
32
33 func (b *batch) isStacksBatch() bool {
34 return b.exp == tracev2.NoExperiment && len(b.data) > 0 && tracev2.EventType(b.data[0]) == tracev2.EvStacks
35 }
36
37 func (b *batch) isCPUSamplesBatch() bool {
38 return b.exp == tracev2.NoExperiment && len(b.data) > 0 && tracev2.EventType(b.data[0]) == tracev2.EvCPUSamples
39 }
40
41 func (b *batch) isSyncBatch(ver version.Version) bool {
42 return (b.exp == tracev2.NoExperiment && len(b.data) > 0) &&
43 ((tracev2.EventType(b.data[0]) == tracev2.EvFrequency && ver < version.Go125) ||
44 (tracev2.EventType(b.data[0]) == tracev2.EvSync && ver >= version.Go125))
45 }
46
47
48 func readBatch(r interface {
49 io.Reader
50 io.ByteReader
51 }) (batch, uint64, error) {
52
53 b, err := r.ReadByte()
54 if err != nil {
55 return batch{}, 0, err
56 }
57 if typ := tracev2.EventType(b); typ != tracev2.EvEventBatch && typ != tracev2.EvExperimentalBatch {
58 return batch{}, 0, fmt.Errorf("expected batch event, got event %d", typ)
59 }
60
61
62 exp := tracev2.NoExperiment
63 if tracev2.EventType(b) == tracev2.EvExperimentalBatch {
64 e, err := r.ReadByte()
65 if err != nil {
66 return batch{}, 0, err
67 }
68 exp = tracev2.Experiment(e)
69 }
70
71
72
73 gen, err := binary.ReadUvarint(r)
74 if err != nil {
75 return batch{}, gen, fmt.Errorf("error reading batch gen: %w", err)
76 }
77 m, err := binary.ReadUvarint(r)
78 if err != nil {
79 return batch{}, gen, fmt.Errorf("error reading batch M ID: %w", err)
80 }
81 ts, err := binary.ReadUvarint(r)
82 if err != nil {
83 return batch{}, gen, fmt.Errorf("error reading batch timestamp: %w", err)
84 }
85
86
87 size, err := binary.ReadUvarint(r)
88 if err != nil {
89 return batch{}, gen, fmt.Errorf("error reading batch size: %w", err)
90 }
91 if size > tracev2.MaxBatchSize {
92 return batch{}, gen, fmt.Errorf("invalid batch size %d, maximum is %d", size, tracev2.MaxBatchSize)
93 }
94
95
96 var data bytes.Buffer
97 data.Grow(int(size))
98 n, err := io.CopyN(&data, r, int64(size))
99 if n != int64(size) {
100 return batch{}, gen, fmt.Errorf("failed to read full batch: read %d but wanted %d", n, size)
101 }
102 if err != nil {
103 return batch{}, gen, fmt.Errorf("copying batch data: %w", err)
104 }
105
106
107 return batch{
108 m: ThreadID(m),
109 time: timestamp(ts),
110 data: data.Bytes(),
111 exp: exp,
112 }, gen, nil
113 }
114
View as plain text