Source file src/net/http/internal/http2/writesched_priority_rfc9218_test.go

     1  // Copyright 2025 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package http2
     6  
     7  import (
     8  	"reflect"
     9  	"testing"
    10  )
    11  
    12  func TestPrioritySchedulerUrgency(t *testing.T) {
    13  	const maxFrameSize = 16
    14  	sc := &serverConn{maxFrameSize: maxFrameSize}
    15  	ws := newPriorityWriteSchedulerRFC9218()
    16  	streams := make([]*stream, 5)
    17  	for i := range streams {
    18  		streamID := uint32(i) + 1
    19  		streams[i] = &stream{
    20  			id: streamID,
    21  			sc: sc,
    22  		}
    23  		streams[i].flow.add(1 << 20) // arbitrary large value
    24  		ws.OpenStream(streamID, OpenStreamOptions{
    25  			priority: PriorityParam{
    26  				urgency:     7,
    27  				incremental: 0,
    28  			},
    29  		})
    30  		wr := FrameWriteRequest{
    31  			write: &writeData{
    32  				streamID:  streamID,
    33  				p:         make([]byte, maxFrameSize*(i+1)),
    34  				endStream: false,
    35  			},
    36  			stream: streams[i],
    37  		}
    38  		ws.Push(wr)
    39  	}
    40  	// Raise the urgency of all even-numbered streams.
    41  	for i := range streams {
    42  		streamID := uint32(i) + 1
    43  		if streamID%2 == 1 {
    44  			continue
    45  		}
    46  		ws.AdjustStream(streamID, PriorityParam{
    47  			urgency:     0,
    48  			incremental: 0,
    49  		})
    50  	}
    51  	const controlFrames = 2
    52  	for range controlFrames {
    53  		ws.Push(makeWriteNonStreamRequest())
    54  	}
    55  
    56  	// We should get the control frames first.
    57  	for range controlFrames {
    58  		wr, ok := ws.Pop()
    59  		if !ok || wr.StreamID() != 0 {
    60  			t.Fatalf("wr.Pop() = stream %v, %v; want 0, true", wr.StreamID(), ok)
    61  		}
    62  	}
    63  
    64  	// Each stream should write maxFrameSize bytes until it runs out of data.
    65  	// Higher-urgency even-numbered streams should come first.
    66  	want := []uint32{2, 2, 4, 4, 4, 4, 1, 3, 3, 3, 5, 5, 5, 5, 5}
    67  	var got []uint32
    68  	for {
    69  		wr, ok := ws.Pop()
    70  		if !ok {
    71  			break
    72  		}
    73  		if wr.DataSize() != maxFrameSize {
    74  			t.Fatalf("wr.Pop() = %v data bytes, want %v", wr.DataSize(), maxFrameSize)
    75  		}
    76  		got = append(got, wr.StreamID())
    77  	}
    78  	if !reflect.DeepEqual(got, want) {
    79  		t.Fatalf("popped streams %v, want %v", got, want)
    80  	}
    81  }
    82  
    83  func TestPrioritySchedulerIncremental(t *testing.T) {
    84  	const maxFrameSize = 16
    85  	sc := &serverConn{maxFrameSize: maxFrameSize}
    86  	ws := newPriorityWriteSchedulerRFC9218()
    87  	streams := make([]*stream, 5)
    88  	for i := range streams {
    89  		streamID := uint32(i) + 1
    90  		streams[i] = &stream{
    91  			id: streamID,
    92  			sc: sc,
    93  		}
    94  		streams[i].flow.add(1 << 20) // arbitrary large value
    95  		ws.OpenStream(streamID, OpenStreamOptions{
    96  			priority: PriorityParam{
    97  				urgency:     7,
    98  				incremental: 0,
    99  			},
   100  		})
   101  		wr := FrameWriteRequest{
   102  			write: &writeData{
   103  				streamID:  streamID,
   104  				p:         make([]byte, maxFrameSize*(i+1)),
   105  				endStream: false,
   106  			},
   107  			stream: streams[i],
   108  		}
   109  		ws.Push(wr)
   110  	}
   111  	// Make even-numbered streams incremental.
   112  	for i := range streams {
   113  		streamID := uint32(i) + 1
   114  		if streamID%2 == 1 {
   115  			continue
   116  		}
   117  		ws.AdjustStream(streamID, PriorityParam{
   118  			urgency:     7,
   119  			incremental: 1,
   120  		})
   121  	}
   122  	const controlFrames = 2
   123  	for range controlFrames {
   124  		ws.Push(makeWriteNonStreamRequest())
   125  	}
   126  
   127  	// We should get the control frames first.
   128  	for range controlFrames {
   129  		wr, ok := ws.Pop()
   130  		if !ok || wr.StreamID() != 0 {
   131  			t.Fatalf("wr.Pop() = stream %v, %v; want 0, true", wr.StreamID(), ok)
   132  		}
   133  	}
   134  
   135  	// Each stream should write maxFrameSize bytes until it runs out of data.
   136  	// We should:
   137  	// - Round-robin between even and odd-numbered streams as they have
   138  	// different i but the same u.
   139  	// - Amongst even-numbered streams, round-robin writes as they are
   140  	// incremental.
   141  	// - Among odd-numbered streams, do not round-robin as they are
   142  	// non-incremental.
   143  	want := []uint32{2, 1, 4, 3, 2, 3, 4, 3, 4, 5, 4, 5, 5, 5, 5}
   144  	var got []uint32
   145  	for {
   146  		wr, ok := ws.Pop()
   147  		if !ok {
   148  			break
   149  		}
   150  		if wr.DataSize() != maxFrameSize {
   151  			t.Fatalf("wr.Pop() = %v data bytes, want %v", wr.DataSize(), maxFrameSize)
   152  		}
   153  		got = append(got, wr.StreamID())
   154  	}
   155  	if !reflect.DeepEqual(got, want) {
   156  		t.Fatalf("popped streams %v, want %v", got, want)
   157  	}
   158  }
   159  
   160  func TestPrioritySchedulerUrgencyAndIncremental(t *testing.T) {
   161  	const maxFrameSize = 16
   162  	sc := &serverConn{maxFrameSize: maxFrameSize}
   163  	ws := newPriorityWriteSchedulerRFC9218()
   164  	streams := make([]*stream, 6)
   165  	for i := range streams {
   166  		streamID := uint32(i) + 1
   167  		streams[i] = &stream{
   168  			id: streamID,
   169  			sc: sc,
   170  		}
   171  		streams[i].flow.add(1 << 20) // arbitrary large value
   172  		ws.OpenStream(streamID, OpenStreamOptions{
   173  			priority: PriorityParam{
   174  				urgency:     7,
   175  				incremental: 0,
   176  			},
   177  		})
   178  		wr := FrameWriteRequest{
   179  			write: &writeData{
   180  				streamID:  streamID,
   181  				p:         make([]byte, maxFrameSize*(i+1)),
   182  				endStream: false,
   183  			},
   184  			stream: streams[i],
   185  		}
   186  		ws.Push(wr)
   187  	}
   188  	// Make even-numbered streams incremental and of higher urgency.
   189  	for i := range streams {
   190  		streamID := uint32(i) + 1
   191  		if streamID%2 == 1 {
   192  			continue
   193  		}
   194  		ws.AdjustStream(streamID, PriorityParam{
   195  			urgency:     0,
   196  			incremental: 1,
   197  		})
   198  	}
   199  	// Close stream 1 and 4
   200  	ws.CloseStream(1)
   201  	ws.CloseStream(4)
   202  	const controlFrames = 2
   203  	for range controlFrames {
   204  		ws.Push(makeWriteNonStreamRequest())
   205  	}
   206  
   207  	// We should get the control frames first.
   208  	for range controlFrames {
   209  		wr, ok := ws.Pop()
   210  		if !ok || wr.StreamID() != 0 {
   211  			t.Fatalf("wr.Pop() = stream %v, %v; want 0, true", wr.StreamID(), ok)
   212  		}
   213  	}
   214  
   215  	// Each stream should write maxFrameSize bytes until it runs out of data.
   216  	// We should:
   217  	// - Get even-numbered streams first that are written in a round-robin
   218  	// manner as they have higher urgency and are incremental.
   219  	// - Get odd-numbered streams after that are written one-by-one to
   220  	// completion as they are of lower urgency and are not incremental.
   221  	// - Skip stream 1 and 4 that have been closed.
   222  	want := []uint32{2, 6, 2, 6, 6, 6, 6, 6, 3, 3, 3, 5, 5, 5, 5, 5}
   223  	var got []uint32
   224  	for {
   225  		wr, ok := ws.Pop()
   226  		if !ok {
   227  			break
   228  		}
   229  		if wr.DataSize() != maxFrameSize {
   230  			t.Fatalf("wr.Pop() = %v data bytes, want %v", wr.DataSize(), maxFrameSize)
   231  		}
   232  		got = append(got, wr.StreamID())
   233  	}
   234  	if !reflect.DeepEqual(got, want) {
   235  		t.Fatalf("popped streams %v, want %v", got, want)
   236  	}
   237  }
   238  
   239  func TestPrioritySchedulerIdempotentUpdate(t *testing.T) {
   240  	const maxFrameSize = 16
   241  	sc := &serverConn{maxFrameSize: maxFrameSize}
   242  	ws := newPriorityWriteSchedulerRFC9218()
   243  	streams := make([]*stream, 6)
   244  	for i := range streams {
   245  		streamID := uint32(i) + 1
   246  		streams[i] = &stream{
   247  			id: streamID,
   248  			sc: sc,
   249  		}
   250  		streams[i].flow.add(1 << 20) // arbitrary large value
   251  		ws.OpenStream(streamID, OpenStreamOptions{
   252  			priority: PriorityParam{
   253  				urgency:     7,
   254  				incremental: 0,
   255  			},
   256  		})
   257  		wr := FrameWriteRequest{
   258  			write: &writeData{
   259  				streamID:  streamID,
   260  				p:         make([]byte, maxFrameSize*(i+1)),
   261  				endStream: false,
   262  			},
   263  			stream: streams[i],
   264  		}
   265  		ws.Push(wr)
   266  	}
   267  	// Make even-numbered streams incremental and of higher urgency.
   268  	for i := range streams {
   269  		streamID := uint32(i) + 1
   270  		if streamID%2 == 1 {
   271  			continue
   272  		}
   273  		ws.AdjustStream(streamID, PriorityParam{
   274  			urgency:     0,
   275  			incremental: 1,
   276  		})
   277  	}
   278  	ws.CloseStream(1)
   279  	// Repeat the same priority update to ensure idempotency.
   280  	for i := range streams {
   281  		streamID := uint32(i) + 1
   282  		if streamID%2 == 1 {
   283  			continue
   284  		}
   285  		ws.AdjustStream(streamID, PriorityParam{
   286  			urgency:     0,
   287  			incremental: 1,
   288  		})
   289  	}
   290  	ws.CloseStream(2)
   291  	const controlFrames = 2
   292  	for range controlFrames {
   293  		ws.Push(makeWriteNonStreamRequest())
   294  	}
   295  
   296  	// We should get the control frames first.
   297  	for range controlFrames {
   298  		wr, ok := ws.Pop()
   299  		if !ok || wr.StreamID() != 0 {
   300  			t.Fatalf("wr.Pop() = stream %v, %v; want 0, true", wr.StreamID(), ok)
   301  		}
   302  	}
   303  
   304  	// Each stream should write maxFrameSize bytes until it runs out of data.
   305  	// We should:
   306  	// - Get even-numbered streams first that are written in a round-robin
   307  	// manner as they have higher urgency and are incremental.
   308  	// - Get odd-numbered streams after that are written one-by-one to
   309  	// completion as they are of lower urgency and are not incremental.
   310  	// - Skip stream 1 and 4 that have been closed.
   311  	want := []uint32{4, 6, 4, 6, 4, 6, 4, 6, 6, 6, 3, 3, 3, 5, 5, 5, 5, 5}
   312  	var got []uint32
   313  	for {
   314  		wr, ok := ws.Pop()
   315  		if !ok {
   316  			break
   317  		}
   318  		if wr.DataSize() != maxFrameSize {
   319  			t.Fatalf("wr.Pop() = %v data bytes, want %v", wr.DataSize(), maxFrameSize)
   320  		}
   321  		got = append(got, wr.StreamID())
   322  	}
   323  	if !reflect.DeepEqual(got, want) {
   324  		t.Fatalf("popped streams %v, want %v", got, want)
   325  	}
   326  }
   327  
   328  func TestPrioritySchedulerBuffersPriorityUpdate(t *testing.T) {
   329  	const maxFrameSize = 16
   330  	sc := &serverConn{maxFrameSize: maxFrameSize}
   331  	ws := newPriorityWriteSchedulerRFC9218()
   332  
   333  	// Priorities are adjusted for streams that are not open yet.
   334  	ws.AdjustStream(1, PriorityParam{urgency: 0})
   335  	ws.AdjustStream(5, PriorityParam{urgency: 0})
   336  	for _, streamID := range []uint32{1, 3, 5} {
   337  		stream := &stream{
   338  			id: streamID,
   339  			sc: sc,
   340  		}
   341  		stream.flow.add(1 << 20) // arbitrary large value
   342  		ws.OpenStream(streamID, OpenStreamOptions{
   343  			priority: PriorityParam{
   344  				urgency:     7,
   345  				incremental: 1,
   346  			},
   347  		})
   348  		wr := FrameWriteRequest{
   349  			write: &writeData{
   350  				streamID:  streamID,
   351  				p:         make([]byte, maxFrameSize*(3)),
   352  				endStream: false,
   353  			},
   354  			stream: stream,
   355  		}
   356  		ws.Push(wr)
   357  	}
   358  
   359  	const controlFrames = 2
   360  	for range controlFrames {
   361  		ws.Push(makeWriteNonStreamRequest())
   362  	}
   363  
   364  	// We should get the control frames first.
   365  	for range controlFrames {
   366  		wr, ok := ws.Pop()
   367  		if !ok || wr.StreamID() != 0 {
   368  			t.Fatalf("wr.Pop() = stream %v, %v; want 0, true", wr.StreamID(), ok)
   369  		}
   370  	}
   371  
   372  	// The most recent priority adjustment is buffered and applied. Older ones
   373  	// are ignored.
   374  	want := []uint32{5, 5, 5, 1, 3, 1, 3, 1, 3}
   375  	var got []uint32
   376  	for {
   377  		wr, ok := ws.Pop()
   378  		if !ok {
   379  			break
   380  		}
   381  		if wr.DataSize() != maxFrameSize {
   382  			t.Fatalf("wr.Pop() = %v data bytes, want %v", wr.DataSize(), maxFrameSize)
   383  		}
   384  		got = append(got, wr.StreamID())
   385  	}
   386  	if !reflect.DeepEqual(got, want) {
   387  		t.Fatalf("popped streams %v, want %v", got, want)
   388  	}
   389  }
   390  

View as plain text