2 Copyright © 2008, The AROS Development Team. All rights reserved.
8 - buffered one-directional pipes
9 - both ends opened at once with FSA_PIPE
10 - FSA_OPEN and FSA_OPEN_FILE used only for duplicating locks (file name "")
11 - no support for multiple readers (however there may be multiple duplicated pipe ends)
16 #include <aros/debug.h>
18 #include <exec/errors.h>
19 #include <exec/resident.h>
20 #include <exec/memory.h>
21 #include <exec/semaphores.h>
22 #include <exec/lists.h>
23 #include <exec/nodes.h>
24 #include <proto/exec.h>
25 #include <utility/tagitem.h>
26 #include <dos/dosextens.h>
27 #include <dos/filesystem.h>
28 #include <clib/macros.h>
29 #include <proto/dos.h>
30 #include <aros/symbolsets.h>
31 #include <aros/asmcall.h>
32 #if defined(__GNUC__) || defined(__INTEL_COMPILER)
33 #include "xpipe_handler_gcc.h"
36 #include LC_LIBDEFS_FILE
41 AROS_UFP3(LONG
, xpipeproc
,
42 AROS_UFPA(char *,argstr
,A0
),
43 AROS_UFPA(ULONG
,argsize
,D0
),
44 AROS_UFPA(struct ExecBase
*,SysBase
,A6
));
49 struct IOFileSys
*iofs
;
56 struct List readers
; /* List of XPipeEnd structures */
57 struct List writers
; /* List of XPipeEnd structures */
58 struct List pendingreads
;
59 struct List pendingwrites
;
76 unsigned int bufpos
; /* Used only for readers */
79 enum { NONE
, PROCESS_READS
, PROCESS_WRITES
};
80 enum { NT_PIPEROOT
, NT_PIPEEND
};
82 static ULONG
SendRequest (struct XPipeBase
*xpipebase
, struct IOFileSys
*iofs
, BOOL abort
);
84 static int GM_UNIQUENAME(Init
)(LIBBASETYPEPTR xpipebase
)
86 DOSBase
= (struct DosLibrary
*)OpenLibrary("dos.library",39);
90 struct TagItem taglist
[]=
92 {NP_Entry
, (IPTR
)xpipeproc
},
93 {NP_Name
, (IPTR
)"xpipe.handler process"},
94 {NP_UserData
, (IPTR
)xpipebase
},
98 xpipebase
->proc
= CreateNewProc(taglist
);
103 CloseLibrary((struct Library
*)DOSBase
);
109 static int GM_UNIQUENAME(Open
)
111 LIBBASETYPEPTR xpipebase
,
112 struct IOFileSys
*iofs
,
117 struct XPipeUnit
*un
;
119 /* Mark message as replied. */
120 iofs
->IOFS
.io_Message
.mn_Node
.ln_Type
=NT_REPLYMSG
;
122 un
= AllocVec(sizeof(struct XPipeUnit
), MEMF_PUBLIC
| MEMF_CLEAR
);
126 un
->node
.ln_Type
= NT_PIPEROOT
;
127 iofs
->IOFS
.io_Unit
=(struct Unit
*)un
;
128 iofs
->IOFS
.io_Device
=&xpipebase
->device
;
132 iofs
->io_DosError
=ERROR_NO_FREE_STORE
;
133 iofs
->IOFS
.io_Error
=IOERR_OPENFAIL
;
138 static int GM_UNIQUENAME(Close
)
140 LIBBASETYPEPTR xpipebase
,
141 struct IOFileSys
*iofs
144 struct XPipeUnit
*un
= (struct XPipeUnit
*)iofs
->IOFS
.io_Unit
;
146 if(!IsListEmpty(&un
->pipes
))
148 iofs
->io_DosError
=ERROR_OBJECT_IN_USE
;
159 static int GM_UNIQUENAME(Expunge
)(LIBBASETYPEPTR xpipebase
)
162 This function is single-threaded by exec by calling Forbid.
163 Never break the Forbid() or strange things might happen.
166 SendRequest(xpipebase
, NULL
, TRUE
);
168 /* Free all resources */
169 CloseLibrary((struct Library
*)xpipebase
->dosbase
);
174 AROS_LH1(void, beginio
,
175 AROS_LHA(struct IOFileSys
*, iofs
, A1
),
176 struct XPipeBase
*, xpipebase
, 5, Xpipe
)
180 BOOL enqueued
= FALSE
;
182 D(bug("[xpipe] COMMAND %d\n", iofs
->IOFS
.io_Command
));
183 switch(iofs
->IOFS
.io_Command
)
193 error
= SendRequest(xpipebase
, iofs
, FALSE
);
198 error
= ERROR_SEEK_ERROR
;
200 case FSA_IS_FILESYSTEM
:
201 iofs
->io_Union
.io_IS_FILESYSTEM
.io_IsFilesystem
= FALSE
;
204 case FSA_DELETE_OBJECT
:
205 case FSA_SET_FILE_SIZE
:
206 case FSA_EXAMINE_ALL
:
207 case FSA_CREATE_HARDLINK
:
208 case FSA_CREATE_SOFTLINK
:
210 case FSA_EXAMINE_NEXT
:
211 error
= ERROR_NOT_IMPLEMENTED
;
215 error
= ERROR_ACTION_NOT_KNOWN
;
220 iofs
->io_DosError
=error
;
222 /* If the quick bit is not set and the request hasn't been redirected
223 send the message to the port
225 if(!(iofs
->IOFS
.io_Flags
&IOF_QUICK
) && !enqueued
)
226 ReplyMsg(&iofs
->IOFS
.io_Message
);
231 ADD2INITLIB(GM_UNIQUENAME(Init
), 0)
232 ADD2OPENDEV(GM_UNIQUENAME(Open
), 0)
233 ADD2CLOSEDEV(GM_UNIQUENAME(Close
), 0)
234 ADD2EXPUNGELIB(GM_UNIQUENAME(Expunge
), 0)
236 AROS_LH1(LONG
, abortio
,
237 AROS_LHA(struct IOFileSys
*, iofs
, A1
),
238 struct XPipeBase
*, xpipebase
, 6, Xpipe
)
242 return SendRequest(xpipebase
, iofs
, TRUE
);
247 static ULONG
SendRequest(struct XPipeBase
*xpipebase
, struct IOFileSys
*iofs
, BOOL abort
)
249 struct XPipeMessage
*msg
= AllocVec(sizeof(*msg
), MEMF_PUBLIC
);
253 msg
->msg
.mn_Node
.ln_Type
= NT_MESSAGE
;
254 msg
->msg
.mn_Node
.ln_Name
= "XPIPEMSG";
255 msg
->msg
.mn_Length
= sizeof(struct XPipeMessage
);
261 iofs
->IOFS
.io_Message
.mn_Node
.ln_Type
= NT_MESSAGE
;
262 iofs
->IOFS
.io_Flags
&= ~IOF_QUICK
;
265 PutMsg(&xpipebase
->proc
->pr_MsgPort
, (struct Message
*)msg
);
270 return ERROR_NO_FREE_STORE
;
273 /* The helper process */
275 #define SendBack(msg, err) \
277 msg->iofs->io_DosError = err; \
278 ReplyMsg(&(msg)->iofs->IOFS.io_Message); \
282 LONG
DuplicatePipeEnd(struct XPipeEnd
**oldend
, ULONG mode
)
284 struct XPipeEnd
*newend
;
285 struct XPipe
*pipe
= (*oldend
)->pipe
;
287 if ((newend
= AllocVec(sizeof(struct XPipeEnd
), MEMF_PUBLIC
| MEMF_CLEAR
)) == NULL
)
289 return ERROR_NO_FREE_STORE
;;
292 newend
->node
.ln_Type
= NT_PIPEEND
;
294 newend
->pipe
= (*oldend
)->pipe
;
295 newend
->bufpos
= (*oldend
)->bufpos
;
297 if (mode
& FMF_WRITE
)
299 D(bug("[xpipe] Cloned pipe end is a writer\n"));
300 ADDTAIL(&pipe
->writers
, newend
);
304 D(bug("[xpipe] Cloned pipe end is a reader\n"));
305 ADDTAIL(&pipe
->readers
, newend
);
312 ULONG
ReadWouldBlock (struct XPipe
*pipe
, ULONG length
)
316 ListLength (&pipe
->writers
, numwriters
);
318 /* If there are no writers, we won't block */
322 if(pipe
->bufstart
<= pipe
->bufend
)
323 curlen
= pipe
->bufend
- pipe
->bufstart
;
325 curlen
= (pipe
->bufend
+ sizeof(pipe
->buffer
) - pipe
->bufstart
) % sizeof(pipe
->buffer
);
333 ULONG
WriteWouldBlock (struct XPipe
*pipe
, ULONG length
)
336 if(pipe
->bufstart
<= pipe
->bufend
)
337 curlen
= pipe
->bufend
- pipe
->bufstart
;
339 curlen
= (pipe
->bufend
+ sizeof(pipe
->buffer
) - pipe
->bufstart
) % sizeof(pipe
->buffer
);
341 if(sizeof(pipe
->buffer
) - curlen
- 1 < length
)
347 LONG
ReadFromPipe (struct XPipe
*pipe
, APTR buffer
, ULONG length
)
349 D(bug("[xpipe] ReadFromPipe(%p, %p, %d)\n", pipe
, buffer
, length
));
350 D(bug("[xpipe] buffer before read from %d to %d\n", pipe
->bufstart
, pipe
->bufend
));
351 if(pipe
->bufstart
<= pipe
->bufend
)
353 /* read bytes from between bufstart and bufend */
354 int bytestoread
= MIN((pipe
->bufend
- pipe
->bufstart
), length
);
355 CopyMem((APTR
)((IPTR
)pipe
->buffer
+ pipe
->bufstart
), buffer
, bytestoread
);
357 pipe
->bufstart
+= bytestoread
;
358 D(bug("[xpipe] buffer after read from %d to %d\n", pipe
->bufstart
, pipe
->bufend
));
363 /* read bytes from bufstart to the end of the buffer */
364 int bytestoread1
= MIN((sizeof(pipe
->buffer
) - pipe
->bufstart
), length
);
365 CopyMem((APTR
)((IPTR
)pipe
->buffer
+ pipe
->bufstart
), buffer
, bytestoread1
);
366 pipe
->bufstart
= (pipe
->bufstart
+ bytestoread1
) % sizeof(pipe
->buffer
);
367 if(bytestoread1
== length
)
369 D(bug("[xpipe] buffer after read from %d to %d\n", pipe
->bufstart
, pipe
->bufend
));
372 /* and if it's not enough then from the beginning of the buffer to
374 int bytestoread2
= MIN(pipe
->bufend
, (length
- bytestoread1
));
375 CopyMem(pipe
->buffer
, (APTR
)((IPTR
) buffer
+ bytestoread1
), bytestoread2
);
376 pipe
->bufstart
= bytestoread2
;
377 D(bug("[xpipe] buffer after read from %d to %d\n", pipe
->bufstart
, pipe
->bufend
));
378 return bytestoread1
+ bytestoread2
;
382 LONG
WriteToPipe (struct XPipe
*pipe
, APTR buffer
, ULONG length
)
384 D(bug("[xpipe] WriteToPipe(%p, %p, %d)\n", pipe
, buffer
, length
));
385 D(bug("[xpipe] buffer before write from %d to %d\n", pipe
->bufstart
, pipe
->bufend
));
386 if(pipe
->bufstart
> pipe
->bufend
)
388 /* write between bufend and bufstart, left one byte */
389 int bytestowrite
= MIN((pipe
->bufstart
- pipe
->bufend
- 1), length
);
390 CopyMem(buffer
, (APTR
)((IPTR
)pipe
->buffer
+ pipe
->bufend
), bytestowrite
);
392 pipe
->bufend
+= bytestowrite
;
393 D(bug("[xpipe] buffer after write from %d to %d\n", pipe
->bufstart
, pipe
->bufend
));
398 /* write bytes from bufend to the end of the buffer */
399 int bytestowrite1
= MIN((sizeof(pipe
->buffer
) - pipe
->bufend
), length
);
400 /* if there's no space left at the beginning then make sure we leave
401 at least one byte free at the end */
402 if(pipe
->bufstart
== 0 && bytestowrite1
+ pipe
->bufend
== sizeof(pipe
->buffer
)) bytestowrite1
--;
403 CopyMem(buffer
, (APTR
)((IPTR
)pipe
->buffer
+ pipe
->bufend
), bytestowrite1
);
404 pipe
->bufend
= (pipe
->bufend
+ bytestowrite1
) % sizeof(pipe
->buffer
);
406 D(bug("[xpipe] wrote %d bytes in first part\n", bytestowrite1
));
407 /* skip the second part if there's no space free at the beginning */
408 if(bytestowrite1
== length
|| pipe
->bufstart
== 0)
410 D(bug("[xpipe] buffer after write from %d to %d\n", pipe
->bufstart
, pipe
->bufend
));
411 return bytestowrite1
;
413 /* and if it's not enough then from the beginning of the buffer to
414 bufstart - 1 (one byte free left) */
415 int bytestowrite2
= MIN(pipe
->bufstart
- 1, (length
- bytestowrite1
));
416 D(bug("[xpipe] wrote %d bytes in second part\n", bytestowrite2
));
417 CopyMem((APTR
)((IPTR
) buffer
+ bytestowrite1
), pipe
->buffer
, bytestowrite2
);
418 pipe
->bufend
= bytestowrite2
;
419 D(bug("[xpipe] buffer after write from %d to %d\n", pipe
->bufstart
, pipe
->bufend
));
420 return bytestowrite1
+ bytestowrite2
;
424 /* Pump data from write request buffers to read request buffers through
426 void pump(struct XPipe
*pipe
, int operation
)
428 while(operation
!= NONE
)
434 D(bug("[xpipe] Processing pending reads\n"));
436 /* Ok, we have some new data, use them to continue with pending reads */
437 struct XPipeMessage
*readmsg
;
438 struct Node
*tempnode
;
439 ForeachNodeSafe (&pipe
->pendingreads
, readmsg
, tempnode
)
441 struct XPipeEnd
*reader
= (struct XPipeEnd
*) readmsg
->iofs
->IOFS
.io_Unit
;
442 struct XPipe
*pipe
= reader
->pipe
;
443 int nread
= readmsg
->curlen
;
445 readmsg
->curlen
+= ReadFromPipe (
447 (APTR
) ((IPTR
) readmsg
->iofs
->io_Union
.io_READ_WRITE
.io_Buffer
+ readmsg
->curlen
),
448 readmsg
->iofs
->io_Union
.io_READ_WRITE
.io_Length
- readmsg
->curlen
450 nread
= readmsg
->curlen
- nread
;
451 D(bug("[xpipe] managed to pump %d bytes from pipe (%d from requested %d)\n", nread
, readmsg
->curlen
, readmsg
->iofs
->io_Union
.io_READ_WRITE
.io_Length
));
454 /* We managed to free some buffer space, now we can
456 operation
= PROCESS_WRITES
;
459 if(readmsg
->iofs
->io_Union
.io_READ_WRITE
.io_Length
== readmsg
->curlen
)
461 D(bug("[xpipe] completed read request %p\n", readmsg
));
462 /* hooray, we managed to read all data */
463 Remove ((struct Node
*) readmsg
);
464 SendBack (readmsg
, 0);
468 /* Since we didn't manage to complete this request,
469 there's no point in processing other */
477 D(bug("[xpipe] Processing pending writes\n"));
479 /* Ok, we have some new data, use them to continue with pending reads */
480 struct XPipeMessage
*writemsg
;
481 struct Node
*tempnode
;
482 ForeachNodeSafe (&pipe
->pendingwrites
, writemsg
, tempnode
)
484 struct XPipeEnd
*writer
= (struct XPipeEnd
*) writemsg
->iofs
->IOFS
.io_Unit
;
485 struct XPipe
*pipe
= writer
->pipe
;
486 int nwrote
= writemsg
->curlen
;
488 writemsg
->curlen
+= WriteToPipe (
490 (APTR
) ((IPTR
) writemsg
->iofs
->io_Union
.io_READ_WRITE
.io_Buffer
+ writemsg
->curlen
),
491 writemsg
->iofs
->io_Union
.io_READ_WRITE
.io_Length
- writemsg
->curlen
493 nwrote
= writemsg
->curlen
- nwrote
;
494 D(bug("[xpipe] managed to pump %d bytes to pipe (%d from requested %d)\n", nwrote
, writemsg
->curlen
, writemsg
->iofs
->io_Union
.io_READ_WRITE
.io_Length
));
497 /* We managed to write some data, now we can process
498 some pending reads. */
499 operation
= PROCESS_READS
;
502 if(writemsg
->iofs
->io_Union
.io_READ_WRITE
.io_Length
== writemsg
->curlen
)
504 D(bug("[xpipe] completed write request %p\n", writemsg
));
505 /* hooray, we managed to write all data */
506 Remove ((struct Node
*) writemsg
);
507 SendBack (writemsg
, 0);
511 /* Since we didn't manage to complete this request,
512 there's no point in processing other */
522 AROS_UFH3(LONG
, xpipeproc
,
523 AROS_UFHA(char *,argstr
,A0
),
524 AROS_UFHA(ULONG
,argsize
,D0
),
525 AROS_UFHA(struct ExecBase
*,SysBase
,A6
))
530 struct XPipeBase
*xpipebase
;
531 struct XPipeMessage
*msg
;
535 me
= (struct Process
*)FindTask(0);
536 xpipebase
= me
->pr_Task
.tc_UserData
;
541 WaitPort (&(me
->pr_MsgPort
));
545 (msg
=(struct XPipeMessage
*)GetMsg (&(me
->pr_MsgPort
))) &&
546 (cont
= (msg
->iofs
!= 0))
549 D(bug("[xpipe] Message received.\n"));
551 pn
= (struct Node
*)msg
->iofs
->IOFS
.io_Unit
;
553 switch (msg
->iofs
->IOFS
.io_Command
)
557 D(bug("[xpipe] Cmd is FSA_OPEN\n"));
559 if(pn
->ln_Type
!= NT_PIPEEND
)
561 SendBack (msg
, ERROR_OBJECT_NOT_FOUND
);
565 if (msg
->iofs
->io_Union
.io_OPEN
.io_Filename
[0])
567 SendBack (msg
, ERROR_OBJECT_NOT_FOUND
);
571 D(bug("[xpipe] Cloning pipe end: %p with mode \n", msg
->iofs
->IOFS
.io_Unit
, msg
->iofs
->io_Union
.io_OPEN
.io_FileMode
));
573 LONG error
= DuplicatePipeEnd (
574 (struct XPipeEnd
**) &msg
->iofs
->IOFS
.io_Unit
,
575 msg
->iofs
->io_Union
.io_OPEN
.io_FileMode
578 D(bug("[xpipe] Cloned pipe end: %p\n", msg
->iofs
->IOFS
.io_Unit
));
580 SendBack (msg
, error
);
585 D(bug("[xpipe] Cmd is FSA_OPEN_FILE\n"));
587 if(pn
->ln_Type
!= NT_PIPEEND
)
589 SendBack (msg
, ERROR_OBJECT_NOT_FOUND
);
593 if (msg
->iofs
->io_Union
.io_OPEN_FILE
.io_Filename
[0])
595 SendBack (msg
, ERROR_OBJECT_NOT_FOUND
);
599 D(bug("[xpipe] Cloning pipe end: %p with mode %d\n", msg
->iofs
->IOFS
.io_Unit
, msg
->iofs
->io_Union
.io_OPEN_FILE
.io_FileMode
));
601 LONG error
= DuplicatePipeEnd (
602 (struct XPipeEnd
**) &msg
->iofs
->IOFS
.io_Unit
,
603 msg
->iofs
->io_Union
.io_OPEN_FILE
.io_FileMode
605 D(bug("[xpipe] Cloned pipe end: %p\n", msg
->iofs
->IOFS
.io_Unit
));
606 SendBack (msg
, error
);
611 struct XPipeEnd
*reader
, *writer
;
613 struct XPipeUnit
*un
;
615 D(bug("[xpipe] Cmd is FSA_PIPE\n"));
617 if(pn
->ln_Type
!= NT_PIPEROOT
)
619 SendBack (msg
, ERROR_OBJECT_WRONG_TYPE
);
622 un
= (struct XPipeUnit
*) pn
;
624 if ((reader
= AllocVec (sizeof(struct XPipeEnd
), MEMF_PUBLIC
| MEMF_CLEAR
)) == NULL
)
626 SendBack (msg
, ERROR_NO_FREE_STORE
);
629 reader
->node
.ln_Type
= NT_PIPEEND
;
630 reader
->mode
= FMF_READ
;
633 if ((writer
= AllocVec (sizeof(struct XPipeEnd
), MEMF_PUBLIC
| MEMF_CLEAR
)) == NULL
)
636 SendBack (msg
, ERROR_NO_FREE_STORE
);
639 writer
->node
.ln_Type
= NT_PIPEEND
;
640 writer
->mode
= FMF_WRITE
;
643 if ((pipe
= AllocVec (sizeof(struct XPipe
), MEMF_PUBLIC
| MEMF_CLEAR
)) == NULL
)
647 SendBack (msg
, ERROR_NO_FREE_STORE
);
652 NEWLIST (&pipe
->readers
);
653 NEWLIST (&pipe
->pendingreads
);
654 ADDTAIL (&pipe
->readers
, reader
);
655 NEWLIST (&pipe
->writers
);
656 NEWLIST (&pipe
->pendingwrites
);
657 ADDTAIL (&pipe
->writers
, writer
);
658 ADDTAIL (&un
->pipes
, pipe
);
660 D(bug("[xpipe] Opened pipe with read end: %p and write end: %p\n", reader
, writer
));
662 msg
->iofs
->IOFS
.io_Unit
= (struct Unit
*) reader
;
663 msg
->iofs
->io_Union
.io_PIPE
.io_Writer
= (struct Unit
*) writer
;
672 D(bug("[xpipe] Cmd is FSA_CLOSE\n"));
674 if(pn
->ln_Type
!= NT_PIPEEND
)
676 SendBack (msg
, ERROR_OBJECT_NOT_FOUND
);
679 struct XPipeEnd
*pipeend
= (struct XPipeEnd
*) pn
;
680 struct XPipe
*pipe
= pipeend
->pipe
;
682 D(bug("[xpipe] Closing pipe end %p\n", pipeend
));
684 Remove ((struct Node
*) pipeend
);
687 int numreaders
, numwriters
;
688 ListLength (&pipe
->readers
, numreaders
);
689 ListLength (&pipe
->writers
, numwriters
);
691 /* If all writing ends are closed we have EOF, so finish all pending read requests */
694 D(bug("[xpipe] Processing pending reads\n"));
695 struct XPipeMessage
*readmsg
;
696 struct Node
*tempnode
;
697 ForeachNodeSafe (&pipe
->pendingreads
, readmsg
, tempnode
)
699 D(bug("[xpipe] Pending read msg %p\n", readmsg
));
700 readmsg
->iofs
->io_Union
.io_READ_WRITE
.io_Length
= readmsg
->curlen
;
701 Remove((struct Node
*)readmsg
);
702 SendBack (readmsg
, 0);
706 /* If there are no pipe ends left, close the pipe */
707 if (numreaders
== 0 && numwriters
== 0)
709 D(bug("[xpipe] No ends left, closing the pipe\n"));
710 Remove ((struct Node
*) pipe
);
719 struct ExAllData
*ead
= msg
->iofs
->io_Union
.io_EXAMINE
.io_ead
;
720 const ULONG type
= msg
->iofs
->io_Union
.io_EXAMINE
.io_Mode
;
721 const ULONG size
= msg
->iofs
->io_Union
.io_EXAMINE
.io_Size
;
724 static const ULONG sizes
[]=
727 offsetof(struct ExAllData
,ed_Type
),
728 offsetof(struct ExAllData
,ed_Size
),
729 offsetof(struct ExAllData
,ed_Prot
),
730 offsetof(struct ExAllData
,ed_Days
),
731 offsetof(struct ExAllData
,ed_Comment
),
732 offsetof(struct ExAllData
,ed_OwnerUID
),
733 sizeof(struct ExAllData
)
736 D(bug("[xpipe] Cmd is EXAMINE\n"));
740 D(bug("[xpipe] The user requested an invalid type\n"));
741 SendBack (msg
, ERROR_BAD_NUMBER
);
745 next
= (STRPTR
)ead
+ sizes
[type
];
746 end
= (STRPTR
)ead
+ size
;
748 if(next
>end
) /* > is correct. Not >= */
750 SendBack (msg
, ERROR_BUFFER_OVERFLOW
);
757 ead
->ed_OwnerUID
= 0;
758 ead
->ed_OwnerGID
= 0;
762 ead
->ed_Comment
= NULL
;
780 ead
->ed_Type
= ST_PIPEFILE
;
786 ead
->ed_Name
[0] = '\0';
797 D(bug("[xpipe] Cmd is FSA_WRITE.\n"));
799 if(pn
->ln_Type
!= NT_PIPEEND
)
801 SendBack (msg
, ERROR_OBJECT_NOT_FOUND
);
804 struct XPipeEnd
*writer
= (struct XPipeEnd
*) pn
;
806 D(bug("[xpipe] Writer end %p\n", writer
));
807 struct XPipe
*pipe
= writer
->pipe
;
809 int length
= msg
->iofs
->io_Union
.io_READ_WRITE
.io_Length
;
810 D(bug("[xpipe] length is %d.\n", length
));
812 if (!(writer
->mode
& FMF_WRITE
))
814 D(bug("[xpipe] User tried to write to the wrong end of the pipe.\n"));
815 SendBack (msg
, ERROR_WRITE_PROTECTED
);
819 ListLength (&pipe
->readers
, numreaders
);
822 D(bug("[xpipe] There are no open read ends: PIPE BROKEN.\n"));
823 SendBack (msg
, ERROR_BROKEN_PIPE
);
827 if (WriteWouldBlock (pipe
, length
))
829 if(writer
->mode
& FMF_NONBLOCK
)
831 D(bug("[xpipe] There is not enough space and the pipe is in nonblocking mode, so return EWOULDBLOCK\n"));
832 SendBack (msg
, ERROR_WOULD_BLOCK
);
836 /* Write as much as we can, enqueue the request and reply when it's finished */
837 D(bug("[xpipe] Enqueing the message\n"));
838 AddTail (&pipe
->pendingwrites
, (struct Node
*)msg
);
841 msg
->curlen
= WriteToPipe (
843 msg
->iofs
->io_Union
.io_READ_WRITE
.io_Buffer
,
846 D(bug("[xpipe] Wrote %d bytes from requested %d\n", msg
->curlen
, length
));
848 if(length
== msg
->curlen
)
850 /* Managed to write everything */
855 pump(pipe
, PROCESS_READS
);
861 D(bug("[xpipe] Cmd is FSA_READ.\n"));
863 if(pn
->ln_Type
!= NT_PIPEEND
)
865 SendBack (msg
, ERROR_OBJECT_NOT_FOUND
);
868 struct XPipeEnd
*reader
= (struct XPipeEnd
*) pn
;
870 D(bug("[xpipe] Reader end %p\n", reader
));
871 struct XPipe
*pipe
= reader
->pipe
;
873 ListLength(&pipe
->writers
, numwriters
);
874 int length
= msg
->iofs
->io_Union
.io_READ_WRITE
.io_Length
;
875 D(bug("[xpipe] length is %d.\n", length
));
877 if (!(reader
->mode
& FMF_READ
))
879 D(bug("[xpipe] User tried to read from the wrong end of the pipe.\n"));
880 SendBack(msg
, ERROR_READ_PROTECTED
);
884 if (ReadWouldBlock (pipe
, length
))
886 if(reader
->mode
& FMF_NONBLOCK
)
888 D(bug("[xpipe] There is not enough data to read and the pipe is in nonblocking mode, so return EWOULDBLOCK\n"));
889 SendBack(msg
, ERROR_WOULD_BLOCK
);
893 /* Read as much as we can, enqueue the request and reply when it's finished */
894 D(bug("[xpipe] Enqueing the message\n"));
895 AddTail (&pipe
->pendingreads
, (struct Node
*)msg
);
898 msg
->curlen
= ReadFromPipe (
900 (APTR
) msg
->iofs
->io_Union
.io_READ_WRITE
.io_Buffer
,
903 D(bug("[xpipe] Read %d bytes\n", msg
->curlen
));
905 if(length
== msg
->curlen
|| numwriters
== 0)
907 /* Managed to read everything or there are no more writers (EOF) */
908 msg
->iofs
->io_Union
.io_READ_WRITE
.io_Length
= msg
->curlen
;
913 pump(pipe
, PROCESS_WRITES
);