server: Allow server side NtQueryVolumeInformationFile implementation.
[wine.git] / server / named_pipe.c
blob6dd2fd628a2f6d237c783a1418e9d9f5e89f4427
1 /*
2 * Server-side pipe management
4 * Copyright (C) 1998 Alexandre Julliard
5 * Copyright (C) 2001 Mike McCormack
6 * Copyright 2016 Jacek Caban for CodeWeavers
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License as published by the Free Software Foundation; either
11 * version 2.1 of the License, or (at your option) any later version.
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Lesser General Public License for more details.
18 * You should have received a copy of the GNU Lesser General Public
19 * License along with this library; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
23 #include "config.h"
24 #include "wine/port.h"
26 #include <assert.h>
27 #include <fcntl.h>
28 #include <string.h>
29 #include <stdarg.h>
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <sys/time.h>
33 #include <sys/types.h>
34 #ifdef HAVE_SYS_SOCKET_H
35 #include <sys/socket.h>
36 #endif
37 #include <time.h>
38 #include <unistd.h>
39 #ifdef HAVE_POLL_H
40 #include <poll.h>
41 #endif
43 #include "ntstatus.h"
44 #define WIN32_NO_STATUS
45 #include "windef.h"
46 #include "winternl.h"
47 #include "winioctl.h"
49 #include "file.h"
50 #include "handle.h"
51 #include "thread.h"
52 #include "request.h"
53 #include "security.h"
55 enum pipe_state
57 ps_idle_server,
58 ps_wait_open,
59 ps_connected_server,
60 ps_wait_disconnect,
61 ps_wait_connect
64 struct named_pipe;
66 struct pipe_message
68 struct list entry; /* entry in message queue */
69 data_size_t read_pos; /* already read bytes */
70 struct iosb *iosb; /* message iosb */
71 struct async *async; /* async of pending write */
74 struct pipe_end
76 struct object obj; /* object header */
77 struct fd *fd; /* pipe file descriptor */
78 unsigned int flags; /* pipe flags */
79 struct pipe_end *connection; /* the other end of the pipe */
80 data_size_t buffer_size;/* size of buffered data that doesn't block caller */
81 struct list message_queue;
82 struct async_queue read_q; /* read queue */
83 struct async_queue write_q; /* write queue */
86 struct pipe_server
88 struct pipe_end pipe_end; /* common header for pipe_client and pipe_server */
89 struct fd *ioctl_fd; /* file descriptor for ioctls when not connected */
90 struct list entry; /* entry in named pipe servers list */
91 enum pipe_state state; /* server state */
92 struct pipe_client *client; /* client that this server is connected to */
93 struct named_pipe *pipe;
94 struct timeout_user *flush_poll;
95 unsigned int options; /* pipe options */
98 struct pipe_client
100 struct pipe_end pipe_end; /* common header for pipe_client and pipe_server */
101 struct pipe_server *server; /* server that this client is connected to */
102 unsigned int flags; /* file flags */
105 struct named_pipe
107 struct object obj; /* object header */
108 unsigned int flags;
109 unsigned int sharing;
110 unsigned int maxinstances;
111 unsigned int outsize;
112 unsigned int insize;
113 unsigned int instances;
114 timeout_t timeout;
115 struct list servers; /* list of servers using this pipe */
116 struct async_queue waiters; /* list of clients waiting to connect */
119 struct named_pipe_device
121 struct object obj; /* object header */
122 struct fd *fd; /* pseudo-fd for ioctls */
123 struct namespace *pipes; /* named pipe namespace */
126 static void named_pipe_dump( struct object *obj, int verbose );
127 static unsigned int named_pipe_map_access( struct object *obj, unsigned int access );
128 static int named_pipe_link_name( struct object *obj, struct object_name *name, struct object *parent );
129 static struct object *named_pipe_open_file( struct object *obj, unsigned int access,
130 unsigned int sharing, unsigned int options );
131 static void named_pipe_destroy( struct object *obj );
133 static const struct object_ops named_pipe_ops =
135 sizeof(struct named_pipe), /* size */
136 named_pipe_dump, /* dump */
137 no_get_type, /* get_type */
138 no_add_queue, /* add_queue */
139 NULL, /* remove_queue */
140 NULL, /* signaled */
141 NULL, /* satisfied */
142 no_signal, /* signal */
143 no_get_fd, /* get_fd */
144 named_pipe_map_access, /* map_access */
145 default_get_sd, /* get_sd */
146 default_set_sd, /* set_sd */
147 no_lookup_name, /* lookup_name */
148 named_pipe_link_name, /* link_name */
149 default_unlink_name, /* unlink_name */
150 named_pipe_open_file, /* open_file */
151 no_close_handle, /* close_handle */
152 named_pipe_destroy /* destroy */
155 /* common server and client pipe end functions */
156 static enum server_fd_type pipe_end_get_fd_type( struct fd *fd );
157 static int pipe_end_read( struct fd *fd, struct async *async, file_pos_t pos );
158 static int pipe_end_write( struct fd *fd, struct async *async_data, file_pos_t pos );
159 static void pipe_end_queue_async( struct fd *fd, struct async *async, int type, int count );
160 static void pipe_end_reselect_async( struct fd *fd, struct async_queue *queue );
162 /* server end functions */
163 static void pipe_server_dump( struct object *obj, int verbose );
164 static struct fd *pipe_server_get_fd( struct object *obj );
165 static void pipe_server_destroy( struct object *obj);
166 static int pipe_server_flush( struct fd *fd, struct async *async );
167 static int pipe_server_ioctl( struct fd *fd, ioctl_code_t code, struct async *async );
169 static const struct object_ops pipe_server_ops =
171 sizeof(struct pipe_server), /* size */
172 pipe_server_dump, /* dump */
173 no_get_type, /* get_type */
174 add_queue, /* add_queue */
175 remove_queue, /* remove_queue */
176 default_fd_signaled, /* signaled */
177 no_satisfied, /* satisfied */
178 no_signal, /* signal */
179 pipe_server_get_fd, /* get_fd */
180 default_fd_map_access, /* map_access */
181 default_get_sd, /* get_sd */
182 default_set_sd, /* set_sd */
183 no_lookup_name, /* lookup_name */
184 no_link_name, /* link_name */
185 NULL, /* unlink_name */
186 no_open_file, /* open_file */
187 fd_close_handle, /* close_handle */
188 pipe_server_destroy /* destroy */
191 static const struct fd_ops pipe_server_fd_ops =
193 default_fd_get_poll_events, /* get_poll_events */
194 default_poll_event, /* poll_event */
195 pipe_end_get_fd_type, /* get_fd_type */
196 pipe_end_read, /* read */
197 pipe_end_write, /* write */
198 pipe_server_flush, /* flush */
199 no_fd_get_volume_info, /* get_volume_info */
200 pipe_server_ioctl, /* ioctl */
201 pipe_end_queue_async, /* queue_async */
202 pipe_end_reselect_async /* reselect_async */
205 /* client end functions */
206 static void pipe_client_dump( struct object *obj, int verbose );
207 static int pipe_client_signaled( struct object *obj, struct wait_queue_entry *entry );
208 static struct fd *pipe_client_get_fd( struct object *obj );
209 static void pipe_client_destroy( struct object *obj );
210 static int pipe_client_flush( struct fd *fd, struct async *async );
211 static int pipe_client_ioctl( struct fd *fd, ioctl_code_t code, struct async *async );
213 static const struct object_ops pipe_client_ops =
215 sizeof(struct pipe_client), /* size */
216 pipe_client_dump, /* dump */
217 no_get_type, /* get_type */
218 add_queue, /* add_queue */
219 remove_queue, /* remove_queue */
220 pipe_client_signaled, /* signaled */
221 no_satisfied, /* satisfied */
222 no_signal, /* signal */
223 pipe_client_get_fd, /* get_fd */
224 default_fd_map_access, /* map_access */
225 default_get_sd, /* get_sd */
226 default_set_sd, /* set_sd */
227 no_lookup_name, /* lookup_name */
228 no_link_name, /* link_name */
229 NULL, /* unlink_name */
230 no_open_file, /* open_file */
231 fd_close_handle, /* close_handle */
232 pipe_client_destroy /* destroy */
235 static const struct fd_ops pipe_client_fd_ops =
237 default_fd_get_poll_events, /* get_poll_events */
238 default_poll_event, /* poll_event */
239 pipe_end_get_fd_type, /* get_fd_type */
240 pipe_end_read, /* read */
241 pipe_end_write, /* write */
242 pipe_client_flush, /* flush */
243 no_fd_get_volume_info, /* get_volume_info */
244 pipe_client_ioctl, /* ioctl */
245 pipe_end_queue_async, /* queue_async */
246 pipe_end_reselect_async /* reselect_async */
249 static void named_pipe_device_dump( struct object *obj, int verbose );
250 static struct object_type *named_pipe_device_get_type( struct object *obj );
251 static struct fd *named_pipe_device_get_fd( struct object *obj );
252 static struct object *named_pipe_device_lookup_name( struct object *obj,
253 struct unicode_str *name, unsigned int attr );
254 static struct object *named_pipe_device_open_file( struct object *obj, unsigned int access,
255 unsigned int sharing, unsigned int options );
256 static void named_pipe_device_destroy( struct object *obj );
257 static enum server_fd_type named_pipe_device_get_fd_type( struct fd *fd );
258 static int named_pipe_device_ioctl( struct fd *fd, ioctl_code_t code, struct async *async );
260 static const struct object_ops named_pipe_device_ops =
262 sizeof(struct named_pipe_device), /* size */
263 named_pipe_device_dump, /* dump */
264 named_pipe_device_get_type, /* get_type */
265 no_add_queue, /* add_queue */
266 NULL, /* remove_queue */
267 NULL, /* signaled */
268 no_satisfied, /* satisfied */
269 no_signal, /* signal */
270 named_pipe_device_get_fd, /* get_fd */
271 no_map_access, /* map_access */
272 default_get_sd, /* get_sd */
273 default_set_sd, /* set_sd */
274 named_pipe_device_lookup_name, /* lookup_name */
275 directory_link_name, /* link_name */
276 default_unlink_name, /* unlink_name */
277 named_pipe_device_open_file, /* open_file */
278 fd_close_handle, /* close_handle */
279 named_pipe_device_destroy /* destroy */
282 static const struct fd_ops named_pipe_device_fd_ops =
284 default_fd_get_poll_events, /* get_poll_events */
285 default_poll_event, /* poll_event */
286 named_pipe_device_get_fd_type, /* get_fd_type */
287 no_fd_read, /* read */
288 no_fd_write, /* write */
289 no_fd_flush, /* flush */
290 no_fd_get_volume_info, /* get_volume_info */
291 named_pipe_device_ioctl, /* ioctl */
292 default_fd_queue_async, /* queue_async */
293 default_fd_reselect_async /* reselect_async */
296 /* Returns if we handle I/O via server calls. Currently message-mode pipes are handled this way. */
297 static int use_server_io( struct pipe_end *pipe_end )
299 return pipe_end->flags & NAMED_PIPE_MESSAGE_STREAM_WRITE;
302 static void named_pipe_dump( struct object *obj, int verbose )
304 fputs( "Named pipe\n", stderr );
307 static unsigned int named_pipe_map_access( struct object *obj, unsigned int access )
309 if (access & GENERIC_READ) access |= STANDARD_RIGHTS_READ;
310 if (access & GENERIC_WRITE) access |= STANDARD_RIGHTS_WRITE | FILE_CREATE_PIPE_INSTANCE;
311 if (access & GENERIC_EXECUTE) access |= STANDARD_RIGHTS_EXECUTE;
312 if (access & GENERIC_ALL) access |= STANDARD_RIGHTS_ALL;
313 return access & ~(GENERIC_READ | GENERIC_WRITE | GENERIC_EXECUTE | GENERIC_ALL);
316 static void pipe_server_dump( struct object *obj, int verbose )
318 struct pipe_server *server = (struct pipe_server *) obj;
319 assert( obj->ops == &pipe_server_ops );
320 fprintf( stderr, "Named pipe server pipe=%p state=%d\n", server->pipe, server->state );
323 static void pipe_client_dump( struct object *obj, int verbose )
325 struct pipe_client *client = (struct pipe_client *) obj;
326 assert( obj->ops == &pipe_client_ops );
327 fprintf( stderr, "Named pipe client server=%p\n", client->server );
330 static int pipe_client_signaled( struct object *obj, struct wait_queue_entry *entry )
332 struct pipe_client *client = (struct pipe_client *) obj;
334 return client->pipe_end.fd && is_fd_signaled(client->pipe_end.fd);
337 static void named_pipe_destroy( struct object *obj)
339 struct named_pipe *pipe = (struct named_pipe *) obj;
341 assert( list_empty( &pipe->servers ) );
342 assert( !pipe->instances );
343 free_async_queue( &pipe->waiters );
346 static struct fd *pipe_client_get_fd( struct object *obj )
348 struct pipe_client *client = (struct pipe_client *) obj;
349 if (client->pipe_end.fd)
350 return (struct fd *) grab_object( client->pipe_end.fd );
351 set_error( STATUS_PIPE_DISCONNECTED );
352 return NULL;
355 static void set_server_state( struct pipe_server *server, enum pipe_state state )
357 server->state = state;
359 switch(state)
361 case ps_connected_server:
362 case ps_wait_disconnect:
363 assert( server->pipe_end.fd );
364 break;
365 case ps_wait_open:
366 case ps_idle_server:
367 assert( !server->pipe_end.fd );
368 set_no_fd_status( server->ioctl_fd, STATUS_PIPE_LISTENING );
369 break;
370 case ps_wait_connect:
371 assert( !server->pipe_end.fd );
372 set_no_fd_status( server->ioctl_fd, STATUS_PIPE_DISCONNECTED );
373 break;
377 static struct fd *pipe_server_get_fd( struct object *obj )
379 struct pipe_server *server = (struct pipe_server *) obj;
381 return (struct fd *)grab_object( server->pipe_end.fd ? server->pipe_end.fd : server->ioctl_fd );
385 static void notify_empty( struct pipe_server *server )
387 if (!server->flush_poll)
388 return;
389 assert( server->state == ps_connected_server );
390 remove_timeout_user( server->flush_poll );
391 server->flush_poll = NULL;
392 fd_async_wake_up( server->pipe_end.fd, ASYNC_TYPE_WAIT, STATUS_SUCCESS );
395 static void wake_message( struct pipe_message *message )
397 struct async *async = message->async;
399 message->async = NULL;
400 message->iosb->status = STATUS_SUCCESS;
401 message->iosb->result = message->iosb->in_size;
402 if (async)
404 async_terminate( async, message->iosb->result ? STATUS_ALERTED : STATUS_SUCCESS );
405 release_object( async );
409 static void free_message( struct pipe_message *message )
411 list_remove( &message->entry );
412 if (message->iosb) release_object( message->iosb );
413 free( message );
416 static void pipe_end_disconnect( struct pipe_end *pipe_end, unsigned int status )
418 struct pipe_end *connection = pipe_end->connection;
420 pipe_end->connection = NULL;
422 if (use_server_io( pipe_end ))
424 struct pipe_message *message, *next;
425 struct async *async;
426 if (pipe_end->fd) fd_async_wake_up( pipe_end->fd, ASYNC_TYPE_WAIT, status );
427 async_wake_up( &pipe_end->read_q, status );
428 LIST_FOR_EACH_ENTRY_SAFE( message, next, &pipe_end->message_queue, struct pipe_message, entry )
430 async = message->async;
431 if (async || status == STATUS_PIPE_DISCONNECTED) free_message( message );
432 if (!async) continue;
433 async_terminate( async, status );
434 release_object( async );
436 if (status == STATUS_PIPE_DISCONNECTED) set_fd_signaled( pipe_end->fd, 0 );
438 if (connection)
440 connection->connection = NULL;
441 pipe_end_disconnect( connection, status );
445 static void do_disconnect( struct pipe_server *server )
447 /* we may only have a server fd, if the client disconnected */
448 if (server->client)
450 assert( server->client->server == server );
451 assert( server->client->pipe_end.fd );
452 if (!use_server_io( &server->pipe_end ))
454 release_object( server->client->pipe_end.fd );
455 server->client->pipe_end.fd = NULL;
458 assert( server->pipe_end.fd );
459 if (!use_server_io( &server->pipe_end ))
460 shutdown( get_unix_fd( server->pipe_end.fd ), SHUT_RDWR );
461 release_object( server->pipe_end.fd );
462 server->pipe_end.fd = NULL;
465 static void pipe_end_destroy( struct pipe_end *pipe_end )
467 struct pipe_message *message;
469 while (!list_empty( &pipe_end->message_queue ))
471 message = LIST_ENTRY( list_head(&pipe_end->message_queue), struct pipe_message, entry );
472 assert( !message->async );
473 free_message( message );
476 free_async_queue( &pipe_end->read_q );
477 free_async_queue( &pipe_end->write_q );
480 static void pipe_server_destroy( struct object *obj)
482 struct pipe_server *server = (struct pipe_server *)obj;
484 assert( obj->ops == &pipe_server_ops );
486 pipe_end_disconnect( &server->pipe_end, STATUS_PIPE_BROKEN );
488 if (server->pipe_end.fd)
490 notify_empty( server );
491 do_disconnect( server );
494 pipe_end_destroy( &server->pipe_end );
495 if (server->client)
497 server->client->server = NULL;
498 server->client = NULL;
501 assert( server->pipe->instances );
502 server->pipe->instances--;
504 if (server->ioctl_fd) release_object( server->ioctl_fd );
505 list_remove( &server->entry );
506 release_object( server->pipe );
509 static void pipe_client_destroy( struct object *obj)
511 struct pipe_client *client = (struct pipe_client *)obj;
512 struct pipe_server *server = client->server;
514 assert( obj->ops == &pipe_client_ops );
516 pipe_end_disconnect( &client->pipe_end, STATUS_PIPE_BROKEN );
518 if (server)
520 notify_empty( server );
522 switch(server->state)
524 case ps_connected_server:
525 /* Don't destroy the server's fd here as we can't
526 do a successful flush without it. */
527 set_server_state( server, ps_wait_disconnect );
528 break;
529 case ps_idle_server:
530 case ps_wait_open:
531 case ps_wait_disconnect:
532 case ps_wait_connect:
533 assert( 0 );
535 assert( server->client );
536 server->client = NULL;
537 client->server = NULL;
540 pipe_end_destroy( &client->pipe_end );
541 if (client->pipe_end.fd) release_object( client->pipe_end.fd );
544 static void named_pipe_device_dump( struct object *obj, int verbose )
546 fputs( "Named pipe device\n", stderr );
549 static struct object_type *named_pipe_device_get_type( struct object *obj )
551 static const WCHAR name[] = {'D','e','v','i','c','e'};
552 static const struct unicode_str str = { name, sizeof(name) };
553 return get_object_type( &str );
556 static struct fd *named_pipe_device_get_fd( struct object *obj )
558 struct named_pipe_device *device = (struct named_pipe_device *)obj;
559 return (struct fd *)grab_object( device->fd );
562 static struct object *named_pipe_device_lookup_name( struct object *obj, struct unicode_str *name,
563 unsigned int attr )
565 struct named_pipe_device *device = (struct named_pipe_device*)obj;
566 struct object *found;
568 assert( obj->ops == &named_pipe_device_ops );
569 assert( device->pipes );
571 if (!name) return NULL; /* open the device itself */
573 if ((found = find_object( device->pipes, name, attr | OBJ_CASE_INSENSITIVE )))
574 name->len = 0;
576 return found;
579 static struct object *named_pipe_device_open_file( struct object *obj, unsigned int access,
580 unsigned int sharing, unsigned int options )
582 return grab_object( obj );
585 static void named_pipe_device_destroy( struct object *obj )
587 struct named_pipe_device *device = (struct named_pipe_device*)obj;
588 assert( obj->ops == &named_pipe_device_ops );
589 if (device->fd) release_object( device->fd );
590 free( device->pipes );
593 static enum server_fd_type named_pipe_device_get_fd_type( struct fd *fd )
595 return FD_TYPE_DEVICE;
598 struct object *create_named_pipe_device( struct object *root, const struct unicode_str *name )
600 struct named_pipe_device *dev;
602 if ((dev = create_named_object( root, &named_pipe_device_ops, name, 0, NULL )) &&
603 get_error() != STATUS_OBJECT_NAME_EXISTS)
605 dev->pipes = NULL;
606 if (!(dev->fd = alloc_pseudo_fd( &named_pipe_device_fd_ops, &dev->obj, 0 )) ||
607 !(dev->pipes = create_namespace( 7 )))
609 release_object( dev );
610 dev = NULL;
613 return &dev->obj;
616 static int pipe_data_remaining( struct pipe_server *server )
618 struct pollfd pfd;
619 int fd;
621 assert( server->client );
623 if (use_server_io( &server->pipe_end ))
624 return !list_empty( &server->client->pipe_end.message_queue );
626 fd = get_unix_fd( server->client->pipe_end.fd );
627 if (fd < 0)
628 return 0;
629 pfd.fd = fd;
630 pfd.events = POLLIN;
631 pfd.revents = 0;
633 if (0 > poll( &pfd, 1, 0 ))
634 return 0;
636 return pfd.revents&POLLIN;
639 static void check_flushed( void *arg )
641 struct pipe_server *server = (struct pipe_server*) arg;
643 if (pipe_data_remaining( server ))
645 server->flush_poll = add_timeout_user( -TICKS_PER_SEC / 10, check_flushed, server );
647 else
649 server->flush_poll = NULL;
650 fd_async_wake_up( server->pipe_end.fd, ASYNC_TYPE_WAIT, STATUS_SUCCESS );
654 static int pipe_end_flush( struct pipe_end *pipe_end, struct async *async )
656 if (use_server_io( pipe_end ) && (!pipe_end->connection || list_empty( &pipe_end->connection->message_queue )))
657 return 1;
659 fd_queue_async( pipe_end->fd, async, ASYNC_TYPE_WAIT );
660 set_error( STATUS_PENDING );
661 return 1;
664 static int pipe_server_flush( struct fd *fd, struct async *async )
666 struct pipe_server *server = get_fd_user( fd );
667 obj_handle_t handle;
669 if (!server || server->state != ps_connected_server) return 1;
671 if (!pipe_data_remaining( server )) return 1;
673 handle = pipe_end_flush( &server->pipe_end, async );
675 /* there's no unix way to be alerted when a pipe becomes empty, so resort to polling */
676 if (handle && !use_server_io( &server->pipe_end ) && !server->flush_poll)
677 server->flush_poll = add_timeout_user( -TICKS_PER_SEC / 10, check_flushed, server );
678 return handle;
681 static int pipe_client_flush( struct fd *fd, struct async *async )
683 struct pipe_end *pipe_end = get_fd_user( fd );
684 /* FIXME: Support byte mode. */
685 return use_server_io( pipe_end ) ? pipe_end_flush( pipe_end, async ) : 1;
688 static void message_queue_read( struct pipe_end *pipe_end, struct iosb *iosb )
690 struct pipe_message *message;
692 if (pipe_end->flags & NAMED_PIPE_MESSAGE_STREAM_READ)
694 message = LIST_ENTRY( list_head(&pipe_end->message_queue), struct pipe_message, entry );
695 iosb->out_size = min( iosb->out_size, message->iosb->in_size - message->read_pos );
696 iosb->status = message->read_pos + iosb->out_size < message->iosb->in_size
697 ? STATUS_BUFFER_OVERFLOW : STATUS_SUCCESS;
699 else
701 data_size_t avail = 0;
702 LIST_FOR_EACH_ENTRY( message, &pipe_end->message_queue, struct pipe_message, entry )
704 avail += message->iosb->in_size - message->read_pos;
705 if (avail >= iosb->out_size) break;
707 iosb->out_size = min( iosb->out_size, avail );
708 iosb->status = STATUS_SUCCESS;
711 message = LIST_ENTRY( list_head(&pipe_end->message_queue), struct pipe_message, entry );
712 if (!message->read_pos && message->iosb->in_size == iosb->out_size) /* fast path */
714 iosb->out_data = message->iosb->in_data;
715 message->iosb->in_data = NULL;
716 wake_message( message );
717 free_message( message );
719 else
721 data_size_t write_pos = 0, writing;
722 char *buf = NULL;
724 if (iosb->out_size && !(buf = iosb->out_data = malloc( iosb->out_size )))
726 iosb->out_size = 0;
727 iosb->status = STATUS_NO_MEMORY;
728 return;
733 message = LIST_ENTRY( list_head(&pipe_end->message_queue), struct pipe_message, entry );
734 writing = min( iosb->out_size - write_pos, message->iosb->in_size - message->read_pos );
735 if (writing) memcpy( buf + write_pos, (const char *)message->iosb->in_data + message->read_pos, writing );
736 write_pos += writing;
737 message->read_pos += writing;
738 if (message->read_pos == message->iosb->in_size)
740 wake_message(message);
741 free_message(message);
743 } while (write_pos < iosb->out_size);
745 iosb->result = iosb->out_size;
748 /* We call async_terminate in our reselect implementation, which causes recursive reselect.
749 * We're not interested in such reselect calls, so we ignore them. */
750 static int ignore_reselect;
752 static void reselect_write_queue( struct pipe_end *pipe_end );
754 static void reselect_read_queue( struct pipe_end *pipe_end )
756 struct async *async;
757 struct iosb *iosb;
758 int read_done = 0;
760 ignore_reselect = 1;
761 while (!list_empty( &pipe_end->message_queue ) && (async = find_pending_async( &pipe_end->read_q )))
763 iosb = async_get_iosb( async );
764 message_queue_read( pipe_end, iosb );
765 async_terminate( async, iosb->result ? STATUS_ALERTED : iosb->status );
766 release_object( async );
767 release_object( iosb );
768 read_done = 1;
770 ignore_reselect = 0;
772 if (pipe_end->connection)
774 if (list_empty( &pipe_end->message_queue ))
775 fd_async_wake_up( pipe_end->connection->fd, ASYNC_TYPE_WAIT, STATUS_SUCCESS );
776 else if (read_done)
777 reselect_write_queue( pipe_end->connection );
781 static void reselect_write_queue( struct pipe_end *pipe_end )
783 struct pipe_message *message, *next;
784 struct pipe_end *reader = pipe_end->connection;
785 data_size_t avail = 0;
787 if (!reader) return;
789 ignore_reselect = 1;
791 LIST_FOR_EACH_ENTRY_SAFE( message, next, &reader->message_queue, struct pipe_message, entry )
793 if (message->async && message->iosb->status != STATUS_PENDING)
795 release_object( message->async );
796 message->async = NULL;
797 free_message( message );
799 else
801 avail += message->iosb->in_size - message->read_pos;
802 if (message->iosb->status == STATUS_PENDING && (avail <= reader->buffer_size || !message->iosb->in_size))
803 wake_message( message );
807 ignore_reselect = 0;
808 reselect_read_queue( reader );
811 static int pipe_end_read( struct fd *fd, struct async *async, file_pos_t pos )
813 struct pipe_end *pipe_end = get_fd_user( fd );
815 if (!use_server_io( pipe_end )) return no_fd_read( fd, async, pos );
817 if (!pipe_end->connection && list_empty( &pipe_end->message_queue ))
819 set_error( STATUS_PIPE_BROKEN );
820 return 0;
823 queue_async( &pipe_end->read_q, async );
824 reselect_read_queue( pipe_end );
825 set_error( STATUS_PENDING );
826 return 1;
829 static int pipe_end_write( struct fd *fd, struct async *async, file_pos_t pos )
831 struct pipe_end *write_end = get_fd_user( fd );
832 struct pipe_end *read_end = write_end->connection;
833 struct pipe_message *message;
835 if (!use_server_io( write_end )) return no_fd_write( fd, async, pos );
837 if (!read_end)
839 set_error( STATUS_PIPE_DISCONNECTED );
840 return 0;
843 if (!(message = mem_alloc( sizeof(*message) ))) return 0;
844 message->async = (struct async *)grab_object( async );
845 message->iosb = async_get_iosb( async );
846 message->read_pos = 0;
847 list_add_tail( &read_end->message_queue, &message->entry );
849 queue_async( &write_end->write_q, async );
850 reselect_write_queue( write_end );
851 set_error( STATUS_PENDING );
852 return 1;
855 static void pipe_end_queue_async( struct fd *fd, struct async *async, int type, int count )
857 struct pipe_end *pipe_end = get_fd_user( fd );
858 if (use_server_io( pipe_end )) no_fd_queue_async( fd, async, type, count );
859 else default_fd_queue_async( fd, async, type, count );
862 static void pipe_end_reselect_async( struct fd *fd, struct async_queue *queue )
864 struct pipe_end *pipe_end = get_fd_user( fd );
866 if (ignore_reselect) return;
868 if (!use_server_io( pipe_end ))
869 default_fd_reselect_async( fd, queue );
870 else if (&pipe_end->write_q == queue)
871 reselect_write_queue( pipe_end );
872 else if (&pipe_end->read_q == queue)
873 reselect_read_queue( pipe_end );
876 static enum server_fd_type pipe_end_get_fd_type( struct fd *fd )
878 return FD_TYPE_PIPE;
881 static int pipe_end_peek( struct pipe_end *pipe_end )
883 unsigned reply_size = get_reply_max_size();
884 FILE_PIPE_PEEK_BUFFER *buffer;
885 struct pipe_message *message;
886 data_size_t avail = 0;
887 data_size_t message_length = 0;
889 if (!use_server_io( pipe_end ))
891 set_error( STATUS_NOT_SUPPORTED );
892 return 0;
895 if (reply_size < offsetof( FILE_PIPE_PEEK_BUFFER, Data ))
897 set_error( STATUS_INFO_LENGTH_MISMATCH );
898 return 0;
900 reply_size -= offsetof( FILE_PIPE_PEEK_BUFFER, Data );
902 LIST_FOR_EACH_ENTRY( message, &pipe_end->message_queue, struct pipe_message, entry )
903 avail += message->iosb->in_size - message->read_pos;
905 if (avail)
907 message = LIST_ENTRY( list_head(&pipe_end->message_queue), struct pipe_message, entry );
908 message_length = message->iosb->in_size - message->read_pos;
909 reply_size = min( reply_size, message_length );
911 else reply_size = 0;
913 if (!(buffer = set_reply_data_size( offsetof( FILE_PIPE_PEEK_BUFFER, Data[reply_size] )))) return 0;
914 buffer->NamedPipeState = 0; /* FIXME */
915 buffer->ReadDataAvailable = avail;
916 buffer->NumberOfMessages = 0; /* FIXME */
917 buffer->MessageLength = message_length;
918 if (reply_size) memcpy( buffer->Data, (const char *)message->iosb->in_data + message->read_pos, reply_size );
919 return 1;
922 static int pipe_server_ioctl( struct fd *fd, ioctl_code_t code, struct async *async )
924 struct pipe_server *server = get_fd_user( fd );
926 switch(code)
928 case FSCTL_PIPE_LISTEN:
929 switch(server->state)
931 case ps_idle_server:
932 case ps_wait_connect:
933 fd_queue_async( server->ioctl_fd, async, ASYNC_TYPE_WAIT );
934 set_server_state( server, ps_wait_open );
935 async_wake_up( &server->pipe->waiters, STATUS_SUCCESS );
936 set_error( STATUS_PENDING );
937 return 1;
938 case ps_connected_server:
939 set_error( STATUS_PIPE_CONNECTED );
940 break;
941 case ps_wait_disconnect:
942 set_error( STATUS_NO_DATA_DETECTED );
943 break;
944 case ps_wait_open:
945 set_error( STATUS_INVALID_HANDLE );
946 break;
948 return 0;
950 case FSCTL_PIPE_DISCONNECT:
951 switch(server->state)
953 case ps_connected_server:
954 assert( server->client );
955 assert( server->client->pipe_end.fd );
957 notify_empty( server );
959 /* dump the client and server fds - client loses all waiting data */
960 pipe_end_disconnect( &server->pipe_end, STATUS_PIPE_DISCONNECTED );
961 do_disconnect( server );
962 server->client->server = NULL;
963 server->client = NULL;
964 set_server_state( server, ps_wait_connect );
965 break;
966 case ps_wait_disconnect:
967 assert( !server->client );
968 pipe_end_disconnect( &server->pipe_end, STATUS_PIPE_DISCONNECTED );
969 do_disconnect( server );
970 set_server_state( server, ps_wait_connect );
971 break;
972 case ps_idle_server:
973 case ps_wait_open:
974 set_error( STATUS_PIPE_LISTENING );
975 return 0;
976 case ps_wait_connect:
977 set_error( STATUS_PIPE_DISCONNECTED );
978 return 0;
980 return 1;
982 case FSCTL_PIPE_PEEK:
983 return pipe_end_peek( &server->pipe_end );
985 default:
986 return default_fd_ioctl( fd, code, async );
990 static int pipe_client_ioctl( struct fd *fd, ioctl_code_t code, struct async *async )
992 struct pipe_client *client = get_fd_user( fd );
994 switch(code)
996 case FSCTL_PIPE_PEEK:
997 return pipe_end_peek( &client->pipe_end );
999 default:
1000 return default_fd_ioctl( fd, code, async );
1004 static struct pipe_server *get_pipe_server_obj( struct process *process,
1005 obj_handle_t handle, unsigned int access )
1007 struct object *obj;
1008 obj = get_handle_obj( process, handle, access, &pipe_server_ops );
1009 return (struct pipe_server *) obj;
1012 static void init_pipe_end( struct pipe_end *pipe_end, unsigned int pipe_flags, data_size_t buffer_size )
1014 pipe_end->fd = NULL;
1015 pipe_end->flags = pipe_flags;
1016 pipe_end->connection = NULL;
1017 pipe_end->buffer_size = buffer_size;
1018 init_async_queue( &pipe_end->read_q );
1019 init_async_queue( &pipe_end->write_q );
1020 list_init( &pipe_end->message_queue );
1023 static struct pipe_server *create_pipe_server( struct named_pipe *pipe, unsigned int options,
1024 unsigned int pipe_flags )
1026 struct pipe_server *server;
1028 server = alloc_object( &pipe_server_ops );
1029 if (!server)
1030 return NULL;
1032 server->pipe = pipe;
1033 server->client = NULL;
1034 server->flush_poll = NULL;
1035 server->options = options;
1036 init_pipe_end( &server->pipe_end, pipe_flags, pipe->insize );
1038 list_add_head( &pipe->servers, &server->entry );
1039 grab_object( pipe );
1040 if (!(server->ioctl_fd = alloc_pseudo_fd( &pipe_server_fd_ops, &server->pipe_end.obj, options )))
1042 release_object( server );
1043 return NULL;
1045 set_fd_signaled( server->ioctl_fd, 1 );
1046 set_server_state( server, ps_idle_server );
1047 return server;
1050 static struct pipe_client *create_pipe_client( unsigned int flags, unsigned int pipe_flags, data_size_t buffer_size )
1052 struct pipe_client *client;
1054 client = alloc_object( &pipe_client_ops );
1055 if (!client)
1056 return NULL;
1058 client->server = NULL;
1059 client->flags = flags;
1060 init_pipe_end( &client->pipe_end, pipe_flags, buffer_size );
1062 return client;
1065 static struct pipe_server *find_available_server( struct named_pipe *pipe )
1067 struct pipe_server *server;
1069 /* look for pipe servers that are listening */
1070 LIST_FOR_EACH_ENTRY( server, &pipe->servers, struct pipe_server, entry )
1072 if (server->state == ps_wait_open)
1073 return (struct pipe_server *)grab_object( server );
1076 /* fall back to pipe servers that are idle */
1077 LIST_FOR_EACH_ENTRY( server, &pipe->servers, struct pipe_server, entry )
1079 if (server->state == ps_idle_server)
1080 return (struct pipe_server *)grab_object( server );
1083 return NULL;
1086 static int named_pipe_link_name( struct object *obj, struct object_name *name, struct object *parent )
1088 struct named_pipe_device *dev = (struct named_pipe_device *)parent;
1090 if (parent->ops != &named_pipe_device_ops)
1092 set_error( STATUS_OBJECT_NAME_INVALID );
1093 return 0;
1095 namespace_add( dev->pipes, name );
1096 name->parent = grab_object( parent );
1097 return 1;
1100 static struct object *named_pipe_open_file( struct object *obj, unsigned int access,
1101 unsigned int sharing, unsigned int options )
1103 struct named_pipe *pipe = (struct named_pipe *)obj;
1104 struct pipe_server *server;
1105 struct pipe_client *client;
1106 unsigned int pipe_sharing;
1107 int fds[2];
1109 if (!(server = find_available_server( pipe )))
1111 set_error( STATUS_PIPE_NOT_AVAILABLE );
1112 return NULL;
1115 pipe_sharing = server->pipe->sharing;
1116 if (((access & GENERIC_READ) && !(pipe_sharing & FILE_SHARE_READ)) ||
1117 ((access & GENERIC_WRITE) && !(pipe_sharing & FILE_SHARE_WRITE)))
1119 set_error( STATUS_ACCESS_DENIED );
1120 release_object( server );
1121 return NULL;
1124 if ((client = create_pipe_client( options, pipe->flags, pipe->outsize )))
1126 if (use_server_io( &server->pipe_end ))
1128 client->pipe_end.fd = alloc_pseudo_fd( &pipe_client_fd_ops, &client->pipe_end.obj, options );
1129 if (client->pipe_end.fd)
1131 set_fd_signaled( client->pipe_end.fd, 1 );
1132 server->pipe_end.fd = (struct fd *)grab_object( server->ioctl_fd );
1133 set_no_fd_status( server->ioctl_fd, STATUS_BAD_DEVICE_TYPE );
1135 else
1137 release_object( client );
1138 client = NULL;
1141 else if (!socketpair( PF_UNIX, SOCK_STREAM, 0, fds ))
1143 assert( !server->pipe_end.fd );
1145 fcntl( fds[0], F_SETFL, O_NONBLOCK );
1146 fcntl( fds[1], F_SETFL, O_NONBLOCK );
1148 if (pipe->insize)
1150 setsockopt( fds[0], SOL_SOCKET, SO_RCVBUF, &pipe->insize, sizeof(pipe->insize) );
1151 setsockopt( fds[1], SOL_SOCKET, SO_RCVBUF, &pipe->insize, sizeof(pipe->insize) );
1153 if (pipe->outsize)
1155 setsockopt( fds[0], SOL_SOCKET, SO_SNDBUF, &pipe->outsize, sizeof(pipe->outsize) );
1156 setsockopt( fds[1], SOL_SOCKET, SO_SNDBUF, &pipe->outsize, sizeof(pipe->outsize) );
1159 client->pipe_end.fd = create_anonymous_fd( &pipe_client_fd_ops, fds[1], &client->pipe_end.obj, options );
1160 server->pipe_end.fd = create_anonymous_fd( &pipe_server_fd_ops, fds[0], &server->pipe_end.obj, server->options );
1161 if (client->pipe_end.fd && server->pipe_end.fd)
1163 fd_copy_completion( server->ioctl_fd, server->pipe_end.fd );
1165 else
1167 release_object( client );
1168 client = NULL;
1171 else
1173 file_set_error();
1174 release_object( client );
1175 client = NULL;
1177 if (client)
1179 allow_fd_caching( client->pipe_end.fd );
1180 allow_fd_caching( server->pipe_end.fd );
1181 if (server->state == ps_wait_open)
1182 fd_async_wake_up( server->ioctl_fd, ASYNC_TYPE_WAIT, STATUS_SUCCESS );
1183 set_server_state( server, ps_connected_server );
1184 server->client = client;
1185 client->server = server;
1186 server->pipe_end.connection = &client->pipe_end;
1187 client->pipe_end.connection = &server->pipe_end;
1190 release_object( server );
1191 return &client->pipe_end.obj;
1194 static int named_pipe_device_ioctl( struct fd *fd, ioctl_code_t code, struct async *async )
1196 struct named_pipe_device *device = get_fd_user( fd );
1198 switch(code)
1200 case FSCTL_PIPE_WAIT:
1202 const FILE_PIPE_WAIT_FOR_BUFFER *buffer = get_req_data();
1203 data_size_t size = get_req_data_size();
1204 struct named_pipe *pipe;
1205 struct pipe_server *server;
1206 struct unicode_str name;
1207 timeout_t when;
1209 if (size < sizeof(*buffer) ||
1210 size < FIELD_OFFSET(FILE_PIPE_WAIT_FOR_BUFFER, Name[buffer->NameLength/sizeof(WCHAR)]))
1212 set_error( STATUS_INVALID_PARAMETER );
1213 return 0;
1215 name.str = buffer->Name;
1216 name.len = (buffer->NameLength / sizeof(WCHAR)) * sizeof(WCHAR);
1217 if (!(pipe = open_named_object( &device->obj, &named_pipe_ops, &name, 0 ))) return 0;
1219 if (!(server = find_available_server( pipe )))
1221 queue_async( &pipe->waiters, async );
1222 when = buffer->TimeoutSpecified ? buffer->Timeout.QuadPart : pipe->timeout;
1223 async_set_timeout( async, when, STATUS_IO_TIMEOUT );
1224 release_object( pipe );
1225 set_error( STATUS_PENDING );
1226 return 1;
1229 release_object( server );
1230 release_object( pipe );
1231 return 0;
1234 default:
1235 return default_fd_ioctl( fd, code, async );
1240 DECL_HANDLER(create_named_pipe)
1242 struct named_pipe *pipe;
1243 struct pipe_server *server;
1244 struct unicode_str name;
1245 struct object *root;
1246 const struct security_descriptor *sd;
1247 const struct object_attributes *objattr = get_req_object_attributes( &sd, &name, &root );
1249 if (!objattr) return;
1251 if (!req->sharing || (req->sharing & ~(FILE_SHARE_READ | FILE_SHARE_WRITE)) ||
1252 (!(req->flags & NAMED_PIPE_MESSAGE_STREAM_WRITE) && (req->flags & NAMED_PIPE_MESSAGE_STREAM_READ)))
1254 if (root) release_object( root );
1255 set_error( STATUS_INVALID_PARAMETER );
1256 return;
1259 if (!name.len) /* pipes need a root directory even without a name */
1261 if (!objattr->rootdir)
1263 set_error( STATUS_OBJECT_PATH_SYNTAX_BAD );
1264 return;
1266 if (!(root = get_directory_obj( current->process, objattr->rootdir ))) return;
1269 pipe = create_named_object( root, &named_pipe_ops, &name, objattr->attributes | OBJ_OPENIF, NULL );
1271 if (root) release_object( root );
1272 if (!pipe) return;
1274 if (get_error() != STATUS_OBJECT_NAME_EXISTS)
1276 /* initialize it if it didn't already exist */
1277 pipe->instances = 0;
1278 init_async_queue( &pipe->waiters );
1279 list_init( &pipe->servers );
1280 pipe->insize = req->insize;
1281 pipe->outsize = req->outsize;
1282 pipe->maxinstances = req->maxinstances;
1283 pipe->timeout = req->timeout;
1284 pipe->flags = req->flags & NAMED_PIPE_MESSAGE_STREAM_WRITE;
1285 pipe->sharing = req->sharing;
1287 else
1289 if (pipe->maxinstances <= pipe->instances)
1291 set_error( STATUS_INSTANCE_NOT_AVAILABLE );
1292 release_object( pipe );
1293 return;
1295 if (pipe->sharing != req->sharing)
1297 set_error( STATUS_ACCESS_DENIED );
1298 release_object( pipe );
1299 return;
1301 clear_error(); /* clear the name collision */
1304 server = create_pipe_server( pipe, req->options, req->flags );
1305 if (server)
1307 reply->handle = alloc_handle( current->process, server, req->access, objattr->attributes );
1308 server->pipe->instances++;
1309 if (sd) default_set_sd( &server->pipe_end.obj, sd, OWNER_SECURITY_INFORMATION |
1310 GROUP_SECURITY_INFORMATION |
1311 DACL_SECURITY_INFORMATION |
1312 SACL_SECURITY_INFORMATION );
1313 release_object( server );
1316 release_object( pipe );
1319 DECL_HANDLER(get_named_pipe_info)
1321 struct pipe_server *server;
1322 struct pipe_client *client = NULL;
1324 server = get_pipe_server_obj( current->process, req->handle, FILE_READ_ATTRIBUTES );
1325 if (!server)
1327 if (get_error() != STATUS_OBJECT_TYPE_MISMATCH)
1328 return;
1330 clear_error();
1331 client = (struct pipe_client *)get_handle_obj( current->process, req->handle,
1332 0, &pipe_client_ops );
1333 if (!client) return;
1334 server = client->server;
1337 reply->flags = client ? client->pipe_end.flags : server->pipe_end.flags;
1338 if (server)
1340 reply->sharing = server->pipe->sharing;
1341 reply->maxinstances = server->pipe->maxinstances;
1342 reply->instances = server->pipe->instances;
1343 reply->insize = server->pipe->insize;
1344 reply->outsize = server->pipe->outsize;
1347 if (client)
1348 release_object(client);
1349 else
1351 reply->flags |= NAMED_PIPE_SERVER_END;
1352 release_object(server);
1356 DECL_HANDLER(set_named_pipe_info)
1358 struct pipe_server *server;
1359 struct pipe_client *client = NULL;
1361 server = get_pipe_server_obj( current->process, req->handle, FILE_WRITE_ATTRIBUTES );
1362 if (!server)
1364 if (get_error() != STATUS_OBJECT_TYPE_MISMATCH)
1365 return;
1367 clear_error();
1368 client = (struct pipe_client *)get_handle_obj( current->process, req->handle,
1369 0, &pipe_client_ops );
1370 if (!client) return;
1371 if (!(server = client->server))
1373 release_object( client );
1374 return;
1378 if ((req->flags & ~(NAMED_PIPE_MESSAGE_STREAM_READ | NAMED_PIPE_NONBLOCKING_MODE)) ||
1379 ((req->flags & NAMED_PIPE_MESSAGE_STREAM_READ) && !(server->pipe->flags & NAMED_PIPE_MESSAGE_STREAM_WRITE)))
1381 set_error( STATUS_INVALID_PARAMETER );
1383 else if (client)
1385 client->pipe_end.flags = server->pipe->flags | req->flags;
1387 else
1389 server->pipe_end.flags = server->pipe->flags | req->flags;
1392 if (client)
1393 release_object(client);
1394 else
1395 release_object(server);