Better wording
[kdepim.git] / kleopatra / utils / kdpipeiodevice.cpp
blobc0da352aed3a59a549d8223665c79ea1e465822c
1 /*
2 Copyright (C) 2007 Klarälvdalens Datakonsult AB
4 KDPipeIODevice is free software; you can redistribute it and/or
5 modify it under the terms of the GNU Library General Public
6 License as published by the Free Software Foundation; either
7 version 2 of the License, or (at your option) any later version.
9 KDPipeIODevice is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU Library General Public License for more details.
14 You should have received a copy of the GNU Library General Public License
15 along with KDPipeIODevice; see the file COPYING.LIB. If not, write to the
16 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
17 Boston, MA 02110-1301, USA.
20 #include <config-kleopatra.h>
22 #include "kdpipeiodevice.h"
24 #include <QtCore/QDebug>
25 #include <QtCore/QMutex>
26 #include <QtCore/QPointer>
27 #include <QtCore/QThread>
28 #include <QtCore/QWaitCondition>
30 #include <cassert>
31 #include <cstring>
32 #include <memory>
33 #include <algorithm>
35 #ifdef Q_OS_WIN32
36 # ifndef NOMINMAX
37 # define NOMINMAX
38 # endif
39 # include <windows.h>
40 # include <io.h>
41 #else
42 # include <unistd.h>
43 # include <errno.h>
44 #endif
46 #ifndef KDAB_CHECK_THIS
47 # define KDAB_CHECK_CTOR (void)1
48 # define KDAB_CHECK_DTOR KDAB_CHECK_CTOR
49 # define KDAB_CHECK_THIS KDAB_CHECK_CTOR
50 #endif
52 #define LOCKED( d ) const QMutexLocker locker( &d->mutex )
53 #define synchronized( d ) if ( int i = 0 ) {} else for ( const QMutexLocker locker( &d->mutex ) ; !i ; ++i )
55 const unsigned int BUFFER_SIZE = 4096;
56 const bool ALLOW_QIODEVICE_BUFFERING = true;
58 namespace {
59 KDPipeIODevice::DebugLevel s_debugLevel = KDPipeIODevice::NoDebug;
62 #define qDebug if( s_debugLevel == KDPipeIODevice::NoDebug ){}else qDebug
64 namespace {
66 class Reader : public QThread {
67 Q_OBJECT
68 public:
69 Reader( int fd, Qt::HANDLE handle );
70 ~Reader();
72 qint64 readData( char * data, qint64 maxSize );
74 unsigned int bytesInBuffer() const {
75 return ( wptr + sizeof buffer - rptr ) % sizeof buffer ;
78 bool bufferFull() const {
79 return bytesInBuffer() == sizeof buffer - 1;
82 bool bufferEmpty() const {
83 return bytesInBuffer() == 0;
86 bool bufferContains( char ch ) {
87 const unsigned int bib = bytesInBuffer();
88 for ( unsigned int i = rptr ; i < rptr + bib ; ++i )
89 if ( buffer[i%sizeof buffer] == ch )
90 return true;
91 return false;
94 void notifyReadyRead();
96 Q_SIGNALS:
97 void readyRead();
99 protected:
100 /* reimp */ void run();
102 private:
103 int fd;
104 Qt::HANDLE handle;
105 public:
106 QMutex mutex;
107 QWaitCondition waitForCancelCondition;
108 QWaitCondition bufferNotFullCondition;
109 QWaitCondition bufferNotEmptyCondition;
110 QWaitCondition hasStarted;
111 QWaitCondition readyReadSentCondition;
112 QWaitCondition blockedConsumerIsDoneCondition;
113 bool cancel;
114 bool eof;
115 bool error;
116 bool eofShortCut;
117 int errorCode;
118 bool isReading;
119 bool consumerBlocksOnUs;
121 private:
122 unsigned int rptr, wptr;
123 char buffer[BUFFER_SIZE+1]; // need to keep one byte free to detect empty state
127 Reader::Reader( int fd_, Qt::HANDLE handle_ ) : QThread(),
128 fd( fd_ ),
129 handle( handle_ ),
130 mutex(),
131 bufferNotFullCondition(),
132 bufferNotEmptyCondition(),
133 hasStarted(),
134 cancel( false ),
135 eof( false ),
136 error( false ),
137 eofShortCut( false ),
138 errorCode( 0 ),
139 isReading( false ),
140 consumerBlocksOnUs( false ),
141 rptr( 0 ),
142 wptr( 0 )
147 Reader::~Reader() {}
150 class Writer : public QThread {
151 Q_OBJECT
152 public:
153 Writer( int fd, Qt::HANDLE handle );
154 ~Writer();
156 qint64 writeData( const char * data, qint64 size );
158 unsigned int bytesInBuffer() const { return numBytesInBuffer; }
160 bool bufferFull() const {
161 return numBytesInBuffer == sizeof buffer;
164 bool bufferEmpty() const {
165 return numBytesInBuffer == 0;
168 Q_SIGNALS:
169 void bytesWritten( qint64 );
171 protected:
172 /* reimp */ void run();
174 private:
175 int fd;
176 Qt::HANDLE handle;
177 public:
178 QMutex mutex;
179 QWaitCondition bufferEmptyCondition;
180 QWaitCondition bufferNotEmptyCondition;
181 QWaitCondition hasStarted;
182 bool cancel;
183 bool error;
184 int errorCode;
185 private:
186 unsigned int numBytesInBuffer;
187 char buffer[BUFFER_SIZE];
191 Writer::Writer( int fd_, Qt::HANDLE handle_ ) : QThread(),
192 fd( fd_ ),
193 handle( handle_ ),
194 mutex(),
195 bufferEmptyCondition(),
196 bufferNotEmptyCondition(),
197 hasStarted(),
198 cancel( false ),
199 error( false ),
200 errorCode( 0 ),
201 numBytesInBuffer( 0 )
206 Writer::~Writer() {}
209 class KDPipeIODevice::Private : public QObject {
210 Q_OBJECT
211 friend class ::KDPipeIODevice;
212 KDPipeIODevice * const q;
213 public:
215 explicit Private( KDPipeIODevice * qq );
216 ~Private();
218 bool doOpen( int, Qt::HANDLE, OpenMode );
219 bool startReaderThread();
220 bool startWriterThread();
221 void stopThreads();
223 public Q_SLOTS:
224 void emitReadyRead();
226 private:
227 int fd;
228 Qt::HANDLE handle;
229 Reader * reader;
230 Writer * writer;
231 bool triedToStartReader;
232 bool triedToStartWriter;
235 KDPipeIODevice::DebugLevel KDPipeIODevice::debugLevel()
237 return s_debugLevel;
240 void KDPipeIODevice::setDebugLevel( KDPipeIODevice::DebugLevel level )
242 s_debugLevel = level;
245 KDPipeIODevice::Private::Private( KDPipeIODevice * qq ) : QObject( qq ), q( qq ),
246 fd( -1 ),
247 handle( 0 ),
248 reader( 0 ),
249 writer( 0 ),
250 triedToStartReader( false ),
251 triedToStartWriter( false )
256 KDPipeIODevice::Private::~Private() {
257 qDebug( "KDPipeIODevice::~Private(): Destroying %p", ( void* ) q );
260 KDPipeIODevice::KDPipeIODevice( QObject * p )
261 : QIODevice( p ), d( new Private( this ) )
263 KDAB_CHECK_CTOR;
266 KDPipeIODevice::KDPipeIODevice( int fd, OpenMode mode, QObject * p )
267 : QIODevice( p ), d( new Private( this ) )
269 KDAB_CHECK_CTOR;
270 open( fd, mode );
273 KDPipeIODevice::KDPipeIODevice( Qt::HANDLE handle, OpenMode mode, QObject * p )
274 : QIODevice( p ), d( new Private( this ) )
276 KDAB_CHECK_CTOR;
277 open( handle, mode );
280 KDPipeIODevice::~KDPipeIODevice() { KDAB_CHECK_DTOR;
281 if ( isOpen() )
282 close();
283 delete d; d = 0;
287 bool KDPipeIODevice::open( int fd, OpenMode mode ) { KDAB_CHECK_THIS;
288 #ifdef Q_OS_WIN32
289 return d->doOpen( fd, (HANDLE)_get_osfhandle( fd ), mode );
290 #else
291 return d->doOpen( fd, 0, mode );
292 #endif
295 bool KDPipeIODevice::open( Qt::HANDLE h, OpenMode mode ) { KDAB_CHECK_THIS;
296 #ifdef Q_OS_WIN32
297 return d->doOpen( -1, h, mode );
298 #else
299 Q_UNUSED( h );
300 Q_UNUSED( mode );
301 assert( !"KDPipeIODevice::open( Qt::HANDLE, OpenMode ) should never be called except on Windows." );
302 return false;
303 #endif
306 bool KDPipeIODevice::Private::startReaderThread()
308 if ( triedToStartReader )
309 return true;
310 triedToStartReader = true;
311 if ( reader && !reader->isRunning() && !reader->isFinished() ) {
312 qDebug("KDPipeIODevice::Private::startReaderThread(): locking reader (CONSUMER THREAD)" );
313 LOCKED( reader );
314 qDebug("KDPipeIODevice::Private::startReaderThread(): locked reader (CONSUMER THREAD)" );
315 reader->start( QThread::HighestPriority );
316 qDebug("KDPipeIODevice::Private::startReaderThread(): waiting for hasStarted (CONSUMER THREAD)" );
317 const bool hasStarted = reader->hasStarted.wait( &reader->mutex, 1000 );
318 qDebug("KDPipeIODevice::Private::startReaderThread(): returned from hasStarted (CONSUMER THREAD)" );
320 return hasStarted;
322 return true;
325 bool KDPipeIODevice::Private::startWriterThread()
327 if ( triedToStartWriter )
328 return true;
329 triedToStartWriter = true;
330 if ( writer && !writer->isRunning() && !writer->isFinished() ) {
331 LOCKED( writer );
333 writer->start( QThread::HighestPriority );
334 if ( !writer->hasStarted.wait( &writer->mutex, 1000 ) )
335 return false;
337 return true;
340 void KDPipeIODevice::Private::emitReadyRead()
342 QPointer<Private> thisPointer( this );
343 qDebug( "KDPipeIODevice::Private::emitReadyRead %p", ( void* ) this );
345 emit q->readyRead();
347 if ( !thisPointer )
348 return;
349 bool mustNotify = false;
350 if ( reader ) {
351 qDebug( "KDPipeIODevice::Private::emitReadyRead %p: locking reader (CONSUMER THREAD)", (
352 void* ) this );
353 synchronized( reader ) {
354 qDebug( "KDPipeIODevice::Private::emitReadyRead %p: locked reader (CONSUMER THREAD)", (
355 void* ) this );
356 reader->readyReadSentCondition.wakeAll();
357 mustNotify = !reader->bufferEmpty() && reader->isReading;
358 qDebug( "KDPipeIODevice::Private::emitReadyRead %p: buffer empty: %d reader in ReadFile: %d", ( void* )this, reader->bufferEmpty(), reader->isReading );
361 qDebug( "KDPipeIODevice::Private::emitReadyRead %p leaving", ( void* ) this );
365 bool KDPipeIODevice::Private::doOpen( int fd_, Qt::HANDLE handle_, OpenMode mode_ ) {
367 if ( q->isOpen() )
368 return false;
370 #ifdef Q_OS_WIN32
371 if ( !handle_ )
372 return false;
373 #else
374 if ( fd_ < 0 )
375 return false;
376 #endif
378 if ( !(mode_ & ReadWrite) )
379 return false; // need to have at least read -or- write
382 std::auto_ptr<Reader> reader_;
383 std::auto_ptr<Writer> writer_;
385 if ( mode_ & ReadOnly ) {
386 reader_.reset( new Reader( fd_, handle_ ) );
387 qDebug( "KDPipeIODevice::doOpen (%p): created reader (%p) for fd %d", ( void * )this,
388 ( void* )reader_.get(), fd_ );
389 connect( reader_.get(), SIGNAL(readyRead()), this, SLOT(emitReadyRead()),
390 Qt::QueuedConnection );
392 if ( mode_ & WriteOnly ) {
393 writer_.reset( new Writer( fd_, handle_ ) );
394 qDebug( "KDPipeIODevice::doOpen (%p): created writer (%p) for fd %d",
395 ( void * )this, ( void* )writer_.get(), fd_ );
396 connect( writer_.get(), SIGNAL(bytesWritten(qint64)), q, SIGNAL(bytesWritten(qint64)),
397 Qt::QueuedConnection );
400 // commit to *this:
401 fd = fd_;
402 handle = handle_;
403 reader = reader_.release();
404 writer = writer_.release();
406 q->setOpenMode( mode_|Unbuffered );
407 return true;
410 int KDPipeIODevice::descriptor() const { KDAB_CHECK_THIS;
411 return d->fd;
415 Qt::HANDLE KDPipeIODevice::handle() const { KDAB_CHECK_THIS;
416 return d->handle;
419 qint64 KDPipeIODevice::bytesAvailable() const { KDAB_CHECK_THIS;
420 const qint64 base = QIODevice::bytesAvailable();
421 if ( !d->triedToStartReader ) {
422 d->startReaderThread();
423 return base;
425 if ( d->reader ) {
426 synchronized( d->reader ) {
427 const qint64 inBuffer = d->reader->bytesInBuffer();
428 return base + inBuffer;
431 return base;
434 qint64 KDPipeIODevice::bytesToWrite() const { KDAB_CHECK_THIS;
435 d->startWriterThread();
436 const qint64 base = QIODevice::bytesToWrite();
437 if ( d->writer ) {
438 synchronized( d->writer ) return base + d->writer->bytesInBuffer();
440 return base;
443 bool KDPipeIODevice::canReadLine() const { KDAB_CHECK_THIS;
444 d->startReaderThread();
445 if ( QIODevice::canReadLine() )
446 return true;
447 if ( d->reader ) {
448 synchronized( d->reader ) return d->reader->bufferContains( '\n' );
450 return true;
453 bool KDPipeIODevice::isSequential() const {
454 return true;
457 bool KDPipeIODevice::atEnd() const { KDAB_CHECK_THIS;
458 d->startReaderThread();
459 if ( !QIODevice::atEnd() ) {
460 qDebug( "%p: KDPipeIODevice::atEnd returns false since QIODevice::atEnd does (with bytesAvailable=%ld)", ( void * )this, static_cast<long>(bytesAvailable()) );
461 return false;
463 if ( !isOpen() )
464 return true;
465 if ( d->reader->eofShortCut )
466 return true;
467 LOCKED( d->reader );
468 const bool eof = ( d->reader->error || d->reader->eof ) && d->reader->bufferEmpty();
469 if ( !eof ) {
470 if ( !d->reader->error && !d->reader->eof ) {
471 qDebug( "%p: KDPipeIODevice::atEnd returns false since !reader->error && !reader->eof",
472 ( void* )( this ) );
474 if ( !d->reader->bufferEmpty() ) {
475 qDebug( "%p: KDPipeIODevice::atEnd returns false since !reader->bufferEmpty()",
476 ( void*) this );
479 return eof;
482 bool KDPipeIODevice::waitForBytesWritten( int msecs ) { KDAB_CHECK_THIS;
483 d->startWriterThread();
484 Writer * const w = d->writer;
485 if ( !w )
486 return true;
487 LOCKED( w );
488 qDebug( "KDPipeIODevice::waitForBytesWritten (%p,w=%p): entered locked area",
489 ( void* )this, ( void* ) w );
490 return w->bufferEmpty() || w->error || w->bufferEmptyCondition.wait( &w->mutex, msecs ) ;
493 bool KDPipeIODevice::waitForReadyRead( int msecs ) { KDAB_CHECK_THIS;
494 qDebug( "KDPipeIODEvice::waitForReadyRead()(%p)", ( void* ) this);
495 d->startReaderThread();
496 if ( ALLOW_QIODEVICE_BUFFERING ) {
497 if ( bytesAvailable() > 0 )
498 return true;
500 Reader * const r = d->reader;
501 if ( !r || r->eofShortCut )
502 return true;
503 LOCKED( r );
504 if ( r->bytesInBuffer() != 0 || r->eof || r->error )
505 return true;
506 assert( false ); // ### wtf?
507 return r->bufferNotEmptyCondition.wait( &r->mutex, msecs ) ;
510 template <typename T>
511 class TemporaryValue {
512 public:
513 TemporaryValue( T& var_, const T& tv ) : var( var_ ), oldValue( var_ ) { var = tv; }
514 ~TemporaryValue() { var = oldValue; }
515 private:
516 T& var;
517 const T oldValue;
521 bool KDPipeIODevice::readWouldBlock() const
523 d->startReaderThread();
524 LOCKED( d->reader );
525 return d->reader->bufferEmpty() && !d->reader->eof && !d->reader->error;
528 bool KDPipeIODevice::writeWouldBlock() const
530 d->startWriterThread();
531 LOCKED( d->writer );
532 return !d->writer->bufferEmpty() && !d->writer->error;
536 qint64 KDPipeIODevice::readData( char * data, qint64 maxSize ) { KDAB_CHECK_THIS;
537 qDebug( "%p: KDPipeIODevice::readData: data=%p, maxSize=%lld", ( void* )this, data, maxSize );
538 d->startReaderThread();
539 Reader * const r = d->reader;
541 assert( r );
544 //assert( r->isRunning() ); // wrong (might be eof, error)
545 assert( data || maxSize == 0 );
546 assert( maxSize >= 0 );
548 if ( r->eofShortCut ) {
549 qDebug( "%p: KDPipeIODevice::readData: hit eofShortCut, returning 0", ( void* )this );
550 return 0;
553 if ( maxSize < 0 )
554 maxSize = 0;
556 if ( ALLOW_QIODEVICE_BUFFERING ) {
557 if ( bytesAvailable() > 0 )
558 maxSize = std::min( maxSize, bytesAvailable() ); // don't block
560 qDebug( "%p: KDPipeIODevice::readData: try to lock reader (CONSUMER THREAD)", ( void* ) this );
561 LOCKED( r );
562 qDebug( "%p: KDPipeIODevice::readData: locked reader (CONSUMER THREAD)", ( void* ) this );
564 r->readyReadSentCondition.wakeAll();
565 if ( /* maxSize > 0 && */ r->bufferEmpty() && !r->error && !r->eof ) { // ### block on maxSize == 0?
566 qDebug( "%p: KDPipeIODevice::readData: waiting for bufferNotEmptyCondition (CONSUMER THREAD)", ( void*) this );
567 const TemporaryValue<bool> tmp( d->reader->consumerBlocksOnUs, true );
568 r->bufferNotEmptyCondition.wait( &r->mutex );
569 r->blockedConsumerIsDoneCondition.wakeAll();
570 qDebug( "%p: KDPipeIODevice::readData: woke up from bufferNotEmptyCondition (CONSUMER THREAD)",
571 ( void*) this );
574 if ( r->bufferEmpty() ) {
575 qDebug( "%p: KDPipeIODevice::readData: got empty buffer, signal eof", ( void* ) this );
576 // woken with an empty buffer must mean either EOF or error:
577 assert( r->eof || r->error );
578 r->eofShortCut = true;
579 return r->eof ? 0 : -1 ;
582 qDebug( "%p: KDPipeIODevice::readData: got bufferNotEmptyCondition, trying to read %lld bytes",
583 ( void* )this, maxSize );
584 const qint64 bytesRead = r->readData( data, maxSize );
585 qDebug( "%p: KDPipeIODevice::readData: read %lld bytes", ( void* )this, bytesRead );
586 qDebug( "%p (fd=%d): KDPipeIODevice::readData: %s", ( void* )this, d->fd, data );
588 return bytesRead;
591 qint64 Reader::readData( char * data, qint64 maxSize ) {
592 qint64 numRead = rptr < wptr ? wptr - rptr : sizeof buffer - rptr ;
593 if ( numRead > maxSize )
594 numRead = maxSize;
596 qDebug( "%p: KDPipeIODevice::readData: data=%p, maxSize=%lld; rptr=%u, wptr=%u (bytesInBuffer=%u); -> numRead=%lld",
597 ( void* )this, data, maxSize, rptr, wptr, bytesInBuffer(), numRead );
599 memcpy( data, buffer + rptr, numRead );
601 rptr = ( rptr + numRead ) % sizeof buffer ;
603 if ( !bufferFull() ) {
604 qDebug( "%p: KDPipeIODevice::readData: signal bufferNotFullCondition", ( void* ) this );
605 bufferNotFullCondition.wakeAll();
608 return numRead;
611 qint64 KDPipeIODevice::writeData( const char * data, qint64 size ) { KDAB_CHECK_THIS;
612 d->startWriterThread();
613 Writer * const w = d->writer;
615 assert( w );
616 assert( w->error || w->isRunning() );
617 assert( data || size == 0 );
618 assert( size >= 0 );
620 LOCKED( w );
622 while ( !w->error && !w->bufferEmpty() ) {
623 qDebug( "%p: KDPipeIODevice::writeData: wait for empty buffer", ( void* ) this );
624 w->bufferEmptyCondition.wait( &w->mutex );
625 qDebug( "%p: KDPipeIODevice::writeData: empty buffer signaled", ( void* ) this );
628 if ( w->error )
629 return -1;
631 assert( w->bufferEmpty() );
633 return w->writeData( data, size );
636 qint64 Writer::writeData( const char * data, qint64 size ) {
637 assert( bufferEmpty() );
639 if ( size > static_cast<qint64>( sizeof buffer ) )
640 size = sizeof buffer;
642 memcpy( buffer, data, size );
644 numBytesInBuffer = size;
646 if ( !bufferEmpty() ) {
647 bufferNotEmptyCondition.wakeAll();
649 return size;
652 void KDPipeIODevice::Private::stopThreads()
654 if ( triedToStartWriter )
656 if ( writer && q->bytesToWrite() > 0 )
657 q->waitForBytesWritten( -1 );
659 assert( q->bytesToWrite() == 0 );
661 if ( Reader * & r = reader ) {
662 disconnect( r, SIGNAL(readyRead()), this, SLOT(emitReadyRead()) );
663 synchronized( r ) {
664 // tell thread to cancel:
665 r->cancel = true;
666 // and wake it, so it can terminate:
667 r->waitForCancelCondition.wakeAll();
668 r->bufferNotFullCondition.wakeAll();
669 r->readyReadSentCondition.wakeAll();
672 if ( Writer * & w = writer ) {
673 synchronized( w ) {
674 // tell thread to cancel:
675 w->cancel = true;
676 // and wake it, so it can terminate:
677 w->bufferNotEmptyCondition.wakeAll();
682 void KDPipeIODevice::close() { KDAB_CHECK_THIS;
683 qDebug( "KDPipeIODevice::close(%p)", ( void* ) this );
684 if ( !isOpen() )
685 return;
687 // tell clients we're about to close:
688 emit aboutToClose();
689 d->stopThreads();
691 #define waitAndDelete( t ) if ( t ) { t->wait(); QThread* const t2 = t; t = 0; delete t2; }
692 qDebug( "KPipeIODevice::close(%p): wait and closing writer %p", ( void* )this, ( void* ) d->writer );
693 waitAndDelete( d->writer );
694 qDebug( "KPipeIODevice::close(%p): wait and closing reader %p", ( void* )this, ( void* ) d->reader );
695 if ( d->reader ) {
696 LOCKED( d->reader );
697 d->reader->readyReadSentCondition.wakeAll();
699 waitAndDelete( d->reader );
700 #undef waitAndDelete
701 #ifdef Q_OS_WIN32
702 if ( d->fd != -1 )
703 _close( d->fd );
704 else
705 CloseHandle( d->handle );
706 #else
707 ::close( d->fd );
708 #endif
710 setOpenMode( NotOpen );
711 d->fd = -1;
712 d->handle = 0;
715 void Reader::run() {
717 LOCKED( this );
719 // too bad QThread doesn't have that itself; a signal isn't enough
720 hasStarted.wakeAll();
722 qDebug( "%p: Reader::run: started",( void* ) this );
724 while ( true ) {
725 if ( !cancel && ( eof || error ) ) {
726 //notify the client until the buffer is empty and then once
727 //again so he receives eof/error. After that, wait for him
728 //to cancel
729 const bool wasEmpty = bufferEmpty();
730 qDebug( "%p: Reader::run: received eof(%d) or error(%d), waking everyone", ( void* )this, eof, error );
731 notifyReadyRead();
732 if ( !cancel && wasEmpty )
733 waitForCancelCondition.wait( &mutex );
734 } else if ( !cancel && !bufferFull() && !bufferEmpty() ) {
735 qDebug( "%p: Reader::run: buffer no longer empty, waking everyone", ( void* ) this );
736 notifyReadyRead();
739 while ( !cancel && !error && bufferFull() ) {
740 notifyReadyRead();
741 if ( !cancel && bufferFull() ) {
742 qDebug( "%p: Reader::run: buffer is full, going to sleep", ( void* )this );
743 bufferNotFullCondition.wait( &mutex );
747 if ( cancel ) {
748 qDebug( "%p: Reader::run: detected cancel", ( void* )this );
749 goto leave;
752 if ( !eof && !error ) {
753 if ( rptr == wptr ) // optimize for larger chunks in case the buffer is empty
754 rptr = wptr = 0;
756 unsigned int numBytes = ( rptr + sizeof buffer - wptr - 1 ) % sizeof buffer;
757 if ( numBytes > sizeof buffer - wptr )
758 numBytes = sizeof buffer - wptr;
760 qDebug( "%p: Reader::run: rptr=%d, wptr=%d -> numBytes=%d", ( void* )this, rptr, wptr, numBytes );
762 assert( numBytes > 0 );
764 qDebug( "%p: Reader::run: trying to read %d bytes from fd %d", ( void* )this, numBytes, fd );
765 #ifdef Q_OS_WIN32
766 isReading = true;
767 mutex.unlock();
768 DWORD numRead;
769 const bool ok = ReadFile( handle, buffer + wptr, numBytes, &numRead, 0 );
770 mutex.lock();
771 isReading = false;
772 if ( ok ) {
773 if ( numRead == 0 ) {
774 qDebug( "%p: Reader::run: got eof (numRead==0)", ( void* ) this );
775 eof = true;
777 } else { // !ok
778 errorCode = static_cast<int>( GetLastError() );
779 if ( errorCode == ERROR_BROKEN_PIPE ) {
780 assert( numRead == 0 );
781 qDebug( "%p: Reader::run: got eof (broken pipe)", ( void* ) this );
782 eof = true;
783 } else {
784 assert( numRead == 0 );
785 qDebug( "%p: Reader::run: got error: %s (%d)", ( void* ) this, strerror( errorCode ), errorCode );
786 error = true;
789 #else
790 qint64 numRead;
791 mutex.unlock();
792 do {
793 numRead = ::read( fd, buffer + wptr, numBytes );
794 } while ( numRead == -1 && errno == EINTR );
795 mutex.lock();
797 if ( numRead < 0 ) {
798 errorCode = errno;
799 error = true;
800 qDebug( "%p: Reader::run: got error: %d", ( void* )this, errorCode );
801 } else if ( numRead == 0 ) {
802 qDebug( "%p: Reader::run: eof detected", ( void* )this );
803 eof = true;
805 #endif
806 qDebug( "%p (fd=%d): Reader::run: read %ld bytes", ( void* ) this, fd, static_cast<long>(numRead) );
807 qDebug( "%p (fd=%d): Reader::run: %s", ( void* )this, fd, buffer );
809 if ( numRead > 0 ) {
810 qDebug( "%p: Reader::run: buffer before: rptr=%4d, wptr=%4d", ( void* )this, rptr, wptr );
811 wptr = ( wptr + numRead ) % sizeof buffer;
812 qDebug( "%p: Reader::run: buffer after: rptr=%4d, wptr=%4d", ( void* )this, rptr, wptr );
816 leave:
817 qDebug( "%p: Reader::run: terminated", ( void* )this );
820 void Reader::notifyReadyRead()
822 qDebug( "notifyReadyRead: %d bytes available", bytesInBuffer() );
823 assert( !cancel );
825 if ( consumerBlocksOnUs ) {
826 bufferNotEmptyCondition.wakeAll();
827 blockedConsumerIsDoneCondition.wait( &mutex );
828 return;
830 qDebug( "notifyReadyRead: emit signal" );
831 emit readyRead();
832 readyReadSentCondition.wait( &mutex );
833 qDebug( "notifyReadyRead: returning from waiting, leave" );
836 void Writer::run() {
838 LOCKED( this );
840 // too bad QThread doesn't have that itself; a signal isn't enough
841 hasStarted.wakeAll();
843 qDebug( ) << this << "Writer::run: started";
845 while ( true ) {
847 while ( !cancel && bufferEmpty() ) {
848 qDebug( ) << this << "Writer::run: buffer is empty, wake bufferEmptyCond listeners";
849 bufferEmptyCondition.wakeAll();
850 emit bytesWritten( 0 );
851 qDebug( ) << this << "Writer::run: buffer is empty, going to sleep";
852 bufferNotEmptyCondition.wait( &mutex );
853 qDebug( ) << this << "Writer::run: woke up";
856 if ( cancel ) {
857 qDebug( ) << this << "Writer::run: detected cancel";
858 goto leave;
861 assert( numBytesInBuffer > 0 );
863 qDebug( ) << this << "Writer::run: Trying to write " << numBytesInBuffer << "bytes";
864 qint64 totalWritten = 0;
865 do {
866 mutex.unlock();
867 #ifdef Q_OS_WIN32
868 DWORD numWritten;
869 qDebug( "%p (fd=%d): Writer::run: buffer before WriteFile (numBytes=%lld): %s:",
870 ( void*) this, fd, numBytesInBuffer, buffer );
871 qDebug( "%p (fd=%d): Writer::run: Going into WriteFile", ( void* ) this, fd );
872 if ( !WriteFile( handle, buffer + totalWritten, numBytesInBuffer - totalWritten, &numWritten, 0 ) ) {
873 mutex.lock();
874 errorCode = static_cast<int>( GetLastError() );
875 qDebug( "%p: Writer::run: got error code: %d", ( void* ) this, errorCode );
876 error = true;
877 goto leave;
879 #else
880 qint64 numWritten;
881 do {
882 numWritten = ::write( fd, buffer + totalWritten, numBytesInBuffer - totalWritten );
883 } while ( numWritten == -1 && errno == EINTR );
885 if ( numWritten < 0 ) {
886 mutex.lock();
887 errorCode = errno;
888 qDebug( "%p: Writer::run: got error code: %s (%d)", ( void* )this, strerror( errorCode ), errorCode );
889 error = true;
890 goto leave;
892 #endif
893 qDebug( "%p (fd=%d): Writer::run: buffer after WriteFile (numBytes=%u): %s:", ( void* )this, fd, numBytesInBuffer, buffer );
894 totalWritten += numWritten;
895 mutex.lock();
896 } while ( totalWritten < numBytesInBuffer );
898 qDebug() << this << "Writer::run: wrote " << totalWritten << "bytes";
899 numBytesInBuffer = 0;
900 qDebug() << this << "Writer::run: buffer is empty, wake bufferEmptyCond listeners";
901 bufferEmptyCondition.wakeAll();
902 emit bytesWritten( totalWritten );
904 leave:
905 qDebug() << this << "Writer::run: terminating";
906 numBytesInBuffer = 0;
907 qDebug() << this << "Writer::run: buffer is empty, wake bufferEmptyCond listeners";
908 bufferEmptyCondition.wakeAll();
909 emit bytesWritten( 0 );
912 // static
913 std::pair<KDPipeIODevice*,KDPipeIODevice*> KDPipeIODevice::makePairOfConnectedPipes() {
914 KDPipeIODevice * read = 0;
915 KDPipeIODevice * write = 0;
916 #ifdef Q_OS_WIN32
917 HANDLE rh;
918 HANDLE wh;
919 SECURITY_ATTRIBUTES sa;
920 memset( &sa, 0, sizeof(sa) );
921 sa.nLength = sizeof(sa);
922 sa.bInheritHandle = TRUE;
923 if ( CreatePipe( &rh, &wh, &sa, BUFFER_SIZE ) ) {
924 read = new KDPipeIODevice;
925 read->open( rh, ReadOnly );
926 write = new KDPipeIODevice;
927 write->open( wh, WriteOnly );
929 #else
930 int fds[2];
931 if ( pipe( fds ) == 0 ) {
932 read = new KDPipeIODevice;
933 read->open( fds[0], ReadOnly );
934 write = new KDPipeIODevice;
935 write->open( fds[1], WriteOnly );
937 #endif
938 return std::make_pair( read, write );
941 #ifdef KDAB_DEFINE_CHECKS
942 KDAB_DEFINE_CHECKS( KDPipeIODevice ) {
943 if ( !isOpen() ) {
944 assert( openMode() == NotOpen );
945 assert( !d->reader );
946 assert( !d->writer );
947 #ifdef Q_OS_WIN32
948 assert( !d->handle );
949 #else
950 assert( d->fd < 0 );
951 #endif
952 } else {
953 assert( openMode() != NotOpen );
954 assert( openMode() & ReadWrite );
955 if ( openMode() & ReadOnly ) {
956 assert( d->reader );
957 synchronized( d->reader )
958 assert( d->reader->eof || d->reader->error || d->reader->isRunning() );
960 if ( openMode() & WriteOnly ) {
961 assert( d->writer );
962 synchronized( d->writer )
963 assert( d->writer->error || d->writer->isRunning() );
965 #ifdef Q_OS_WIN32
966 assert( d->handle );
967 #else
968 assert( d->fd >= 0 );
969 #endif
972 #endif // KDAB_DEFINE_CHECKS
974 #include "moc_kdpipeiodevice.cpp"
975 #include "kdpipeiodevice.moc"