2 * Copyright (c) 1996 John S. Dyson
4 * Copyright (c) 2003-2017 The DragonFly Project. All rights reserved.
6 * This code is derived from software contributed to The DragonFly Project
7 * by Matthew Dillon <dillon@backplane.com>
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions
12 * 1. Redistributions of source code must retain the above copyright
13 * notice immediately at the beginning of the file, without modification,
14 * this list of conditions, and the following disclaimer.
15 * 2. Redistributions in binary form must reproduce the above copyright
16 * notice, this list of conditions and the following disclaimer in the
17 * documentation and/or other materials provided with the distribution.
18 * 3. Absolutely no warranty of function or purpose is made by the author
20 * 4. Modifications may be freely made to this file if the above conditions
25 * This file contains a high-performance replacement for the socket-based
26 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support
27 * all features of sockets, but does do everything that pipes normally
30 #include <sys/param.h>
31 #include <sys/systm.h>
32 #include <sys/kernel.h>
34 #include <sys/fcntl.h>
36 #include <sys/filedesc.h>
37 #include <sys/filio.h>
38 #include <sys/ttycom.h>
40 #include <sys/signalvar.h>
41 #include <sys/sysmsg.h>
43 #include <sys/vnode.h>
45 #include <sys/event.h>
46 #include <sys/globaldata.h>
47 #include <sys/module.h>
48 #include <sys/malloc.h>
49 #include <sys/sysctl.h>
50 #include <sys/socket.h>
51 #include <sys/kern_syscall.h>
53 #include <sys/mutex.h>
56 #include <vm/vm_param.h>
57 #include <vm/vm_object.h>
58 #include <vm/vm_kern.h>
59 #include <vm/vm_extern.h>
61 #include <vm/vm_map.h>
62 #include <vm/vm_page.h>
63 #include <vm/vm_zone.h>
65 #include <sys/file2.h>
66 #include <sys/signal2.h>
67 #include <sys/mutex2.h>
69 #include <machine/cpufunc.h>
76 * interfaces to the outside world
78 static int pipe_read (struct file
*fp
, struct uio
*uio
,
79 struct ucred
*cred
, int flags
);
80 static int pipe_write (struct file
*fp
, struct uio
*uio
,
81 struct ucred
*cred
, int flags
);
82 static int pipe_close (struct file
*fp
);
83 static int pipe_shutdown (struct file
*fp
, int how
);
84 static int pipe_kqfilter (struct file
*fp
, struct knote
*kn
);
85 static int pipe_stat (struct file
*fp
, struct stat
*sb
, struct ucred
*cred
);
86 static int pipe_ioctl (struct file
*fp
, u_long cmd
, caddr_t data
,
87 struct ucred
*cred
, struct sysmsg
*msg
);
89 __read_mostly
static struct fileops pipeops
= {
91 .fo_write
= pipe_write
,
92 .fo_ioctl
= pipe_ioctl
,
93 .fo_kqfilter
= pipe_kqfilter
,
95 .fo_close
= pipe_close
,
96 .fo_shutdown
= pipe_shutdown
99 static void filt_pipedetach(struct knote
*kn
);
100 static int filt_piperead(struct knote
*kn
, long hint
);
101 static int filt_pipewrite(struct knote
*kn
, long hint
);
103 __read_mostly
static struct filterops pipe_rfiltops
=
104 { FILTEROP_ISFD
|FILTEROP_MPSAFE
, NULL
, filt_pipedetach
, filt_piperead
};
105 __read_mostly
static struct filterops pipe_wfiltops
=
106 { FILTEROP_ISFD
|FILTEROP_MPSAFE
, NULL
, filt_pipedetach
, filt_pipewrite
};
108 MALLOC_DEFINE(M_PIPE
, "pipe", "pipe structures");
110 #define PIPEQ_MAX_CACHE 16 /* per-cpu pipe structure cache */
112 __read_mostly
static int pipe_maxcache
= PIPEQ_MAX_CACHE
;
113 __read_mostly
static struct pipegdlock
*pipe_gdlocks
;
115 SYSCTL_NODE(_kern
, OID_AUTO
, pipe
, CTLFLAG_RW
, 0, "Pipe operation");
116 SYSCTL_INT(_kern_pipe
, OID_AUTO
, maxcache
,
117 CTLFLAG_RW
, &pipe_maxcache
, 0, "max pipes cached per-cpu");
120 * The pipe buffer size can be changed at any time. Only new pipe()s
121 * are affected. Note that due to cpu cache effects, you do not want
122 * to make this value too large.
124 __read_mostly
static int pipe_size
= 32768;
125 SYSCTL_INT(_kern_pipe
, OID_AUTO
, size
,
126 CTLFLAG_RW
, &pipe_size
, 0, "Pipe buffer size (16384 minimum)");
129 * Reader/writer delay loop. When the reader exhausts the pipe buffer
130 * or the write completely fills the pipe buffer and would otherwise sleep,
131 * it first busy-loops for a few microseconds waiting for data or buffer
132 * space. This eliminates IPIs for most high-bandwidth writer/reader pipes
133 * and also helps when the user program uses a large data buffer in its
136 * This defaults to 4uS.
138 #ifdef _RDTSC_SUPPORTED_
139 __read_mostly
static int pipe_delay
= 4000; /* 4uS default */
140 SYSCTL_INT(_kern_pipe
, OID_AUTO
, delay
,
141 CTLFLAG_RW
, &pipe_delay
, 0, "SMP delay optimization in ns");
145 * Auto-size pipe cache to reduce kmem allocations and frees.
149 pipeinit(void *dummy
)
151 size_t mbytes
= kmem_lim_size();
154 if (pipe_maxcache
== PIPEQ_MAX_CACHE
) {
155 if (mbytes
>= 7 * 1024)
157 if (mbytes
>= 15 * 1024)
162 * Detune the pcpu caching a bit on systems with an insane number
163 * of cpu threads to reduce memory waste.
166 pipe_maxcache
= pipe_maxcache
* 64 / ncpus
;
167 if (pipe_maxcache
< PIPEQ_MAX_CACHE
)
168 pipe_maxcache
= PIPEQ_MAX_CACHE
;
171 pipe_gdlocks
= kmalloc(sizeof(*pipe_gdlocks
) * ncpus
,
172 M_PIPE
, M_WAITOK
| M_ZERO
);
173 for (n
= 0; n
< ncpus
; ++n
)
174 mtx_init(&pipe_gdlocks
[n
].mtx
, "pipekm");
176 SYSINIT(kmem
, SI_BOOT2_MACHDEP
, SI_ORDER_ANY
, pipeinit
, NULL
);
178 static void pipeclose (struct pipe
*pipe
,
179 struct pipebuf
*pbr
, struct pipebuf
*pbw
);
180 static void pipe_free_kmem (struct pipebuf
*buf
);
181 static int pipe_create (struct pipe
**pipep
);
184 * Test and clear the specified flag, wakeup(pb) if it was set.
185 * This function must also act as a memory barrier.
188 pipesignal(struct pipebuf
*pb
, uint32_t flags
)
196 nflags
= oflags
& ~flags
;
197 if (atomic_cmpset_int(&pb
->state
, oflags
, nflags
))
208 pipewakeup(struct pipebuf
*pb
, int dosigio
)
210 if (dosigio
&& (pb
->state
& PIPE_ASYNC
) && pb
->sigio
) {
211 lwkt_gettoken(&sigio_token
);
212 pgsigio(pb
->sigio
, SIGIO
, 0);
213 lwkt_reltoken(&sigio_token
);
215 KNOTE(&pb
->kq
.ki_note
, 0);
219 * These routines are called before and after a UIO. The UIO
220 * may block, causing our held tokens to be lost temporarily.
222 * We use these routines to serialize reads against other reads
223 * and writes against other writes.
225 * The appropriate token is held on entry so *ipp does not race.
228 pipe_start_uio(int *ipp
)
234 error
= tsleep(ipp
, PCATCH
, "pipexx", 0);
243 pipe_end_uio(int *ipp
)
255 * The pipe system call for the DTYPE_PIPE type of pipes
257 * pipe_args(int dummy)
262 sys_pipe(struct sysmsg
*sysmsg
, const struct pipe_args
*uap
)
264 return kern_pipe(sysmsg
->sysmsg_fds
, 0);
268 sys_pipe2(struct sysmsg
*sysmsg
, const struct pipe2_args
*uap
)
270 return kern_pipe(sysmsg
->sysmsg_fds
, uap
->flags
);
274 kern_pipe(long *fds
, int flags
)
276 struct thread
*td
= curthread
;
277 struct filedesc
*fdp
= td
->td_proc
->p_fd
;
278 struct file
*rf
, *wf
;
283 if (pipe_create(&pipe
)) {
284 pipeclose(pipe
, &pipe
->bufferA
, &pipe
->bufferB
);
285 pipeclose(pipe
, &pipe
->bufferB
, &pipe
->bufferA
);
289 error
= falloc(td
->td_lwp
, &rf
, &fd1
);
291 pipeclose(pipe
, &pipe
->bufferA
, &pipe
->bufferB
);
292 pipeclose(pipe
, &pipe
->bufferB
, &pipe
->bufferA
);
298 * Warning: once we've gotten past allocation of the fd for the
299 * read-side, we can only drop the read side via fdrop() in order
300 * to avoid races against processes which manage to dup() the read
301 * side while we are blocked trying to allocate the write side.
303 rf
->f_type
= DTYPE_PIPE
;
304 rf
->f_flag
= FREAD
| FWRITE
;
305 rf
->f_ops
= &pipeops
;
306 rf
->f_data
= (void *)((intptr_t)pipe
| 0);
307 if (flags
& O_NONBLOCK
)
308 rf
->f_flag
|= O_NONBLOCK
;
309 if (flags
& O_CLOEXEC
)
310 fdp
->fd_files
[fd1
].fileflags
|= UF_EXCLOSE
;
312 error
= falloc(td
->td_lwp
, &wf
, &fd2
);
314 fsetfd(fdp
, NULL
, fd1
);
316 /* pipeA has been closed by fdrop() */
317 /* close pipeB here */
318 pipeclose(pipe
, &pipe
->bufferB
, &pipe
->bufferA
);
321 wf
->f_type
= DTYPE_PIPE
;
322 wf
->f_flag
= FREAD
| FWRITE
;
323 wf
->f_ops
= &pipeops
;
324 wf
->f_data
= (void *)((intptr_t)pipe
| 1);
325 if (flags
& O_NONBLOCK
)
326 wf
->f_flag
|= O_NONBLOCK
;
327 if (flags
& O_CLOEXEC
)
328 fdp
->fd_files
[fd2
].fileflags
|= UF_EXCLOSE
;
333 * Once activated the peer relationship remains valid until
334 * both sides are closed.
336 fsetfd(fdp
, rf
, fd1
);
337 fsetfd(fdp
, wf
, fd2
);
345 * [re]allocates KVA for the pipe's circular buffer. The space is
346 * pageable. Called twice to setup full-duplex communications.
348 * NOTE: Independent vm_object's are used to improve performance.
350 * Returns 0 on success, ENOMEM on failure.
353 pipespace(struct pipe
*pipe
, struct pipebuf
*pb
, size_t size
)
355 struct vm_object
*object
;
360 size
= (size
+ PAGE_MASK
) & ~(size_t)PAGE_MASK
;
363 if (size
> 1024*1024)
366 npages
= round_page(size
) / PAGE_SIZE
;
370 * [re]create the object if necessary and reserve space for it
371 * in the kernel_map. The object and memory are pageable. On
372 * success, free the old resources before assigning the new
375 if (object
== NULL
|| object
->size
!= npages
) {
376 object
= vm_object_allocate(OBJT_DEFAULT
, npages
);
377 buffer
= (caddr_t
)vm_map_min(kernel_map
);
379 error
= vm_map_find(kernel_map
, object
, NULL
,
380 0, (vm_offset_t
*)&buffer
, size
,
382 VM_MAPTYPE_NORMAL
, VM_SUBSYS_PIPE
,
383 VM_PROT_ALL
, VM_PROT_ALL
, 0);
385 if (error
!= KERN_SUCCESS
) {
386 vm_object_deallocate(object
);
401 * Initialize and allocate VM and memory for pipe, pulling the pipe from
402 * our per-cpu cache if possible.
404 * Returns 0 on success, else an error code (typically ENOMEM). Caller
405 * must still deallocate the pipe on failure.
408 pipe_create(struct pipe
**pipep
)
410 globaldata_t gd
= mycpu
;
414 if ((pipe
= gd
->gd_pipeq
) != NULL
) {
415 gd
->gd_pipeq
= pipe
->next
;
419 pipe
= kmalloc(sizeof(*pipe
), M_PIPE
, M_WAITOK
| M_ZERO
);
420 pipe
->inum
= gd
->gd_anoninum
++ * ncpus
+ gd
->gd_cpuid
+ 2;
421 lwkt_token_init(&pipe
->bufferA
.rlock
, "piper");
422 lwkt_token_init(&pipe
->bufferA
.wlock
, "pipew");
423 lwkt_token_init(&pipe
->bufferB
.rlock
, "piper");
424 lwkt_token_init(&pipe
->bufferB
.wlock
, "pipew");
427 if ((error
= pipespace(pipe
, &pipe
->bufferA
, pipe_size
)) != 0) {
430 if ((error
= pipespace(pipe
, &pipe
->bufferB
, pipe_size
)) != 0) {
433 vfs_timestamp(&pipe
->ctime
);
434 pipe
->bufferA
.atime
= pipe
->ctime
;
435 pipe
->bufferA
.mtime
= pipe
->ctime
;
436 pipe
->bufferB
.atime
= pipe
->ctime
;
437 pipe
->bufferB
.mtime
= pipe
->ctime
;
438 pipe
->open_count
= 2;
444 * Read data from a pipe
447 pipe_read(struct file
*fp
, struct uio
*uio
, struct ucred
*cred
, int fflags
)
453 size_t size
; /* total bytes available */
454 size_t nsize
; /* total bytes to read */
455 size_t rindex
; /* contiguous bytes available */
462 pipe
= (struct pipe
*)((intptr_t)fp
->f_data
& ~(intptr_t)1);
463 if ((intptr_t)fp
->f_data
& 1) {
464 rpb
= &pipe
->bufferB
;
465 wpb
= &pipe
->bufferA
;
467 rpb
= &pipe
->bufferA
;
468 wpb
= &pipe
->bufferB
;
470 atomic_set_int(&curthread
->td_mpflags
, TDF_MP_BATCH_DEMARC
);
472 if (uio
->uio_resid
== 0)
478 if (fflags
& O_FBLOCKING
)
480 else if (fflags
& O_FNONBLOCKING
)
482 else if (fp
->f_flag
& O_NONBLOCK
)
488 * 'quick' NBIO test before things get expensive.
490 if (nbio
&& rpb
->rindex
== rpb
->windex
&&
491 (rpb
->state
& PIPE_REOF
) == 0) {
496 * Reads are serialized. Note however that buffer.buffer and
497 * buffer.size can change out from under us when the number
498 * of bytes in the buffer are zero due to the write-side doing a
501 lwkt_gettoken(&rpb
->rlock
);
502 error
= pipe_start_uio(&rpb
->rip
);
504 lwkt_reltoken(&rpb
->rlock
);
509 bigread
= (uio
->uio_resid
> 10 * 1024 * 1024);
512 while (uio
->uio_resid
) {
516 if (bigread
&& --bigcount
== 0) {
519 if (CURSIG(curthread
->td_lwp
)) {
526 * lfence required to avoid read-reordering of buffer
527 * contents prior to validation of size.
529 size
= rpb
->windex
- rpb
->rindex
;
532 rindex
= rpb
->rindex
& (rpb
->size
- 1);
534 if (nsize
> rpb
->size
- rindex
)
535 nsize
= rpb
->size
- rindex
;
536 nsize
= szmin(nsize
, uio
->uio_resid
);
539 * Limit how much we move in one go so we have a
540 * chance to kick the writer while data is still
541 * available in the pipe. This avoids getting into
542 * a ping-pong with the writer.
544 if (nsize
> (rpb
->size
>> 1))
545 nsize
= rpb
->size
>> 1;
547 error
= uiomove(&rpb
->buffer
[rindex
], nsize
, uio
);
550 rpb
->rindex
+= nsize
;
554 * If the FIFO is still over half full just continue
555 * and do not try to notify the writer yet. If
556 * less than half full notify any waiting writer.
558 if (size
- nsize
> (rpb
->size
>> 1)) {
562 pipesignal(rpb
, PIPE_WANTW
);
568 * If the "write-side" was blocked we wake it up. This code
569 * is reached when the buffer is completely emptied.
571 pipesignal(rpb
, PIPE_WANTW
);
574 * Pick up our copy loop again if the writer sent data to
575 * us while we were messing around.
577 * On a SMP box poll up to pipe_delay nanoseconds for new
578 * data. Typically a value of 2000 to 4000 is sufficient
579 * to eradicate most IPIs/tsleeps/wakeups when a pipe
580 * is used for synchronous communications with small packets,
581 * and 8000 or so (8uS) will pipeline large buffer xfers
582 * between cpus over a pipe.
584 * For synchronous communications a hit means doing a
585 * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS,
586 * where as miss requiring a tsleep/wakeup sequence
587 * will take 7uS or more.
589 if (rpb
->windex
!= rpb
->rindex
)
592 #ifdef _RDTSC_SUPPORTED_
597 tsc_target
= tsc_get_target(pipe_delay
);
598 while (tsc_test_target(tsc_target
) == 0) {
600 if (rpb
->windex
!= rpb
->rindex
) {
612 * Detect EOF condition, do not set error.
614 if (rpb
->state
& PIPE_REOF
)
618 * Break if some data was read, or if this was a non-blocking
630 * Last chance, interlock with WANTR
632 tsleep_interlock(rpb
, PCATCH
);
633 atomic_set_int(&rpb
->state
, PIPE_WANTR
);
636 * Retest bytes available after memory barrier above.
638 size
= rpb
->windex
- rpb
->rindex
;
643 * Retest EOF after memory barrier above.
645 if (rpb
->state
& PIPE_REOF
)
649 * Wait for more data or state change
651 error
= tsleep(rpb
, PCATCH
| PINTERLOCKED
, "piperd", 0);
655 pipe_end_uio(&rpb
->rip
);
658 * Uptime last access time
660 if (error
== 0 && nread
&& rpb
->lticks
!= ticks
) {
661 vfs_timestamp(&rpb
->atime
);
666 * If we drained the FIFO more then half way then handle
667 * write blocking hysteresis.
669 * Note that PIPE_WANTW cannot be set by the writer without
670 * it holding both rlock and wlock, so we can test it
671 * while holding just rlock.
675 * Synchronous blocking is done on the pipe involved
677 pipesignal(rpb
, PIPE_WANTW
);
680 * But we may also have to deal with a kqueue which is
681 * stored on the same pipe as its descriptor, so a
682 * EVFILT_WRITE event waiting for our side to drain will
683 * be on the other side.
687 /*size = rpb->windex - rpb->rindex;*/
688 lwkt_reltoken(&rpb
->rlock
);
694 pipe_write(struct file
*fp
, struct uio
*uio
, struct ucred
*cred
, int fflags
)
708 pipe
= (struct pipe
*)((intptr_t)fp
->f_data
& ~(intptr_t)1);
709 if ((intptr_t)fp
->f_data
& 1) {
710 rpb
= &pipe
->bufferB
;
711 wpb
= &pipe
->bufferA
;
713 rpb
= &pipe
->bufferA
;
714 wpb
= &pipe
->bufferB
;
720 if (fflags
& O_FBLOCKING
)
722 else if (fflags
& O_FNONBLOCKING
)
724 else if (fp
->f_flag
& O_NONBLOCK
)
730 * 'quick' NBIO test before things get expensive.
732 if (nbio
&& wpb
->size
== (wpb
->windex
- wpb
->rindex
) &&
733 uio
->uio_resid
&& (wpb
->state
& PIPE_WEOF
) == 0) {
738 * Writes go to the peer. The peer will always exist.
740 lwkt_gettoken(&wpb
->wlock
);
741 if (wpb
->state
& PIPE_WEOF
) {
742 lwkt_reltoken(&wpb
->wlock
);
747 * Degenerate case (EPIPE takes prec)
749 if (uio
->uio_resid
== 0) {
750 lwkt_reltoken(&wpb
->wlock
);
755 * Writes are serialized (start_uio must be called with wlock)
757 error
= pipe_start_uio(&wpb
->wip
);
759 lwkt_reltoken(&wpb
->wlock
);
763 orig_resid
= uio
->uio_resid
;
766 bigwrite
= (uio
->uio_resid
> 10 * 1024 * 1024);
769 while (uio
->uio_resid
) {
770 if (wpb
->state
& PIPE_WEOF
) {
778 if (bigwrite
&& --bigcount
== 0) {
781 if (CURSIG(curthread
->td_lwp
)) {
787 windex
= wpb
->windex
& (wpb
->size
- 1);
788 space
= wpb
->size
- (wpb
->windex
- wpb
->rindex
);
791 * Writes of size <= PIPE_BUF must be atomic.
793 if ((space
< uio
->uio_resid
) && (orig_resid
<= PIPE_BUF
))
797 * Write to fill, read size handles write hysteresis. Also
798 * additional restrictions can cause select-based non-blocking
805 * We want to notify a potentially waiting reader
806 * before we exhaust the write buffer for SMP
807 * pipelining. Otherwise the write/read will begin
810 space
= szmin(space
, uio
->uio_resid
);
811 if (space
> (wpb
->size
>> 1))
812 space
= (wpb
->size
>> 1);
815 * First segment to transfer is minimum of
816 * transfer size and contiguous space in
817 * pipe buffer. If first segment to transfer
818 * is less than the transfer size, we've got
819 * a wraparound in the buffer.
821 segsize
= wpb
->size
- windex
;
826 * If this is the first loop and the reader is
827 * blocked, do a preemptive wakeup of the reader.
829 * On SMP the IPI latency plus the wlock interlock
830 * on the reader side is the fastest way to get the
831 * reader going. (The scheduler will hard loop on
835 pipesignal(wpb
, PIPE_WANTR
);
838 * Transfer segment, which may include a wrap-around.
839 * Update windex to account for both all in one go
840 * so the reader can read() the data atomically.
842 error
= uiomove(&wpb
->buffer
[windex
], segsize
, uio
);
843 if (error
== 0 && segsize
< space
) {
844 segsize
= space
- segsize
;
845 error
= uiomove(&wpb
->buffer
[0], segsize
, uio
);
851 * Memory fence prior to windex updating (note: not
852 * needed so this is a NOP on Intel).
855 wpb
->windex
+= space
;
861 pipesignal(wpb
, PIPE_WANTR
);
867 * Wakeup any pending reader
869 pipesignal(wpb
, PIPE_WANTR
);
872 * don't block on non-blocking I/O
879 #ifdef _RDTSC_SUPPORTED_
884 tsc_target
= tsc_get_target(pipe_delay
);
885 while (tsc_test_target(tsc_target
) == 0) {
887 space
= wpb
->size
- (wpb
->windex
- wpb
->rindex
);
888 if ((space
< uio
->uio_resid
) &&
889 (orig_resid
<= PIPE_BUF
)) {
904 * Interlocked test. Atomic op enforces the memory barrier.
906 tsleep_interlock(wpb
, PCATCH
);
907 atomic_set_int(&wpb
->state
, PIPE_WANTW
);
910 * Retest space available after memory barrier above.
911 * Writes of size <= PIPE_BUF must be atomic.
913 space
= wpb
->size
- (wpb
->windex
- wpb
->rindex
);
914 if ((space
< uio
->uio_resid
) && (orig_resid
<= PIPE_BUF
))
918 * Retest EOF after memory barrier above.
920 if (wpb
->state
& PIPE_WEOF
) {
926 * We have no more space and have something to offer,
927 * wake up select/poll/kq.
931 error
= tsleep(wpb
, PCATCH
| PINTERLOCKED
, "pipewr", 0);
935 * Break out if we errored or the read side wants us to go
940 if (wpb
->state
& PIPE_WEOF
) {
945 pipe_end_uio(&wpb
->wip
);
948 * If we have put any characters in the buffer, we wake up
951 * Both rlock and wlock are required to be able to modify pipe_state.
953 if (wpb
->windex
!= wpb
->rindex
) {
954 pipesignal(wpb
, PIPE_WANTR
);
959 * Don't return EPIPE if I/O was successful
961 if ((wpb
->rindex
== wpb
->windex
) &&
962 (uio
->uio_resid
== 0) &&
967 if (error
== 0 && wpb
->lticks
!= ticks
) {
968 vfs_timestamp(&wpb
->mtime
);
973 * We have something to offer,
974 * wake up select/poll/kq.
976 /*space = wpb->windex - wpb->rindex;*/
977 lwkt_reltoken(&wpb
->wlock
);
983 * we implement a very minimal set of ioctls for compatibility with sockets.
986 pipe_ioctl(struct file
*fp
, u_long cmd
, caddr_t data
,
987 struct ucred
*cred
, struct sysmsg
*msg
)
993 pipe
= (struct pipe
*)((intptr_t)fp
->f_data
& ~(intptr_t)1);
994 if ((intptr_t)fp
->f_data
& 1) {
995 rpb
= &pipe
->bufferB
;
997 rpb
= &pipe
->bufferA
;
1000 lwkt_gettoken(&rpb
->rlock
);
1001 lwkt_gettoken(&rpb
->wlock
);
1006 atomic_set_int(&rpb
->state
, PIPE_ASYNC
);
1008 atomic_clear_int(&rpb
->state
, PIPE_ASYNC
);
1013 *(int *)data
= (int)(rpb
->windex
- rpb
->rindex
);
1017 error
= fsetown(*(int *)data
, &rpb
->sigio
);
1020 *(int *)data
= fgetown(&rpb
->sigio
);
1024 /* This is deprecated, FIOSETOWN should be used instead. */
1025 error
= fsetown(-(*(int *)data
), &rpb
->sigio
);
1029 /* This is deprecated, FIOGETOWN should be used instead. */
1030 *(int *)data
= -fgetown(&rpb
->sigio
);
1037 lwkt_reltoken(&rpb
->wlock
);
1038 lwkt_reltoken(&rpb
->rlock
);
1047 pipe_stat(struct file
*fp
, struct stat
*ub
, struct ucred
*cred
)
1049 struct pipebuf
*rpb
;
1052 pipe
= (struct pipe
*)((intptr_t)fp
->f_data
& ~(intptr_t)1);
1053 if ((intptr_t)fp
->f_data
& 1) {
1054 rpb
= &pipe
->bufferB
;
1056 rpb
= &pipe
->bufferA
;
1059 bzero((caddr_t
)ub
, sizeof(*ub
));
1060 ub
->st_mode
= S_IFIFO
;
1061 ub
->st_blksize
= rpb
->size
;
1062 ub
->st_size
= rpb
->windex
- rpb
->rindex
;
1063 ub
->st_blocks
= howmany(ub
->st_size
, ub
->st_blksize
);
1064 ub
->st_atimespec
= rpb
->atime
;
1065 ub
->st_mtimespec
= rpb
->mtime
;
1066 ub
->st_ctimespec
= pipe
->ctime
;
1067 ub
->st_uid
= fp
->f_cred
->cr_uid
;
1068 ub
->st_gid
= fp
->f_cred
->cr_gid
;
1069 ub
->st_ino
= pipe
->inum
;
1071 * Left as 0: st_dev, st_nlink, st_rdev,
1073 * XXX (st_dev, st_ino) should be unique.
1080 pipe_close(struct file
*fp
)
1082 struct pipebuf
*rpb
;
1083 struct pipebuf
*wpb
;
1086 pipe
= (struct pipe
*)((intptr_t)fp
->f_data
& ~(intptr_t)1);
1087 if ((intptr_t)fp
->f_data
& 1) {
1088 rpb
= &pipe
->bufferB
;
1089 wpb
= &pipe
->bufferA
;
1091 rpb
= &pipe
->bufferA
;
1092 wpb
= &pipe
->bufferB
;
1095 fp
->f_ops
= &badfileops
;
1097 funsetown(&rpb
->sigio
);
1098 pipeclose(pipe
, rpb
, wpb
);
1104 * Shutdown one or both directions of a full-duplex pipe.
1107 pipe_shutdown(struct file
*fp
, int how
)
1109 struct pipebuf
*rpb
;
1110 struct pipebuf
*wpb
;
1114 pipe
= (struct pipe
*)((intptr_t)fp
->f_data
& ~(intptr_t)1);
1115 if ((intptr_t)fp
->f_data
& 1) {
1116 rpb
= &pipe
->bufferB
;
1117 wpb
= &pipe
->bufferA
;
1119 rpb
= &pipe
->bufferA
;
1120 wpb
= &pipe
->bufferB
;
1124 * We modify pipe_state on both pipes, which means we need
1127 lwkt_gettoken(&rpb
->rlock
);
1128 lwkt_gettoken(&rpb
->wlock
);
1129 lwkt_gettoken(&wpb
->rlock
);
1130 lwkt_gettoken(&wpb
->wlock
);
1136 * EOF on my reads and peer writes
1138 atomic_set_int(&rpb
->state
, PIPE_REOF
| PIPE_WEOF
);
1139 if (rpb
->state
& PIPE_WANTR
) {
1140 rpb
->state
&= ~PIPE_WANTR
;
1143 if (rpb
->state
& PIPE_WANTW
) {
1144 rpb
->state
&= ~PIPE_WANTW
;
1153 * EOF on peer reads and my writes
1155 atomic_set_int(&wpb
->state
, PIPE_REOF
| PIPE_WEOF
);
1156 if (wpb
->state
& PIPE_WANTR
) {
1157 wpb
->state
&= ~PIPE_WANTR
;
1160 if (wpb
->state
& PIPE_WANTW
) {
1161 wpb
->state
&= ~PIPE_WANTW
;
1170 lwkt_reltoken(&wpb
->wlock
);
1171 lwkt_reltoken(&wpb
->rlock
);
1172 lwkt_reltoken(&rpb
->wlock
);
1173 lwkt_reltoken(&rpb
->rlock
);
1179 * Destroy the pipe buffer.
1182 pipe_free_kmem(struct pipebuf
*pb
)
1184 if (pb
->buffer
!= NULL
) {
1185 kmem_free(kernel_map
, (vm_offset_t
)pb
->buffer
, pb
->size
);
1192 * Close one half of the pipe. We are closing the pipe for reading on rpb
1193 * and writing on wpb. This routine must be called twice with the pipebufs
1194 * reversed to close both directions.
1197 pipeclose(struct pipe
*pipe
, struct pipebuf
*rpb
, struct pipebuf
*wpb
)
1205 * We need both the read and write tokens to modify pipe_state.
1207 lwkt_gettoken(&rpb
->rlock
);
1208 lwkt_gettoken(&rpb
->wlock
);
1211 * Set our state, wakeup anyone waiting in select/poll/kq, and
1212 * wakeup anyone blocked on our pipe. No action if our side
1213 * is already closed.
1215 if (rpb
->state
& PIPE_CLOSED
) {
1216 lwkt_reltoken(&rpb
->wlock
);
1217 lwkt_reltoken(&rpb
->rlock
);
1221 atomic_set_int(&rpb
->state
, PIPE_CLOSED
| PIPE_REOF
| PIPE_WEOF
);
1223 if (rpb
->state
& (PIPE_WANTR
| PIPE_WANTW
)) {
1224 rpb
->state
&= ~(PIPE_WANTR
| PIPE_WANTW
);
1227 lwkt_reltoken(&rpb
->wlock
);
1228 lwkt_reltoken(&rpb
->rlock
);
1231 * Disconnect from peer.
1233 lwkt_gettoken(&wpb
->rlock
);
1234 lwkt_gettoken(&wpb
->wlock
);
1236 atomic_set_int(&wpb
->state
, PIPE_REOF
| PIPE_WEOF
);
1238 if (wpb
->state
& (PIPE_WANTR
| PIPE_WANTW
)) {
1239 wpb
->state
&= ~(PIPE_WANTR
| PIPE_WANTW
);
1242 if (SLIST_FIRST(&wpb
->kq
.ki_note
))
1243 KNOTE(&wpb
->kq
.ki_note
, 0);
1244 lwkt_reltoken(&wpb
->wlock
);
1245 lwkt_reltoken(&wpb
->rlock
);
1248 * Free resources once both sides are closed. We maintain a pcpu
1249 * cache to improve performance, so the actual tear-down case is
1250 * limited to bulk situations.
1252 * However, the bulk tear-down case can cause intense contention
1253 * on the kernel_map when, e.g. hundreds to hundreds of thousands
1254 * of processes are killed at the same time. To deal with this we
1255 * use a pcpu mutex to maintain concurrency but also limit the
1256 * number of threads banging on the map and pmap.
1258 * We use the mtx mechanism instead of the lockmgr mechanism because
1259 * the mtx mechanism utilizes a queued design which will not break
1260 * down in the face of thousands to hundreds of thousands of
1261 * processes trying to free pipes simultaneously. The lockmgr
1262 * mechanism will wind up waking them all up each time a lock
1265 if (atomic_fetchadd_int(&pipe
->open_count
, -1) == 1) {
1267 if (gd
->gd_pipeqcount
>= pipe_maxcache
) {
1268 mtx_lock(&pipe_gdlocks
[gd
->gd_cpuid
].mtx
);
1269 pipe_free_kmem(rpb
);
1270 pipe_free_kmem(wpb
);
1271 mtx_unlock(&pipe_gdlocks
[gd
->gd_cpuid
].mtx
);
1272 kfree(pipe
, M_PIPE
);
1276 pipe
->next
= gd
->gd_pipeq
;
1277 gd
->gd_pipeq
= pipe
;
1278 ++gd
->gd_pipeqcount
;
1284 pipe_kqfilter(struct file
*fp
, struct knote
*kn
)
1286 struct pipebuf
*rpb
;
1287 struct pipebuf
*wpb
;
1290 pipe
= (struct pipe
*)((intptr_t)fp
->f_data
& ~(intptr_t)1);
1291 if ((intptr_t)fp
->f_data
& 1) {
1292 rpb
= &pipe
->bufferB
;
1293 wpb
= &pipe
->bufferA
;
1295 rpb
= &pipe
->bufferA
;
1296 wpb
= &pipe
->bufferB
;
1299 switch (kn
->kn_filter
) {
1301 kn
->kn_fop
= &pipe_rfiltops
;
1304 kn
->kn_fop
= &pipe_wfiltops
;
1307 return (EOPNOTSUPP
);
1310 if (rpb
== &pipe
->bufferA
)
1311 kn
->kn_hook
= (caddr_t
)(void *)((intptr_t)pipe
| 0);
1313 kn
->kn_hook
= (caddr_t
)(void *)((intptr_t)pipe
| 1);
1315 knote_insert(&rpb
->kq
.ki_note
, kn
);
1321 filt_pipedetach(struct knote
*kn
)
1323 struct pipebuf
*rpb
;
1324 struct pipebuf
*wpb
;
1327 pipe
= (struct pipe
*)((intptr_t)kn
->kn_hook
& ~(intptr_t)1);
1328 if ((intptr_t)kn
->kn_hook
& 1) {
1329 rpb
= &pipe
->bufferB
;
1330 wpb
= &pipe
->bufferA
;
1332 rpb
= &pipe
->bufferA
;
1333 wpb
= &pipe
->bufferB
;
1335 knote_remove(&rpb
->kq
.ki_note
, kn
);
1340 filt_piperead(struct knote
*kn
, long hint
)
1342 struct pipebuf
*rpb
;
1343 struct pipebuf
*wpb
;
1347 pipe
= (struct pipe
*)((intptr_t)kn
->kn_fp
->f_data
& ~(intptr_t)1);
1348 if ((intptr_t)kn
->kn_fp
->f_data
& 1) {
1349 rpb
= &pipe
->bufferB
;
1350 wpb
= &pipe
->bufferA
;
1352 rpb
= &pipe
->bufferA
;
1353 wpb
= &pipe
->bufferB
;
1357 * We shouldn't need the pipe locks because the knote itself is
1358 * locked via KN_PROCESSING. If we lose a race against the writer,
1359 * the writer will just issue a KNOTE() after us.
1362 lwkt_gettoken(&rpb
->rlock
);
1363 lwkt_gettoken(&rpb
->wlock
);
1366 kn
->kn_data
= rpb
->windex
- rpb
->rindex
;
1367 if (kn
->kn_data
< 0)
1370 if (rpb
->state
& PIPE_REOF
) {
1372 * Only set NODATA if all data has been exhausted
1374 if (kn
->kn_data
== 0)
1375 kn
->kn_flags
|= EV_NODATA
;
1376 kn
->kn_flags
|= EV_EOF
;
1379 * Only set HUP if the pipe is completely closed.
1380 * half-closed does not count (to make the behavior
1381 * the same as linux).
1383 if (wpb
->state
& PIPE_CLOSED
) {
1384 kn
->kn_flags
|= EV_HUP
;
1390 lwkt_reltoken(&rpb
->wlock
);
1391 lwkt_reltoken(&rpb
->rlock
);
1394 if (!ready
&& (kn
->kn_sfflags
& NOTE_HUPONLY
) == 0)
1395 ready
= kn
->kn_data
> 0;
1402 filt_pipewrite(struct knote
*kn
, long hint
)
1404 struct pipebuf
*rpb
;
1405 struct pipebuf
*wpb
;
1409 pipe
= (struct pipe
*)((intptr_t)kn
->kn_fp
->f_data
& ~(intptr_t)1);
1410 if ((intptr_t)kn
->kn_fp
->f_data
& 1) {
1411 rpb
= &pipe
->bufferB
;
1412 wpb
= &pipe
->bufferA
;
1414 rpb
= &pipe
->bufferA
;
1415 wpb
= &pipe
->bufferB
;
1419 if (wpb
->state
& PIPE_CLOSED
) {
1420 kn
->kn_flags
|= EV_EOF
| EV_HUP
| EV_NODATA
;
1425 * We shouldn't need the pipe locks because the knote itself is
1426 * locked via KN_PROCESSING. If we lose a race against the reader,
1427 * the writer will just issue a KNOTE() after us.
1430 lwkt_gettoken(&wpb
->rlock
);
1431 lwkt_gettoken(&wpb
->wlock
);
1434 if (wpb
->state
& PIPE_WEOF
) {
1435 kn
->kn_flags
|= EV_EOF
| EV_HUP
| EV_NODATA
;
1440 kn
->kn_data
= wpb
->size
- (wpb
->windex
- wpb
->rindex
);
1441 if (kn
->kn_data
< 0)
1446 lwkt_reltoken(&wpb
->wlock
);
1447 lwkt_reltoken(&wpb
->rlock
);
1451 ready
= kn
->kn_data
>= PIPE_BUF
;