Source file src/internal/poll/splice_linux.go
1 // Copyright 2018 The Go Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style 3 // license that can be found in the LICENSE file. 4 5 package poll 6 7 import ( 8 "internal/syscall/unix" 9 "runtime" 10 "sync" 11 "syscall" 12 ) 13 14 const ( 15 // spliceNonblock doesn't make the splice itself necessarily nonblocking 16 // (because the actual file descriptors that are spliced from/to may block 17 // unless they have the O_NONBLOCK flag set), but it makes the splice pipe 18 // operations nonblocking. 19 spliceNonblock = 0x2 20 21 // maxSpliceSize is the maximum amount of data Splice asks 22 // the kernel to move in a single call to splice(2). 23 // We use 1MB as Splice writes data through a pipe, and 1MB is the default maximum pipe buffer size, 24 // which is determined by /proc/sys/fs/pipe-max-size. 25 maxSpliceSize = 1 << 20 26 ) 27 28 // Splice transfers at most remain bytes of data from src to dst, using the 29 // splice system call to minimize copies of data from and to userspace. 30 // 31 // Splice gets a pipe buffer from the pool or creates a new one if needed, to serve as a buffer for the data transfer. 32 // src and dst must both be stream-oriented sockets. 33 func Splice(dst, src *FD, remain int64) (written int64, handled bool, err error) { 34 p, err := getPipe() 35 if err != nil { 36 return 0, false, err 37 } 38 defer putPipe(p) 39 var inPipe, n int 40 for err == nil && remain > 0 { 41 max := maxSpliceSize 42 if int64(max) > remain { 43 max = int(remain) 44 } 45 inPipe, err = spliceDrain(p.wfd, src, max) 46 // The operation is considered handled if splice returns no 47 // error, or an error other than EINVAL. An EINVAL means the 48 // kernel does not support splice for the socket type of src. 49 // The failed syscall does not consume any data so it is safe 50 // to fall back to a generic copy. 51 // 52 // spliceDrain should never return EAGAIN, so if err != nil, 53 // Splice cannot continue. 54 // 55 // If inPipe == 0 && err == nil, src is at EOF, and the 56 // transfer is complete. 57 handled = handled || (err != syscall.EINVAL) 58 if err != nil || inPipe == 0 { 59 break 60 } 61 p.data += inPipe 62 63 n, err = splicePump(dst, p.rfd, inPipe) 64 if n > 0 { 65 written += int64(n) 66 remain -= int64(n) 67 p.data -= n 68 } 69 } 70 if err != nil { 71 return written, handled, err 72 } 73 return written, true, nil 74 } 75 76 // spliceDrain moves data from a socket to a pipe. 77 // 78 // Invariant: when entering spliceDrain, the pipe is empty. It is either in its 79 // initial state, or splicePump has emptied it previously. 80 // 81 // Given this, spliceDrain can reasonably assume that the pipe is ready for 82 // writing, so if splice returns EAGAIN, it must be because the socket is not 83 // ready for reading. 84 // 85 // If spliceDrain returns (0, nil), src is at EOF. 86 func spliceDrain(pipefd int, sock *FD, max int) (int, error) { 87 if err := sock.readLock(); err != nil { 88 return 0, err 89 } 90 defer sock.readUnlock() 91 if err := sock.pd.prepareRead(sock.isFile); err != nil { 92 return 0, err 93 } 94 for { 95 // In theory calling splice(2) with SPLICE_F_NONBLOCK could end up an infinite loop here, 96 // because it could return EAGAIN ceaselessly when the write end of the pipe is full, 97 // but this shouldn't be a concern here, since the pipe buffer must be sufficient for 98 // this data transmission on the basis of the workflow in Splice. 99 n, err := splice(pipefd, sock.Sysfd, max, spliceNonblock) 100 if err == syscall.EINTR { 101 continue 102 } 103 if err != syscall.EAGAIN { 104 return n, err 105 } 106 if sock.pd.pollable() { 107 if err := sock.pd.waitRead(sock.isFile); err != nil { 108 return n, err 109 } 110 } 111 } 112 } 113 114 // splicePump moves all the buffered data from a pipe to a socket. 115 // 116 // Invariant: when entering splicePump, there are exactly inPipe 117 // bytes of data in the pipe, from a previous call to spliceDrain. 118 // 119 // By analogy to the condition from spliceDrain, splicePump 120 // only needs to poll the socket for readiness, if splice returns 121 // EAGAIN. 122 // 123 // If splicePump cannot move all the data in a single call to 124 // splice(2), it loops over the buffered data until it has written 125 // all of it to the socket. This behavior is similar to the Write 126 // step of an io.Copy in userspace. 127 func splicePump(sock *FD, pipefd int, inPipe int) (int, error) { 128 if err := sock.writeLock(); err != nil { 129 return 0, err 130 } 131 defer sock.writeUnlock() 132 if err := sock.pd.prepareWrite(sock.isFile); err != nil { 133 return 0, err 134 } 135 written := 0 136 for inPipe > 0 { 137 // In theory calling splice(2) with SPLICE_F_NONBLOCK could end up an infinite loop here, 138 // because it could return EAGAIN ceaselessly when the read end of the pipe is empty, 139 // but this shouldn't be a concern here, since the pipe buffer must contain inPipe size of 140 // data on the basis of the workflow in Splice. 141 n, err := splice(sock.Sysfd, pipefd, inPipe, spliceNonblock) 142 if err == syscall.EINTR { 143 continue 144 } 145 // Here, the condition n == 0 && err == nil should never be 146 // observed, since Splice controls the write side of the pipe. 147 if n > 0 { 148 inPipe -= n 149 written += n 150 continue 151 } 152 if err != syscall.EAGAIN { 153 return written, err 154 } 155 if sock.pd.pollable() { 156 if err := sock.pd.waitWrite(sock.isFile); err != nil { 157 return written, err 158 } 159 } 160 } 161 return written, nil 162 } 163 164 // splice wraps the splice system call. Since the current implementation 165 // only uses splice on sockets and pipes, the offset arguments are unused. 166 // splice returns int instead of int64, because callers never ask it to 167 // move more data in a single call than can fit in an int32. 168 func splice(out int, in int, max int, flags int) (int, error) { 169 n, err := syscall.Splice(in, nil, out, nil, max, flags) 170 return int(n), err 171 } 172 173 type splicePipeFields struct { 174 rfd int 175 wfd int 176 data int 177 } 178 179 type splicePipe struct { 180 splicePipeFields 181 cleanup runtime.Cleanup 182 } 183 184 // splicePipePool caches pipes to avoid high-frequency construction and destruction of pipe buffers. 185 // The garbage collector will free all pipes in the sync.Pool periodically, thus we need to set up 186 // a finalizer for each pipe to close its file descriptors before the actual GC. 187 var splicePipePool = sync.Pool{New: newPoolPipe} 188 189 func newPoolPipe() any { 190 // Discard the error which occurred during the creation of pipe buffer, 191 // redirecting the data transmission to the conventional way utilizing read() + write() as a fallback. 192 p := newPipe() 193 if p == nil { 194 return nil 195 } 196 197 p.cleanup = runtime.AddCleanup(p, func(spf splicePipeFields) { 198 destroyPipe(&splicePipe{splicePipeFields: spf}) 199 }, p.splicePipeFields) 200 return p 201 } 202 203 // getPipe tries to acquire a pipe buffer from the pool or create a new one with newPipe() if it gets nil from the cache. 204 func getPipe() (*splicePipe, error) { 205 v := splicePipePool.Get() 206 if v == nil { 207 return nil, syscall.EINVAL 208 } 209 return v.(*splicePipe), nil 210 } 211 212 func putPipe(p *splicePipe) { 213 // If there is still data left in the pipe, 214 // then close and discard it instead of putting it back into the pool. 215 if p.data != 0 { 216 p.cleanup.Stop() 217 destroyPipe(p) 218 return 219 } 220 splicePipePool.Put(p) 221 } 222 223 // newPipe sets up a pipe for a splice operation. 224 func newPipe() *splicePipe { 225 var fds [2]int 226 if err := syscall.Pipe2(fds[:], syscall.O_CLOEXEC|syscall.O_NONBLOCK); err != nil { 227 return nil 228 } 229 230 // Splice will loop writing maxSpliceSize bytes from the source to the pipe, 231 // and then write those bytes from the pipe to the destination. 232 // Set the pipe buffer size to maxSpliceSize to optimize that. 233 // Ignore errors here, as a smaller buffer size will work, 234 // although it will require more system calls. 235 unix.Fcntl(fds[0], syscall.F_SETPIPE_SZ, maxSpliceSize) 236 237 return &splicePipe{splicePipeFields: splicePipeFields{rfd: fds[0], wfd: fds[1]}} 238 } 239 240 // destroyPipe destroys a pipe. 241 func destroyPipe(p *splicePipe) { 242 CloseFunc(p.rfd) 243 CloseFunc(p.wfd) 244 } 245