Source file src/internal/trace/generation.go

     1  // Copyright 2023 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package trace
     6  
     7  import (
     8  	"bufio"
     9  	"bytes"
    10  	"cmp"
    11  	"encoding/binary"
    12  	"fmt"
    13  	"io"
    14  	"slices"
    15  	"strings"
    16  	"time"
    17  
    18  	"internal/trace/tracev2"
    19  	"internal/trace/version"
    20  )
    21  
    22  // generation contains all the trace data for a single
    23  // trace generation. It is purely data: it does not
    24  // track any parse state nor does it contain a cursor
    25  // into the generation.
    26  type generation struct {
    27  	gen        uint64
    28  	batches    map[ThreadID][]batch
    29  	batchMs    []ThreadID
    30  	cpuSamples []cpuSample
    31  	minTs      timestamp
    32  	*evTable
    33  }
    34  
    35  // spilledBatch represents a batch that was read out for the next generation,
    36  // while reading the previous one. It's passed on when parsing the next
    37  // generation.
    38  type spilledBatch struct {
    39  	gen uint64
    40  	*batch
    41  }
    42  
    43  // readGeneration buffers and decodes the structural elements of a trace generation
    44  // out of r. spill is the first batch of the new generation (already buffered and
    45  // parsed from reading the last generation). Returns the generation and the first
    46  // batch read of the next generation, if any.
    47  //
    48  // If gen is non-nil, it is valid and must be processed before handling the returned
    49  // error.
    50  func readGeneration(r *bufio.Reader, spill *spilledBatch, ver version.Version) (*generation, *spilledBatch, error) {
    51  	g := &generation{
    52  		evTable: &evTable{
    53  			pcs: make(map[uint64]frame),
    54  		},
    55  		batches: make(map[ThreadID][]batch),
    56  	}
    57  	// Process the spilled batch.
    58  	if spill != nil {
    59  		g.gen = spill.gen
    60  		g.minTs = spill.batch.time
    61  		if err := processBatch(g, *spill.batch, ver); err != nil {
    62  			return nil, nil, err
    63  		}
    64  		spill = nil
    65  	}
    66  	// Read batches one at a time until we either hit EOF or
    67  	// the next generation.
    68  	var spillErr error
    69  	for {
    70  		b, gen, err := readBatch(r)
    71  		if err == io.EOF {
    72  			break
    73  		}
    74  		if err != nil {
    75  			if g.gen != 0 {
    76  				// This is an error reading the first batch of the next generation.
    77  				// This is fine. Let's forge ahead assuming that what we've got so
    78  				// far is fine.
    79  				spillErr = err
    80  				break
    81  			}
    82  			return nil, nil, err
    83  		}
    84  		if gen == 0 {
    85  			// 0 is a sentinel used by the runtime, so we'll never see it.
    86  			return nil, nil, fmt.Errorf("invalid generation number %d", gen)
    87  		}
    88  		if g.gen == 0 {
    89  			// Initialize gen.
    90  			g.gen = gen
    91  		}
    92  		if gen == g.gen+1 { // TODO: advance this the same way the runtime does.
    93  			spill = &spilledBatch{gen: gen, batch: &b}
    94  			break
    95  		}
    96  		if gen != g.gen {
    97  			// N.B. Fail as fast as possible if we see this. At first it
    98  			// may seem prudent to be fault-tolerant and assume we have a
    99  			// complete generation, parsing and returning that first. However,
   100  			// if the batches are mixed across generations then it's likely
   101  			// we won't be able to parse this generation correctly at all.
   102  			// Rather than return a cryptic error in that case, indicate the
   103  			// problem as soon as we see it.
   104  			return nil, nil, fmt.Errorf("generations out of order")
   105  		}
   106  		if g.minTs == 0 || b.time < g.minTs {
   107  			g.minTs = b.time
   108  		}
   109  		if err := processBatch(g, b, ver); err != nil {
   110  			return nil, nil, err
   111  		}
   112  	}
   113  
   114  	// Check some invariants.
   115  	if g.freq == 0 {
   116  		return nil, nil, fmt.Errorf("no frequency event found")
   117  	}
   118  	if ver >= version.Go125 && !g.hasClockSnapshot {
   119  		return nil, nil, fmt.Errorf("no clock snapshot event found")
   120  	}
   121  
   122  	// N.B. Trust that the batch order is correct. We can't validate the batch order
   123  	// by timestamp because the timestamps could just be plain wrong. The source of
   124  	// truth is the order things appear in the trace and the partial order sequence
   125  	// numbers on certain events. If it turns out the batch order is actually incorrect
   126  	// we'll very likely fail to advance a partial order from the frontier.
   127  
   128  	// Compactify stacks and strings for better lookup performance later.
   129  	g.stacks.compactify()
   130  	g.strings.compactify()
   131  
   132  	// Validate stacks.
   133  	if err := validateStackStrings(&g.stacks, &g.strings, g.pcs); err != nil {
   134  		return nil, nil, err
   135  	}
   136  
   137  	// Fix up the CPU sample timestamps, now that we have freq.
   138  	for i := range g.cpuSamples {
   139  		s := &g.cpuSamples[i]
   140  		s.time = g.freq.mul(timestamp(s.time))
   141  	}
   142  	// Sort the CPU samples.
   143  	slices.SortFunc(g.cpuSamples, func(a, b cpuSample) int {
   144  		return cmp.Compare(a.time, b.time)
   145  	})
   146  	return g, spill, spillErr
   147  }
   148  
   149  // processBatch adds the batch to the generation.
   150  func processBatch(g *generation, b batch, ver version.Version) error {
   151  	switch {
   152  	case b.isStringsBatch():
   153  		if err := addStrings(&g.strings, b); err != nil {
   154  			return err
   155  		}
   156  	case b.isStacksBatch():
   157  		if err := addStacks(&g.stacks, g.pcs, b); err != nil {
   158  			return err
   159  		}
   160  	case b.isCPUSamplesBatch():
   161  		samples, err := addCPUSamples(g.cpuSamples, b)
   162  		if err != nil {
   163  			return err
   164  		}
   165  		g.cpuSamples = samples
   166  	case b.isSyncBatch(ver):
   167  		if err := setSyncBatch(&g.sync, b, ver); err != nil {
   168  			return err
   169  		}
   170  	case b.exp != tracev2.NoExperiment:
   171  		if g.expBatches == nil {
   172  			g.expBatches = make(map[tracev2.Experiment][]ExperimentalBatch)
   173  		}
   174  		if err := addExperimentalBatch(g.expBatches, b); err != nil {
   175  			return err
   176  		}
   177  	default:
   178  		if _, ok := g.batches[b.m]; !ok {
   179  			g.batchMs = append(g.batchMs, b.m)
   180  		}
   181  		g.batches[b.m] = append(g.batches[b.m], b)
   182  	}
   183  	return nil
   184  }
   185  
   186  // validateStackStrings makes sure all the string references in
   187  // the stack table are present in the string table.
   188  func validateStackStrings(
   189  	stacks *dataTable[stackID, stack],
   190  	strings *dataTable[stringID, string],
   191  	frames map[uint64]frame,
   192  ) error {
   193  	var err error
   194  	stacks.forEach(func(id stackID, stk stack) bool {
   195  		for _, pc := range stk.pcs {
   196  			frame, ok := frames[pc]
   197  			if !ok {
   198  				err = fmt.Errorf("found unknown pc %x for stack %d", pc, id)
   199  				return false
   200  			}
   201  			_, ok = strings.get(frame.funcID)
   202  			if !ok {
   203  				err = fmt.Errorf("found invalid func string ID %d for stack %d", frame.funcID, id)
   204  				return false
   205  			}
   206  			_, ok = strings.get(frame.fileID)
   207  			if !ok {
   208  				err = fmt.Errorf("found invalid file string ID %d for stack %d", frame.fileID, id)
   209  				return false
   210  			}
   211  		}
   212  		return true
   213  	})
   214  	return err
   215  }
   216  
   217  // addStrings takes a batch whose first byte is an EvStrings event
   218  // (indicating that the batch contains only strings) and adds each
   219  // string contained therein to the provided strings map.
   220  func addStrings(stringTable *dataTable[stringID, string], b batch) error {
   221  	if !b.isStringsBatch() {
   222  		return fmt.Errorf("internal error: addStrings called on non-string batch")
   223  	}
   224  	r := bytes.NewReader(b.data)
   225  	hdr, err := r.ReadByte() // Consume the EvStrings byte.
   226  	if err != nil || tracev2.EventType(hdr) != tracev2.EvStrings {
   227  		return fmt.Errorf("missing strings batch header")
   228  	}
   229  
   230  	var sb strings.Builder
   231  	for r.Len() != 0 {
   232  		// Read the header.
   233  		ev, err := r.ReadByte()
   234  		if err != nil {
   235  			return err
   236  		}
   237  		if tracev2.EventType(ev) != tracev2.EvString {
   238  			return fmt.Errorf("expected string event, got %d", ev)
   239  		}
   240  
   241  		// Read the string's ID.
   242  		id, err := binary.ReadUvarint(r)
   243  		if err != nil {
   244  			return err
   245  		}
   246  
   247  		// Read the string's length.
   248  		len, err := binary.ReadUvarint(r)
   249  		if err != nil {
   250  			return err
   251  		}
   252  		if len > tracev2.MaxEventTrailerDataSize {
   253  			return fmt.Errorf("invalid string size %d, maximum is %d", len, tracev2.MaxEventTrailerDataSize)
   254  		}
   255  
   256  		// Copy out the string.
   257  		n, err := io.CopyN(&sb, r, int64(len))
   258  		if n != int64(len) {
   259  			return fmt.Errorf("failed to read full string: read %d but wanted %d", n, len)
   260  		}
   261  		if err != nil {
   262  			return fmt.Errorf("copying string data: %w", err)
   263  		}
   264  
   265  		// Add the string to the map.
   266  		s := sb.String()
   267  		sb.Reset()
   268  		if err := stringTable.insert(stringID(id), s); err != nil {
   269  			return err
   270  		}
   271  	}
   272  	return nil
   273  }
   274  
   275  // addStacks takes a batch whose first byte is an EvStacks event
   276  // (indicating that the batch contains only stacks) and adds each
   277  // string contained therein to the provided stacks map.
   278  func addStacks(stackTable *dataTable[stackID, stack], pcs map[uint64]frame, b batch) error {
   279  	if !b.isStacksBatch() {
   280  		return fmt.Errorf("internal error: addStacks called on non-stacks batch")
   281  	}
   282  	r := bytes.NewReader(b.data)
   283  	hdr, err := r.ReadByte() // Consume the EvStacks byte.
   284  	if err != nil || tracev2.EventType(hdr) != tracev2.EvStacks {
   285  		return fmt.Errorf("missing stacks batch header")
   286  	}
   287  
   288  	for r.Len() != 0 {
   289  		// Read the header.
   290  		ev, err := r.ReadByte()
   291  		if err != nil {
   292  			return err
   293  		}
   294  		if tracev2.EventType(ev) != tracev2.EvStack {
   295  			return fmt.Errorf("expected stack event, got %d", ev)
   296  		}
   297  
   298  		// Read the stack's ID.
   299  		id, err := binary.ReadUvarint(r)
   300  		if err != nil {
   301  			return err
   302  		}
   303  
   304  		// Read how many frames are in each stack.
   305  		nFrames, err := binary.ReadUvarint(r)
   306  		if err != nil {
   307  			return err
   308  		}
   309  		if nFrames > tracev2.MaxFramesPerStack {
   310  			return fmt.Errorf("invalid stack size %d, maximum is %d", nFrames, tracev2.MaxFramesPerStack)
   311  		}
   312  
   313  		// Each frame consists of 4 fields: pc, funcID (string), fileID (string), line.
   314  		frames := make([]uint64, 0, nFrames)
   315  		for i := uint64(0); i < nFrames; i++ {
   316  			// Read the frame data.
   317  			pc, err := binary.ReadUvarint(r)
   318  			if err != nil {
   319  				return fmt.Errorf("reading frame %d's PC for stack %d: %w", i+1, id, err)
   320  			}
   321  			funcID, err := binary.ReadUvarint(r)
   322  			if err != nil {
   323  				return fmt.Errorf("reading frame %d's funcID for stack %d: %w", i+1, id, err)
   324  			}
   325  			fileID, err := binary.ReadUvarint(r)
   326  			if err != nil {
   327  				return fmt.Errorf("reading frame %d's fileID for stack %d: %w", i+1, id, err)
   328  			}
   329  			line, err := binary.ReadUvarint(r)
   330  			if err != nil {
   331  				return fmt.Errorf("reading frame %d's line for stack %d: %w", i+1, id, err)
   332  			}
   333  			frames = append(frames, pc)
   334  
   335  			if _, ok := pcs[pc]; !ok {
   336  				pcs[pc] = frame{
   337  					pc:     pc,
   338  					funcID: stringID(funcID),
   339  					fileID: stringID(fileID),
   340  					line:   line,
   341  				}
   342  			}
   343  		}
   344  
   345  		// Add the stack to the map.
   346  		if err := stackTable.insert(stackID(id), stack{pcs: frames}); err != nil {
   347  			return err
   348  		}
   349  	}
   350  	return nil
   351  }
   352  
   353  // addCPUSamples takes a batch whose first byte is an EvCPUSamples event
   354  // (indicating that the batch contains only CPU samples) and adds each
   355  // sample contained therein to the provided samples list.
   356  func addCPUSamples(samples []cpuSample, b batch) ([]cpuSample, error) {
   357  	if !b.isCPUSamplesBatch() {
   358  		return nil, fmt.Errorf("internal error: addCPUSamples called on non-CPU-sample batch")
   359  	}
   360  	r := bytes.NewReader(b.data)
   361  	hdr, err := r.ReadByte() // Consume the EvCPUSamples byte.
   362  	if err != nil || tracev2.EventType(hdr) != tracev2.EvCPUSamples {
   363  		return nil, fmt.Errorf("missing CPU samples batch header")
   364  	}
   365  
   366  	for r.Len() != 0 {
   367  		// Read the header.
   368  		ev, err := r.ReadByte()
   369  		if err != nil {
   370  			return nil, err
   371  		}
   372  		if tracev2.EventType(ev) != tracev2.EvCPUSample {
   373  			return nil, fmt.Errorf("expected CPU sample event, got %d", ev)
   374  		}
   375  
   376  		// Read the sample's timestamp.
   377  		ts, err := binary.ReadUvarint(r)
   378  		if err != nil {
   379  			return nil, err
   380  		}
   381  
   382  		// Read the sample's M.
   383  		m, err := binary.ReadUvarint(r)
   384  		if err != nil {
   385  			return nil, err
   386  		}
   387  		mid := ThreadID(m)
   388  
   389  		// Read the sample's P.
   390  		p, err := binary.ReadUvarint(r)
   391  		if err != nil {
   392  			return nil, err
   393  		}
   394  		pid := ProcID(p)
   395  
   396  		// Read the sample's G.
   397  		g, err := binary.ReadUvarint(r)
   398  		if err != nil {
   399  			return nil, err
   400  		}
   401  		goid := GoID(g)
   402  		if g == 0 {
   403  			goid = NoGoroutine
   404  		}
   405  
   406  		// Read the sample's stack.
   407  		s, err := binary.ReadUvarint(r)
   408  		if err != nil {
   409  			return nil, err
   410  		}
   411  
   412  		// Add the sample to the slice.
   413  		samples = append(samples, cpuSample{
   414  			schedCtx: schedCtx{
   415  				M: mid,
   416  				P: pid,
   417  				G: goid,
   418  			},
   419  			time:  Time(ts), // N.B. this is really a "timestamp," not a Time.
   420  			stack: stackID(s),
   421  		})
   422  	}
   423  	return samples, nil
   424  }
   425  
   426  // sync holds the per-generation sync data.
   427  type sync struct {
   428  	freq             frequency
   429  	hasClockSnapshot bool
   430  	snapTime         timestamp
   431  	snapMono         uint64
   432  	snapWall         time.Time
   433  }
   434  
   435  func setSyncBatch(s *sync, b batch, ver version.Version) error {
   436  	if !b.isSyncBatch(ver) {
   437  		return fmt.Errorf("internal error: setSyncBatch called on non-sync batch")
   438  	}
   439  	r := bytes.NewReader(b.data)
   440  	if ver >= version.Go125 {
   441  		hdr, err := r.ReadByte() // Consume the EvSync byte.
   442  		if err != nil || tracev2.EventType(hdr) != tracev2.EvSync {
   443  			return fmt.Errorf("missing sync batch header")
   444  		}
   445  	}
   446  
   447  	lastTs := b.time
   448  	for r.Len() != 0 {
   449  		// Read the header
   450  		ev, err := r.ReadByte()
   451  		if err != nil {
   452  			return err
   453  		}
   454  		et := tracev2.EventType(ev)
   455  		switch {
   456  		case et == tracev2.EvFrequency:
   457  			if s.freq != 0 {
   458  				return fmt.Errorf("found multiple frequency events")
   459  			}
   460  			// Read the frequency. It'll come out as timestamp units per second.
   461  			f, err := binary.ReadUvarint(r)
   462  			if err != nil {
   463  				return err
   464  			}
   465  			// Convert to nanoseconds per timestamp unit.
   466  			s.freq = frequency(1.0 / (float64(f) / 1e9))
   467  		case et == tracev2.EvClockSnapshot && ver >= version.Go125:
   468  			if s.hasClockSnapshot {
   469  				return fmt.Errorf("found multiple clock snapshot events")
   470  			}
   471  			s.hasClockSnapshot = true
   472  			// Read the EvClockSnapshot arguments.
   473  			tdiff, err := binary.ReadUvarint(r)
   474  			if err != nil {
   475  				return err
   476  			}
   477  			lastTs += timestamp(tdiff)
   478  			s.snapTime = lastTs
   479  			mono, err := binary.ReadUvarint(r)
   480  			if err != nil {
   481  				return err
   482  			}
   483  			s.snapMono = mono
   484  			sec, err := binary.ReadUvarint(r)
   485  			if err != nil {
   486  				return err
   487  			}
   488  			nsec, err := binary.ReadUvarint(r)
   489  			if err != nil {
   490  				return err
   491  			}
   492  			// TODO(felixge): In theory we could inject s.snapMono into the time
   493  			// value below to make it comparable. But there is no API for this
   494  			// in the time package right now.
   495  			s.snapWall = time.Unix(int64(sec), int64(nsec))
   496  		default:
   497  			return fmt.Errorf("expected frequency or clock snapshot event, got %d", ev)
   498  		}
   499  	}
   500  	return nil
   501  }
   502  
   503  // addExperimentalBatch takes an experimental batch and adds it to the list of experimental
   504  // batches for the experiment its a part of.
   505  func addExperimentalBatch(expBatches map[tracev2.Experiment][]ExperimentalBatch, b batch) error {
   506  	if b.exp == tracev2.NoExperiment {
   507  		return fmt.Errorf("internal error: addExperimentalBatch called on non-experimental batch")
   508  	}
   509  	expBatches[b.exp] = append(expBatches[b.exp], ExperimentalBatch{
   510  		Thread: b.m,
   511  		Data:   b.data,
   512  	})
   513  	return nil
   514  }
   515  

View as plain text