usr.sbin/makefs/ffs: Remove m_buf::b_is_hammer2
[dragonfly.git] / sys / kern / sys_pipe.c
blobf508dec971124e22faffc2e659f1ecff033875f7
1 /*
2 * Copyright (c) 1996 John S. Dyson
3 * All rights reserved.
4 * Copyright (c) 2003-2017 The DragonFly Project. All rights reserved.
6 * This code is derived from software contributed to The DragonFly Project
7 * by Matthew Dillon <dillon@backplane.com>
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions
11 * are met:
12 * 1. Redistributions of source code must retain the above copyright
13 * notice immediately at the beginning of the file, without modification,
14 * this list of conditions, and the following disclaimer.
15 * 2. Redistributions in binary form must reproduce the above copyright
16 * notice, this list of conditions and the following disclaimer in the
17 * documentation and/or other materials provided with the distribution.
18 * 3. Absolutely no warranty of function or purpose is made by the author
19 * John S. Dyson.
20 * 4. Modifications may be freely made to this file if the above conditions
21 * are met.
25 * This file contains a high-performance replacement for the socket-based
26 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support
27 * all features of sockets, but does do everything that pipes normally
28 * do.
30 #include <sys/param.h>
31 #include <sys/systm.h>
32 #include <sys/kernel.h>
33 #include <sys/proc.h>
34 #include <sys/fcntl.h>
35 #include <sys/file.h>
36 #include <sys/filedesc.h>
37 #include <sys/filio.h>
38 #include <sys/ttycom.h>
39 #include <sys/stat.h>
40 #include <sys/signalvar.h>
41 #include <sys/sysmsg.h>
42 #include <sys/pipe.h>
43 #include <sys/vnode.h>
44 #include <sys/uio.h>
45 #include <sys/event.h>
46 #include <sys/globaldata.h>
47 #include <sys/module.h>
48 #include <sys/malloc.h>
49 #include <sys/sysctl.h>
50 #include <sys/socket.h>
51 #include <sys/kern_syscall.h>
52 #include <sys/lock.h>
53 #include <sys/mutex.h>
55 #include <vm/vm.h>
56 #include <vm/vm_param.h>
57 #include <vm/vm_object.h>
58 #include <vm/vm_kern.h>
59 #include <vm/vm_extern.h>
60 #include <vm/pmap.h>
61 #include <vm/vm_map.h>
62 #include <vm/vm_page.h>
63 #include <vm/vm_zone.h>
65 #include <sys/file2.h>
66 #include <sys/signal2.h>
67 #include <sys/mutex2.h>
69 #include <machine/cpufunc.h>
71 struct pipegdlock {
72 struct mtx mtx;
73 } __cachealign;
76 * interfaces to the outside world
78 static int pipe_read (struct file *fp, struct uio *uio,
79 struct ucred *cred, int flags);
80 static int pipe_write (struct file *fp, struct uio *uio,
81 struct ucred *cred, int flags);
82 static int pipe_close (struct file *fp);
83 static int pipe_shutdown (struct file *fp, int how);
84 static int pipe_kqfilter (struct file *fp, struct knote *kn);
85 static int pipe_stat (struct file *fp, struct stat *sb, struct ucred *cred);
86 static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data,
87 struct ucred *cred, struct sysmsg *msg);
89 __read_mostly static struct fileops pipeops = {
90 .fo_read = pipe_read,
91 .fo_write = pipe_write,
92 .fo_ioctl = pipe_ioctl,
93 .fo_kqfilter = pipe_kqfilter,
94 .fo_stat = pipe_stat,
95 .fo_close = pipe_close,
96 .fo_shutdown = pipe_shutdown
99 static void filt_pipedetach(struct knote *kn);
100 static int filt_piperead(struct knote *kn, long hint);
101 static int filt_pipewrite(struct knote *kn, long hint);
103 __read_mostly static struct filterops pipe_rfiltops =
104 { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_piperead };
105 __read_mostly static struct filterops pipe_wfiltops =
106 { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_pipewrite };
108 MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures");
110 #define PIPEQ_MAX_CACHE 16 /* per-cpu pipe structure cache */
112 __read_mostly static int pipe_maxcache = PIPEQ_MAX_CACHE;
113 __read_mostly static struct pipegdlock *pipe_gdlocks;
115 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation");
116 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache,
117 CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu");
120 * The pipe buffer size can be changed at any time. Only new pipe()s
121 * are affected. Note that due to cpu cache effects, you do not want
122 * to make this value too large.
124 __read_mostly static int pipe_size = 32768;
125 SYSCTL_INT(_kern_pipe, OID_AUTO, size,
126 CTLFLAG_RW, &pipe_size, 0, "Pipe buffer size (16384 minimum)");
129 * Reader/writer delay loop. When the reader exhausts the pipe buffer
130 * or the write completely fills the pipe buffer and would otherwise sleep,
131 * it first busy-loops for a few microseconds waiting for data or buffer
132 * space. This eliminates IPIs for most high-bandwidth writer/reader pipes
133 * and also helps when the user program uses a large data buffer in its
134 * UIOs.
136 * This defaults to 4uS.
138 #ifdef _RDTSC_SUPPORTED_
139 __read_mostly static int pipe_delay = 4000; /* 4uS default */
140 SYSCTL_INT(_kern_pipe, OID_AUTO, delay,
141 CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns");
142 #endif
145 * Auto-size pipe cache to reduce kmem allocations and frees.
147 static
148 void
149 pipeinit(void *dummy)
151 size_t mbytes = kmem_lim_size();
152 int n;
154 if (pipe_maxcache == PIPEQ_MAX_CACHE) {
155 if (mbytes >= 7 * 1024)
156 pipe_maxcache *= 2;
157 if (mbytes >= 15 * 1024)
158 pipe_maxcache *= 2;
162 * Detune the pcpu caching a bit on systems with an insane number
163 * of cpu threads to reduce memory waste.
165 if (ncpus > 64) {
166 pipe_maxcache = pipe_maxcache * 64 / ncpus;
167 if (pipe_maxcache < PIPEQ_MAX_CACHE)
168 pipe_maxcache = PIPEQ_MAX_CACHE;
171 pipe_gdlocks = kmalloc(sizeof(*pipe_gdlocks) * ncpus,
172 M_PIPE, M_WAITOK | M_ZERO);
173 for (n = 0; n < ncpus; ++n)
174 mtx_init(&pipe_gdlocks[n].mtx, "pipekm");
176 SYSINIT(kmem, SI_BOOT2_MACHDEP, SI_ORDER_ANY, pipeinit, NULL);
178 static void pipeclose (struct pipe *pipe,
179 struct pipebuf *pbr, struct pipebuf *pbw);
180 static void pipe_free_kmem (struct pipebuf *buf);
181 static int pipe_create (struct pipe **pipep);
184 * Test and clear the specified flag, wakeup(pb) if it was set.
185 * This function must also act as a memory barrier.
187 static __inline void
188 pipesignal(struct pipebuf *pb, uint32_t flags)
190 uint32_t oflags;
191 uint32_t nflags;
193 for (;;) {
194 oflags = pb->state;
195 cpu_ccfence();
196 nflags = oflags & ~flags;
197 if (atomic_cmpset_int(&pb->state, oflags, nflags))
198 break;
200 if (oflags & flags)
201 wakeup(pb);
207 static __inline void
208 pipewakeup(struct pipebuf *pb, int dosigio)
210 if (dosigio && (pb->state & PIPE_ASYNC) && pb->sigio) {
211 lwkt_gettoken(&sigio_token);
212 pgsigio(pb->sigio, SIGIO, 0);
213 lwkt_reltoken(&sigio_token);
215 KNOTE(&pb->kq.ki_note, 0);
219 * These routines are called before and after a UIO. The UIO
220 * may block, causing our held tokens to be lost temporarily.
222 * We use these routines to serialize reads against other reads
223 * and writes against other writes.
225 * The appropriate token is held on entry so *ipp does not race.
227 static __inline int
228 pipe_start_uio(int *ipp)
230 int error;
232 while (*ipp) {
233 *ipp = -1;
234 error = tsleep(ipp, PCATCH, "pipexx", 0);
235 if (error)
236 return (error);
238 *ipp = 1;
239 return (0);
242 static __inline void
243 pipe_end_uio(int *ipp)
245 if (*ipp < 0) {
246 *ipp = 0;
247 wakeup(ipp);
248 } else {
249 KKASSERT(*ipp > 0);
250 *ipp = 0;
255 * The pipe system call for the DTYPE_PIPE type of pipes
257 * pipe_args(int dummy)
259 * MPSAFE
262 sys_pipe(struct sysmsg *sysmsg, const struct pipe_args *uap)
264 return kern_pipe(sysmsg->sysmsg_fds, 0);
268 sys_pipe2(struct sysmsg *sysmsg, const struct pipe2_args *uap)
270 return kern_pipe(sysmsg->sysmsg_fds, uap->flags);
274 kern_pipe(long *fds, int flags)
276 struct thread *td = curthread;
277 struct filedesc *fdp = td->td_proc->p_fd;
278 struct file *rf, *wf;
279 struct pipe *pipe;
280 int fd1, fd2, error;
282 pipe = NULL;
283 if (pipe_create(&pipe)) {
284 pipeclose(pipe, &pipe->bufferA, &pipe->bufferB);
285 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
286 return (ENFILE);
289 error = falloc(td->td_lwp, &rf, &fd1);
290 if (error) {
291 pipeclose(pipe, &pipe->bufferA, &pipe->bufferB);
292 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
293 return (error);
295 fds[0] = fd1;
298 * Warning: once we've gotten past allocation of the fd for the
299 * read-side, we can only drop the read side via fdrop() in order
300 * to avoid races against processes which manage to dup() the read
301 * side while we are blocked trying to allocate the write side.
303 rf->f_type = DTYPE_PIPE;
304 rf->f_flag = FREAD | FWRITE;
305 rf->f_ops = &pipeops;
306 rf->f_data = (void *)((intptr_t)pipe | 0);
307 if (flags & O_NONBLOCK)
308 rf->f_flag |= O_NONBLOCK;
309 if (flags & O_CLOEXEC)
310 fdp->fd_files[fd1].fileflags |= UF_EXCLOSE;
312 error = falloc(td->td_lwp, &wf, &fd2);
313 if (error) {
314 fsetfd(fdp, NULL, fd1);
315 fdrop(rf);
316 /* pipeA has been closed by fdrop() */
317 /* close pipeB here */
318 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
319 return (error);
321 wf->f_type = DTYPE_PIPE;
322 wf->f_flag = FREAD | FWRITE;
323 wf->f_ops = &pipeops;
324 wf->f_data = (void *)((intptr_t)pipe | 1);
325 if (flags & O_NONBLOCK)
326 wf->f_flag |= O_NONBLOCK;
327 if (flags & O_CLOEXEC)
328 fdp->fd_files[fd2].fileflags |= UF_EXCLOSE;
330 fds[1] = fd2;
333 * Once activated the peer relationship remains valid until
334 * both sides are closed.
336 fsetfd(fdp, rf, fd1);
337 fsetfd(fdp, wf, fd2);
338 fdrop(rf);
339 fdrop(wf);
341 return (0);
345 * [re]allocates KVA for the pipe's circular buffer. The space is
346 * pageable. Called twice to setup full-duplex communications.
348 * NOTE: Independent vm_object's are used to improve performance.
350 * Returns 0 on success, ENOMEM on failure.
352 static int
353 pipespace(struct pipe *pipe, struct pipebuf *pb, size_t size)
355 struct vm_object *object;
356 caddr_t buffer;
357 vm_pindex_t npages;
358 int error;
360 size = (size + PAGE_MASK) & ~(size_t)PAGE_MASK;
361 if (size < 16384)
362 size = 16384;
363 if (size > 1024*1024)
364 size = 1024*1024;
366 npages = round_page(size) / PAGE_SIZE;
367 object = pb->object;
370 * [re]create the object if necessary and reserve space for it
371 * in the kernel_map. The object and memory are pageable. On
372 * success, free the old resources before assigning the new
373 * ones.
375 if (object == NULL || object->size != npages) {
376 object = vm_object_allocate(OBJT_DEFAULT, npages);
377 buffer = (caddr_t)vm_map_min(kernel_map);
379 error = vm_map_find(kernel_map, object, NULL,
380 0, (vm_offset_t *)&buffer, size,
381 PAGE_SIZE, TRUE,
382 VM_MAPTYPE_NORMAL, VM_SUBSYS_PIPE,
383 VM_PROT_ALL, VM_PROT_ALL, 0);
385 if (error != KERN_SUCCESS) {
386 vm_object_deallocate(object);
387 return (ENOMEM);
389 pipe_free_kmem(pb);
390 pb->object = object;
391 pb->buffer = buffer;
392 pb->size = size;
394 pb->rindex = 0;
395 pb->windex = 0;
397 return (0);
401 * Initialize and allocate VM and memory for pipe, pulling the pipe from
402 * our per-cpu cache if possible.
404 * Returns 0 on success, else an error code (typically ENOMEM). Caller
405 * must still deallocate the pipe on failure.
407 static int
408 pipe_create(struct pipe **pipep)
410 globaldata_t gd = mycpu;
411 struct pipe *pipe;
412 int error;
414 if ((pipe = gd->gd_pipeq) != NULL) {
415 gd->gd_pipeq = pipe->next;
416 --gd->gd_pipeqcount;
417 pipe->next = NULL;
418 } else {
419 pipe = kmalloc(sizeof(*pipe), M_PIPE, M_WAITOK | M_ZERO);
420 pipe->inum = gd->gd_anoninum++ * ncpus + gd->gd_cpuid + 2;
421 lwkt_token_init(&pipe->bufferA.rlock, "piper");
422 lwkt_token_init(&pipe->bufferA.wlock, "pipew");
423 lwkt_token_init(&pipe->bufferB.rlock, "piper");
424 lwkt_token_init(&pipe->bufferB.wlock, "pipew");
426 *pipep = pipe;
427 if ((error = pipespace(pipe, &pipe->bufferA, pipe_size)) != 0) {
428 return (error);
430 if ((error = pipespace(pipe, &pipe->bufferB, pipe_size)) != 0) {
431 return (error);
433 vfs_timestamp(&pipe->ctime);
434 pipe->bufferA.atime = pipe->ctime;
435 pipe->bufferA.mtime = pipe->ctime;
436 pipe->bufferB.atime = pipe->ctime;
437 pipe->bufferB.mtime = pipe->ctime;
438 pipe->open_count = 2;
440 return (0);
444 * Read data from a pipe
446 static int
447 pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
449 struct pipebuf *rpb;
450 struct pipebuf *wpb;
451 struct pipe *pipe;
452 size_t nread = 0;
453 size_t size; /* total bytes available */
454 size_t nsize; /* total bytes to read */
455 size_t rindex; /* contiguous bytes available */
456 int notify_writer;
457 int bigread;
458 int bigcount;
459 int error;
460 int nbio;
462 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
463 if ((intptr_t)fp->f_data & 1) {
464 rpb = &pipe->bufferB;
465 wpb = &pipe->bufferA;
466 } else {
467 rpb = &pipe->bufferA;
468 wpb = &pipe->bufferB;
470 atomic_set_int(&curthread->td_mpflags, TDF_MP_BATCH_DEMARC);
472 if (uio->uio_resid == 0)
473 return(0);
476 * Calculate nbio
478 if (fflags & O_FBLOCKING)
479 nbio = 0;
480 else if (fflags & O_FNONBLOCKING)
481 nbio = 1;
482 else if (fp->f_flag & O_NONBLOCK)
483 nbio = 1;
484 else
485 nbio = 0;
488 * 'quick' NBIO test before things get expensive.
490 if (nbio && rpb->rindex == rpb->windex &&
491 (rpb->state & PIPE_REOF) == 0) {
492 return EAGAIN;
496 * Reads are serialized. Note however that buffer.buffer and
497 * buffer.size can change out from under us when the number
498 * of bytes in the buffer are zero due to the write-side doing a
499 * pipespace().
501 lwkt_gettoken(&rpb->rlock);
502 error = pipe_start_uio(&rpb->rip);
503 if (error) {
504 lwkt_reltoken(&rpb->rlock);
505 return (error);
507 notify_writer = 0;
509 bigread = (uio->uio_resid > 10 * 1024 * 1024);
510 bigcount = 10;
512 while (uio->uio_resid) {
514 * Don't hog the cpu.
516 if (bigread && --bigcount == 0) {
517 lwkt_user_yield();
518 bigcount = 10;
519 if (CURSIG(curthread->td_lwp)) {
520 error = EINTR;
521 break;
526 * lfence required to avoid read-reordering of buffer
527 * contents prior to validation of size.
529 size = rpb->windex - rpb->rindex;
530 cpu_lfence();
531 if (size) {
532 rindex = rpb->rindex & (rpb->size - 1);
533 nsize = size;
534 if (nsize > rpb->size - rindex)
535 nsize = rpb->size - rindex;
536 nsize = szmin(nsize, uio->uio_resid);
539 * Limit how much we move in one go so we have a
540 * chance to kick the writer while data is still
541 * available in the pipe. This avoids getting into
542 * a ping-pong with the writer.
544 if (nsize > (rpb->size >> 1))
545 nsize = rpb->size >> 1;
547 error = uiomove(&rpb->buffer[rindex], nsize, uio);
548 if (error)
549 break;
550 rpb->rindex += nsize;
551 nread += nsize;
554 * If the FIFO is still over half full just continue
555 * and do not try to notify the writer yet. If
556 * less than half full notify any waiting writer.
558 if (size - nsize > (rpb->size >> 1)) {
559 notify_writer = 0;
560 } else {
561 notify_writer = 1;
562 pipesignal(rpb, PIPE_WANTW);
564 continue;
568 * If the "write-side" was blocked we wake it up. This code
569 * is reached when the buffer is completely emptied.
571 pipesignal(rpb, PIPE_WANTW);
574 * Pick up our copy loop again if the writer sent data to
575 * us while we were messing around.
577 * On a SMP box poll up to pipe_delay nanoseconds for new
578 * data. Typically a value of 2000 to 4000 is sufficient
579 * to eradicate most IPIs/tsleeps/wakeups when a pipe
580 * is used for synchronous communications with small packets,
581 * and 8000 or so (8uS) will pipeline large buffer xfers
582 * between cpus over a pipe.
584 * For synchronous communications a hit means doing a
585 * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS,
586 * where as miss requiring a tsleep/wakeup sequence
587 * will take 7uS or more.
589 if (rpb->windex != rpb->rindex)
590 continue;
592 #ifdef _RDTSC_SUPPORTED_
593 if (pipe_delay) {
594 int64_t tsc_target;
595 int good = 0;
597 tsc_target = tsc_get_target(pipe_delay);
598 while (tsc_test_target(tsc_target) == 0) {
599 cpu_lfence();
600 if (rpb->windex != rpb->rindex) {
601 good = 1;
602 break;
604 cpu_pause();
606 if (good)
607 continue;
609 #endif
612 * Detect EOF condition, do not set error.
614 if (rpb->state & PIPE_REOF)
615 break;
618 * Break if some data was read, or if this was a non-blocking
619 * read.
621 if (nread > 0)
622 break;
624 if (nbio) {
625 error = EAGAIN;
626 break;
630 * Last chance, interlock with WANTR
632 tsleep_interlock(rpb, PCATCH);
633 atomic_set_int(&rpb->state, PIPE_WANTR);
636 * Retest bytes available after memory barrier above.
638 size = rpb->windex - rpb->rindex;
639 if (size)
640 continue;
643 * Retest EOF after memory barrier above.
645 if (rpb->state & PIPE_REOF)
646 break;
649 * Wait for more data or state change
651 error = tsleep(rpb, PCATCH | PINTERLOCKED, "piperd", 0);
652 if (error)
653 break;
655 pipe_end_uio(&rpb->rip);
658 * Uptime last access time
660 if (error == 0 && nread && rpb->lticks != ticks) {
661 vfs_timestamp(&rpb->atime);
662 rpb->lticks = ticks;
666 * If we drained the FIFO more then half way then handle
667 * write blocking hysteresis.
669 * Note that PIPE_WANTW cannot be set by the writer without
670 * it holding both rlock and wlock, so we can test it
671 * while holding just rlock.
673 if (notify_writer) {
675 * Synchronous blocking is done on the pipe involved
677 pipesignal(rpb, PIPE_WANTW);
680 * But we may also have to deal with a kqueue which is
681 * stored on the same pipe as its descriptor, so a
682 * EVFILT_WRITE event waiting for our side to drain will
683 * be on the other side.
685 pipewakeup(wpb, 0);
687 /*size = rpb->windex - rpb->rindex;*/
688 lwkt_reltoken(&rpb->rlock);
690 return (error);
693 static int
694 pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
696 struct pipebuf *rpb;
697 struct pipebuf *wpb;
698 struct pipe *pipe;
699 size_t windex;
700 size_t space;
701 size_t wcount;
702 size_t orig_resid;
703 int bigwrite;
704 int bigcount;
705 int error;
706 int nbio;
708 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
709 if ((intptr_t)fp->f_data & 1) {
710 rpb = &pipe->bufferB;
711 wpb = &pipe->bufferA;
712 } else {
713 rpb = &pipe->bufferA;
714 wpb = &pipe->bufferB;
718 * Calculate nbio
720 if (fflags & O_FBLOCKING)
721 nbio = 0;
722 else if (fflags & O_FNONBLOCKING)
723 nbio = 1;
724 else if (fp->f_flag & O_NONBLOCK)
725 nbio = 1;
726 else
727 nbio = 0;
730 * 'quick' NBIO test before things get expensive.
732 if (nbio && wpb->size == (wpb->windex - wpb->rindex) &&
733 uio->uio_resid && (wpb->state & PIPE_WEOF) == 0) {
734 return EAGAIN;
738 * Writes go to the peer. The peer will always exist.
740 lwkt_gettoken(&wpb->wlock);
741 if (wpb->state & PIPE_WEOF) {
742 lwkt_reltoken(&wpb->wlock);
743 return (EPIPE);
747 * Degenerate case (EPIPE takes prec)
749 if (uio->uio_resid == 0) {
750 lwkt_reltoken(&wpb->wlock);
751 return(0);
755 * Writes are serialized (start_uio must be called with wlock)
757 error = pipe_start_uio(&wpb->wip);
758 if (error) {
759 lwkt_reltoken(&wpb->wlock);
760 return (error);
763 orig_resid = uio->uio_resid;
764 wcount = 0;
766 bigwrite = (uio->uio_resid > 10 * 1024 * 1024);
767 bigcount = 10;
769 while (uio->uio_resid) {
770 if (wpb->state & PIPE_WEOF) {
771 error = EPIPE;
772 break;
776 * Don't hog the cpu.
778 if (bigwrite && --bigcount == 0) {
779 lwkt_user_yield();
780 bigcount = 10;
781 if (CURSIG(curthread->td_lwp)) {
782 error = EINTR;
783 break;
787 windex = wpb->windex & (wpb->size - 1);
788 space = wpb->size - (wpb->windex - wpb->rindex);
791 * Writes of size <= PIPE_BUF must be atomic.
793 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
794 space = 0;
797 * Write to fill, read size handles write hysteresis. Also
798 * additional restrictions can cause select-based non-blocking
799 * writes to spin.
801 if (space > 0) {
802 size_t segsize;
805 * We want to notify a potentially waiting reader
806 * before we exhaust the write buffer for SMP
807 * pipelining. Otherwise the write/read will begin
808 * to ping-pong.
810 space = szmin(space, uio->uio_resid);
811 if (space > (wpb->size >> 1))
812 space = (wpb->size >> 1);
815 * First segment to transfer is minimum of
816 * transfer size and contiguous space in
817 * pipe buffer. If first segment to transfer
818 * is less than the transfer size, we've got
819 * a wraparound in the buffer.
821 segsize = wpb->size - windex;
822 if (segsize > space)
823 segsize = space;
826 * If this is the first loop and the reader is
827 * blocked, do a preemptive wakeup of the reader.
829 * On SMP the IPI latency plus the wlock interlock
830 * on the reader side is the fastest way to get the
831 * reader going. (The scheduler will hard loop on
832 * lock tokens).
834 if (wcount == 0)
835 pipesignal(wpb, PIPE_WANTR);
838 * Transfer segment, which may include a wrap-around.
839 * Update windex to account for both all in one go
840 * so the reader can read() the data atomically.
842 error = uiomove(&wpb->buffer[windex], segsize, uio);
843 if (error == 0 && segsize < space) {
844 segsize = space - segsize;
845 error = uiomove(&wpb->buffer[0], segsize, uio);
847 if (error)
848 break;
851 * Memory fence prior to windex updating (note: not
852 * needed so this is a NOP on Intel).
854 cpu_sfence();
855 wpb->windex += space;
858 * Signal reader
860 if (wcount != 0)
861 pipesignal(wpb, PIPE_WANTR);
862 wcount += space;
863 continue;
867 * Wakeup any pending reader
869 pipesignal(wpb, PIPE_WANTR);
872 * don't block on non-blocking I/O
874 if (nbio) {
875 error = EAGAIN;
876 break;
879 #ifdef _RDTSC_SUPPORTED_
880 if (pipe_delay) {
881 int64_t tsc_target;
882 int good = 0;
884 tsc_target = tsc_get_target(pipe_delay);
885 while (tsc_test_target(tsc_target) == 0) {
886 cpu_lfence();
887 space = wpb->size - (wpb->windex - wpb->rindex);
888 if ((space < uio->uio_resid) &&
889 (orig_resid <= PIPE_BUF)) {
890 space = 0;
892 if (space) {
893 good = 1;
894 break;
896 cpu_pause();
898 if (good)
899 continue;
901 #endif
904 * Interlocked test. Atomic op enforces the memory barrier.
906 tsleep_interlock(wpb, PCATCH);
907 atomic_set_int(&wpb->state, PIPE_WANTW);
910 * Retest space available after memory barrier above.
911 * Writes of size <= PIPE_BUF must be atomic.
913 space = wpb->size - (wpb->windex - wpb->rindex);
914 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
915 space = 0;
918 * Retest EOF after memory barrier above.
920 if (wpb->state & PIPE_WEOF) {
921 error = EPIPE;
922 break;
926 * We have no more space and have something to offer,
927 * wake up select/poll/kq.
929 if (space == 0) {
930 pipewakeup(wpb, 1);
931 error = tsleep(wpb, PCATCH | PINTERLOCKED, "pipewr", 0);
935 * Break out if we errored or the read side wants us to go
936 * away.
938 if (error)
939 break;
940 if (wpb->state & PIPE_WEOF) {
941 error = EPIPE;
942 break;
945 pipe_end_uio(&wpb->wip);
948 * If we have put any characters in the buffer, we wake up
949 * the reader.
951 * Both rlock and wlock are required to be able to modify pipe_state.
953 if (wpb->windex != wpb->rindex) {
954 pipesignal(wpb, PIPE_WANTR);
955 pipewakeup(wpb, 1);
959 * Don't return EPIPE if I/O was successful
961 if ((wpb->rindex == wpb->windex) &&
962 (uio->uio_resid == 0) &&
963 (error == EPIPE)) {
964 error = 0;
967 if (error == 0 && wpb->lticks != ticks) {
968 vfs_timestamp(&wpb->mtime);
969 wpb->lticks = ticks;
973 * We have something to offer,
974 * wake up select/poll/kq.
976 /*space = wpb->windex - wpb->rindex;*/
977 lwkt_reltoken(&wpb->wlock);
979 return (error);
983 * we implement a very minimal set of ioctls for compatibility with sockets.
985 static int
986 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data,
987 struct ucred *cred, struct sysmsg *msg)
989 struct pipebuf *rpb;
990 struct pipe *pipe;
991 int error;
993 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
994 if ((intptr_t)fp->f_data & 1) {
995 rpb = &pipe->bufferB;
996 } else {
997 rpb = &pipe->bufferA;
1000 lwkt_gettoken(&rpb->rlock);
1001 lwkt_gettoken(&rpb->wlock);
1003 switch (cmd) {
1004 case FIOASYNC:
1005 if (*(int *)data) {
1006 atomic_set_int(&rpb->state, PIPE_ASYNC);
1007 } else {
1008 atomic_clear_int(&rpb->state, PIPE_ASYNC);
1010 error = 0;
1011 break;
1012 case FIONREAD:
1013 *(int *)data = (int)(rpb->windex - rpb->rindex);
1014 error = 0;
1015 break;
1016 case FIOSETOWN:
1017 error = fsetown(*(int *)data, &rpb->sigio);
1018 break;
1019 case FIOGETOWN:
1020 *(int *)data = fgetown(&rpb->sigio);
1021 error = 0;
1022 break;
1023 case TIOCSPGRP:
1024 /* This is deprecated, FIOSETOWN should be used instead. */
1025 error = fsetown(-(*(int *)data), &rpb->sigio);
1026 break;
1028 case TIOCGPGRP:
1029 /* This is deprecated, FIOGETOWN should be used instead. */
1030 *(int *)data = -fgetown(&rpb->sigio);
1031 error = 0;
1032 break;
1033 default:
1034 error = ENOTTY;
1035 break;
1037 lwkt_reltoken(&rpb->wlock);
1038 lwkt_reltoken(&rpb->rlock);
1040 return (error);
1044 * MPSAFE
1046 static int
1047 pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred)
1049 struct pipebuf *rpb;
1050 struct pipe *pipe;
1052 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1053 if ((intptr_t)fp->f_data & 1) {
1054 rpb = &pipe->bufferB;
1055 } else {
1056 rpb = &pipe->bufferA;
1059 bzero((caddr_t)ub, sizeof(*ub));
1060 ub->st_mode = S_IFIFO;
1061 ub->st_blksize = rpb->size;
1062 ub->st_size = rpb->windex - rpb->rindex;
1063 ub->st_blocks = howmany(ub->st_size, ub->st_blksize);
1064 ub->st_atimespec = rpb->atime;
1065 ub->st_mtimespec = rpb->mtime;
1066 ub->st_ctimespec = pipe->ctime;
1067 ub->st_uid = fp->f_cred->cr_uid;
1068 ub->st_gid = fp->f_cred->cr_gid;
1069 ub->st_ino = pipe->inum;
1071 * Left as 0: st_dev, st_nlink, st_rdev,
1072 * st_flags, st_gen.
1073 * XXX (st_dev, st_ino) should be unique.
1076 return (0);
1079 static int
1080 pipe_close(struct file *fp)
1082 struct pipebuf *rpb;
1083 struct pipebuf *wpb;
1084 struct pipe *pipe;
1086 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1087 if ((intptr_t)fp->f_data & 1) {
1088 rpb = &pipe->bufferB;
1089 wpb = &pipe->bufferA;
1090 } else {
1091 rpb = &pipe->bufferA;
1092 wpb = &pipe->bufferB;
1095 fp->f_ops = &badfileops;
1096 fp->f_data = NULL;
1097 funsetown(&rpb->sigio);
1098 pipeclose(pipe, rpb, wpb);
1100 return (0);
1104 * Shutdown one or both directions of a full-duplex pipe.
1106 static int
1107 pipe_shutdown(struct file *fp, int how)
1109 struct pipebuf *rpb;
1110 struct pipebuf *wpb;
1111 struct pipe *pipe;
1112 int error = EPIPE;
1114 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1115 if ((intptr_t)fp->f_data & 1) {
1116 rpb = &pipe->bufferB;
1117 wpb = &pipe->bufferA;
1118 } else {
1119 rpb = &pipe->bufferA;
1120 wpb = &pipe->bufferB;
1124 * We modify pipe_state on both pipes, which means we need
1125 * all four tokens!
1127 lwkt_gettoken(&rpb->rlock);
1128 lwkt_gettoken(&rpb->wlock);
1129 lwkt_gettoken(&wpb->rlock);
1130 lwkt_gettoken(&wpb->wlock);
1132 switch(how) {
1133 case SHUT_RDWR:
1134 case SHUT_RD:
1136 * EOF on my reads and peer writes
1138 atomic_set_int(&rpb->state, PIPE_REOF | PIPE_WEOF);
1139 if (rpb->state & PIPE_WANTR) {
1140 rpb->state &= ~PIPE_WANTR;
1141 wakeup(rpb);
1143 if (rpb->state & PIPE_WANTW) {
1144 rpb->state &= ~PIPE_WANTW;
1145 wakeup(rpb);
1147 error = 0;
1148 if (how == SHUT_RD)
1149 break;
1150 /* fall through */
1151 case SHUT_WR:
1153 * EOF on peer reads and my writes
1155 atomic_set_int(&wpb->state, PIPE_REOF | PIPE_WEOF);
1156 if (wpb->state & PIPE_WANTR) {
1157 wpb->state &= ~PIPE_WANTR;
1158 wakeup(wpb);
1160 if (wpb->state & PIPE_WANTW) {
1161 wpb->state &= ~PIPE_WANTW;
1162 wakeup(wpb);
1164 error = 0;
1165 break;
1167 pipewakeup(rpb, 1);
1168 pipewakeup(wpb, 1);
1170 lwkt_reltoken(&wpb->wlock);
1171 lwkt_reltoken(&wpb->rlock);
1172 lwkt_reltoken(&rpb->wlock);
1173 lwkt_reltoken(&rpb->rlock);
1175 return (error);
1179 * Destroy the pipe buffer.
1181 static void
1182 pipe_free_kmem(struct pipebuf *pb)
1184 if (pb->buffer != NULL) {
1185 kmem_free(kernel_map, (vm_offset_t)pb->buffer, pb->size);
1186 pb->buffer = NULL;
1187 pb->object = NULL;
1192 * Close one half of the pipe. We are closing the pipe for reading on rpb
1193 * and writing on wpb. This routine must be called twice with the pipebufs
1194 * reversed to close both directions.
1196 static void
1197 pipeclose(struct pipe *pipe, struct pipebuf *rpb, struct pipebuf *wpb)
1199 globaldata_t gd;
1201 if (pipe == NULL)
1202 return;
1205 * We need both the read and write tokens to modify pipe_state.
1207 lwkt_gettoken(&rpb->rlock);
1208 lwkt_gettoken(&rpb->wlock);
1211 * Set our state, wakeup anyone waiting in select/poll/kq, and
1212 * wakeup anyone blocked on our pipe. No action if our side
1213 * is already closed.
1215 if (rpb->state & PIPE_CLOSED) {
1216 lwkt_reltoken(&rpb->wlock);
1217 lwkt_reltoken(&rpb->rlock);
1218 return;
1221 atomic_set_int(&rpb->state, PIPE_CLOSED | PIPE_REOF | PIPE_WEOF);
1222 pipewakeup(rpb, 1);
1223 if (rpb->state & (PIPE_WANTR | PIPE_WANTW)) {
1224 rpb->state &= ~(PIPE_WANTR | PIPE_WANTW);
1225 wakeup(rpb);
1227 lwkt_reltoken(&rpb->wlock);
1228 lwkt_reltoken(&rpb->rlock);
1231 * Disconnect from peer.
1233 lwkt_gettoken(&wpb->rlock);
1234 lwkt_gettoken(&wpb->wlock);
1236 atomic_set_int(&wpb->state, PIPE_REOF | PIPE_WEOF);
1237 pipewakeup(wpb, 1);
1238 if (wpb->state & (PIPE_WANTR | PIPE_WANTW)) {
1239 wpb->state &= ~(PIPE_WANTR | PIPE_WANTW);
1240 wakeup(wpb);
1242 if (SLIST_FIRST(&wpb->kq.ki_note))
1243 KNOTE(&wpb->kq.ki_note, 0);
1244 lwkt_reltoken(&wpb->wlock);
1245 lwkt_reltoken(&wpb->rlock);
1248 * Free resources once both sides are closed. We maintain a pcpu
1249 * cache to improve performance, so the actual tear-down case is
1250 * limited to bulk situations.
1252 * However, the bulk tear-down case can cause intense contention
1253 * on the kernel_map when, e.g. hundreds to hundreds of thousands
1254 * of processes are killed at the same time. To deal with this we
1255 * use a pcpu mutex to maintain concurrency but also limit the
1256 * number of threads banging on the map and pmap.
1258 * We use the mtx mechanism instead of the lockmgr mechanism because
1259 * the mtx mechanism utilizes a queued design which will not break
1260 * down in the face of thousands to hundreds of thousands of
1261 * processes trying to free pipes simultaneously. The lockmgr
1262 * mechanism will wind up waking them all up each time a lock
1263 * cycles.
1265 if (atomic_fetchadd_int(&pipe->open_count, -1) == 1) {
1266 gd = mycpu;
1267 if (gd->gd_pipeqcount >= pipe_maxcache) {
1268 mtx_lock(&pipe_gdlocks[gd->gd_cpuid].mtx);
1269 pipe_free_kmem(rpb);
1270 pipe_free_kmem(wpb);
1271 mtx_unlock(&pipe_gdlocks[gd->gd_cpuid].mtx);
1272 kfree(pipe, M_PIPE);
1273 } else {
1274 rpb->state = 0;
1275 wpb->state = 0;
1276 pipe->next = gd->gd_pipeq;
1277 gd->gd_pipeq = pipe;
1278 ++gd->gd_pipeqcount;
1283 static int
1284 pipe_kqfilter(struct file *fp, struct knote *kn)
1286 struct pipebuf *rpb;
1287 struct pipebuf *wpb;
1288 struct pipe *pipe;
1290 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1291 if ((intptr_t)fp->f_data & 1) {
1292 rpb = &pipe->bufferB;
1293 wpb = &pipe->bufferA;
1294 } else {
1295 rpb = &pipe->bufferA;
1296 wpb = &pipe->bufferB;
1299 switch (kn->kn_filter) {
1300 case EVFILT_READ:
1301 kn->kn_fop = &pipe_rfiltops;
1302 break;
1303 case EVFILT_WRITE:
1304 kn->kn_fop = &pipe_wfiltops;
1305 break;
1306 default:
1307 return (EOPNOTSUPP);
1310 if (rpb == &pipe->bufferA)
1311 kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 0);
1312 else
1313 kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 1);
1315 knote_insert(&rpb->kq.ki_note, kn);
1317 return (0);
1320 static void
1321 filt_pipedetach(struct knote *kn)
1323 struct pipebuf *rpb;
1324 struct pipebuf *wpb;
1325 struct pipe *pipe;
1327 pipe = (struct pipe *)((intptr_t)kn->kn_hook & ~(intptr_t)1);
1328 if ((intptr_t)kn->kn_hook & 1) {
1329 rpb = &pipe->bufferB;
1330 wpb = &pipe->bufferA;
1331 } else {
1332 rpb = &pipe->bufferA;
1333 wpb = &pipe->bufferB;
1335 knote_remove(&rpb->kq.ki_note, kn);
1338 /*ARGSUSED*/
1339 static int
1340 filt_piperead(struct knote *kn, long hint)
1342 struct pipebuf *rpb;
1343 struct pipebuf *wpb;
1344 struct pipe *pipe;
1345 int ready = 0;
1347 pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1);
1348 if ((intptr_t)kn->kn_fp->f_data & 1) {
1349 rpb = &pipe->bufferB;
1350 wpb = &pipe->bufferA;
1351 } else {
1352 rpb = &pipe->bufferA;
1353 wpb = &pipe->bufferB;
1357 * We shouldn't need the pipe locks because the knote itself is
1358 * locked via KN_PROCESSING. If we lose a race against the writer,
1359 * the writer will just issue a KNOTE() after us.
1361 #if 0
1362 lwkt_gettoken(&rpb->rlock);
1363 lwkt_gettoken(&rpb->wlock);
1364 #endif
1366 kn->kn_data = rpb->windex - rpb->rindex;
1367 if (kn->kn_data < 0)
1368 kn->kn_data = 0;
1370 if (rpb->state & PIPE_REOF) {
1372 * Only set NODATA if all data has been exhausted
1374 if (kn->kn_data == 0)
1375 kn->kn_flags |= EV_NODATA;
1376 kn->kn_flags |= EV_EOF;
1379 * Only set HUP if the pipe is completely closed.
1380 * half-closed does not count (to make the behavior
1381 * the same as linux).
1383 if (wpb->state & PIPE_CLOSED) {
1384 kn->kn_flags |= EV_HUP;
1385 ready = 1;
1389 #if 0
1390 lwkt_reltoken(&rpb->wlock);
1391 lwkt_reltoken(&rpb->rlock);
1392 #endif
1394 if (!ready && (kn->kn_sfflags & NOTE_HUPONLY) == 0)
1395 ready = kn->kn_data > 0;
1397 return (ready);
1400 /*ARGSUSED*/
1401 static int
1402 filt_pipewrite(struct knote *kn, long hint)
1404 struct pipebuf *rpb;
1405 struct pipebuf *wpb;
1406 struct pipe *pipe;
1407 int ready = 0;
1409 pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1);
1410 if ((intptr_t)kn->kn_fp->f_data & 1) {
1411 rpb = &pipe->bufferB;
1412 wpb = &pipe->bufferA;
1413 } else {
1414 rpb = &pipe->bufferA;
1415 wpb = &pipe->bufferB;
1418 kn->kn_data = 0;
1419 if (wpb->state & PIPE_CLOSED) {
1420 kn->kn_flags |= EV_EOF | EV_HUP | EV_NODATA;
1421 return (1);
1425 * We shouldn't need the pipe locks because the knote itself is
1426 * locked via KN_PROCESSING. If we lose a race against the reader,
1427 * the writer will just issue a KNOTE() after us.
1429 #if 0
1430 lwkt_gettoken(&wpb->rlock);
1431 lwkt_gettoken(&wpb->wlock);
1432 #endif
1434 if (wpb->state & PIPE_WEOF) {
1435 kn->kn_flags |= EV_EOF | EV_HUP | EV_NODATA;
1436 ready = 1;
1439 if (!ready) {
1440 kn->kn_data = wpb->size - (wpb->windex - wpb->rindex);
1441 if (kn->kn_data < 0)
1442 kn->kn_data = 0;
1445 #if 0
1446 lwkt_reltoken(&wpb->wlock);
1447 lwkt_reltoken(&wpb->rlock);
1448 #endif
1450 if (!ready)
1451 ready = kn->kn_data >= PIPE_BUF;
1453 return (ready);