1 // Copyright (c) 2006- Facebook
2 // Distributed under the Thrift Software License
4 // See accompanying file LICENSE or visit the Thrift site at:
5 // http://developers.facebook.com/thrift/
11 #include "TFileTransport.h"
12 #include "TTransportUtils.h"
15 #ifndef HAVE_CLOCK_GETTIME
29 namespace facebook
{ namespace thrift
{ namespace transport
{
31 using boost::shared_ptr
;
33 using namespace facebook::thrift::protocol
;
35 #ifndef HAVE_CLOCK_GETTIME
38 * Fake clock_gettime for systems like darwin
40 * @author Paul Querna <pquerna@apache.org>
42 #define CLOCK_REALTIME 0
43 static int clock_gettime(int clk_id
/*ignored*/, struct timespec
*tp
) {
46 int rv
= gettimeofday(&now
, NULL
);
51 tp
->tv_sec
= now
.tv_sec
;
52 tp
->tv_nsec
= now
.tv_usec
* 1000;
57 TFileTransport::TFileTransport(string path
, bool readOnly
)
61 , readBuffSize_(DEFAULT_READ_BUFF_SIZE
)
62 , readTimeout_(NO_TAIL_READ_TIMEOUT
)
63 , chunkSize_(DEFAULT_CHUNK_SIZE
)
64 , eventBufferSize_(DEFAULT_EVENT_BUFFER_SIZE
)
65 , flushMaxUs_(DEFAULT_FLUSH_MAX_US
)
66 , flushMaxBytes_(DEFAULT_FLUSH_MAX_BYTES
)
67 , maxEventSize_(DEFAULT_MAX_EVENT_SIZE
)
68 , maxCorruptedEvents_(DEFAULT_MAX_CORRUPTED_EVENTS
)
69 , eofSleepTime_(DEFAULT_EOF_SLEEP_TIME_US
)
70 , corruptedEventSleepTime_(DEFAULT_CORRUPTED_SLEEP_TIME_US
)
72 , dequeueBuffer_(NULL
)
73 , enqueueBuffer_(NULL
)
78 , bufferAndThreadInitialized_(false)
81 , numCorruptedEventsInChunk_(0)
84 // initialize all the condition vars/mutexes
85 pthread_mutex_init(&mutex_
, NULL
);
86 pthread_cond_init(¬Full_
, NULL
);
87 pthread_cond_init(¬Empty_
, NULL
);
88 pthread_cond_init(&flushed_
, NULL
);
93 void TFileTransport::resetOutputFile(int fd
, string filename
, int64_t offset
) {
97 // check if current file is still open
99 // flush any events in the queue
101 fprintf(stderr
, "error, current file (%s) not closed\n", filename_
.c_str());
102 if (-1 == ::close(fd_
)) {
103 GlobalOutput("TFileTransport: error in file close");
104 throw TTransportException("TFileTransport: error in file close");
111 // open file if the input fd is 0
117 TFileTransport::~TFileTransport() {
118 // flush the buffer if a writer thread is active
119 if (writerThreadId_
> 0) {
120 // reduce the flush timeout so that closing is quicker
121 setFlushMaxUs(300*1000);
123 // flush output buffer
126 // set state to closing
129 // TODO: make sure event queue is empty
130 // currently only the write buffer is flushed
131 // we dont actually wait until the queue is empty. This shouldn't be a big
132 // deal in the common case because writing is quick
134 pthread_join(writerThreadId_
, NULL
);
138 if (dequeueBuffer_
) {
139 delete dequeueBuffer_
;
140 dequeueBuffer_
= NULL
;
143 if (enqueueBuffer_
) {
144 delete enqueueBuffer_
;
145 enqueueBuffer_
= NULL
;
154 delete currentEvent_
;
155 currentEvent_
= NULL
;
160 if(-1 == ::close(fd_
)) {
161 GlobalOutput("TFileTransport: error in file close");
166 bool TFileTransport::initBufferAndWriteThread() {
167 if (bufferAndThreadInitialized_
) {
168 T_ERROR("Trying to double-init TFileTransport");
172 if (writerThreadId_
== 0) {
173 if (pthread_create(&writerThreadId_
, NULL
, startWriterThread
, (void *)this) != 0) {
174 T_ERROR("Could not create writer thread");
179 dequeueBuffer_
= new TFileTransportBuffer(eventBufferSize_
);
180 enqueueBuffer_
= new TFileTransportBuffer(eventBufferSize_
);
181 bufferAndThreadInitialized_
= true;
186 void TFileTransport::write(const uint8_t* buf
, uint32_t len
) {
188 throw TTransportException("TFileTransport: attempting to write to file opened readonly");
191 enqueueEvent(buf
, len
, false);
194 void TFileTransport::enqueueEvent(const uint8_t* buf
, uint32_t eventLen
, bool blockUntilFlush
) {
195 // can't enqueue more events if file is going to close
200 // make sure that event size is valid
201 if ( (maxEventSize_
> 0) && (eventLen
> maxEventSize_
) ) {
202 T_ERROR("msg size is greater than max event size: %u > %u\n", eventLen
, maxEventSize_
);
207 T_ERROR("cannot enqueue an empty event");
211 eventInfo
* toEnqueue
= new eventInfo();
212 toEnqueue
->eventBuff_
= (uint8_t *)malloc((sizeof(uint8_t) * eventLen
) + 4);
213 // first 4 bytes is the event length
214 memcpy(toEnqueue
->eventBuff_
, (void*)(&eventLen
), 4);
215 // actual event contents
216 memcpy(toEnqueue
->eventBuff_
+ 4, buf
, eventLen
);
217 toEnqueue
->eventSize_
= eventLen
+ 4;
220 pthread_mutex_lock(&mutex_
);
222 // make sure that enqueue buffer is initialized and writer thread is running
223 if (!bufferAndThreadInitialized_
) {
224 if (!initBufferAndWriteThread()) {
226 pthread_mutex_unlock(&mutex_
);
231 // Can't enqueue while buffer is full
232 while (enqueueBuffer_
->isFull()) {
233 pthread_cond_wait(¬Full_
, &mutex_
);
237 if (!enqueueBuffer_
->addEvent(toEnqueue
)) {
239 pthread_mutex_unlock(&mutex_
);
243 // signal anybody who's waiting for the buffer to be non-empty
244 pthread_cond_signal(¬Empty_
);
246 if (blockUntilFlush
) {
247 pthread_cond_wait(&flushed_
, &mutex_
);
250 // this really should be a loop where it makes sure it got flushed
251 // because condition variables can get triggered by the os for no reason
252 // it is probably a non-factor for the time being
253 pthread_mutex_unlock(&mutex_
);
256 bool TFileTransport::swapEventBuffers(struct timespec
* deadline
) {
257 pthread_mutex_lock(&mutex_
);
258 if (deadline
!= NULL
) {
259 // if we were handed a deadline time struct, do a timed wait
260 pthread_cond_timedwait(¬Empty_
, &mutex_
, deadline
);
262 // just wait until the buffer gets an item
263 pthread_cond_wait(¬Empty_
, &mutex_
);
266 bool swapped
= false;
268 // could be empty if we timed out
269 if (!enqueueBuffer_
->isEmpty()) {
270 TFileTransportBuffer
*temp
= enqueueBuffer_
;
271 enqueueBuffer_
= dequeueBuffer_
;
272 dequeueBuffer_
= temp
;
277 // unlock the mutex and signal if required
278 pthread_mutex_unlock(&mutex_
);
281 pthread_cond_signal(¬Full_
);
288 void TFileTransport::writerThread() {
289 // open file if it is not open
294 // set the offset to the correct value (EOF)
297 // Figure out the next time by which a flush must take place
299 struct timespec ts_next_flush
;
300 getNextFlushTime(&ts_next_flush
);
301 uint32_t unflushed
= 0;
304 // this will only be true when the destructor is being invoked
306 // empty out both the buffers
307 if (enqueueBuffer_
->isEmpty() && dequeueBuffer_
->isEmpty()) {
308 if (-1 == ::close(fd_
)) {
309 GlobalOutput("TFileTransport: error in close");
310 throw TTransportException("TFileTransport: error in file close");
312 // just be safe and sync to disk
320 if (swapEventBuffers(&ts_next_flush
)) {
322 while (NULL
!= (outEvent
= dequeueBuffer_
->getNext())) {
324 T_DEBUG_L(1, "Got an empty event");
328 // sanity check on event
329 if ((maxEventSize_
> 0) && (outEvent
->eventSize_
> maxEventSize_
)) {
330 T_ERROR("msg size is greater than max event size: %u > %u\n", outEvent
->eventSize_
, maxEventSize_
);
334 // If chunking is required, then make sure that msg does not cross chunk boundary
335 if ((outEvent
->eventSize_
> 0) && (chunkSize_
!= 0)) {
337 // event size must be less than chunk size
338 if(outEvent
->eventSize_
> chunkSize_
) {
339 T_ERROR("TFileTransport: event size(%u) is greater than chunk size(%u): skipping event",
340 outEvent
->eventSize_
, chunkSize_
);
344 int64_t chunk1
= offset_
/chunkSize_
;
345 int64_t chunk2
= (offset_
+ outEvent
->eventSize_
- 1)/chunkSize_
;
347 // if adding this event will cross a chunk boundary, pad the chunk with zeros
348 if (chunk1
!= chunk2
) {
349 // refetch the offset to keep in sync
350 offset_
= lseek(fd_
, 0, SEEK_CUR
);
351 int32_t padding
= (int32_t)(chunk2
*chunkSize_
- offset_
);
355 T_DEBUG("Padding is empty, skipping event");
358 if (padding
> (int32_t)chunkSize_
) {
359 T_DEBUG("padding is larger than chunk size, skipping event");
362 uint8_t zeros
[padding
];
363 bzero(zeros
, padding
);
364 //T_DEBUG_L(1, "Adding padding of %u bytes at %lu (to reach chunk %lld)",
365 //padding, offset_, chunk2);
366 if (-1 == ::write(fd_
, zeros
, padding
)) {
367 GlobalOutput("TFileTransport: error while padding zeros");
368 throw TTransportException("TFileTransport: error while padding zeros");
370 unflushed
+= padding
;
375 // write the dequeued event to the file
376 if (outEvent
->eventSize_
> 0) {
377 if (-1 == ::write(fd_
, outEvent
->eventBuff_
, outEvent
->eventSize_
)) {
378 GlobalOutput("TFileTransport: error while writing event");
379 throw TTransportException("TFileTransport: error while writing event");
382 unflushed
+= outEvent
->eventSize_
;
383 offset_
+= outEvent
->eventSize_
;
386 dequeueBuffer_
->reset();
389 bool flushTimeElapsed
= false;
390 struct timespec current_time
;
391 clock_gettime(CLOCK_REALTIME
, ¤t_time
);
393 if (current_time
.tv_sec
> ts_next_flush
.tv_sec
||
394 (current_time
.tv_sec
== ts_next_flush
.tv_sec
&& current_time
.tv_nsec
> ts_next_flush
.tv_nsec
)) {
395 flushTimeElapsed
= true;
396 getNextFlushTime(&ts_next_flush
);
399 // couple of cases from which a flush could be triggered
400 if ((flushTimeElapsed
&& unflushed
> 0) ||
401 unflushed
> flushMaxBytes_
||
404 // sync (force flush) file to disk
408 // notify anybody waiting for flush completion
410 pthread_cond_broadcast(&flushed_
);
415 void TFileTransport::flush() {
416 // file must be open for writing for any flushing to take place
417 if (writerThreadId_
<= 0) {
420 // wait for flush to take place
421 pthread_mutex_lock(&mutex_
);
425 while (forceFlush_
) {
426 pthread_cond_wait(&flushed_
, &mutex_
);
429 pthread_mutex_unlock(&mutex_
);
433 uint32_t TFileTransport::readAll(uint8_t* buf
, uint32_t len
) {
438 get
= read(buf
+have
, len
-have
);
440 throw TEOFException();
448 uint32_t TFileTransport::read(uint8_t* buf
, uint32_t len
) {
449 // check if there an event is ready to be read
450 if (!currentEvent_
) {
451 currentEvent_
= readEvent();
454 // did not manage to read an event from the file. This could have happened
455 // if the timeout expired or there was some other error
456 if (!currentEvent_
) {
460 // read as much of the current event as possible
461 int32_t remaining
= currentEvent_
->eventSize_
- currentEvent_
->eventBuffPos_
;
462 if (remaining
<= (int32_t)len
) {
463 // copy over anything thats remaining
466 currentEvent_
->eventBuff_
+ currentEvent_
->eventBuffPos_
,
469 delete(currentEvent_
);
470 currentEvent_
= NULL
;
474 // read as much as possible
475 memcpy(buf
, currentEvent_
->eventBuff_
+ currentEvent_
->eventBuffPos_
, len
);
476 currentEvent_
->eventBuffPos_
+= len
;
480 eventInfo
* TFileTransport::readEvent() {
484 readBuff_
= new uint8_t[readBuffSize_
];
488 // read from the file if read buffer is exhausted
489 if (readState_
.bufferPtr_
== readState_
.bufferLen_
) {
490 // advance the offset pointer
491 offset_
+= readState_
.bufferLen_
;
492 readState_
.bufferLen_
= ::read(fd_
, readBuff_
, readBuffSize_
);
493 // if (readState_.bufferLen_) {
494 // T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_);
496 readState_
.bufferPtr_
= 0;
497 readState_
.lastDispatchPtr_
= 0;
500 if (readState_
.bufferLen_
== -1) {
501 readState_
.resetAllValues();
502 GlobalOutput("TFileTransport: error while reading from file");
503 throw TTransportException("TFileTransport: error while reading from file");
504 } else if (readState_
.bufferLen_
== 0) { // EOF
505 // wait indefinitely if there is no timeout
506 if (readTimeout_
== TAIL_READ_TIMEOUT
) {
507 usleep(eofSleepTime_
);
509 } else if (readTimeout_
== NO_TAIL_READ_TIMEOUT
) {
511 readState_
.resetState(0);
513 } else if (readTimeout_
> 0) {
514 // timeout already expired once
516 readState_
.resetState(0);
519 usleep(readTimeout_
* 1000);
529 // attempt to read an event from the buffer
530 while(readState_
.bufferPtr_
< readState_
.bufferLen_
) {
531 if (readState_
.readingSize_
) {
532 if(readState_
.eventSizeBuffPos_
== 0) {
533 if ( (offset_
+ readState_
.bufferPtr_
)/chunkSize_
!=
534 ((offset_
+ readState_
.bufferPtr_
+ 3)/chunkSize_
)) {
535 // skip one byte towards chunk boundary
536 // T_DEBUG_L(1, "Skipping a byte");
537 readState_
.bufferPtr_
++;
542 readState_
.eventSizeBuff_
[readState_
.eventSizeBuffPos_
++] =
543 readBuff_
[readState_
.bufferPtr_
++];
544 if (readState_
.eventSizeBuffPos_
== 4) {
545 // 0 length event indicates padding
546 if (*((uint32_t *)(readState_
.eventSizeBuff_
)) == 0) {
547 // T_DEBUG_L(1, "Got padding");
548 readState_
.resetState(readState_
.lastDispatchPtr_
);
552 readState_
.readingSize_
= false;
553 if (readState_
.event_
) {
554 delete(readState_
.event_
);
556 readState_
.event_
= new eventInfo();
557 readState_
.event_
->eventSize_
= *((uint32_t *)(readState_
.eventSizeBuff_
));
559 // check if the event is corrupted and perform recovery if required
560 if (isEventCorrupted()) {
562 // start from the top
567 if (!readState_
.event_
->eventBuff_
) {
568 readState_
.event_
->eventBuff_
= new uint8_t[readState_
.event_
->eventSize_
];
569 readState_
.event_
->eventBuffPos_
= 0;
571 // take either the entire event or the remaining bytes in the buffer
572 int reclaimBuffer
= min((uint32_t)(readState_
.bufferLen_
- readState_
.bufferPtr_
),
573 readState_
.event_
->eventSize_
- readState_
.event_
->eventBuffPos_
);
575 // copy data from read buffer into event buffer
576 memcpy(readState_
.event_
->eventBuff_
+ readState_
.event_
->eventBuffPos_
,
577 readBuff_
+ readState_
.bufferPtr_
,
580 // increment position ptrs
581 readState_
.event_
->eventBuffPos_
+= reclaimBuffer
;
582 readState_
.bufferPtr_
+= reclaimBuffer
;
584 // check if the event has been read in full
585 if (readState_
.event_
->eventBuffPos_
== readState_
.event_
->eventSize_
) {
586 // set the completed event to the current event
587 eventInfo
* completeEvent
= readState_
.event_
;
588 completeEvent
->eventBuffPos_
= 0;
590 readState_
.event_
= NULL
;
591 readState_
.resetState(readState_
.bufferPtr_
);
594 return completeEvent
;
602 bool TFileTransport::isEventCorrupted() {
603 // an error is triggered if:
604 if ( (maxEventSize_
> 0) && (readState_
.event_
->eventSize_
> maxEventSize_
)) {
605 // 1. Event size is larger than user-speficied max-event size
606 T_ERROR("Read corrupt event. Event size(%u) greater than max event size (%u)",
607 readState_
.event_
->eventSize_
, maxEventSize_
);
609 } else if (readState_
.event_
->eventSize_
> chunkSize_
) {
610 // 2. Event size is larger than chunk size
611 T_ERROR("Read corrupt event. Event size(%u) greater than chunk size (%u)",
612 readState_
.event_
->eventSize_
, chunkSize_
);
614 } else if( ((offset_
+ readState_
.bufferPtr_
- 4)/chunkSize_
) !=
615 ((offset_
+ readState_
.bufferPtr_
+ readState_
.event_
->eventSize_
- 1)/chunkSize_
) ) {
616 // 3. size indicates that event crosses chunk boundary
617 T_ERROR("Read corrupt event. Event crosses chunk boundary. Event size:%u Offset:%ld",
618 readState_
.event_
->eventSize_
, offset_
+ readState_
.bufferPtr_
+ 4);
625 void TFileTransport::performRecovery() {
626 // perform some kickass recovery
627 uint32_t curChunk
= getCurChunk();
628 if (lastBadChunk_
== curChunk
) {
629 numCorruptedEventsInChunk_
++;
631 lastBadChunk_
= curChunk
;
632 numCorruptedEventsInChunk_
= 1;
635 if (numCorruptedEventsInChunk_
< maxCorruptedEvents_
) {
636 // maybe there was an error in reading the file from disk
637 // seek to the beginning of chunk and try again
638 seekToChunk(curChunk
);
641 // just skip ahead to the next chunk if we not already at the last chunk
642 if (curChunk
!= (getNumChunks() - 1)) {
643 seekToChunk(curChunk
+ 1);
644 } else if (readTimeout_
== TAIL_READ_TIMEOUT
) {
645 // if tailing the file, wait until there is enough data to start
647 while(curChunk
== (getNumChunks() - 1)) {
648 usleep(DEFAULT_CORRUPTED_SLEEP_TIME_US
);
650 seekToChunk(curChunk
+ 1);
652 // pretty hosed at this stage, rewind the file back to the last successful
653 // point and punt on the error
654 readState_
.resetState(readState_
.lastDispatchPtr_
);
655 currentEvent_
= NULL
;
657 sprintf(errorMsg
, "TFileTransport: log file corrupted at offset: %lu",
658 offset_
+ readState_
.lastDispatchPtr_
);
659 GlobalOutput(errorMsg
);
660 throw TTransportException(errorMsg
);
666 void TFileTransport::seekToChunk(int32_t chunk
) {
668 throw TTransportException("File not open");
671 int32_t numChunks
= getNumChunks();
673 // file is empty, seeking to chunk is pointless
674 if (numChunks
== 0) {
678 // negative indicates reverse seek (from the end)
683 // too large a value for reverse seek, just seek to beginning
685 T_DEBUG("Incorrect value for reverse seek. Seeking to beginning...", chunk
)
689 // cannot seek past EOF
690 bool seekToEnd
= false;
691 uint32_t minEndOffset
= 0;
692 if (chunk
>= numChunks
) {
693 T_DEBUG("Trying to seek past EOF. Seeking to EOF instead...");
695 chunk
= numChunks
- 1;
696 // this is the min offset to process events till
697 minEndOffset
= lseek(fd_
, 0, SEEK_END
);
700 off_t newOffset
= off_t(chunk
) * chunkSize_
;
701 offset_
= lseek(fd_
, newOffset
, SEEK_SET
);
702 readState_
.resetAllValues();
703 currentEvent_
= NULL
;
705 GlobalOutput("TFileTransport: lseek error in seekToChunk");
706 throw TTransportException("TFileTransport: lseek error in seekToChunk");
709 // seek to EOF if user wanted to go to last chunk
711 uint32_t oldReadTimeout
= getReadTimeout();
712 setReadTimeout(NO_TAIL_READ_TIMEOUT
);
713 // keep on reading unti the last event at point of seekChunk call
714 while( readEvent() && ((offset_
+ readState_
.bufferPtr_
) < minEndOffset
)) {};
715 setReadTimeout(oldReadTimeout
);
720 void TFileTransport::seekToEnd() {
721 seekToChunk(getNumChunks());
724 uint32_t TFileTransport::getNumChunks() {
730 if (f_info
.st_size
> 0) {
731 return ((f_info
.st_size
)/chunkSize_
) + 1;
734 // empty file has no chunks
738 uint32_t TFileTransport::getCurChunk() {
739 return offset_
/chunkSize_
;
743 void TFileTransport::openLogFile() {
744 mode_t mode
= readOnly_
? S_IRUSR
| S_IRGRP
| S_IROTH
: S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IROTH
;
745 int flags
= readOnly_
? O_RDONLY
: O_RDWR
| O_CREAT
| O_APPEND
;
746 fd_
= ::open(filename_
.c_str(), flags
, mode
);
749 // make sure open call was successful
752 sprintf(errorMsg
, "TFileTransport: Could not open file: %s", filename_
.c_str());
753 GlobalOutput(errorMsg
);
754 throw TTransportException(errorMsg
);
759 void TFileTransport::getNextFlushTime(struct timespec
* ts_next_flush
) {
760 clock_gettime(CLOCK_REALTIME
, ts_next_flush
);
761 ts_next_flush
->tv_nsec
+= (flushMaxUs_
% 1000000) * 1000;
762 if (ts_next_flush
->tv_nsec
> 1000000000) {
763 ts_next_flush
->tv_nsec
-= 1000000000;
764 ts_next_flush
->tv_sec
+= 1;
766 ts_next_flush
->tv_sec
+= flushMaxUs_
/ 1000000;
769 TFileTransportBuffer::TFileTransportBuffer(uint32_t size
)
775 buffer_
= new eventInfo
*[size
];
778 TFileTransportBuffer::~TFileTransportBuffer() {
780 for (uint32_t i
= 0; i
< writePoint_
; i
++) {
788 bool TFileTransportBuffer::addEvent(eventInfo
*event
) {
789 if (bufferMode_
== READ
) {
790 GlobalOutput("Trying to write to a buffer in read mode");
792 if (writePoint_
< size_
) {
793 buffer_
[writePoint_
++] = event
;
801 eventInfo
* TFileTransportBuffer::getNext() {
802 if (bufferMode_
== WRITE
) {
805 if (readPoint_
< writePoint_
) {
806 return buffer_
[readPoint_
++];
813 void TFileTransportBuffer::reset() {
814 if (bufferMode_
== WRITE
|| writePoint_
> readPoint_
) {
815 T_DEBUG("Resetting a buffer with unread entries");
817 // Clean up the old entries
818 for (uint32_t i
= 0; i
< writePoint_
; i
++) {
826 bool TFileTransportBuffer::isFull() {
827 return writePoint_
== size_
;
830 bool TFileTransportBuffer::isEmpty() {
831 return writePoint_
== 0;
834 TFileProcessor::TFileProcessor(shared_ptr
<TProcessor
> processor
,
835 shared_ptr
<TProtocolFactory
> protocolFactory
,
836 shared_ptr
<TFileReaderTransport
> inputTransport
):
837 processor_(processor
),
838 inputProtocolFactory_(protocolFactory
),
839 outputProtocolFactory_(protocolFactory
),
840 inputTransport_(inputTransport
) {
842 // default the output transport to a null transport (common case)
843 outputTransport_
= shared_ptr
<TNullTransport
>(new TNullTransport());
846 TFileProcessor::TFileProcessor(shared_ptr
<TProcessor
> processor
,
847 shared_ptr
<TProtocolFactory
> inputProtocolFactory
,
848 shared_ptr
<TProtocolFactory
> outputProtocolFactory
,
849 shared_ptr
<TFileReaderTransport
> inputTransport
):
850 processor_(processor
),
851 inputProtocolFactory_(inputProtocolFactory
),
852 outputProtocolFactory_(outputProtocolFactory
),
853 inputTransport_(inputTransport
) {
855 // default the output transport to a null transport (common case)
856 outputTransport_
= shared_ptr
<TNullTransport
>(new TNullTransport());
859 TFileProcessor::TFileProcessor(shared_ptr
<TProcessor
> processor
,
860 shared_ptr
<TProtocolFactory
> protocolFactory
,
861 shared_ptr
<TFileReaderTransport
> inputTransport
,
862 shared_ptr
<TTransport
> outputTransport
):
863 processor_(processor
),
864 inputProtocolFactory_(protocolFactory
),
865 outputProtocolFactory_(protocolFactory
),
866 inputTransport_(inputTransport
),
867 outputTransport_(outputTransport
) {};
869 void TFileProcessor::process(uint32_t numEvents
, bool tail
) {
870 shared_ptr
<TProtocol
> inputProtocol
= inputProtocolFactory_
->getProtocol(inputTransport_
);
871 shared_ptr
<TProtocol
> outputProtocol
= outputProtocolFactory_
->getProtocol(outputTransport_
);
873 // set the read timeout to 0 if tailing is required
874 int32_t oldReadTimeout
= inputTransport_
->getReadTimeout();
876 // save old read timeout so it can be restored
877 inputTransport_
->setReadTimeout(TFileTransport::TAIL_READ_TIMEOUT
);
880 uint32_t numProcessed
= 0;
882 // bad form to use exceptions for flow control but there is really
883 // no other way around it
885 processor_
->process(inputProtocol
, outputProtocol
);
887 if ( (numEvents
> 0) && (numProcessed
== numEvents
)) {
890 } catch (TEOFException
& teof
) {
894 } catch (TException
&te
) {
895 cerr
<< te
.what() << endl
;
900 // restore old read timeout
902 inputTransport_
->setReadTimeout(oldReadTimeout
);
907 void TFileProcessor::processChunk() {
908 shared_ptr
<TProtocol
> inputProtocol
= inputProtocolFactory_
->getProtocol(inputTransport_
);
909 shared_ptr
<TProtocol
> outputProtocol
= outputProtocolFactory_
->getProtocol(outputTransport_
);
911 uint32_t curChunk
= inputTransport_
->getCurChunk();
914 // bad form to use exceptions for flow control but there is really
915 // no other way around it
917 processor_
->process(inputProtocol
, outputProtocol
);
918 if (curChunk
!= inputTransport_
->getCurChunk()) {
921 } catch (TEOFException
& teof
) {
923 } catch (TException
&te
) {
924 cerr
<< te
.what() << endl
;
930 }}} // facebook::thrift::transport