Source file src/net/sendfile_test.go

     1  // Copyright 2016 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package net
     6  
     7  import (
     8  	"bytes"
     9  	"context"
    10  	"crypto/sha256"
    11  	"encoding/hex"
    12  	"errors"
    13  	"fmt"
    14  	"internal/poll"
    15  	"io"
    16  	"math/rand"
    17  	"os"
    18  	"runtime"
    19  	"strconv"
    20  	"sync"
    21  	"testing"
    22  	"time"
    23  )
    24  
    25  const (
    26  	newton       = "../testdata/Isaac.Newton-Opticks.txt"
    27  	newtonLen    = 567198
    28  	newtonSHA256 = "d4a9ac22462b35e7821a4f2706c211093da678620a8f9997989ee7cf8d507bbd"
    29  )
    30  
    31  // expectSendfile runs f, and verifies that internal/poll.SendFile successfully handles
    32  // a write to wantConn during f's execution.
    33  //
    34  // On platforms where supportsSendfile is false, expectSendfile runs f but does not
    35  // expect a call to SendFile.
    36  func expectSendfile(t *testing.T, wantConn Conn, f func()) {
    37  	t.Helper()
    38  	if !supportsSendfile {
    39  		f()
    40  		return
    41  	}
    42  	orig := poll.TestHookDidSendFile
    43  	defer func() {
    44  		poll.TestHookDidSendFile = orig
    45  	}()
    46  	var (
    47  		called     bool
    48  		gotHandled bool
    49  		gotFD      *poll.FD
    50  		gotErr     error
    51  	)
    52  	poll.TestHookDidSendFile = func(dstFD *poll.FD, src uintptr, written int64, err error, handled bool) {
    53  		if called {
    54  			t.Error("internal/poll.SendFile called multiple times, want one call")
    55  		}
    56  		called = true
    57  		gotHandled = handled
    58  		gotFD = dstFD
    59  		gotErr = err
    60  	}
    61  	f()
    62  	if !called {
    63  		t.Error("internal/poll.SendFile was not called, want it to be")
    64  		return
    65  	}
    66  	if !gotHandled {
    67  		t.Error("internal/poll.SendFile did not handle the write, want it to, error:", gotErr)
    68  		return
    69  	}
    70  	if &wantConn.(*TCPConn).fd.pfd != gotFD {
    71  		t.Error("internal.poll.SendFile called with unexpected FD")
    72  	}
    73  }
    74  
    75  func TestSendfile(t *testing.T) { testSendfile(t, newton, newtonSHA256, newtonLen, 0) }
    76  func TestSendfileWithExactLimit(t *testing.T) {
    77  	testSendfile(t, newton, newtonSHA256, newtonLen, newtonLen)
    78  }
    79  func TestSendfileWithLimitLargerThanFile(t *testing.T) {
    80  	testSendfile(t, newton, newtonSHA256, newtonLen, newtonLen*2)
    81  }
    82  func TestSendfileWithLargeFile(t *testing.T) {
    83  	// Some platforms are not capable of handling large files with sendfile
    84  	// due to limited system resource, so we only run this test on amd64 and
    85  	// arm64 for the moment.
    86  	if runtime.GOARCH != "amd64" && runtime.GOARCH != "arm64" {
    87  		t.Skip("skipping on non-amd64 and non-arm64 platforms")
    88  	}
    89  	// Also skip it during short testing.
    90  	if testing.Short() {
    91  		t.Skip("Skip it during short testing")
    92  	}
    93  
    94  	// We're using 1<<31 - 1 as the chunk size for sendfile currently,
    95  	// make an edge case file that is 1 byte bigger than that.
    96  	f := createTempFile(t, 1<<31)
    97  	// For big file like this, only verify the transmission of the file,
    98  	// skip the content check.
    99  	testSendfile(t, f.Name(), "", 1<<31, 0)
   100  }
   101  func testSendfile(t *testing.T, filePath, fileHash string, size, limit int64) {
   102  	ln := newLocalListener(t, "tcp")
   103  	defer ln.Close()
   104  
   105  	errc := make(chan error, 1)
   106  	go func(ln Listener) {
   107  		// Wait for a connection.
   108  		conn, err := ln.Accept()
   109  		if err != nil {
   110  			errc <- err
   111  			close(errc)
   112  			return
   113  		}
   114  
   115  		go func() {
   116  			defer close(errc)
   117  			defer conn.Close()
   118  
   119  			f, err := os.Open(filePath)
   120  			if err != nil {
   121  				errc <- err
   122  				return
   123  			}
   124  			defer f.Close()
   125  
   126  			// Return file data using io.Copy, which should use
   127  			// sendFile if available.
   128  			var sbytes int64
   129  			expectSendfile(t, conn, func() {
   130  				if limit > 0 {
   131  					sbytes, err = io.CopyN(conn, f, limit)
   132  					if err == io.EOF && limit > size {
   133  						err = nil
   134  					}
   135  				} else {
   136  					sbytes, err = io.Copy(conn, f)
   137  				}
   138  			})
   139  			if err != nil {
   140  				errc <- err
   141  				return
   142  			}
   143  
   144  			if sbytes != size {
   145  				errc <- fmt.Errorf("sent %d bytes; expected %d", sbytes, size)
   146  				return
   147  			}
   148  		}()
   149  	}(ln)
   150  
   151  	// Connect to listener to retrieve file and verify digest matches
   152  	// expected.
   153  	c, err := Dial("tcp", ln.Addr().String())
   154  	if err != nil {
   155  		t.Fatal(err)
   156  	}
   157  	defer c.Close()
   158  
   159  	h := sha256.New()
   160  	rbytes, err := io.Copy(h, c)
   161  	if err != nil {
   162  		t.Error(err)
   163  	}
   164  
   165  	if rbytes != size {
   166  		t.Errorf("received %d bytes; expected %d", rbytes, size)
   167  	}
   168  
   169  	if len(fileHash) > 0 && hex.EncodeToString(h.Sum(nil)) != newtonSHA256 {
   170  		t.Error("retrieved data hash did not match")
   171  	}
   172  
   173  	for err := range errc {
   174  		t.Error(err)
   175  	}
   176  }
   177  
   178  func TestSendfileParts(t *testing.T) {
   179  	ln := newLocalListener(t, "tcp")
   180  	defer ln.Close()
   181  
   182  	errc := make(chan error, 1)
   183  	go func(ln Listener) {
   184  		// Wait for a connection.
   185  		conn, err := ln.Accept()
   186  		if err != nil {
   187  			errc <- err
   188  			close(errc)
   189  			return
   190  		}
   191  
   192  		go func() {
   193  			defer close(errc)
   194  			defer conn.Close()
   195  
   196  			f, err := os.Open(newton)
   197  			if err != nil {
   198  				errc <- err
   199  				return
   200  			}
   201  			defer f.Close()
   202  
   203  			for i := 0; i < 3; i++ {
   204  				// Return file data using io.CopyN, which should use
   205  				// sendFile if available.
   206  				expectSendfile(t, conn, func() {
   207  					_, err = io.CopyN(conn, f, 3)
   208  				})
   209  				if err != nil {
   210  					errc <- err
   211  					return
   212  				}
   213  			}
   214  		}()
   215  	}(ln)
   216  
   217  	c, err := Dial("tcp", ln.Addr().String())
   218  	if err != nil {
   219  		t.Fatal(err)
   220  	}
   221  	defer c.Close()
   222  
   223  	buf := new(bytes.Buffer)
   224  	buf.ReadFrom(c)
   225  
   226  	if want, have := "Produced ", buf.String(); have != want {
   227  		t.Errorf("unexpected server reply %q, want %q", have, want)
   228  	}
   229  
   230  	for err := range errc {
   231  		t.Error(err)
   232  	}
   233  }
   234  
   235  func TestSendfileSeeked(t *testing.T) {
   236  	ln := newLocalListener(t, "tcp")
   237  	defer ln.Close()
   238  
   239  	const seekTo = 65 << 10
   240  	const sendSize = 10 << 10
   241  
   242  	errc := make(chan error, 1)
   243  	go func(ln Listener) {
   244  		// Wait for a connection.
   245  		conn, err := ln.Accept()
   246  		if err != nil {
   247  			errc <- err
   248  			close(errc)
   249  			return
   250  		}
   251  
   252  		go func() {
   253  			defer close(errc)
   254  			defer conn.Close()
   255  
   256  			f, err := os.Open(newton)
   257  			if err != nil {
   258  				errc <- err
   259  				return
   260  			}
   261  			defer f.Close()
   262  			if _, err := f.Seek(seekTo, io.SeekStart); err != nil {
   263  				errc <- err
   264  				return
   265  			}
   266  
   267  			expectSendfile(t, conn, func() {
   268  				_, err = io.CopyN(conn, f, sendSize)
   269  			})
   270  			if err != nil {
   271  				errc <- err
   272  				return
   273  			}
   274  		}()
   275  	}(ln)
   276  
   277  	c, err := Dial("tcp", ln.Addr().String())
   278  	if err != nil {
   279  		t.Fatal(err)
   280  	}
   281  	defer c.Close()
   282  
   283  	buf := new(bytes.Buffer)
   284  	buf.ReadFrom(c)
   285  
   286  	if buf.Len() != sendSize {
   287  		t.Errorf("Got %d bytes; want %d", buf.Len(), sendSize)
   288  	}
   289  
   290  	for err := range errc {
   291  		t.Error(err)
   292  	}
   293  }
   294  
   295  // Test that sendfile doesn't put a pipe into blocking mode.
   296  func TestSendfilePipe(t *testing.T) {
   297  	switch runtime.GOOS {
   298  	case "plan9", "windows", "js", "wasip1":
   299  		// These systems don't support deadlines on pipes.
   300  		t.Skipf("skipping on %s", runtime.GOOS)
   301  	}
   302  
   303  	t.Parallel()
   304  
   305  	ln := newLocalListener(t, "tcp")
   306  	defer ln.Close()
   307  
   308  	r, w, err := os.Pipe()
   309  	if err != nil {
   310  		t.Fatal(err)
   311  	}
   312  	defer w.Close()
   313  	defer r.Close()
   314  
   315  	copied := make(chan bool)
   316  
   317  	var wg sync.WaitGroup
   318  	wg.Add(1)
   319  	go func() {
   320  		// Accept a connection and copy 1 byte from the read end of
   321  		// the pipe to the connection. This will call into sendfile.
   322  		defer wg.Done()
   323  		conn, err := ln.Accept()
   324  		if err != nil {
   325  			t.Error(err)
   326  			return
   327  		}
   328  		defer conn.Close()
   329  		// The comment above states that this should call into sendfile,
   330  		// but empirically it doesn't seem to do so at this time.
   331  		// If it does, or does on some platforms, this CopyN should be wrapped
   332  		// in expectSendfile.
   333  		_, err = io.CopyN(conn, r, 1)
   334  		if err != nil {
   335  			t.Error(err)
   336  			return
   337  		}
   338  		// Signal the main goroutine that we've copied the byte.
   339  		close(copied)
   340  	}()
   341  
   342  	wg.Add(1)
   343  	go func() {
   344  		// Write 1 byte to the write end of the pipe.
   345  		defer wg.Done()
   346  		_, err := w.Write([]byte{'a'})
   347  		if err != nil {
   348  			t.Error(err)
   349  		}
   350  	}()
   351  
   352  	wg.Add(1)
   353  	go func() {
   354  		// Connect to the server started two goroutines up and
   355  		// discard any data that it writes.
   356  		defer wg.Done()
   357  		conn, err := Dial("tcp", ln.Addr().String())
   358  		if err != nil {
   359  			t.Error(err)
   360  			return
   361  		}
   362  		defer conn.Close()
   363  		io.Copy(io.Discard, conn)
   364  	}()
   365  
   366  	// Wait for the byte to be copied, meaning that sendfile has
   367  	// been called on the pipe.
   368  	<-copied
   369  
   370  	// Set a very short deadline on the read end of the pipe.
   371  	if err := r.SetDeadline(time.Now().Add(time.Microsecond)); err != nil {
   372  		t.Fatal(err)
   373  	}
   374  
   375  	wg.Add(1)
   376  	go func() {
   377  		// Wait for much longer than the deadline and write a byte
   378  		// to the pipe.
   379  		defer wg.Done()
   380  		time.Sleep(50 * time.Millisecond)
   381  		w.Write([]byte{'b'})
   382  	}()
   383  
   384  	// If this read does not time out, the pipe was incorrectly
   385  	// put into blocking mode.
   386  	_, err = r.Read(make([]byte, 1))
   387  	if err == nil {
   388  		t.Error("Read did not time out")
   389  	} else if !os.IsTimeout(err) {
   390  		t.Errorf("got error %v, expected a time out", err)
   391  	}
   392  
   393  	wg.Wait()
   394  }
   395  
   396  // Issue 43822: tests that returns EOF when conn write timeout.
   397  func TestSendfileOnWriteTimeoutExceeded(t *testing.T) {
   398  	ln := newLocalListener(t, "tcp")
   399  	defer ln.Close()
   400  
   401  	errc := make(chan error, 1)
   402  	go func(ln Listener) (retErr error) {
   403  		defer func() {
   404  			errc <- retErr
   405  			close(errc)
   406  		}()
   407  
   408  		conn, err := ln.Accept()
   409  		if err != nil {
   410  			return err
   411  		}
   412  		defer conn.Close()
   413  
   414  		// Set the write deadline in the past(1h ago). It makes
   415  		// sure that it is always write timeout.
   416  		if err := conn.SetWriteDeadline(time.Now().Add(-1 * time.Hour)); err != nil {
   417  			return err
   418  		}
   419  
   420  		f, err := os.Open(newton)
   421  		if err != nil {
   422  			return err
   423  		}
   424  		defer f.Close()
   425  
   426  		// We expect this to use sendfile, but as of the time this comment was written
   427  		// poll.SendFile on an FD past its timeout can return an error indicating that
   428  		// it didn't handle the operation, resulting in a non-sendfile retry.
   429  		// So don't use expectSendfile here.
   430  		_, err = io.Copy(conn, f)
   431  		if errors.Is(err, os.ErrDeadlineExceeded) {
   432  			return nil
   433  		}
   434  
   435  		if err == nil {
   436  			err = fmt.Errorf("expected ErrDeadlineExceeded, but got nil")
   437  		}
   438  		return err
   439  	}(ln)
   440  
   441  	conn, err := Dial("tcp", ln.Addr().String())
   442  	if err != nil {
   443  		t.Fatal(err)
   444  	}
   445  	defer conn.Close()
   446  
   447  	n, err := io.Copy(io.Discard, conn)
   448  	if err != nil {
   449  		t.Fatalf("expected nil error, but got %v", err)
   450  	}
   451  	if n != 0 {
   452  		t.Fatalf("expected receive zero, but got %d byte(s)", n)
   453  	}
   454  
   455  	if err := <-errc; err != nil {
   456  		t.Fatal(err)
   457  	}
   458  }
   459  
   460  func BenchmarkSendfileZeroBytes(b *testing.B) {
   461  	var (
   462  		wg          sync.WaitGroup
   463  		ctx, cancel = context.WithCancel(context.Background())
   464  	)
   465  
   466  	defer wg.Wait()
   467  
   468  	ln := newLocalListener(b, "tcp")
   469  	defer ln.Close()
   470  
   471  	tempFile, err := os.CreateTemp(b.TempDir(), "test.txt")
   472  	if err != nil {
   473  		b.Fatalf("failed to create temp file: %v", err)
   474  	}
   475  	defer tempFile.Close()
   476  
   477  	fileName := tempFile.Name()
   478  
   479  	dataSize := b.N
   480  	wg.Add(1)
   481  	go func(f *os.File) {
   482  		defer wg.Done()
   483  
   484  		for i := 0; i < dataSize; i++ {
   485  			if _, err := f.Write([]byte{1}); err != nil {
   486  				b.Errorf("failed to write: %v", err)
   487  				return
   488  			}
   489  			if i%1000 == 0 {
   490  				f.Sync()
   491  			}
   492  		}
   493  	}(tempFile)
   494  
   495  	b.ResetTimer()
   496  	b.ReportAllocs()
   497  
   498  	wg.Add(1)
   499  	go func(ln Listener, fileName string) {
   500  		defer wg.Done()
   501  
   502  		conn, err := ln.Accept()
   503  		if err != nil {
   504  			b.Errorf("failed to accept: %v", err)
   505  			return
   506  		}
   507  		defer conn.Close()
   508  
   509  		f, err := os.OpenFile(fileName, os.O_RDONLY, 0660)
   510  		if err != nil {
   511  			b.Errorf("failed to open file: %v", err)
   512  			return
   513  		}
   514  		defer f.Close()
   515  
   516  		for {
   517  			if ctx.Err() != nil {
   518  				return
   519  			}
   520  
   521  			if _, err := io.Copy(conn, f); err != nil {
   522  				b.Errorf("failed to copy: %v", err)
   523  				return
   524  			}
   525  		}
   526  	}(ln, fileName)
   527  
   528  	conn, err := Dial("tcp", ln.Addr().String())
   529  	if err != nil {
   530  		b.Fatalf("failed to dial: %v", err)
   531  	}
   532  	defer conn.Close()
   533  
   534  	n, err := io.CopyN(io.Discard, conn, int64(dataSize))
   535  	if err != nil {
   536  		b.Fatalf("failed to copy: %v", err)
   537  	}
   538  	if n != int64(dataSize) {
   539  		b.Fatalf("expected %d copied bytes, but got %d", dataSize, n)
   540  	}
   541  
   542  	cancel()
   543  }
   544  
   545  func BenchmarkSendFile(b *testing.B) {
   546  	if runtime.GOOS == "windows" {
   547  		// TODO(panjf2000): Windows has not yet implemented FileConn,
   548  		//		remove this when it's implemented in https://go.dev/issues/9503.
   549  		b.Skipf("skipping on %s", runtime.GOOS)
   550  	}
   551  
   552  	b.Run("file-to-tcp", func(b *testing.B) { benchmarkSendFile(b, "tcp") })
   553  	b.Run("file-to-unix", func(b *testing.B) { benchmarkSendFile(b, "unix") })
   554  }
   555  
   556  func benchmarkSendFile(b *testing.B, proto string) {
   557  	for i := 0; i <= 10; i++ {
   558  		size := 1 << (i + 10)
   559  		bench := sendFileBench{
   560  			proto:     proto,
   561  			chunkSize: size,
   562  		}
   563  		b.Run(strconv.Itoa(size), bench.benchSendFile)
   564  	}
   565  }
   566  
   567  type sendFileBench struct {
   568  	proto     string
   569  	chunkSize int
   570  }
   571  
   572  func (bench sendFileBench) benchSendFile(b *testing.B) {
   573  	fileSize := b.N * bench.chunkSize
   574  	f := createTempFile(b, int64(fileSize))
   575  
   576  	client, server := spawnTestSocketPair(b, bench.proto)
   577  	defer server.Close()
   578  
   579  	cleanUp, err := startTestSocketPeer(b, client, "r", bench.chunkSize, fileSize)
   580  	if err != nil {
   581  		client.Close()
   582  		b.Fatal(err)
   583  	}
   584  	defer cleanUp(b)
   585  
   586  	b.ReportAllocs()
   587  	b.SetBytes(int64(bench.chunkSize))
   588  	b.ResetTimer()
   589  
   590  	// Data go from file to socket via sendfile(2).
   591  	sent, err := io.Copy(server, f)
   592  	if err != nil {
   593  		b.Fatalf("failed to copy data with sendfile, error: %v", err)
   594  	}
   595  	if sent != int64(fileSize) {
   596  		b.Fatalf("bytes sent mismatch, got: %d, want: %d", sent, fileSize)
   597  	}
   598  }
   599  
   600  func createTempFile(tb testing.TB, size int64) *os.File {
   601  	f, err := os.CreateTemp(tb.TempDir(), "sendfile-bench")
   602  	if err != nil {
   603  		tb.Fatalf("failed to create temporary file: %v", err)
   604  	}
   605  	tb.Cleanup(func() {
   606  		f.Close()
   607  	})
   608  
   609  	if _, err := io.CopyN(f, newRandReader(tb), size); err != nil {
   610  		tb.Fatalf("failed to fill the file with random data: %v", err)
   611  	}
   612  	if _, err := f.Seek(0, io.SeekStart); err != nil {
   613  		tb.Fatalf("failed to rewind the file: %v", err)
   614  	}
   615  
   616  	return f
   617  }
   618  
   619  func newRandReader(tb testing.TB) io.Reader {
   620  	seed := time.Now().UnixNano()
   621  	tb.Logf("Deterministic RNG seed based on timestamp: 0x%x", seed)
   622  	return rand.New(rand.NewSource(seed))
   623  }
   624  

View as plain text