mtree/BSD.root.dist: Use spaces.
[dragonfly.git] / sys / kern / sys_pipe.c
blob0a1fa2f0d3867c72f48128dc1898133e553fc015
1 /*
2 * Copyright (c) 1996 John S. Dyson
3 * All rights reserved.
4 * Copyright (c) 2003-2017 The DragonFly Project. All rights reserved.
6 * This code is derived from software contributed to The DragonFly Project
7 * by Matthew Dillon <dillon@backplane.com>
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions
11 * are met:
12 * 1. Redistributions of source code must retain the above copyright
13 * notice immediately at the beginning of the file, without modification,
14 * this list of conditions, and the following disclaimer.
15 * 2. Redistributions in binary form must reproduce the above copyright
16 * notice, this list of conditions and the following disclaimer in the
17 * documentation and/or other materials provided with the distribution.
18 * 3. Absolutely no warranty of function or purpose is made by the author
19 * John S. Dyson.
20 * 4. Modifications may be freely made to this file if the above conditions
21 * are met.
25 * This file contains a high-performance replacement for the socket-based
26 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support
27 * all features of sockets, but does do everything that pipes normally
28 * do.
30 #include <sys/param.h>
31 #include <sys/systm.h>
32 #include <sys/kernel.h>
33 #include <sys/proc.h>
34 #include <sys/fcntl.h>
35 #include <sys/file.h>
36 #include <sys/filedesc.h>
37 #include <sys/filio.h>
38 #include <sys/ttycom.h>
39 #include <sys/stat.h>
40 #include <sys/signalvar.h>
41 #include <sys/sysproto.h>
42 #include <sys/pipe.h>
43 #include <sys/vnode.h>
44 #include <sys/uio.h>
45 #include <sys/event.h>
46 #include <sys/globaldata.h>
47 #include <sys/module.h>
48 #include <sys/malloc.h>
49 #include <sys/sysctl.h>
50 #include <sys/socket.h>
51 #include <sys/kern_syscall.h>
52 #include <sys/lock.h>
53 #include <sys/mutex.h>
55 #include <vm/vm.h>
56 #include <vm/vm_param.h>
57 #include <vm/vm_object.h>
58 #include <vm/vm_kern.h>
59 #include <vm/vm_extern.h>
60 #include <vm/pmap.h>
61 #include <vm/vm_map.h>
62 #include <vm/vm_page.h>
63 #include <vm/vm_zone.h>
65 #include <sys/file2.h>
66 #include <sys/signal2.h>
67 #include <sys/mutex2.h>
69 #include <machine/cpufunc.h>
71 struct pipegdlock {
72 struct mtx mtx;
73 } __cachealign;
76 * interfaces to the outside world
78 static int pipe_read (struct file *fp, struct uio *uio,
79 struct ucred *cred, int flags);
80 static int pipe_write (struct file *fp, struct uio *uio,
81 struct ucred *cred, int flags);
82 static int pipe_close (struct file *fp);
83 static int pipe_shutdown (struct file *fp, int how);
84 static int pipe_kqfilter (struct file *fp, struct knote *kn);
85 static int pipe_stat (struct file *fp, struct stat *sb, struct ucred *cred);
86 static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data,
87 struct ucred *cred, struct sysmsg *msg);
89 static struct fileops pipeops = {
90 .fo_read = pipe_read,
91 .fo_write = pipe_write,
92 .fo_ioctl = pipe_ioctl,
93 .fo_kqfilter = pipe_kqfilter,
94 .fo_stat = pipe_stat,
95 .fo_close = pipe_close,
96 .fo_shutdown = pipe_shutdown
99 static void filt_pipedetach(struct knote *kn);
100 static int filt_piperead(struct knote *kn, long hint);
101 static int filt_pipewrite(struct knote *kn, long hint);
103 static struct filterops pipe_rfiltops =
104 { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_piperead };
105 static struct filterops pipe_wfiltops =
106 { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_pipewrite };
108 MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures");
110 #define PIPEQ_MAX_CACHE 16 /* per-cpu pipe structure cache */
112 static int pipe_maxcache = PIPEQ_MAX_CACHE;
113 static struct pipegdlock *pipe_gdlocks;
115 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation");
116 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache,
117 CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu");
120 * The pipe buffer size can be changed at any time. Only new pipe()s
121 * are affected. Note that due to cpu cache effects, you do not want
122 * to make this value too large.
124 static int pipe_size = 32768;
125 SYSCTL_INT(_kern_pipe, OID_AUTO, size,
126 CTLFLAG_RW, &pipe_size, 0, "Pipe buffer size (16384 minimum)");
129 * Reader/writer delay loop. When the reader exhausts the pipe buffer
130 * or the write completely fills the pipe buffer and would otherwise sleep,
131 * it first busy-loops for a few microseconds waiting for data or buffer
132 * space. This eliminates IPIs for most high-bandwidth writer/reader pipes
133 * and also helps when the user program uses a large data buffer in its
134 * UIOs.
136 * This defaults to 4uS.
138 #ifdef _RDTSC_SUPPORTED_
139 static int pipe_delay = 4000; /* 4uS default */
140 SYSCTL_INT(_kern_pipe, OID_AUTO, delay,
141 CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns");
142 #endif
145 * Auto-size pipe cache to reduce kmem allocations and frees.
147 static
148 void
149 pipeinit(void *dummy)
151 size_t mbytes = kmem_lim_size();
152 int n;
154 if (pipe_maxcache == PIPEQ_MAX_CACHE) {
155 if (mbytes >= 7 * 1024)
156 pipe_maxcache *= 2;
157 if (mbytes >= 15 * 1024)
158 pipe_maxcache *= 2;
160 pipe_gdlocks = kmalloc(sizeof(*pipe_gdlocks) * ncpus,
161 M_PIPE, M_WAITOK | M_ZERO);
162 for (n = 0; n < ncpus; ++n)
163 mtx_init(&pipe_gdlocks[n].mtx, "pipekm");
165 SYSINIT(kmem, SI_BOOT2_MACHDEP, SI_ORDER_ANY, pipeinit, NULL);
167 static void pipeclose (struct pipe *pipe,
168 struct pipebuf *pbr, struct pipebuf *pbw);
169 static void pipe_free_kmem (struct pipebuf *buf);
170 static int pipe_create (struct pipe **pipep);
173 * Test and clear the specified flag, wakeup(pb) if it was set.
174 * This function must also act as a memory barrier.
176 static __inline void
177 pipesignal(struct pipebuf *pb, uint32_t flags)
179 uint32_t oflags;
180 uint32_t nflags;
182 for (;;) {
183 oflags = pb->state;
184 cpu_ccfence();
185 nflags = oflags & ~flags;
186 if (atomic_cmpset_int(&pb->state, oflags, nflags))
187 break;
189 if (oflags & flags)
190 wakeup(pb);
196 static __inline void
197 pipewakeup(struct pipebuf *pb, int dosigio)
199 if (dosigio && (pb->state & PIPE_ASYNC) && pb->sigio) {
200 lwkt_gettoken(&sigio_token);
201 pgsigio(pb->sigio, SIGIO, 0);
202 lwkt_reltoken(&sigio_token);
204 KNOTE(&pb->kq.ki_note, 0);
208 * These routines are called before and after a UIO. The UIO
209 * may block, causing our held tokens to be lost temporarily.
211 * We use these routines to serialize reads against other reads
212 * and writes against other writes.
214 * The appropriate token is held on entry so *ipp does not race.
216 static __inline int
217 pipe_start_uio(int *ipp)
219 int error;
221 while (*ipp) {
222 *ipp = -1;
223 error = tsleep(ipp, PCATCH, "pipexx", 0);
224 if (error)
225 return (error);
227 *ipp = 1;
228 return (0);
231 static __inline void
232 pipe_end_uio(int *ipp)
234 if (*ipp < 0) {
235 *ipp = 0;
236 wakeup(ipp);
237 } else {
238 KKASSERT(*ipp > 0);
239 *ipp = 0;
244 * The pipe system call for the DTYPE_PIPE type of pipes
246 * pipe_args(int dummy)
248 * MPSAFE
251 sys_pipe(struct pipe_args *uap)
253 return kern_pipe(uap->sysmsg_fds, 0);
257 sys_pipe2(struct pipe2_args *uap)
259 return kern_pipe(uap->sysmsg_fds, uap->flags);
263 kern_pipe(long *fds, int flags)
265 struct thread *td = curthread;
266 struct filedesc *fdp = td->td_proc->p_fd;
267 struct file *rf, *wf;
268 struct pipe *pipe;
269 int fd1, fd2, error;
271 pipe = NULL;
272 if (pipe_create(&pipe)) {
273 pipeclose(pipe, &pipe->bufferA, &pipe->bufferB);
274 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
275 return (ENFILE);
278 error = falloc(td->td_lwp, &rf, &fd1);
279 if (error) {
280 pipeclose(pipe, &pipe->bufferA, &pipe->bufferB);
281 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
282 return (error);
284 fds[0] = fd1;
287 * Warning: once we've gotten past allocation of the fd for the
288 * read-side, we can only drop the read side via fdrop() in order
289 * to avoid races against processes which manage to dup() the read
290 * side while we are blocked trying to allocate the write side.
292 rf->f_type = DTYPE_PIPE;
293 rf->f_flag = FREAD | FWRITE;
294 rf->f_ops = &pipeops;
295 rf->f_data = (void *)((intptr_t)pipe | 0);
296 if (flags & O_NONBLOCK)
297 rf->f_flag |= O_NONBLOCK;
298 if (flags & O_CLOEXEC)
299 fdp->fd_files[fd1].fileflags |= UF_EXCLOSE;
301 error = falloc(td->td_lwp, &wf, &fd2);
302 if (error) {
303 fsetfd(fdp, NULL, fd1);
304 fdrop(rf);
305 /* pipeA has been closed by fdrop() */
306 /* close pipeB here */
307 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
308 return (error);
310 wf->f_type = DTYPE_PIPE;
311 wf->f_flag = FREAD | FWRITE;
312 wf->f_ops = &pipeops;
313 wf->f_data = (void *)((intptr_t)pipe | 1);
314 if (flags & O_NONBLOCK)
315 wf->f_flag |= O_NONBLOCK;
316 if (flags & O_CLOEXEC)
317 fdp->fd_files[fd2].fileflags |= UF_EXCLOSE;
319 fds[1] = fd2;
322 * Once activated the peer relationship remains valid until
323 * both sides are closed.
325 fsetfd(fdp, rf, fd1);
326 fsetfd(fdp, wf, fd2);
327 fdrop(rf);
328 fdrop(wf);
330 return (0);
334 * [re]allocates KVA for the pipe's circular buffer. The space is
335 * pageable. Called twice to setup full-duplex communications.
337 * NOTE: Independent vm_object's are used to improve performance.
339 * Returns 0 on success, ENOMEM on failure.
341 static int
342 pipespace(struct pipe *pipe, struct pipebuf *pb, size_t size)
344 struct vm_object *object;
345 caddr_t buffer;
346 vm_pindex_t npages;
347 int error;
349 size = (size + PAGE_MASK) & ~(size_t)PAGE_MASK;
350 if (size < 16384)
351 size = 16384;
352 if (size > 1024*1024)
353 size = 1024*1024;
355 npages = round_page(size) / PAGE_SIZE;
356 object = pb->object;
359 * [re]create the object if necessary and reserve space for it
360 * in the kernel_map. The object and memory are pageable. On
361 * success, free the old resources before assigning the new
362 * ones.
364 if (object == NULL || object->size != npages) {
365 object = vm_object_allocate(OBJT_DEFAULT, npages);
366 buffer = (caddr_t)vm_map_min(&kernel_map);
368 error = vm_map_find(&kernel_map, object, NULL,
369 0, (vm_offset_t *)&buffer, size,
370 PAGE_SIZE, TRUE,
371 VM_MAPTYPE_NORMAL, VM_SUBSYS_PIPE,
372 VM_PROT_ALL, VM_PROT_ALL, 0);
374 if (error != KERN_SUCCESS) {
375 vm_object_deallocate(object);
376 return (ENOMEM);
378 pipe_free_kmem(pb);
379 pb->object = object;
380 pb->buffer = buffer;
381 pb->size = size;
383 pb->rindex = 0;
384 pb->windex = 0;
386 return (0);
390 * Initialize and allocate VM and memory for pipe, pulling the pipe from
391 * our per-cpu cache if possible.
393 * Returns 0 on success, else an error code (typically ENOMEM). Caller
394 * must still deallocate the pipe on failure.
396 static int
397 pipe_create(struct pipe **pipep)
399 globaldata_t gd = mycpu;
400 struct pipe *pipe;
401 int error;
403 if ((pipe = gd->gd_pipeq) != NULL) {
404 gd->gd_pipeq = pipe->next;
405 --gd->gd_pipeqcount;
406 pipe->next = NULL;
407 } else {
408 pipe = kmalloc(sizeof(*pipe), M_PIPE, M_WAITOK | M_ZERO);
409 lwkt_token_init(&pipe->bufferA.rlock, "piper");
410 lwkt_token_init(&pipe->bufferA.wlock, "pipew");
411 lwkt_token_init(&pipe->bufferB.rlock, "piper");
412 lwkt_token_init(&pipe->bufferB.wlock, "pipew");
414 *pipep = pipe;
415 if ((error = pipespace(pipe, &pipe->bufferA, pipe_size)) != 0) {
416 return (error);
418 if ((error = pipespace(pipe, &pipe->bufferB, pipe_size)) != 0) {
419 return (error);
421 vfs_timestamp(&pipe->ctime);
422 pipe->bufferA.atime = pipe->ctime;
423 pipe->bufferA.mtime = pipe->ctime;
424 pipe->bufferB.atime = pipe->ctime;
425 pipe->bufferB.mtime = pipe->ctime;
426 pipe->open_count = 2;
428 return (0);
432 * Read data from a pipe
434 static int
435 pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
437 struct pipebuf *rpb;
438 struct pipebuf *wpb;
439 struct pipe *pipe;
440 size_t nread = 0;
441 size_t size; /* total bytes available */
442 size_t nsize; /* total bytes to read */
443 size_t rindex; /* contiguous bytes available */
444 int notify_writer;
445 int bigread;
446 int bigcount;
447 int error;
448 int nbio;
450 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
451 if ((intptr_t)fp->f_data & 1) {
452 rpb = &pipe->bufferB;
453 wpb = &pipe->bufferA;
454 } else {
455 rpb = &pipe->bufferA;
456 wpb = &pipe->bufferB;
458 atomic_set_int(&curthread->td_mpflags, TDF_MP_BATCH_DEMARC);
460 if (uio->uio_resid == 0)
461 return(0);
464 * Calculate nbio
466 if (fflags & O_FBLOCKING)
467 nbio = 0;
468 else if (fflags & O_FNONBLOCKING)
469 nbio = 1;
470 else if (fp->f_flag & O_NONBLOCK)
471 nbio = 1;
472 else
473 nbio = 0;
476 * 'quick' NBIO test before things get expensive.
478 if (nbio && rpb->rindex == rpb->windex &&
479 (rpb->state & PIPE_REOF) == 0) {
480 return EAGAIN;
484 * Reads are serialized. Note however that buffer.buffer and
485 * buffer.size can change out from under us when the number
486 * of bytes in the buffer are zero due to the write-side doing a
487 * pipespace().
489 lwkt_gettoken(&rpb->rlock);
490 error = pipe_start_uio(&rpb->rip);
491 if (error) {
492 lwkt_reltoken(&rpb->rlock);
493 return (error);
495 notify_writer = 0;
497 bigread = (uio->uio_resid > 10 * 1024 * 1024);
498 bigcount = 10;
500 while (uio->uio_resid) {
502 * Don't hog the cpu.
504 if (bigread && --bigcount == 0) {
505 lwkt_user_yield();
506 bigcount = 10;
507 if (CURSIG(curthread->td_lwp)) {
508 error = EINTR;
509 break;
514 * lfence required to avoid read-reordering of buffer
515 * contents prior to validation of size.
517 size = rpb->windex - rpb->rindex;
518 cpu_lfence();
519 if (size) {
520 rindex = rpb->rindex & (rpb->size - 1);
521 nsize = size;
522 if (nsize > rpb->size - rindex)
523 nsize = rpb->size - rindex;
524 nsize = szmin(nsize, uio->uio_resid);
527 * Limit how much we move in one go so we have a
528 * chance to kick the writer while data is still
529 * available in the pipe. This avoids getting into
530 * a ping-pong with the writer.
532 if (nsize > (rpb->size >> 1))
533 nsize = rpb->size >> 1;
535 error = uiomove(&rpb->buffer[rindex], nsize, uio);
536 if (error)
537 break;
538 rpb->rindex += nsize;
539 nread += nsize;
542 * If the FIFO is still over half full just continue
543 * and do not try to notify the writer yet. If
544 * less than half full notify any waiting writer.
546 if (size - nsize > (rpb->size >> 1)) {
547 notify_writer = 0;
548 } else {
549 notify_writer = 1;
550 pipesignal(rpb, PIPE_WANTW);
552 continue;
556 * If the "write-side" was blocked we wake it up. This code
557 * is reached when the buffer is completely emptied.
559 pipesignal(rpb, PIPE_WANTW);
562 * Pick up our copy loop again if the writer sent data to
563 * us while we were messing around.
565 * On a SMP box poll up to pipe_delay nanoseconds for new
566 * data. Typically a value of 2000 to 4000 is sufficient
567 * to eradicate most IPIs/tsleeps/wakeups when a pipe
568 * is used for synchronous communications with small packets,
569 * and 8000 or so (8uS) will pipeline large buffer xfers
570 * between cpus over a pipe.
572 * For synchronous communications a hit means doing a
573 * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS,
574 * where as miss requiring a tsleep/wakeup sequence
575 * will take 7uS or more.
577 if (rpb->windex != rpb->rindex)
578 continue;
580 #ifdef _RDTSC_SUPPORTED_
581 if (pipe_delay) {
582 int64_t tsc_target;
583 int good = 0;
585 tsc_target = tsc_get_target(pipe_delay);
586 while (tsc_test_target(tsc_target) == 0) {
587 cpu_lfence();
588 if (rpb->windex != rpb->rindex) {
589 good = 1;
590 break;
592 cpu_pause();
594 if (good)
595 continue;
597 #endif
600 * Detect EOF condition, do not set error.
602 if (rpb->state & PIPE_REOF)
603 break;
606 * Break if some data was read, or if this was a non-blocking
607 * read.
609 if (nread > 0)
610 break;
612 if (nbio) {
613 error = EAGAIN;
614 break;
618 * Last chance, interlock with WANTR
620 tsleep_interlock(rpb, PCATCH);
621 atomic_set_int(&rpb->state, PIPE_WANTR);
624 * Retest bytes available after memory barrier above.
626 size = rpb->windex - rpb->rindex;
627 if (size)
628 continue;
631 * Retest EOF after memory barrier above.
633 if (rpb->state & PIPE_REOF)
634 break;
637 * Wait for more data or state change
639 error = tsleep(rpb, PCATCH | PINTERLOCKED, "piperd", 0);
640 if (error)
641 break;
643 pipe_end_uio(&rpb->rip);
646 * Uptime last access time
648 if (error == 0 && nread)
649 vfs_timestamp(&rpb->atime);
652 * If we drained the FIFO more then half way then handle
653 * write blocking hysteresis.
655 * Note that PIPE_WANTW cannot be set by the writer without
656 * it holding both rlock and wlock, so we can test it
657 * while holding just rlock.
659 if (notify_writer) {
661 * Synchronous blocking is done on the pipe involved
663 pipesignal(rpb, PIPE_WANTW);
666 * But we may also have to deal with a kqueue which is
667 * stored on the same pipe as its descriptor, so a
668 * EVFILT_WRITE event waiting for our side to drain will
669 * be on the other side.
671 pipewakeup(wpb, 0);
673 /*size = rpb->windex - rpb->rindex;*/
674 lwkt_reltoken(&rpb->rlock);
676 return (error);
679 static int
680 pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
682 struct pipebuf *rpb;
683 struct pipebuf *wpb;
684 struct pipe *pipe;
685 size_t windex;
686 size_t space;
687 size_t wcount;
688 size_t orig_resid;
689 int bigwrite;
690 int bigcount;
691 int error;
692 int nbio;
694 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
695 if ((intptr_t)fp->f_data & 1) {
696 rpb = &pipe->bufferB;
697 wpb = &pipe->bufferA;
698 } else {
699 rpb = &pipe->bufferA;
700 wpb = &pipe->bufferB;
704 * Calculate nbio
706 if (fflags & O_FBLOCKING)
707 nbio = 0;
708 else if (fflags & O_FNONBLOCKING)
709 nbio = 1;
710 else if (fp->f_flag & O_NONBLOCK)
711 nbio = 1;
712 else
713 nbio = 0;
716 * 'quick' NBIO test before things get expensive.
718 if (nbio && wpb->size == (wpb->windex - wpb->rindex) &&
719 uio->uio_resid && (wpb->state & PIPE_WEOF) == 0) {
720 return EAGAIN;
724 * Writes go to the peer. The peer will always exist.
726 lwkt_gettoken(&wpb->wlock);
727 if (wpb->state & PIPE_WEOF) {
728 lwkt_reltoken(&wpb->wlock);
729 return (EPIPE);
733 * Degenerate case (EPIPE takes prec)
735 if (uio->uio_resid == 0) {
736 lwkt_reltoken(&wpb->wlock);
737 return(0);
741 * Writes are serialized (start_uio must be called with wlock)
743 error = pipe_start_uio(&wpb->wip);
744 if (error) {
745 lwkt_reltoken(&wpb->wlock);
746 return (error);
749 orig_resid = uio->uio_resid;
750 wcount = 0;
752 bigwrite = (uio->uio_resid > 10 * 1024 * 1024);
753 bigcount = 10;
755 while (uio->uio_resid) {
756 if (wpb->state & PIPE_WEOF) {
757 error = EPIPE;
758 break;
762 * Don't hog the cpu.
764 if (bigwrite && --bigcount == 0) {
765 lwkt_user_yield();
766 bigcount = 10;
767 if (CURSIG(curthread->td_lwp)) {
768 error = EINTR;
769 break;
773 windex = wpb->windex & (wpb->size - 1);
774 space = wpb->size - (wpb->windex - wpb->rindex);
777 * Writes of size <= PIPE_BUF must be atomic.
779 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
780 space = 0;
783 * Write to fill, read size handles write hysteresis. Also
784 * additional restrictions can cause select-based non-blocking
785 * writes to spin.
787 if (space > 0) {
788 size_t segsize;
791 * We want to notify a potentially waiting reader
792 * before we exhaust the write buffer for SMP
793 * pipelining. Otherwise the write/read will begin
794 * to ping-pong.
796 space = szmin(space, uio->uio_resid);
797 if (space > (wpb->size >> 1))
798 space = (wpb->size >> 1);
801 * First segment to transfer is minimum of
802 * transfer size and contiguous space in
803 * pipe buffer. If first segment to transfer
804 * is less than the transfer size, we've got
805 * a wraparound in the buffer.
807 segsize = wpb->size - windex;
808 if (segsize > space)
809 segsize = space;
812 * If this is the first loop and the reader is
813 * blocked, do a preemptive wakeup of the reader.
815 * On SMP the IPI latency plus the wlock interlock
816 * on the reader side is the fastest way to get the
817 * reader going. (The scheduler will hard loop on
818 * lock tokens).
820 if (wcount == 0)
821 pipesignal(wpb, PIPE_WANTR);
824 * Transfer segment, which may include a wrap-around.
825 * Update windex to account for both all in one go
826 * so the reader can read() the data atomically.
828 error = uiomove(&wpb->buffer[windex], segsize, uio);
829 if (error == 0 && segsize < space) {
830 segsize = space - segsize;
831 error = uiomove(&wpb->buffer[0], segsize, uio);
833 if (error)
834 break;
837 * Memory fence prior to windex updating (note: not
838 * needed so this is a NOP on Intel).
840 cpu_sfence();
841 wpb->windex += space;
844 * Signal reader
846 if (wcount != 0)
847 pipesignal(wpb, PIPE_WANTR);
848 wcount += space;
849 continue;
853 * Wakeup any pending reader
855 pipesignal(wpb, PIPE_WANTR);
858 * don't block on non-blocking I/O
860 if (nbio) {
861 error = EAGAIN;
862 break;
865 #ifdef _RDTSC_SUPPORTED_
866 if (pipe_delay) {
867 int64_t tsc_target;
868 int good = 0;
870 tsc_target = tsc_get_target(pipe_delay);
871 while (tsc_test_target(tsc_target) == 0) {
872 cpu_lfence();
873 space = wpb->size - (wpb->windex - wpb->rindex);
874 if ((space < uio->uio_resid) &&
875 (orig_resid <= PIPE_BUF)) {
876 space = 0;
878 if (space) {
879 good = 1;
880 break;
882 cpu_pause();
884 if (good)
885 continue;
887 #endif
890 * Interlocked test. Atomic op enforces the memory barrier.
892 tsleep_interlock(wpb, PCATCH);
893 atomic_set_int(&wpb->state, PIPE_WANTW);
896 * Retest space available after memory barrier above.
897 * Writes of size <= PIPE_BUF must be atomic.
899 space = wpb->size - (wpb->windex - wpb->rindex);
900 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
901 space = 0;
904 * Retest EOF after memory barrier above.
906 if (wpb->state & PIPE_WEOF) {
907 error = EPIPE;
908 break;
912 * We have no more space and have something to offer,
913 * wake up select/poll/kq.
915 if (space == 0) {
916 pipewakeup(wpb, 1);
917 error = tsleep(wpb, PCATCH | PINTERLOCKED, "pipewr", 0);
921 * Break out if we errored or the read side wants us to go
922 * away.
924 if (error)
925 break;
926 if (wpb->state & PIPE_WEOF) {
927 error = EPIPE;
928 break;
931 pipe_end_uio(&wpb->wip);
934 * If we have put any characters in the buffer, we wake up
935 * the reader.
937 * Both rlock and wlock are required to be able to modify pipe_state.
939 if (wpb->windex != wpb->rindex) {
940 pipesignal(wpb, PIPE_WANTR);
941 pipewakeup(wpb, 1);
945 * Don't return EPIPE if I/O was successful
947 if ((wpb->rindex == wpb->windex) &&
948 (uio->uio_resid == 0) &&
949 (error == EPIPE)) {
950 error = 0;
953 if (error == 0)
954 vfs_timestamp(&wpb->mtime);
957 * We have something to offer,
958 * wake up select/poll/kq.
960 /*space = wpb->windex - wpb->rindex;*/
961 lwkt_reltoken(&wpb->wlock);
963 return (error);
967 * we implement a very minimal set of ioctls for compatibility with sockets.
969 static int
970 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data,
971 struct ucred *cred, struct sysmsg *msg)
973 struct pipebuf *rpb;
974 struct pipe *pipe;
975 int error;
977 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
978 if ((intptr_t)fp->f_data & 1) {
979 rpb = &pipe->bufferB;
980 } else {
981 rpb = &pipe->bufferA;
984 lwkt_gettoken(&rpb->rlock);
985 lwkt_gettoken(&rpb->wlock);
987 switch (cmd) {
988 case FIOASYNC:
989 if (*(int *)data) {
990 atomic_set_int(&rpb->state, PIPE_ASYNC);
991 } else {
992 atomic_clear_int(&rpb->state, PIPE_ASYNC);
994 error = 0;
995 break;
996 case FIONREAD:
997 *(int *)data = (int)(rpb->windex - rpb->rindex);
998 error = 0;
999 break;
1000 case FIOSETOWN:
1001 error = fsetown(*(int *)data, &rpb->sigio);
1002 break;
1003 case FIOGETOWN:
1004 *(int *)data = fgetown(&rpb->sigio);
1005 error = 0;
1006 break;
1007 case TIOCSPGRP:
1008 /* This is deprecated, FIOSETOWN should be used instead. */
1009 error = fsetown(-(*(int *)data), &rpb->sigio);
1010 break;
1012 case TIOCGPGRP:
1013 /* This is deprecated, FIOGETOWN should be used instead. */
1014 *(int *)data = -fgetown(&rpb->sigio);
1015 error = 0;
1016 break;
1017 default:
1018 error = ENOTTY;
1019 break;
1021 lwkt_reltoken(&rpb->wlock);
1022 lwkt_reltoken(&rpb->rlock);
1024 return (error);
1028 * MPSAFE
1030 static int
1031 pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred)
1033 struct pipebuf *rpb;
1034 struct pipe *pipe;
1036 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1037 if ((intptr_t)fp->f_data & 1) {
1038 rpb = &pipe->bufferB;
1039 } else {
1040 rpb = &pipe->bufferA;
1043 bzero((caddr_t)ub, sizeof(*ub));
1044 ub->st_mode = S_IFIFO;
1045 ub->st_blksize = rpb->size;
1046 ub->st_size = rpb->windex - rpb->rindex;
1047 ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1048 ub->st_atimespec = rpb->atime;
1049 ub->st_mtimespec = rpb->mtime;
1050 ub->st_ctimespec = pipe->ctime;
1052 * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev,
1053 * st_flags, st_gen.
1054 * XXX (st_dev, st_ino) should be unique.
1057 return (0);
1060 static int
1061 pipe_close(struct file *fp)
1063 struct pipebuf *rpb;
1064 struct pipebuf *wpb;
1065 struct pipe *pipe;
1067 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1068 if ((intptr_t)fp->f_data & 1) {
1069 rpb = &pipe->bufferB;
1070 wpb = &pipe->bufferA;
1071 } else {
1072 rpb = &pipe->bufferA;
1073 wpb = &pipe->bufferB;
1076 fp->f_ops = &badfileops;
1077 fp->f_data = NULL;
1078 funsetown(&rpb->sigio);
1079 pipeclose(pipe, rpb, wpb);
1081 return (0);
1085 * Shutdown one or both directions of a full-duplex pipe.
1087 static int
1088 pipe_shutdown(struct file *fp, int how)
1090 struct pipebuf *rpb;
1091 struct pipebuf *wpb;
1092 struct pipe *pipe;
1093 int error = EPIPE;
1095 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1096 if ((intptr_t)fp->f_data & 1) {
1097 rpb = &pipe->bufferB;
1098 wpb = &pipe->bufferA;
1099 } else {
1100 rpb = &pipe->bufferA;
1101 wpb = &pipe->bufferB;
1105 * We modify pipe_state on both pipes, which means we need
1106 * all four tokens!
1108 lwkt_gettoken(&rpb->rlock);
1109 lwkt_gettoken(&rpb->wlock);
1110 lwkt_gettoken(&wpb->rlock);
1111 lwkt_gettoken(&wpb->wlock);
1113 switch(how) {
1114 case SHUT_RDWR:
1115 case SHUT_RD:
1117 * EOF on my reads and peer writes
1119 atomic_set_int(&rpb->state, PIPE_REOF | PIPE_WEOF);
1120 if (rpb->state & PIPE_WANTR) {
1121 rpb->state &= ~PIPE_WANTR;
1122 wakeup(rpb);
1124 if (rpb->state & PIPE_WANTW) {
1125 rpb->state &= ~PIPE_WANTW;
1126 wakeup(rpb);
1128 error = 0;
1129 if (how == SHUT_RD)
1130 break;
1131 /* fall through */
1132 case SHUT_WR:
1134 * EOF on peer reads and my writes
1136 atomic_set_int(&wpb->state, PIPE_REOF | PIPE_WEOF);
1137 if (wpb->state & PIPE_WANTR) {
1138 wpb->state &= ~PIPE_WANTR;
1139 wakeup(wpb);
1141 if (wpb->state & PIPE_WANTW) {
1142 wpb->state &= ~PIPE_WANTW;
1143 wakeup(wpb);
1145 error = 0;
1146 break;
1148 pipewakeup(rpb, 1);
1149 pipewakeup(wpb, 1);
1151 lwkt_reltoken(&wpb->wlock);
1152 lwkt_reltoken(&wpb->rlock);
1153 lwkt_reltoken(&rpb->wlock);
1154 lwkt_reltoken(&rpb->rlock);
1156 return (error);
1160 * Destroy the pipe buffer.
1162 static void
1163 pipe_free_kmem(struct pipebuf *pb)
1165 if (pb->buffer != NULL) {
1166 kmem_free(&kernel_map, (vm_offset_t)pb->buffer, pb->size);
1167 pb->buffer = NULL;
1168 pb->object = NULL;
1173 * Close one half of the pipe. We are closing the pipe for reading on rpb
1174 * and writing on wpb. This routine must be called twice with the pipebufs
1175 * reversed to close both directions.
1177 static void
1178 pipeclose(struct pipe *pipe, struct pipebuf *rpb, struct pipebuf *wpb)
1180 globaldata_t gd;
1182 if (pipe == NULL)
1183 return;
1186 * We need both the read and write tokens to modify pipe_state.
1188 lwkt_gettoken(&rpb->rlock);
1189 lwkt_gettoken(&rpb->wlock);
1192 * Set our state, wakeup anyone waiting in select/poll/kq, and
1193 * wakeup anyone blocked on our pipe. No action if our side
1194 * is already closed.
1196 if (rpb->state & PIPE_CLOSED) {
1197 lwkt_reltoken(&rpb->wlock);
1198 lwkt_reltoken(&rpb->rlock);
1199 return;
1202 atomic_set_int(&rpb->state, PIPE_CLOSED | PIPE_REOF | PIPE_WEOF);
1203 pipewakeup(rpb, 1);
1204 if (rpb->state & (PIPE_WANTR | PIPE_WANTW)) {
1205 rpb->state &= ~(PIPE_WANTR | PIPE_WANTW);
1206 wakeup(rpb);
1208 lwkt_reltoken(&rpb->wlock);
1209 lwkt_reltoken(&rpb->rlock);
1212 * Disconnect from peer.
1214 lwkt_gettoken(&wpb->rlock);
1215 lwkt_gettoken(&wpb->wlock);
1217 atomic_set_int(&wpb->state, PIPE_REOF | PIPE_WEOF);
1218 pipewakeup(wpb, 1);
1219 if (wpb->state & (PIPE_WANTR | PIPE_WANTW)) {
1220 wpb->state &= ~(PIPE_WANTR | PIPE_WANTW);
1221 wakeup(wpb);
1223 if (SLIST_FIRST(&wpb->kq.ki_note))
1224 KNOTE(&wpb->kq.ki_note, 0);
1225 lwkt_reltoken(&wpb->wlock);
1226 lwkt_reltoken(&wpb->rlock);
1229 * Free resources once both sides are closed. We maintain a pcpu
1230 * cache to improve performance, so the actual tear-down case is
1231 * limited to bulk situations.
1233 * However, the bulk tear-down case can cause intense contention
1234 * on the kernel_map when, e.g. hundreds to hundreds of thousands
1235 * of processes are killed at the same time. To deal with this we
1236 * use a pcpu mutex to maintain concurrency but also limit the
1237 * number of threads banging on the map and pmap.
1239 * We use the mtx mechanism instead of the lockmgr mechanism because
1240 * the mtx mechanism utilizes a queued design which will not break
1241 * down in the face of thousands to hundreds of thousands of
1242 * processes trying to free pipes simultaneously. The lockmgr
1243 * mechanism will wind up waking them all up each time a lock
1244 * cycles.
1246 if (atomic_fetchadd_int(&pipe->open_count, -1) == 1) {
1247 gd = mycpu;
1248 if (gd->gd_pipeqcount >= pipe_maxcache) {
1249 mtx_lock(&pipe_gdlocks[gd->gd_cpuid].mtx);
1250 pipe_free_kmem(rpb);
1251 pipe_free_kmem(wpb);
1252 mtx_unlock(&pipe_gdlocks[gd->gd_cpuid].mtx);
1253 kfree(pipe, M_PIPE);
1254 } else {
1255 rpb->state = 0;
1256 wpb->state = 0;
1257 pipe->next = gd->gd_pipeq;
1258 gd->gd_pipeq = pipe;
1259 ++gd->gd_pipeqcount;
1264 static int
1265 pipe_kqfilter(struct file *fp, struct knote *kn)
1267 struct pipebuf *rpb;
1268 struct pipebuf *wpb;
1269 struct pipe *pipe;
1271 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1272 if ((intptr_t)fp->f_data & 1) {
1273 rpb = &pipe->bufferB;
1274 wpb = &pipe->bufferA;
1275 } else {
1276 rpb = &pipe->bufferA;
1277 wpb = &pipe->bufferB;
1280 switch (kn->kn_filter) {
1281 case EVFILT_READ:
1282 kn->kn_fop = &pipe_rfiltops;
1283 break;
1284 case EVFILT_WRITE:
1285 kn->kn_fop = &pipe_wfiltops;
1286 if (wpb->state & PIPE_CLOSED) {
1287 /* other end of pipe has been closed */
1288 return (EPIPE);
1290 break;
1291 default:
1292 return (EOPNOTSUPP);
1295 if (rpb == &pipe->bufferA)
1296 kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 0);
1297 else
1298 kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 1);
1300 knote_insert(&rpb->kq.ki_note, kn);
1302 return (0);
1305 static void
1306 filt_pipedetach(struct knote *kn)
1308 struct pipebuf *rpb;
1309 struct pipebuf *wpb;
1310 struct pipe *pipe;
1312 pipe = (struct pipe *)((intptr_t)kn->kn_hook & ~(intptr_t)1);
1313 if ((intptr_t)kn->kn_hook & 1) {
1314 rpb = &pipe->bufferB;
1315 wpb = &pipe->bufferA;
1316 } else {
1317 rpb = &pipe->bufferA;
1318 wpb = &pipe->bufferB;
1320 knote_remove(&rpb->kq.ki_note, kn);
1323 /*ARGSUSED*/
1324 static int
1325 filt_piperead(struct knote *kn, long hint)
1327 struct pipebuf *rpb;
1328 struct pipebuf *wpb;
1329 struct pipe *pipe;
1330 int ready = 0;
1332 pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1);
1333 if ((intptr_t)kn->kn_fp->f_data & 1) {
1334 rpb = &pipe->bufferB;
1335 wpb = &pipe->bufferA;
1336 } else {
1337 rpb = &pipe->bufferA;
1338 wpb = &pipe->bufferB;
1341 lwkt_gettoken(&rpb->rlock);
1342 lwkt_gettoken(&rpb->wlock);
1344 kn->kn_data = rpb->windex - rpb->rindex;
1346 if (rpb->state & PIPE_REOF) {
1348 * Only set NODATA if all data has been exhausted
1350 if (kn->kn_data == 0)
1351 kn->kn_flags |= EV_NODATA;
1352 kn->kn_flags |= EV_EOF;
1353 ready = 1;
1356 lwkt_reltoken(&rpb->wlock);
1357 lwkt_reltoken(&rpb->rlock);
1359 if (!ready)
1360 ready = kn->kn_data > 0;
1362 return (ready);
1365 /*ARGSUSED*/
1366 static int
1367 filt_pipewrite(struct knote *kn, long hint)
1369 struct pipebuf *rpb;
1370 struct pipebuf *wpb;
1371 struct pipe *pipe;
1372 int ready = 0;
1374 pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1);
1375 if ((intptr_t)kn->kn_fp->f_data & 1) {
1376 rpb = &pipe->bufferB;
1377 wpb = &pipe->bufferA;
1378 } else {
1379 rpb = &pipe->bufferA;
1380 wpb = &pipe->bufferB;
1383 kn->kn_data = 0;
1384 if (wpb->state & PIPE_CLOSED) {
1385 kn->kn_flags |= (EV_EOF | EV_NODATA);
1386 return (1);
1389 lwkt_gettoken(&wpb->rlock);
1390 lwkt_gettoken(&wpb->wlock);
1392 if (wpb->state & PIPE_WEOF) {
1393 kn->kn_flags |= (EV_EOF | EV_NODATA);
1394 ready = 1;
1397 if (!ready)
1398 kn->kn_data = wpb->size - (wpb->windex - wpb->rindex);
1400 lwkt_reltoken(&wpb->wlock);
1401 lwkt_reltoken(&wpb->rlock);
1403 if (!ready)
1404 ready = kn->kn_data >= PIPE_BUF;
1406 return (ready);