r1359@opsdev009 (orig r71576): mcslee | 2007-11-27 16:12:11 -0800
[amiethrift.git] / lib / cpp / src / transport / TFileTransport.cpp
blob8b5571549ae79ade4c046d491ce15634dbb37535
1 // Copyright (c) 2006- Facebook
2 // Distributed under the Thrift Software License
3 //
4 // See accompanying file LICENSE or visit the Thrift site at:
5 // http://developers.facebook.com/thrift/
7 #ifdef HAVE_CONFIG_H
8 #include "config.h"
9 #endif
11 #include "TFileTransport.h"
12 #include "TTransportUtils.h"
14 #include <pthread.h>
15 #ifndef HAVE_CLOCK_GETTIME
16 #include <time.h>
17 #else
18 #include <sys/time.h>
19 #endif
20 #include <fcntl.h>
21 #include <errno.h>
22 #include <unistd.h>
23 #ifdef HAVE_STRINGS_H
24 #include <strings.h>
25 #endif
26 #include <iostream>
27 #include <sys/stat.h>
29 namespace facebook { namespace thrift { namespace transport {
31 using boost::shared_ptr;
32 using namespace std;
33 using namespace facebook::thrift::protocol;
35 #ifndef HAVE_CLOCK_GETTIME
37 /**
38 * Fake clock_gettime for systems like darwin
40 * @author Paul Querna <pquerna@apache.org>
42 #define CLOCK_REALTIME 0
43 static int clock_gettime(int clk_id /*ignored*/, struct timespec *tp) {
44 struct timeval now;
46 int rv = gettimeofday(&now, NULL);
47 if (rv != 0) {
48 return rv;
51 tp->tv_sec = now.tv_sec;
52 tp->tv_nsec = now.tv_usec * 1000;
53 return 0;
55 #endif
57 TFileTransport::TFileTransport(string path, bool readOnly)
58 : readState_()
59 , readBuff_(NULL)
60 , currentEvent_(NULL)
61 , readBuffSize_(DEFAULT_READ_BUFF_SIZE)
62 , readTimeout_(NO_TAIL_READ_TIMEOUT)
63 , chunkSize_(DEFAULT_CHUNK_SIZE)
64 , eventBufferSize_(DEFAULT_EVENT_BUFFER_SIZE)
65 , flushMaxUs_(DEFAULT_FLUSH_MAX_US)
66 , flushMaxBytes_(DEFAULT_FLUSH_MAX_BYTES)
67 , maxEventSize_(DEFAULT_MAX_EVENT_SIZE)
68 , maxCorruptedEvents_(DEFAULT_MAX_CORRUPTED_EVENTS)
69 , eofSleepTime_(DEFAULT_EOF_SLEEP_TIME_US)
70 , corruptedEventSleepTime_(DEFAULT_CORRUPTED_SLEEP_TIME_US)
71 , writerThreadId_(0)
72 , dequeueBuffer_(NULL)
73 , enqueueBuffer_(NULL)
74 , closing_(false)
75 , forceFlush_(false)
76 , filename_(path)
77 , fd_(0)
78 , bufferAndThreadInitialized_(false)
79 , offset_(0)
80 , lastBadChunk_(0)
81 , numCorruptedEventsInChunk_(0)
82 , readOnly_(readOnly)
84 // initialize all the condition vars/mutexes
85 pthread_mutex_init(&mutex_, NULL);
86 pthread_cond_init(&notFull_, NULL);
87 pthread_cond_init(&notEmpty_, NULL);
88 pthread_cond_init(&flushed_, NULL);
90 openLogFile();
93 void TFileTransport::resetOutputFile(int fd, string filename, int64_t offset) {
94 filename_ = filename;
95 offset_ = offset;
97 // check if current file is still open
98 if (fd_ > 0) {
99 // flush any events in the queue
100 flush();
101 fprintf(stderr, "error, current file (%s) not closed\n", filename_.c_str());
102 if (-1 == ::close(fd_)) {
103 GlobalOutput("TFileTransport: error in file close");
104 throw TTransportException("TFileTransport: error in file close");
108 if (fd) {
109 fd_ = fd;
110 } else {
111 // open file if the input fd is 0
112 openLogFile();
117 TFileTransport::~TFileTransport() {
118 // flush the buffer if a writer thread is active
119 if (writerThreadId_ > 0) {
120 // reduce the flush timeout so that closing is quicker
121 setFlushMaxUs(300*1000);
123 // flush output buffer
124 flush();
126 // set state to closing
127 closing_ = true;
129 // TODO: make sure event queue is empty
130 // currently only the write buffer is flushed
131 // we dont actually wait until the queue is empty. This shouldn't be a big
132 // deal in the common case because writing is quick
134 pthread_join(writerThreadId_, NULL);
135 writerThreadId_ = 0;
138 if (dequeueBuffer_) {
139 delete dequeueBuffer_;
140 dequeueBuffer_ = NULL;
143 if (enqueueBuffer_) {
144 delete enqueueBuffer_;
145 enqueueBuffer_ = NULL;
148 if (readBuff_) {
149 delete readBuff_;
150 readBuff_ = NULL;
153 if (currentEvent_) {
154 delete currentEvent_;
155 currentEvent_ = NULL;
158 // close logfile
159 if (fd_ > 0) {
160 if(-1 == ::close(fd_)) {
161 GlobalOutput("TFileTransport: error in file close");
166 bool TFileTransport::initBufferAndWriteThread() {
167 if (bufferAndThreadInitialized_) {
168 T_ERROR("Trying to double-init TFileTransport");
169 return false;
172 if (writerThreadId_ == 0) {
173 if (pthread_create(&writerThreadId_, NULL, startWriterThread, (void *)this) != 0) {
174 T_ERROR("Could not create writer thread");
175 return false;
179 dequeueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
180 enqueueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
181 bufferAndThreadInitialized_ = true;
183 return true;
186 void TFileTransport::write(const uint8_t* buf, uint32_t len) {
187 if (readOnly_) {
188 throw TTransportException("TFileTransport: attempting to write to file opened readonly");
191 enqueueEvent(buf, len, false);
194 void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush) {
195 // can't enqueue more events if file is going to close
196 if (closing_) {
197 return;
200 // make sure that event size is valid
201 if ( (maxEventSize_ > 0) && (eventLen > maxEventSize_) ) {
202 T_ERROR("msg size is greater than max event size: %u > %u\n", eventLen, maxEventSize_);
203 return;
206 if (eventLen == 0) {
207 T_ERROR("cannot enqueue an empty event");
208 return;
211 eventInfo* toEnqueue = new eventInfo();
212 toEnqueue->eventBuff_ = (uint8_t *)malloc((sizeof(uint8_t) * eventLen) + 4);
213 // first 4 bytes is the event length
214 memcpy(toEnqueue->eventBuff_, (void*)(&eventLen), 4);
215 // actual event contents
216 memcpy(toEnqueue->eventBuff_ + 4, buf, eventLen);
217 toEnqueue->eventSize_ = eventLen + 4;
219 // lock mutex
220 pthread_mutex_lock(&mutex_);
222 // make sure that enqueue buffer is initialized and writer thread is running
223 if (!bufferAndThreadInitialized_) {
224 if (!initBufferAndWriteThread()) {
225 delete toEnqueue;
226 pthread_mutex_unlock(&mutex_);
227 return;
231 // Can't enqueue while buffer is full
232 while (enqueueBuffer_->isFull()) {
233 pthread_cond_wait(&notFull_, &mutex_);
236 // add to the buffer
237 if (!enqueueBuffer_->addEvent(toEnqueue)) {
238 delete toEnqueue;
239 pthread_mutex_unlock(&mutex_);
240 return;
243 // signal anybody who's waiting for the buffer to be non-empty
244 pthread_cond_signal(&notEmpty_);
246 if (blockUntilFlush) {
247 pthread_cond_wait(&flushed_, &mutex_);
250 // this really should be a loop where it makes sure it got flushed
251 // because condition variables can get triggered by the os for no reason
252 // it is probably a non-factor for the time being
253 pthread_mutex_unlock(&mutex_);
256 bool TFileTransport::swapEventBuffers(struct timespec* deadline) {
257 pthread_mutex_lock(&mutex_);
258 if (deadline != NULL) {
259 // if we were handed a deadline time struct, do a timed wait
260 pthread_cond_timedwait(&notEmpty_, &mutex_, deadline);
261 } else {
262 // just wait until the buffer gets an item
263 pthread_cond_wait(&notEmpty_, &mutex_);
266 bool swapped = false;
268 // could be empty if we timed out
269 if (!enqueueBuffer_->isEmpty()) {
270 TFileTransportBuffer *temp = enqueueBuffer_;
271 enqueueBuffer_ = dequeueBuffer_;
272 dequeueBuffer_ = temp;
274 swapped = true;
277 // unlock the mutex and signal if required
278 pthread_mutex_unlock(&mutex_);
280 if (swapped) {
281 pthread_cond_signal(&notFull_);
284 return swapped;
288 void TFileTransport::writerThread() {
289 // open file if it is not open
290 if(!fd_) {
291 openLogFile();
294 // set the offset to the correct value (EOF)
295 seekToEnd();
297 // Figure out the next time by which a flush must take place
299 struct timespec ts_next_flush;
300 getNextFlushTime(&ts_next_flush);
301 uint32_t unflushed = 0;
303 while(1) {
304 // this will only be true when the destructor is being invoked
305 if(closing_) {
306 // empty out both the buffers
307 if (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) {
308 if (-1 == ::close(fd_)) {
309 GlobalOutput("TFileTransport: error in close");
310 throw TTransportException("TFileTransport: error in file close");
312 // just be safe and sync to disk
313 fsync(fd_);
314 fd_ = 0;
315 pthread_exit(NULL);
316 return;
320 if (swapEventBuffers(&ts_next_flush)) {
321 eventInfo* outEvent;
322 while (NULL != (outEvent = dequeueBuffer_->getNext())) {
323 if (!outEvent) {
324 T_DEBUG_L(1, "Got an empty event");
325 return;
328 // sanity check on event
329 if ((maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) {
330 T_ERROR("msg size is greater than max event size: %u > %u\n", outEvent->eventSize_, maxEventSize_);
331 continue;
334 // If chunking is required, then make sure that msg does not cross chunk boundary
335 if ((outEvent->eventSize_ > 0) && (chunkSize_ != 0)) {
337 // event size must be less than chunk size
338 if(outEvent->eventSize_ > chunkSize_) {
339 T_ERROR("TFileTransport: event size(%u) is greater than chunk size(%u): skipping event",
340 outEvent->eventSize_, chunkSize_);
341 continue;
344 int64_t chunk1 = offset_/chunkSize_;
345 int64_t chunk2 = (offset_ + outEvent->eventSize_ - 1)/chunkSize_;
347 // if adding this event will cross a chunk boundary, pad the chunk with zeros
348 if (chunk1 != chunk2) {
349 // refetch the offset to keep in sync
350 offset_ = lseek(fd_, 0, SEEK_CUR);
351 int32_t padding = (int32_t)(chunk2*chunkSize_ - offset_);
353 // sanity check
354 if (padding <= 0) {
355 T_DEBUG("Padding is empty, skipping event");
356 continue;
358 if (padding > (int32_t)chunkSize_) {
359 T_DEBUG("padding is larger than chunk size, skipping event");
360 continue;
362 uint8_t zeros[padding];
363 bzero(zeros, padding);
364 //T_DEBUG_L(1, "Adding padding of %u bytes at %lu (to reach chunk %lld)",
365 //padding, offset_, chunk2);
366 if (-1 == ::write(fd_, zeros, padding)) {
367 GlobalOutput("TFileTransport: error while padding zeros");
368 throw TTransportException("TFileTransport: error while padding zeros");
370 unflushed += padding;
371 offset_ += padding;
375 // write the dequeued event to the file
376 if (outEvent->eventSize_ > 0) {
377 if (-1 == ::write(fd_, outEvent->eventBuff_, outEvent->eventSize_)) {
378 GlobalOutput("TFileTransport: error while writing event");
379 throw TTransportException("TFileTransport: error while writing event");
382 unflushed += outEvent->eventSize_;
383 offset_ += outEvent->eventSize_;
386 dequeueBuffer_->reset();
389 bool flushTimeElapsed = false;
390 struct timespec current_time;
391 clock_gettime(CLOCK_REALTIME, &current_time);
393 if (current_time.tv_sec > ts_next_flush.tv_sec ||
394 (current_time.tv_sec == ts_next_flush.tv_sec && current_time.tv_nsec > ts_next_flush.tv_nsec)) {
395 flushTimeElapsed = true;
396 getNextFlushTime(&ts_next_flush);
399 // couple of cases from which a flush could be triggered
400 if ((flushTimeElapsed && unflushed > 0) ||
401 unflushed > flushMaxBytes_ ||
402 forceFlush_) {
404 // sync (force flush) file to disk
405 fsync(fd_);
406 unflushed = 0;
408 // notify anybody waiting for flush completion
409 forceFlush_ = false;
410 pthread_cond_broadcast(&flushed_);
415 void TFileTransport::flush() {
416 // file must be open for writing for any flushing to take place
417 if (writerThreadId_ <= 0) {
418 return;
420 // wait for flush to take place
421 pthread_mutex_lock(&mutex_);
423 forceFlush_ = true;
425 while (forceFlush_) {
426 pthread_cond_wait(&flushed_, &mutex_);
429 pthread_mutex_unlock(&mutex_);
433 uint32_t TFileTransport::readAll(uint8_t* buf, uint32_t len) {
434 uint32_t have = 0;
435 uint32_t get = 0;
437 while (have < len) {
438 get = read(buf+have, len-have);
439 if (get <= 0) {
440 throw TEOFException();
442 have += get;
445 return have;
448 uint32_t TFileTransport::read(uint8_t* buf, uint32_t len) {
449 // check if there an event is ready to be read
450 if (!currentEvent_) {
451 currentEvent_ = readEvent();
454 // did not manage to read an event from the file. This could have happened
455 // if the timeout expired or there was some other error
456 if (!currentEvent_) {
457 return 0;
460 // read as much of the current event as possible
461 int32_t remaining = currentEvent_->eventSize_ - currentEvent_->eventBuffPos_;
462 if (remaining <= (int32_t)len) {
463 // copy over anything thats remaining
464 if (remaining > 0) {
465 memcpy(buf,
466 currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_,
467 remaining);
469 delete(currentEvent_);
470 currentEvent_ = NULL;
471 return remaining;
474 // read as much as possible
475 memcpy(buf, currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, len);
476 currentEvent_->eventBuffPos_ += len;
477 return len;
480 eventInfo* TFileTransport::readEvent() {
481 int readTries = 0;
483 if (!readBuff_) {
484 readBuff_ = new uint8_t[readBuffSize_];
487 while (1) {
488 // read from the file if read buffer is exhausted
489 if (readState_.bufferPtr_ == readState_.bufferLen_) {
490 // advance the offset pointer
491 offset_ += readState_.bufferLen_;
492 readState_.bufferLen_ = ::read(fd_, readBuff_, readBuffSize_);
493 // if (readState_.bufferLen_) {
494 // T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_);
495 // }
496 readState_.bufferPtr_ = 0;
497 readState_.lastDispatchPtr_ = 0;
499 // read error
500 if (readState_.bufferLen_ == -1) {
501 readState_.resetAllValues();
502 GlobalOutput("TFileTransport: error while reading from file");
503 throw TTransportException("TFileTransport: error while reading from file");
504 } else if (readState_.bufferLen_ == 0) { // EOF
505 // wait indefinitely if there is no timeout
506 if (readTimeout_ == TAIL_READ_TIMEOUT) {
507 usleep(eofSleepTime_);
508 continue;
509 } else if (readTimeout_ == NO_TAIL_READ_TIMEOUT) {
510 // reset state
511 readState_.resetState(0);
512 return NULL;
513 } else if (readTimeout_ > 0) {
514 // timeout already expired once
515 if (readTries > 0) {
516 readState_.resetState(0);
517 return NULL;
518 } else {
519 usleep(readTimeout_ * 1000);
520 readTries++;
521 continue;
527 readTries = 0;
529 // attempt to read an event from the buffer
530 while(readState_.bufferPtr_ < readState_.bufferLen_) {
531 if (readState_.readingSize_) {
532 if(readState_.eventSizeBuffPos_ == 0) {
533 if ( (offset_ + readState_.bufferPtr_)/chunkSize_ !=
534 ((offset_ + readState_.bufferPtr_ + 3)/chunkSize_)) {
535 // skip one byte towards chunk boundary
536 // T_DEBUG_L(1, "Skipping a byte");
537 readState_.bufferPtr_++;
538 continue;
542 readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++] =
543 readBuff_[readState_.bufferPtr_++];
544 if (readState_.eventSizeBuffPos_ == 4) {
545 // 0 length event indicates padding
546 if (*((uint32_t *)(readState_.eventSizeBuff_)) == 0) {
547 // T_DEBUG_L(1, "Got padding");
548 readState_.resetState(readState_.lastDispatchPtr_);
549 continue;
551 // got a valid event
552 readState_.readingSize_ = false;
553 if (readState_.event_) {
554 delete(readState_.event_);
556 readState_.event_ = new eventInfo();
557 readState_.event_->eventSize_ = *((uint32_t *)(readState_.eventSizeBuff_));
559 // check if the event is corrupted and perform recovery if required
560 if (isEventCorrupted()) {
561 performRecovery();
562 // start from the top
563 break;
566 } else {
567 if (!readState_.event_->eventBuff_) {
568 readState_.event_->eventBuff_ = new uint8_t[readState_.event_->eventSize_];
569 readState_.event_->eventBuffPos_ = 0;
571 // take either the entire event or the remaining bytes in the buffer
572 int reclaimBuffer = min((uint32_t)(readState_.bufferLen_ - readState_.bufferPtr_),
573 readState_.event_->eventSize_ - readState_.event_->eventBuffPos_);
575 // copy data from read buffer into event buffer
576 memcpy(readState_.event_->eventBuff_ + readState_.event_->eventBuffPos_,
577 readBuff_ + readState_.bufferPtr_,
578 reclaimBuffer);
580 // increment position ptrs
581 readState_.event_->eventBuffPos_ += reclaimBuffer;
582 readState_.bufferPtr_ += reclaimBuffer;
584 // check if the event has been read in full
585 if (readState_.event_->eventBuffPos_ == readState_.event_->eventSize_) {
586 // set the completed event to the current event
587 eventInfo* completeEvent = readState_.event_;
588 completeEvent->eventBuffPos_ = 0;
590 readState_.event_ = NULL;
591 readState_.resetState(readState_.bufferPtr_);
593 // exit criteria
594 return completeEvent;
602 bool TFileTransport::isEventCorrupted() {
603 // an error is triggered if:
604 if ( (maxEventSize_ > 0) && (readState_.event_->eventSize_ > maxEventSize_)) {
605 // 1. Event size is larger than user-speficied max-event size
606 T_ERROR("Read corrupt event. Event size(%u) greater than max event size (%u)",
607 readState_.event_->eventSize_, maxEventSize_);
608 return true;
609 } else if (readState_.event_->eventSize_ > chunkSize_) {
610 // 2. Event size is larger than chunk size
611 T_ERROR("Read corrupt event. Event size(%u) greater than chunk size (%u)",
612 readState_.event_->eventSize_, chunkSize_);
613 return true;
614 } else if( ((offset_ + readState_.bufferPtr_ - 4)/chunkSize_) !=
615 ((offset_ + readState_.bufferPtr_ + readState_.event_->eventSize_ - 1)/chunkSize_) ) {
616 // 3. size indicates that event crosses chunk boundary
617 T_ERROR("Read corrupt event. Event crosses chunk boundary. Event size:%u Offset:%ld",
618 readState_.event_->eventSize_, offset_ + readState_.bufferPtr_ + 4);
619 return true;
622 return false;
625 void TFileTransport::performRecovery() {
626 // perform some kickass recovery
627 uint32_t curChunk = getCurChunk();
628 if (lastBadChunk_ == curChunk) {
629 numCorruptedEventsInChunk_++;
630 } else {
631 lastBadChunk_ = curChunk;
632 numCorruptedEventsInChunk_ = 1;
635 if (numCorruptedEventsInChunk_ < maxCorruptedEvents_) {
636 // maybe there was an error in reading the file from disk
637 // seek to the beginning of chunk and try again
638 seekToChunk(curChunk);
639 } else {
641 // just skip ahead to the next chunk if we not already at the last chunk
642 if (curChunk != (getNumChunks() - 1)) {
643 seekToChunk(curChunk + 1);
644 } else if (readTimeout_ == TAIL_READ_TIMEOUT) {
645 // if tailing the file, wait until there is enough data to start
646 // the next chunk
647 while(curChunk == (getNumChunks() - 1)) {
648 usleep(DEFAULT_CORRUPTED_SLEEP_TIME_US);
650 seekToChunk(curChunk + 1);
651 } else {
652 // pretty hosed at this stage, rewind the file back to the last successful
653 // point and punt on the error
654 readState_.resetState(readState_.lastDispatchPtr_);
655 currentEvent_ = NULL;
656 char errorMsg[1024];
657 sprintf(errorMsg, "TFileTransport: log file corrupted at offset: %lu",
658 offset_ + readState_.lastDispatchPtr_);
659 GlobalOutput(errorMsg);
660 throw TTransportException(errorMsg);
666 void TFileTransport::seekToChunk(int32_t chunk) {
667 if (fd_ <= 0) {
668 throw TTransportException("File not open");
671 int32_t numChunks = getNumChunks();
673 // file is empty, seeking to chunk is pointless
674 if (numChunks == 0) {
675 return;
678 // negative indicates reverse seek (from the end)
679 if (chunk < 0) {
680 chunk += numChunks;
683 // too large a value for reverse seek, just seek to beginning
684 if (chunk < 0) {
685 T_DEBUG("Incorrect value for reverse seek. Seeking to beginning...", chunk)
686 chunk = 0;
689 // cannot seek past EOF
690 bool seekToEnd = false;
691 uint32_t minEndOffset = 0;
692 if (chunk >= numChunks) {
693 T_DEBUG("Trying to seek past EOF. Seeking to EOF instead...");
694 seekToEnd = true;
695 chunk = numChunks - 1;
696 // this is the min offset to process events till
697 minEndOffset = lseek(fd_, 0, SEEK_END);
700 off_t newOffset = off_t(chunk) * chunkSize_;
701 offset_ = lseek(fd_, newOffset, SEEK_SET);
702 readState_.resetAllValues();
703 currentEvent_ = NULL;
704 if (offset_ == -1) {
705 GlobalOutput("TFileTransport: lseek error in seekToChunk");
706 throw TTransportException("TFileTransport: lseek error in seekToChunk");
709 // seek to EOF if user wanted to go to last chunk
710 if (seekToEnd) {
711 uint32_t oldReadTimeout = getReadTimeout();
712 setReadTimeout(NO_TAIL_READ_TIMEOUT);
713 // keep on reading unti the last event at point of seekChunk call
714 while( readEvent() && ((offset_ + readState_.bufferPtr_) < minEndOffset)) {};
715 setReadTimeout(oldReadTimeout);
720 void TFileTransport::seekToEnd() {
721 seekToChunk(getNumChunks());
724 uint32_t TFileTransport::getNumChunks() {
725 if (fd_ <= 0) {
726 return 0;
728 struct stat f_info;
729 fstat(fd_, &f_info);
730 if (f_info.st_size > 0) {
731 return ((f_info.st_size)/chunkSize_) + 1;
734 // empty file has no chunks
735 return 0;
738 uint32_t TFileTransport::getCurChunk() {
739 return offset_/chunkSize_;
742 // Utility Functions
743 void TFileTransport::openLogFile() {
744 mode_t mode = readOnly_ ? S_IRUSR | S_IRGRP | S_IROTH : S_IRUSR | S_IWUSR| S_IRGRP | S_IROTH;
745 int flags = readOnly_ ? O_RDONLY : O_RDWR | O_CREAT | O_APPEND;
746 fd_ = ::open(filename_.c_str(), flags, mode);
747 offset_ = 0;
749 // make sure open call was successful
750 if(fd_ == -1) {
751 char errorMsg[1024];
752 sprintf(errorMsg, "TFileTransport: Could not open file: %s", filename_.c_str());
753 GlobalOutput(errorMsg);
754 throw TTransportException(errorMsg);
759 void TFileTransport::getNextFlushTime(struct timespec* ts_next_flush) {
760 clock_gettime(CLOCK_REALTIME, ts_next_flush);
761 ts_next_flush->tv_nsec += (flushMaxUs_ % 1000000) * 1000;
762 if (ts_next_flush->tv_nsec > 1000000000) {
763 ts_next_flush->tv_nsec -= 1000000000;
764 ts_next_flush->tv_sec += 1;
766 ts_next_flush->tv_sec += flushMaxUs_ / 1000000;
769 TFileTransportBuffer::TFileTransportBuffer(uint32_t size)
770 : bufferMode_(WRITE)
771 , writePoint_(0)
772 , readPoint_(0)
773 , size_(size)
775 buffer_ = new eventInfo*[size];
778 TFileTransportBuffer::~TFileTransportBuffer() {
779 if (buffer_) {
780 for (uint32_t i = 0; i < writePoint_; i++) {
781 delete buffer_[i];
783 delete[] buffer_;
784 buffer_ = NULL;
788 bool TFileTransportBuffer::addEvent(eventInfo *event) {
789 if (bufferMode_ == READ) {
790 GlobalOutput("Trying to write to a buffer in read mode");
792 if (writePoint_ < size_) {
793 buffer_[writePoint_++] = event;
794 return true;
795 } else {
796 // buffer is full
797 return false;
801 eventInfo* TFileTransportBuffer::getNext() {
802 if (bufferMode_ == WRITE) {
803 bufferMode_ = READ;
805 if (readPoint_ < writePoint_) {
806 return buffer_[readPoint_++];
807 } else {
808 // no more entries
809 return NULL;
813 void TFileTransportBuffer::reset() {
814 if (bufferMode_ == WRITE || writePoint_ > readPoint_) {
815 T_DEBUG("Resetting a buffer with unread entries");
817 // Clean up the old entries
818 for (uint32_t i = 0; i < writePoint_; i++) {
819 delete buffer_[i];
821 bufferMode_ = WRITE;
822 writePoint_ = 0;
823 readPoint_ = 0;
826 bool TFileTransportBuffer::isFull() {
827 return writePoint_ == size_;
830 bool TFileTransportBuffer::isEmpty() {
831 return writePoint_ == 0;
834 TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
835 shared_ptr<TProtocolFactory> protocolFactory,
836 shared_ptr<TFileReaderTransport> inputTransport):
837 processor_(processor),
838 inputProtocolFactory_(protocolFactory),
839 outputProtocolFactory_(protocolFactory),
840 inputTransport_(inputTransport) {
842 // default the output transport to a null transport (common case)
843 outputTransport_ = shared_ptr<TNullTransport>(new TNullTransport());
846 TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
847 shared_ptr<TProtocolFactory> inputProtocolFactory,
848 shared_ptr<TProtocolFactory> outputProtocolFactory,
849 shared_ptr<TFileReaderTransport> inputTransport):
850 processor_(processor),
851 inputProtocolFactory_(inputProtocolFactory),
852 outputProtocolFactory_(outputProtocolFactory),
853 inputTransport_(inputTransport) {
855 // default the output transport to a null transport (common case)
856 outputTransport_ = shared_ptr<TNullTransport>(new TNullTransport());
859 TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
860 shared_ptr<TProtocolFactory> protocolFactory,
861 shared_ptr<TFileReaderTransport> inputTransport,
862 shared_ptr<TTransport> outputTransport):
863 processor_(processor),
864 inputProtocolFactory_(protocolFactory),
865 outputProtocolFactory_(protocolFactory),
866 inputTransport_(inputTransport),
867 outputTransport_(outputTransport) {};
869 void TFileProcessor::process(uint32_t numEvents, bool tail) {
870 shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_);
871 shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_);
873 // set the read timeout to 0 if tailing is required
874 int32_t oldReadTimeout = inputTransport_->getReadTimeout();
875 if (tail) {
876 // save old read timeout so it can be restored
877 inputTransport_->setReadTimeout(TFileTransport::TAIL_READ_TIMEOUT);
880 uint32_t numProcessed = 0;
881 while(1) {
882 // bad form to use exceptions for flow control but there is really
883 // no other way around it
884 try {
885 processor_->process(inputProtocol, outputProtocol);
886 numProcessed++;
887 if ( (numEvents > 0) && (numProcessed == numEvents)) {
888 return;
890 } catch (TEOFException& teof) {
891 if (!tail) {
892 break;
894 } catch (TException &te) {
895 cerr << te.what() << endl;
896 break;
900 // restore old read timeout
901 if (tail) {
902 inputTransport_->setReadTimeout(oldReadTimeout);
907 void TFileProcessor::processChunk() {
908 shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_);
909 shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_);
911 uint32_t curChunk = inputTransport_->getCurChunk();
913 while(1) {
914 // bad form to use exceptions for flow control but there is really
915 // no other way around it
916 try {
917 processor_->process(inputProtocol, outputProtocol);
918 if (curChunk != inputTransport_->getCurChunk()) {
919 break;
921 } catch (TEOFException& teof) {
922 break;
923 } catch (TException &te) {
924 cerr << te.what() << endl;
925 break;
930 }}} // facebook::thrift::transport