r1359@opsdev009 (orig r71576): mcslee | 2007-11-27 16:12:11 -0800
[amiethrift.git] / lib / cpp / src / transport / TTransportUtils.h
blob38d27871ca0021191eb95fb61cc79d85b29f73ae
1 // Copyright (c) 2006- Facebook
2 // Distributed under the Thrift Software License
3 //
4 // See accompanying file LICENSE or visit the Thrift site at:
5 // http://developers.facebook.com/thrift/
7 #ifndef _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_
8 #define _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_ 1
10 #include <string>
11 #include <algorithm>
12 #include <transport/TTransport.h>
13 #include <transport/TFileTransport.h>
15 namespace facebook { namespace thrift { namespace transport {
17 /**
18 * The null transport is a dummy transport that doesn't actually do anything.
19 * It's sort of an analogy to /dev/null, you can never read anything from it
20 * and it will let you write anything you want to it, though it won't actually
21 * go anywhere.
23 * @author Mark Slee <mcslee@facebook.com>
25 class TNullTransport : public TTransport {
26 public:
27 TNullTransport() {}
29 ~TNullTransport() {}
31 bool isOpen() {
32 return true;
35 void open() {}
37 void write(const uint8_t* buf, uint32_t len) {
38 return;
44 /**
45 * Buffered transport. For reads it will read more data than is requested
46 * and will serve future data out of a local buffer. For writes, data is
47 * stored to an in memory buffer before being written out.
49 * @author Mark Slee <mcslee@facebook.com>
51 class TBufferedTransport : public TTransport {
52 public:
53 TBufferedTransport(boost::shared_ptr<TTransport> transport) :
54 transport_(transport),
55 rBufSize_(512), rPos_(0), rLen_(0),
56 wBufSize_(512), wLen_(0) {
57 rBuf_ = new uint8_t[rBufSize_];
58 wBuf_ = new uint8_t[wBufSize_];
61 TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz) :
62 transport_(transport),
63 rBufSize_(sz), rPos_(0), rLen_(0),
64 wBufSize_(sz), wLen_(0) {
65 rBuf_ = new uint8_t[rBufSize_];
66 wBuf_ = new uint8_t[wBufSize_];
69 TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz) :
70 transport_(transport),
71 rBufSize_(rsz), rPos_(0), rLen_(0),
72 wBufSize_(wsz), wLen_(0) {
73 rBuf_ = new uint8_t[rBufSize_];
74 wBuf_ = new uint8_t[wBufSize_];
77 ~TBufferedTransport() {
78 delete [] rBuf_;
79 delete [] wBuf_;
82 bool isOpen() {
83 return transport_->isOpen();
86 bool peek() {
87 if (rPos_ >= rLen_) {
88 rLen_ = transport_->read(rBuf_, rBufSize_);
89 rPos_ = 0;
91 return (rLen_ > rPos_);
94 void open() {
95 transport_->open();
98 void close() {
99 flush();
100 transport_->close();
103 uint32_t read(uint8_t* buf, uint32_t len);
105 void write(const uint8_t* buf, uint32_t len);
107 void flush();
109 bool borrow(uint8_t* buf, uint32_t len);
111 void consume(uint32_t len);
113 boost::shared_ptr<TTransport> getUnderlyingTransport() {
114 return transport_;
117 protected:
118 boost::shared_ptr<TTransport> transport_;
119 uint8_t* rBuf_;
120 uint32_t rBufSize_;
121 uint32_t rPos_;
122 uint32_t rLen_;
124 uint8_t* wBuf_;
125 uint32_t wBufSize_;
126 uint32_t wLen_;
130 * Wraps a transport into a buffered one.
132 * @author Mark Slee <mcslee@facebook.com>
134 class TBufferedTransportFactory : public TTransportFactory {
135 public:
136 TBufferedTransportFactory() {}
138 virtual ~TBufferedTransportFactory() {}
141 * Wraps the transport into a buffered one.
143 virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
144 return boost::shared_ptr<TTransport>(new TBufferedTransport(trans));
150 * Framed transport. All writes go into an in-memory buffer until flush is
151 * called, at which point the transport writes the length of the entire
152 * binary chunk followed by the data payload. This allows the receiver on the
153 * other end to always do fixed-length reads.
155 * @author Mark Slee <mcslee@facebook.com>
157 class TFramedTransport : public TTransport {
158 public:
159 TFramedTransport(boost::shared_ptr<TTransport> transport) :
160 transport_(transport),
161 rPos_(0),
162 rLen_(0),
163 read_(true),
164 wBufSize_(512),
165 wLen_(0),
166 write_(true) {
167 rBuf_ = NULL;
168 wBuf_ = new uint8_t[wBufSize_];
171 TFramedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz) :
172 transport_(transport),
173 rPos_(0),
174 rLen_(0),
175 read_(true),
176 wBufSize_(sz),
177 wLen_(0),
178 write_(true) {
179 rBuf_ = NULL;
180 wBuf_ = new uint8_t[wBufSize_];
183 ~TFramedTransport() {
184 if (rBuf_ != NULL) {
185 delete [] rBuf_;
187 if (wBuf_ != NULL) {
188 delete [] wBuf_;
192 void setRead(bool read) {
193 read_ = read;
196 void setWrite(bool write) {
197 write_ = write;
200 void open() {
201 transport_->open();
204 bool isOpen() {
205 return transport_->isOpen();
208 bool peek() {
209 if (rPos_ < rLen_) {
210 return true;
212 return transport_->peek();
215 void close() {
216 if (wLen_ > 0) {
217 flush();
219 transport_->close();
222 uint32_t read(uint8_t* buf, uint32_t len);
224 void write(const uint8_t* buf, uint32_t len);
226 void flush();
228 bool borrow(uint8_t* buf, uint32_t len);
230 void consume(uint32_t len);
232 boost::shared_ptr<TTransport> getUnderlyingTransport() {
233 return transport_;
236 protected:
237 boost::shared_ptr<TTransport> transport_;
238 uint8_t* rBuf_;
239 uint32_t rPos_;
240 uint32_t rLen_;
241 bool read_;
243 uint8_t* wBuf_;
244 uint32_t wBufSize_;
245 uint32_t wLen_;
246 bool write_;
249 * Reads a frame of input from the underlying stream.
251 void readFrame();
255 * Wraps a transport into a framed one.
257 * @author Dave Simpson <dave@powerset.com>
259 class TFramedTransportFactory : public TTransportFactory {
260 public:
261 TFramedTransportFactory() {}
263 virtual ~TFramedTransportFactory() {}
266 * Wraps the transport into a framed one.
268 virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
269 return boost::shared_ptr<TTransport>(new TFramedTransport(trans));
276 * A memory buffer is a tranpsort that simply reads from and writes to an
277 * in memory buffer. Anytime you call write on it, the data is simply placed
278 * into a buffer, and anytime you call read, data is read from that buffer.
280 * The buffers are allocated using C constructs malloc,realloc, and the size
281 * doubles as necessary.
283 * @author Mark Slee <mcslee@facebook.com>
285 class TMemoryBuffer : public TTransport {
286 private:
288 // Common initialization done by all constructors.
289 void initCommon(uint8_t* buf, uint32_t size, bool owner, uint32_t wPos) {
290 if (buf == NULL && size != 0) {
291 assert(owner);
292 buf = (uint8_t*)malloc(size);
293 if (buf == NULL) {
294 throw TTransportException("Out of memory");
298 buffer_ = buf;
299 bufferSize_ = size;
300 owner_ = owner;
301 wPos_ = wPos;
302 rPos_ = 0;
305 public:
306 static const uint32_t defaultSize = 1024;
308 TMemoryBuffer() {
309 initCommon(NULL, defaultSize, true, 0);
312 TMemoryBuffer(uint32_t sz) {
313 initCommon(NULL, sz, true, 0);
316 // transferOwnership should be true if you want TMemoryBuffer to call free(buf).
317 TMemoryBuffer(uint8_t* buf, int sz, bool transferOwnership = false) {
318 initCommon(buf, sz, transferOwnership, sz);
321 // copy should be true if you want TMemoryBuffer to make a copy of the string.
322 // If you set copy to false, the string must not be destroyed before you are
323 // done with the TMemoryBuffer.
324 TMemoryBuffer(const std::string& str, bool copy = false) {
325 if (copy) {
326 initCommon(NULL, str.length(), true, 0);
327 this->write(reinterpret_cast<const uint8_t*>(str.data()), str.length());
328 } else {
329 // This first argument should be equivalent to the following:
330 // const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(str.data()))
331 initCommon((uint8_t*)str.data(), str.length(), false, str.length());
335 ~TMemoryBuffer() {
336 if (owner_) {
337 free(buffer_);
338 buffer_ = NULL;
342 bool isOpen() {
343 return true;
346 bool peek() {
347 return (rPos_ < wPos_);
350 void open() {}
352 void close() {}
354 void getBuffer(uint8_t** bufPtr, uint32_t* sz) {
355 *bufPtr = buffer_;
356 *sz = wPos_;
359 std::string getBufferAsString() {
360 if (buffer_ == NULL) {
361 return "";
363 return std::string((char*)buffer_, (std::string::size_type)wPos_);
366 void appendBufferToString(std::string& str) {
367 if (buffer_ == NULL) {
368 return;
370 str.append((char*)buffer_, wPos_);
373 void resetBuffer() {
374 wPos_ = 0;
375 rPos_ = 0;
378 // transferOwnership should be true if you want TMemoryBuffer to call free(buf).
379 void resetBuffer(uint8_t* buf, uint32_t sz, bool transferOwnership = false) {
380 if (owner_) {
381 free(buffer_);
383 owner_ = transferOwnership;
384 buffer_ = buf;
385 bufferSize_ = sz;
386 wPos_ = sz;
387 rPos_ = 0;
390 // See the constructor that takes a string.
391 void resetFromString(const std::string& str, bool copy = false) {
392 if (copy) {
393 uint8_t* buf = (uint8_t*)malloc(str.length());
394 if (buf == NULL) {
395 throw TTransportException("Out of memory");
397 std::copy(str.begin(), str.end(), buf);
398 resetBuffer(buf, str.length(), true);
399 } else {
400 // See the above comment about const_cast.
401 resetBuffer((uint8_t*)str.data(), str.length(), false);
405 uint32_t read(uint8_t* buf, uint32_t len);
407 std::string readAsString(uint32_t len) {
408 std::string str;
409 (void)readAppendToString(str, len);
410 return str;
413 uint32_t readAppendToString(std::string& str, uint32_t len);
415 void readEnd() {
416 if (rPos_ == wPos_) {
417 resetBuffer();
421 void write(const uint8_t* buf, uint32_t len);
423 uint32_t available() {
424 return wPos_ - rPos_;
427 bool borrow(uint8_t* buf, uint32_t len);
429 void consume(uint32_t len);
431 private:
432 // Data buffer
433 uint8_t* buffer_;
435 // Allocated buffer size
436 uint32_t bufferSize_;
438 // Where the write is at
439 uint32_t wPos_;
441 // Where the reader is at
442 uint32_t rPos_;
444 // Is this object the owner of the buffer?
445 bool owner_;
450 * TPipedTransport. This transport allows piping of a request from one
451 * transport to another either when readEnd() or writeEnd(). The typical
452 * use case for this is to log a request or a reply to disk.
453 * The underlying buffer expands to a keep a copy of the entire
454 * request/response.
456 * @author Aditya Agarwal <aditya@facebook.com>
458 class TPipedTransport : virtual public TTransport {
459 public:
460 TPipedTransport(boost::shared_ptr<TTransport> srcTrans,
461 boost::shared_ptr<TTransport> dstTrans) :
462 srcTrans_(srcTrans),
463 dstTrans_(dstTrans),
464 rBufSize_(512), rPos_(0), rLen_(0),
465 wBufSize_(512), wLen_(0) {
467 // default is to to pipe the request when readEnd() is called
468 pipeOnRead_ = true;
469 pipeOnWrite_ = false;
471 rBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * rBufSize_);
472 wBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * wBufSize_);
475 TPipedTransport(boost::shared_ptr<TTransport> srcTrans,
476 boost::shared_ptr<TTransport> dstTrans,
477 uint32_t sz) :
478 srcTrans_(srcTrans),
479 dstTrans_(dstTrans),
480 rBufSize_(512), rPos_(0), rLen_(0),
481 wBufSize_(sz), wLen_(0) {
483 rBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * rBufSize_);
484 wBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * wBufSize_);
487 ~TPipedTransport() {
488 free(rBuf_);
489 free(wBuf_);
492 bool isOpen() {
493 return srcTrans_->isOpen();
496 bool peek() {
497 if (rPos_ >= rLen_) {
498 // Double the size of the underlying buffer if it is full
499 if (rLen_ == rBufSize_) {
500 rBufSize_ *=2;
501 rBuf_ = (uint8_t *)realloc(rBuf_, sizeof(uint8_t) * rBufSize_);
504 // try to fill up the buffer
505 rLen_ += srcTrans_->read(rBuf_+rPos_, rBufSize_ - rPos_);
507 return (rLen_ > rPos_);
511 void open() {
512 srcTrans_->open();
515 void close() {
516 srcTrans_->close();
519 void setPipeOnRead(bool pipeVal) {
520 pipeOnRead_ = pipeVal;
523 void setPipeOnWrite(bool pipeVal) {
524 pipeOnWrite_ = pipeVal;
527 uint32_t read(uint8_t* buf, uint32_t len);
529 void readEnd() {
531 if (pipeOnRead_) {
532 dstTrans_->write(rBuf_, rLen_);
533 dstTrans_->flush();
536 srcTrans_->readEnd();
538 // reset state
539 rLen_ = 0;
540 rPos_ = 0;
543 void write(const uint8_t* buf, uint32_t len);
545 void writeEnd() {
546 if (pipeOnWrite_) {
547 dstTrans_->write(wBuf_, wLen_);
548 dstTrans_->flush();
552 void flush();
554 boost::shared_ptr<TTransport> getTargetTransport() {
555 return dstTrans_;
558 protected:
559 boost::shared_ptr<TTransport> srcTrans_;
560 boost::shared_ptr<TTransport> dstTrans_;
562 uint8_t* rBuf_;
563 uint32_t rBufSize_;
564 uint32_t rPos_;
565 uint32_t rLen_;
567 uint8_t* wBuf_;
568 uint32_t wBufSize_;
569 uint32_t wLen_;
571 bool pipeOnRead_;
572 bool pipeOnWrite_;
577 * Wraps a transport into a pipedTransport instance.
579 * @author Aditya Agarwal <aditya@facebook.com>
581 class TPipedTransportFactory : public TTransportFactory {
582 public:
583 TPipedTransportFactory() {}
584 TPipedTransportFactory(boost::shared_ptr<TTransport> dstTrans) {
585 initializeTargetTransport(dstTrans);
587 virtual ~TPipedTransportFactory() {}
590 * Wraps the base transport into a piped transport.
592 virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> srcTrans) {
593 return boost::shared_ptr<TTransport>(new TPipedTransport(srcTrans, dstTrans_));
596 virtual void initializeTargetTransport(boost::shared_ptr<TTransport> dstTrans) {
597 if (dstTrans_.get() == NULL) {
598 dstTrans_ = dstTrans;
599 } else {
600 throw TException("Target transport already initialized");
604 protected:
605 boost::shared_ptr<TTransport> dstTrans_;
609 * TPipedFileTransport. This is just like a TTransport, except that
610 * it is a templatized class, so that clients who rely on a specific
611 * TTransport can still access the original transport.
613 * @author James Wang <jwang@facebook.com>
615 class TPipedFileReaderTransport : public TPipedTransport,
616 public TFileReaderTransport {
617 public:
618 TPipedFileReaderTransport(boost::shared_ptr<TFileReaderTransport> srcTrans, boost::shared_ptr<TTransport> dstTrans);
620 ~TPipedFileReaderTransport();
622 // TTransport functions
623 bool isOpen();
624 bool peek();
625 void open();
626 void close();
627 uint32_t read(uint8_t* buf, uint32_t len);
628 uint32_t readAll(uint8_t* buf, uint32_t len);
629 void readEnd();
630 void write(const uint8_t* buf, uint32_t len);
631 void writeEnd();
632 void flush();
634 // TFileReaderTransport functions
635 int32_t getReadTimeout();
636 void setReadTimeout(int32_t readTimeout);
637 uint32_t getNumChunks();
638 uint32_t getCurChunk();
639 void seekToChunk(int32_t chunk);
640 void seekToEnd();
642 protected:
643 // shouldn't be used
644 TPipedFileReaderTransport();
645 boost::shared_ptr<TFileReaderTransport> srcTrans_;
649 * Creates a TPipedFileReaderTransport from a filepath and a destination transport
651 * @author James Wang <jwang@facebook.com>
653 class TPipedFileReaderTransportFactory : public TPipedTransportFactory {
654 public:
655 TPipedFileReaderTransportFactory() {}
656 TPipedFileReaderTransportFactory(boost::shared_ptr<TTransport> dstTrans)
657 : TPipedTransportFactory(dstTrans)
659 virtual ~TPipedFileReaderTransportFactory() {}
661 boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> srcTrans) {
662 boost::shared_ptr<TFileReaderTransport> pFileReaderTransport = boost::dynamic_pointer_cast<TFileReaderTransport>(srcTrans);
663 if (pFileReaderTransport.get() != NULL) {
664 return getFileReaderTransport(pFileReaderTransport);
665 } else {
666 return boost::shared_ptr<TTransport>();
670 boost::shared_ptr<TFileReaderTransport> getFileReaderTransport(boost::shared_ptr<TFileReaderTransport> srcTrans) {
671 return boost::shared_ptr<TFileReaderTransport>(new TPipedFileReaderTransport(srcTrans, dstTrans_));
675 }}} // facebook::thrift::transport
677 #endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_