2 Copyright (C) 2007 Klarälvdalens Datakonsult AB
4 KDPipeIODevice is free software; you can redistribute it and/or
5 modify it under the terms of the GNU Library General Public
6 License as published by the Free Software Foundation; either
7 version 2 of the License, or (at your option) any later version.
9 KDPipeIODevice is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Library General Public License for more details.
14 You should have received a copy of the GNU Library General Public License
15 along with KDPipeIODevice; see the file COPYING.LIB. If not, write to the
16 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
17 Boston, MA 02110-1301, USA.
20 #include <config-kleopatra.h>
22 #include "kdpipeiodevice.h"
24 #include <QtCore/QDebug>
25 #include <QtCore/QMutex>
26 #include <QtCore/QPointer>
27 #include <QtCore/QThread>
28 #include <QtCore/QWaitCondition>
46 #ifndef KDAB_CHECK_THIS
47 # define KDAB_CHECK_CTOR (void)1
48 # define KDAB_CHECK_DTOR KDAB_CHECK_CTOR
49 # define KDAB_CHECK_THIS KDAB_CHECK_CTOR
52 #define LOCKED( d ) const QMutexLocker locker( &d->mutex )
53 #define synchronized( d ) if ( int i = 0 ) {} else for ( const QMutexLocker locker( &d->mutex ) ; !i ; ++i )
55 const unsigned int BUFFER_SIZE
= 4096;
56 const bool ALLOW_QIODEVICE_BUFFERING
= true;
59 KDPipeIODevice::DebugLevel s_debugLevel
= KDPipeIODevice::NoDebug
;
62 #define qDebug if( s_debugLevel == KDPipeIODevice::NoDebug ){}else qDebug
66 class Reader
: public QThread
{
69 Reader( int fd
, Qt::HANDLE handle
);
72 qint64
readData( char * data
, qint64 maxSize
);
74 unsigned int bytesInBuffer() const {
75 return ( wptr
+ sizeof buffer
- rptr
) % sizeof buffer
;
78 bool bufferFull() const {
79 return bytesInBuffer() == sizeof buffer
- 1;
82 bool bufferEmpty() const {
83 return bytesInBuffer() == 0;
86 bool bufferContains( char ch
) {
87 const unsigned int bib
= bytesInBuffer();
88 for ( unsigned int i
= rptr
; i
< rptr
+ bib
; ++i
)
89 if ( buffer
[i
%sizeof buffer
] == ch
)
94 void notifyReadyRead();
100 /* reimp */ void run();
107 QWaitCondition waitForCancelCondition
;
108 QWaitCondition bufferNotFullCondition
;
109 QWaitCondition bufferNotEmptyCondition
;
110 QWaitCondition hasStarted
;
111 QWaitCondition readyReadSentCondition
;
112 QWaitCondition blockedConsumerIsDoneCondition
;
119 bool consumerBlocksOnUs
;
122 unsigned int rptr
, wptr
;
123 char buffer
[BUFFER_SIZE
+1]; // need to keep one byte free to detect empty state
127 Reader::Reader( int fd_
, Qt::HANDLE handle_
) : QThread(),
131 bufferNotFullCondition(),
132 bufferNotEmptyCondition(),
137 eofShortCut( false ),
140 consumerBlocksOnUs( false ),
150 class Writer
: public QThread
{
153 Writer( int fd
, Qt::HANDLE handle
);
156 qint64
writeData( const char * data
, qint64 size
);
158 unsigned int bytesInBuffer() const { return numBytesInBuffer
; }
160 bool bufferFull() const {
161 return numBytesInBuffer
== sizeof buffer
;
164 bool bufferEmpty() const {
165 return numBytesInBuffer
== 0;
169 void bytesWritten( qint64
);
172 /* reimp */ void run();
179 QWaitCondition bufferEmptyCondition
;
180 QWaitCondition bufferNotEmptyCondition
;
181 QWaitCondition hasStarted
;
186 unsigned int numBytesInBuffer
;
187 char buffer
[BUFFER_SIZE
];
191 Writer::Writer( int fd_
, Qt::HANDLE handle_
) : QThread(),
195 bufferEmptyCondition(),
196 bufferNotEmptyCondition(),
201 numBytesInBuffer( 0 )
209 class KDPipeIODevice::Private
: public QObject
{
211 friend class ::KDPipeIODevice
;
212 KDPipeIODevice
* const q
;
215 explicit Private( KDPipeIODevice
* qq
);
218 bool doOpen( int, Qt::HANDLE
, OpenMode
);
219 bool startReaderThread();
220 bool startWriterThread();
224 void emitReadyRead();
231 bool triedToStartReader
;
232 bool triedToStartWriter
;
235 KDPipeIODevice::DebugLevel
KDPipeIODevice::debugLevel()
240 void KDPipeIODevice::setDebugLevel( KDPipeIODevice::DebugLevel level
)
242 s_debugLevel
= level
;
245 KDPipeIODevice::Private::Private( KDPipeIODevice
* qq
) : QObject( qq
), q( qq
),
250 triedToStartReader( false ),
251 triedToStartWriter( false )
256 KDPipeIODevice::Private::~Private() {
257 qDebug( "KDPipeIODevice::~Private(): Destroying %p", ( void* ) q
);
260 KDPipeIODevice::KDPipeIODevice( QObject
* p
)
261 : QIODevice( p
), d( new Private( this ) )
266 KDPipeIODevice::KDPipeIODevice( int fd
, OpenMode mode
, QObject
* p
)
267 : QIODevice( p
), d( new Private( this ) )
273 KDPipeIODevice::KDPipeIODevice( Qt::HANDLE handle
, OpenMode mode
, QObject
* p
)
274 : QIODevice( p
), d( new Private( this ) )
277 open( handle
, mode
);
280 KDPipeIODevice::~KDPipeIODevice() { KDAB_CHECK_DTOR
;
287 bool KDPipeIODevice::open( int fd
, OpenMode mode
) { KDAB_CHECK_THIS
;
289 return d
->doOpen( fd
, (HANDLE
)_get_osfhandle( fd
), mode
);
291 return d
->doOpen( fd
, 0, mode
);
295 bool KDPipeIODevice::open( Qt::HANDLE h
, OpenMode mode
) { KDAB_CHECK_THIS
;
297 return d
->doOpen( -1, h
, mode
);
301 assert( !"KDPipeIODevice::open( Qt::HANDLE, OpenMode ) should never be called except on Windows." );
306 bool KDPipeIODevice::Private::startReaderThread()
308 if ( triedToStartReader
)
310 triedToStartReader
= true;
311 if ( reader
&& !reader
->isRunning() && !reader
->isFinished() ) {
312 qDebug("KDPipeIODevice::Private::startReaderThread(): locking reader (CONSUMER THREAD)" );
314 qDebug("KDPipeIODevice::Private::startReaderThread(): locked reader (CONSUMER THREAD)" );
315 reader
->start( QThread::HighestPriority
);
316 qDebug("KDPipeIODevice::Private::startReaderThread(): waiting for hasStarted (CONSUMER THREAD)" );
317 const bool hasStarted
= reader
->hasStarted
.wait( &reader
->mutex
, 1000 );
318 qDebug("KDPipeIODevice::Private::startReaderThread(): returned from hasStarted (CONSUMER THREAD)" );
325 bool KDPipeIODevice::Private::startWriterThread()
327 if ( triedToStartWriter
)
329 triedToStartWriter
= true;
330 if ( writer
&& !writer
->isRunning() && !writer
->isFinished() ) {
333 writer
->start( QThread::HighestPriority
);
334 if ( !writer
->hasStarted
.wait( &writer
->mutex
, 1000 ) )
340 void KDPipeIODevice::Private::emitReadyRead()
342 QPointer
<Private
> thisPointer( this );
343 qDebug( "KDPipeIODevice::Private::emitReadyRead %p", ( void* ) this );
349 bool mustNotify
= false;
351 qDebug( "KDPipeIODevice::Private::emitReadyRead %p: locking reader (CONSUMER THREAD)", (
353 synchronized( reader
) {
354 qDebug( "KDPipeIODevice::Private::emitReadyRead %p: locked reader (CONSUMER THREAD)", (
356 reader
->readyReadSentCondition
.wakeAll();
357 mustNotify
= !reader
->bufferEmpty() && reader
->isReading
;
358 qDebug( "KDPipeIODevice::Private::emitReadyRead %p: buffer empty: %d reader in ReadFile: %d", ( void* )this, reader
->bufferEmpty(), reader
->isReading
);
361 qDebug( "KDPipeIODevice::Private::emitReadyRead %p leaving", ( void* ) this );
365 bool KDPipeIODevice::Private::doOpen( int fd_
, Qt::HANDLE handle_
, OpenMode mode_
) {
378 if ( !(mode_
& ReadWrite
) )
379 return false; // need to have at least read -or- write
382 std::auto_ptr
<Reader
> reader_
;
383 std::auto_ptr
<Writer
> writer_
;
385 if ( mode_
& ReadOnly
) {
386 reader_
.reset( new Reader( fd_
, handle_
) );
387 qDebug( "KDPipeIODevice::doOpen (%p): created reader (%p) for fd %d", ( void * )this,
388 ( void* )reader_
.get(), fd_
);
389 connect( reader_
.get(), SIGNAL(readyRead()), this, SLOT(emitReadyRead()),
390 Qt::QueuedConnection
);
392 if ( mode_
& WriteOnly
) {
393 writer_
.reset( new Writer( fd_
, handle_
) );
394 qDebug( "KDPipeIODevice::doOpen (%p): created writer (%p) for fd %d",
395 ( void * )this, ( void* )writer_
.get(), fd_
);
396 connect( writer_
.get(), SIGNAL(bytesWritten(qint64
)), q
, SIGNAL(bytesWritten(qint64
)),
397 Qt::QueuedConnection
);
403 reader
= reader_
.release();
404 writer
= writer_
.release();
406 q
->setOpenMode( mode_
|Unbuffered
);
410 int KDPipeIODevice::descriptor() const { KDAB_CHECK_THIS
;
415 Qt::HANDLE
KDPipeIODevice::handle() const { KDAB_CHECK_THIS
;
419 qint64
KDPipeIODevice::bytesAvailable() const { KDAB_CHECK_THIS
;
420 const qint64 base
= QIODevice::bytesAvailable();
421 if ( !d
->triedToStartReader
) {
422 d
->startReaderThread();
426 synchronized( d
->reader
) {
427 const qint64 inBuffer
= d
->reader
->bytesInBuffer();
428 return base
+ inBuffer
;
434 qint64
KDPipeIODevice::bytesToWrite() const { KDAB_CHECK_THIS
;
435 d
->startWriterThread();
436 const qint64 base
= QIODevice::bytesToWrite();
438 synchronized( d
->writer
) return base
+ d
->writer
->bytesInBuffer();
443 bool KDPipeIODevice::canReadLine() const { KDAB_CHECK_THIS
;
444 d
->startReaderThread();
445 if ( QIODevice::canReadLine() )
448 synchronized( d
->reader
) return d
->reader
->bufferContains( '\n' );
453 bool KDPipeIODevice::isSequential() const {
457 bool KDPipeIODevice::atEnd() const { KDAB_CHECK_THIS
;
458 d
->startReaderThread();
459 if ( !QIODevice::atEnd() ) {
460 qDebug( "%p: KDPipeIODevice::atEnd returns false since QIODevice::atEnd does (with bytesAvailable=%ld)", ( void * )this, static_cast<long>(bytesAvailable()) );
465 if ( d
->reader
->eofShortCut
)
468 const bool eof
= ( d
->reader
->error
|| d
->reader
->eof
) && d
->reader
->bufferEmpty();
470 if ( !d
->reader
->error
&& !d
->reader
->eof
) {
471 qDebug( "%p: KDPipeIODevice::atEnd returns false since !reader->error && !reader->eof",
474 if ( !d
->reader
->bufferEmpty() ) {
475 qDebug( "%p: KDPipeIODevice::atEnd returns false since !reader->bufferEmpty()",
482 bool KDPipeIODevice::waitForBytesWritten( int msecs
) { KDAB_CHECK_THIS
;
483 d
->startWriterThread();
484 Writer
* const w
= d
->writer
;
488 qDebug( "KDPipeIODevice::waitForBytesWritten (%p,w=%p): entered locked area",
489 ( void* )this, ( void* ) w
);
490 return w
->bufferEmpty() || w
->error
|| w
->bufferEmptyCondition
.wait( &w
->mutex
, msecs
) ;
493 bool KDPipeIODevice::waitForReadyRead( int msecs
) { KDAB_CHECK_THIS
;
494 qDebug( "KDPipeIODEvice::waitForReadyRead()(%p)", ( void* ) this);
495 d
->startReaderThread();
496 if ( ALLOW_QIODEVICE_BUFFERING
) {
497 if ( bytesAvailable() > 0 )
500 Reader
* const r
= d
->reader
;
501 if ( !r
|| r
->eofShortCut
)
504 if ( r
->bytesInBuffer() != 0 || r
->eof
|| r
->error
)
506 assert( false ); // ### wtf?
507 return r
->bufferNotEmptyCondition
.wait( &r
->mutex
, msecs
) ;
510 template <typename T
>
511 class TemporaryValue
{
513 TemporaryValue( T
& var_
, const T
& tv
) : var( var_
), oldValue( var_
) { var
= tv
; }
514 ~TemporaryValue() { var
= oldValue
; }
521 bool KDPipeIODevice::readWouldBlock() const
523 d
->startReaderThread();
525 return d
->reader
->bufferEmpty() && !d
->reader
->eof
&& !d
->reader
->error
;
528 bool KDPipeIODevice::writeWouldBlock() const
530 d
->startWriterThread();
532 return !d
->writer
->bufferEmpty() && !d
->writer
->error
;
536 qint64
KDPipeIODevice::readData( char * data
, qint64 maxSize
) { KDAB_CHECK_THIS
;
537 qDebug( "%p: KDPipeIODevice::readData: data=%p, maxSize=%lld", ( void* )this, data
, maxSize
);
538 d
->startReaderThread();
539 Reader
* const r
= d
->reader
;
544 //assert( r->isRunning() ); // wrong (might be eof, error)
545 assert( data
|| maxSize
== 0 );
546 assert( maxSize
>= 0 );
548 if ( r
->eofShortCut
) {
549 qDebug( "%p: KDPipeIODevice::readData: hit eofShortCut, returning 0", ( void* )this );
556 if ( ALLOW_QIODEVICE_BUFFERING
) {
557 if ( bytesAvailable() > 0 )
558 maxSize
= std::min( maxSize
, bytesAvailable() ); // don't block
560 qDebug( "%p: KDPipeIODevice::readData: try to lock reader (CONSUMER THREAD)", ( void* ) this );
562 qDebug( "%p: KDPipeIODevice::readData: locked reader (CONSUMER THREAD)", ( void* ) this );
564 r
->readyReadSentCondition
.wakeAll();
565 if ( /* maxSize > 0 && */ r
->bufferEmpty() && !r
->error
&& !r
->eof
) { // ### block on maxSize == 0?
566 qDebug( "%p: KDPipeIODevice::readData: waiting for bufferNotEmptyCondition (CONSUMER THREAD)", ( void*) this );
567 const TemporaryValue
<bool> tmp( d
->reader
->consumerBlocksOnUs
, true );
568 r
->bufferNotEmptyCondition
.wait( &r
->mutex
);
569 r
->blockedConsumerIsDoneCondition
.wakeAll();
570 qDebug( "%p: KDPipeIODevice::readData: woke up from bufferNotEmptyCondition (CONSUMER THREAD)",
574 if ( r
->bufferEmpty() ) {
575 qDebug( "%p: KDPipeIODevice::readData: got empty buffer, signal eof", ( void* ) this );
576 // woken with an empty buffer must mean either EOF or error:
577 assert( r
->eof
|| r
->error
);
578 r
->eofShortCut
= true;
579 return r
->eof
? 0 : -1 ;
582 qDebug( "%p: KDPipeIODevice::readData: got bufferNotEmptyCondition, trying to read %lld bytes",
583 ( void* )this, maxSize
);
584 const qint64 bytesRead
= r
->readData( data
, maxSize
);
585 qDebug( "%p: KDPipeIODevice::readData: read %lld bytes", ( void* )this, bytesRead
);
586 qDebug( "%p (fd=%d): KDPipeIODevice::readData: %s", ( void* )this, d
->fd
, data
);
591 qint64
Reader::readData( char * data
, qint64 maxSize
) {
592 qint64 numRead
= rptr
< wptr
? wptr
- rptr
: sizeof buffer
- rptr
;
593 if ( numRead
> maxSize
)
596 qDebug( "%p: KDPipeIODevice::readData: data=%p, maxSize=%lld; rptr=%u, wptr=%u (bytesInBuffer=%u); -> numRead=%lld",
597 ( void* )this, data
, maxSize
, rptr
, wptr
, bytesInBuffer(), numRead
);
599 memcpy( data
, buffer
+ rptr
, numRead
);
601 rptr
= ( rptr
+ numRead
) % sizeof buffer
;
603 if ( !bufferFull() ) {
604 qDebug( "%p: KDPipeIODevice::readData: signal bufferNotFullCondition", ( void* ) this );
605 bufferNotFullCondition
.wakeAll();
611 qint64
KDPipeIODevice::writeData( const char * data
, qint64 size
) { KDAB_CHECK_THIS
;
612 d
->startWriterThread();
613 Writer
* const w
= d
->writer
;
616 assert( w
->error
|| w
->isRunning() );
617 assert( data
|| size
== 0 );
622 while ( !w
->error
&& !w
->bufferEmpty() ) {
623 qDebug( "%p: KDPipeIODevice::writeData: wait for empty buffer", ( void* ) this );
624 w
->bufferEmptyCondition
.wait( &w
->mutex
);
625 qDebug( "%p: KDPipeIODevice::writeData: empty buffer signaled", ( void* ) this );
631 assert( w
->bufferEmpty() );
633 return w
->writeData( data
, size
);
636 qint64
Writer::writeData( const char * data
, qint64 size
) {
637 assert( bufferEmpty() );
639 if ( size
> static_cast<qint64
>( sizeof buffer
) )
640 size
= sizeof buffer
;
642 memcpy( buffer
, data
, size
);
644 numBytesInBuffer
= size
;
646 if ( !bufferEmpty() ) {
647 bufferNotEmptyCondition
.wakeAll();
652 void KDPipeIODevice::Private::stopThreads()
654 if ( triedToStartWriter
)
656 if ( writer
&& q
->bytesToWrite() > 0 )
657 q
->waitForBytesWritten( -1 );
659 assert( q
->bytesToWrite() == 0 );
661 if ( Reader
* & r
= reader
) {
662 disconnect( r
, SIGNAL(readyRead()), this, SLOT(emitReadyRead()) );
664 // tell thread to cancel:
666 // and wake it, so it can terminate:
667 r
->waitForCancelCondition
.wakeAll();
668 r
->bufferNotFullCondition
.wakeAll();
669 r
->readyReadSentCondition
.wakeAll();
672 if ( Writer
* & w
= writer
) {
674 // tell thread to cancel:
676 // and wake it, so it can terminate:
677 w
->bufferNotEmptyCondition
.wakeAll();
682 void KDPipeIODevice::close() { KDAB_CHECK_THIS
;
683 qDebug( "KDPipeIODevice::close(%p)", ( void* ) this );
687 // tell clients we're about to close:
691 #define waitAndDelete( t ) if ( t ) { t->wait(); QThread* const t2 = t; t = 0; delete t2; }
692 qDebug( "KPipeIODevice::close(%p): wait and closing writer %p", ( void* )this, ( void* ) d
->writer
);
693 waitAndDelete( d
->writer
);
694 qDebug( "KPipeIODevice::close(%p): wait and closing reader %p", ( void* )this, ( void* ) d
->reader
);
697 d
->reader
->readyReadSentCondition
.wakeAll();
699 waitAndDelete( d
->reader
);
705 CloseHandle( d
->handle
);
710 setOpenMode( NotOpen
);
719 // too bad QThread doesn't have that itself; a signal isn't enough
720 hasStarted
.wakeAll();
722 qDebug( "%p: Reader::run: started",( void* ) this );
725 if ( !cancel
&& ( eof
|| error
) ) {
726 //notify the client until the buffer is empty and then once
727 //again so he receives eof/error. After that, wait for him
729 const bool wasEmpty
= bufferEmpty();
730 qDebug( "%p: Reader::run: received eof(%d) or error(%d), waking everyone", ( void* )this, eof
, error
);
732 if ( !cancel
&& wasEmpty
)
733 waitForCancelCondition
.wait( &mutex
);
734 } else if ( !cancel
&& !bufferFull() && !bufferEmpty() ) {
735 qDebug( "%p: Reader::run: buffer no longer empty, waking everyone", ( void* ) this );
739 while ( !cancel
&& !error
&& bufferFull() ) {
741 if ( !cancel
&& bufferFull() ) {
742 qDebug( "%p: Reader::run: buffer is full, going to sleep", ( void* )this );
743 bufferNotFullCondition
.wait( &mutex
);
748 qDebug( "%p: Reader::run: detected cancel", ( void* )this );
752 if ( !eof
&& !error
) {
753 if ( rptr
== wptr
) // optimize for larger chunks in case the buffer is empty
756 unsigned int numBytes
= ( rptr
+ sizeof buffer
- wptr
- 1 ) % sizeof buffer
;
757 if ( numBytes
> sizeof buffer
- wptr
)
758 numBytes
= sizeof buffer
- wptr
;
760 qDebug( "%p: Reader::run: rptr=%d, wptr=%d -> numBytes=%d", ( void* )this, rptr
, wptr
, numBytes
);
762 assert( numBytes
> 0 );
764 qDebug( "%p: Reader::run: trying to read %d bytes from fd %d", ( void* )this, numBytes
, fd
);
769 const bool ok
= ReadFile( handle
, buffer
+ wptr
, numBytes
, &numRead
, 0 );
773 if ( numRead
== 0 ) {
774 qDebug( "%p: Reader::run: got eof (numRead==0)", ( void* ) this );
778 errorCode
= static_cast<int>( GetLastError() );
779 if ( errorCode
== ERROR_BROKEN_PIPE
) {
780 assert( numRead
== 0 );
781 qDebug( "%p: Reader::run: got eof (broken pipe)", ( void* ) this );
784 assert( numRead
== 0 );
785 qDebug( "%p: Reader::run: got error: %s (%d)", ( void* ) this, strerror( errorCode
), errorCode
);
793 numRead
= ::read( fd
, buffer
+ wptr
, numBytes
);
794 } while ( numRead
== -1 && errno
== EINTR
);
800 qDebug( "%p: Reader::run: got error: %d", ( void* )this, errorCode
);
801 } else if ( numRead
== 0 ) {
802 qDebug( "%p: Reader::run: eof detected", ( void* )this );
806 qDebug( "%p (fd=%d): Reader::run: read %ld bytes", ( void* ) this, fd
, static_cast<long>(numRead
) );
807 qDebug( "%p (fd=%d): Reader::run: %s", ( void* )this, fd
, buffer
);
810 qDebug( "%p: Reader::run: buffer before: rptr=%4d, wptr=%4d", ( void* )this, rptr
, wptr
);
811 wptr
= ( wptr
+ numRead
) % sizeof buffer
;
812 qDebug( "%p: Reader::run: buffer after: rptr=%4d, wptr=%4d", ( void* )this, rptr
, wptr
);
817 qDebug( "%p: Reader::run: terminated", ( void* )this );
820 void Reader::notifyReadyRead()
822 qDebug( "notifyReadyRead: %d bytes available", bytesInBuffer() );
825 if ( consumerBlocksOnUs
) {
826 bufferNotEmptyCondition
.wakeAll();
827 blockedConsumerIsDoneCondition
.wait( &mutex
);
830 qDebug( "notifyReadyRead: emit signal" );
832 readyReadSentCondition
.wait( &mutex
);
833 qDebug( "notifyReadyRead: returning from waiting, leave" );
840 // too bad QThread doesn't have that itself; a signal isn't enough
841 hasStarted
.wakeAll();
843 qDebug( ) << this << "Writer::run: started";
847 while ( !cancel
&& bufferEmpty() ) {
848 qDebug( ) << this << "Writer::run: buffer is empty, wake bufferEmptyCond listeners";
849 bufferEmptyCondition
.wakeAll();
850 emit
bytesWritten( 0 );
851 qDebug( ) << this << "Writer::run: buffer is empty, going to sleep";
852 bufferNotEmptyCondition
.wait( &mutex
);
853 qDebug( ) << this << "Writer::run: woke up";
857 qDebug( ) << this << "Writer::run: detected cancel";
861 assert( numBytesInBuffer
> 0 );
863 qDebug( ) << this << "Writer::run: Trying to write " << numBytesInBuffer
<< "bytes";
864 qint64 totalWritten
= 0;
869 qDebug( "%p (fd=%d): Writer::run: buffer before WriteFile (numBytes=%lld): %s:",
870 ( void*) this, fd
, numBytesInBuffer
, buffer
);
871 qDebug( "%p (fd=%d): Writer::run: Going into WriteFile", ( void* ) this, fd
);
872 if ( !WriteFile( handle
, buffer
+ totalWritten
, numBytesInBuffer
- totalWritten
, &numWritten
, 0 ) ) {
874 errorCode
= static_cast<int>( GetLastError() );
875 qDebug( "%p: Writer::run: got error code: %d", ( void* ) this, errorCode
);
882 numWritten
= ::write( fd
, buffer
+ totalWritten
, numBytesInBuffer
- totalWritten
);
883 } while ( numWritten
== -1 && errno
== EINTR
);
885 if ( numWritten
< 0 ) {
888 qDebug( "%p: Writer::run: got error code: %s (%d)", ( void* )this, strerror( errorCode
), errorCode
);
893 qDebug( "%p (fd=%d): Writer::run: buffer after WriteFile (numBytes=%u): %s:", ( void* )this, fd
, numBytesInBuffer
, buffer
);
894 totalWritten
+= numWritten
;
896 } while ( totalWritten
< numBytesInBuffer
);
898 qDebug() << this << "Writer::run: wrote " << totalWritten
<< "bytes";
899 numBytesInBuffer
= 0;
900 qDebug() << this << "Writer::run: buffer is empty, wake bufferEmptyCond listeners";
901 bufferEmptyCondition
.wakeAll();
902 emit
bytesWritten( totalWritten
);
905 qDebug() << this << "Writer::run: terminating";
906 numBytesInBuffer
= 0;
907 qDebug() << this << "Writer::run: buffer is empty, wake bufferEmptyCond listeners";
908 bufferEmptyCondition
.wakeAll();
909 emit
bytesWritten( 0 );
913 std::pair
<KDPipeIODevice
*,KDPipeIODevice
*> KDPipeIODevice::makePairOfConnectedPipes() {
914 KDPipeIODevice
* read
= 0;
915 KDPipeIODevice
* write
= 0;
919 SECURITY_ATTRIBUTES sa
;
920 memset( &sa
, 0, sizeof(sa
) );
921 sa
.nLength
= sizeof(sa
);
922 sa
.bInheritHandle
= TRUE
;
923 if ( CreatePipe( &rh
, &wh
, &sa
, BUFFER_SIZE
) ) {
924 read
= new KDPipeIODevice
;
925 read
->open( rh
, ReadOnly
);
926 write
= new KDPipeIODevice
;
927 write
->open( wh
, WriteOnly
);
931 if ( pipe( fds
) == 0 ) {
932 read
= new KDPipeIODevice
;
933 read
->open( fds
[0], ReadOnly
);
934 write
= new KDPipeIODevice
;
935 write
->open( fds
[1], WriteOnly
);
938 return std::make_pair( read
, write
);
941 #ifdef KDAB_DEFINE_CHECKS
942 KDAB_DEFINE_CHECKS( KDPipeIODevice
) {
944 assert( openMode() == NotOpen
);
945 assert( !d
->reader
);
946 assert( !d
->writer
);
948 assert( !d
->handle
);
953 assert( openMode() != NotOpen
);
954 assert( openMode() & ReadWrite
);
955 if ( openMode() & ReadOnly
) {
957 synchronized( d
->reader
)
958 assert( d
->reader
->eof
|| d
->reader
->error
|| d
->reader
->isRunning() );
960 if ( openMode() & WriteOnly
) {
962 synchronized( d
->writer
)
963 assert( d
->writer
->error
|| d
->writer
->isRunning() );
968 assert( d
->fd
>= 0 );
972 #endif // KDAB_DEFINE_CHECKS
974 #include "moc_kdpipeiodevice.cpp"
975 #include "kdpipeiodevice.moc"