r1347@opsdev009 (orig r71314): mcslee | 2007-11-26 11:05:29 -0800
[amiethrift.git] / lib / cpp / src / transport / TTransportUtils.h
blob1ebbfbda464e84c57133f9079da6a3be2f7cb5fe
1 // Copyright (c) 2006- Facebook
2 // Distributed under the Thrift Software License
3 //
4 // See accompanying file LICENSE or visit the Thrift site at:
5 // http://developers.facebook.com/thrift/
7 #ifndef _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_
8 #define _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_ 1
10 #include <string>
11 #include <algorithm>
12 #include <transport/TTransport.h>
13 #include <transport/TFileTransport.h>
15 namespace facebook { namespace thrift { namespace transport {
17 /**
18 * The null transport is a dummy transport that doesn't actually do anything.
19 * It's sort of an analogy to /dev/null, you can never read anything from it
20 * and it will let you write anything you want to it, though it won't actually
21 * go anywhere.
23 * @author Mark Slee <mcslee@facebook.com>
25 class TNullTransport : public TTransport {
26 public:
27 TNullTransport() {}
29 ~TNullTransport() {}
31 bool isOpen() {
32 return true;
35 void open() {}
37 void write(const uint8_t* buf, uint32_t len) {
38 return;
44 /**
45 * Buffered transport. For reads it will read more data than is requested
46 * and will serve future data out of a local buffer. For writes, data is
47 * stored to an in memory buffer before being written out.
49 * @author Mark Slee <mcslee@facebook.com>
51 class TBufferedTransport : public TTransport {
52 public:
53 TBufferedTransport(boost::shared_ptr<TTransport> transport) :
54 transport_(transport),
55 rBufSize_(512), rPos_(0), rLen_(0),
56 wBufSize_(512), wLen_(0) {
57 rBuf_ = new uint8_t[rBufSize_];
58 wBuf_ = new uint8_t[wBufSize_];
61 TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz) :
62 transport_(transport),
63 rBufSize_(sz), rPos_(0), rLen_(0),
64 wBufSize_(sz), wLen_(0) {
65 rBuf_ = new uint8_t[rBufSize_];
66 wBuf_ = new uint8_t[wBufSize_];
69 TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz) :
70 transport_(transport),
71 rBufSize_(rsz), rPos_(0), rLen_(0),
72 wBufSize_(wsz), wLen_(0) {
73 rBuf_ = new uint8_t[rBufSize_];
74 wBuf_ = new uint8_t[wBufSize_];
77 ~TBufferedTransport() {
78 delete [] rBuf_;
79 delete [] wBuf_;
82 bool isOpen() {
83 return transport_->isOpen();
86 bool peek() {
87 if (rPos_ >= rLen_) {
88 rLen_ = transport_->read(rBuf_, rBufSize_);
89 rPos_ = 0;
91 return (rLen_ > rPos_);
94 void open() {
95 transport_->open();
98 void close() {
99 flush();
100 transport_->close();
103 uint32_t read(uint8_t* buf, uint32_t len);
105 void write(const uint8_t* buf, uint32_t len);
107 void flush();
109 bool borrow(uint8_t* buf, uint32_t len);
111 void consume(uint32_t len);
113 protected:
114 boost::shared_ptr<TTransport> transport_;
115 uint8_t* rBuf_;
116 uint32_t rBufSize_;
117 uint32_t rPos_;
118 uint32_t rLen_;
120 uint8_t* wBuf_;
121 uint32_t wBufSize_;
122 uint32_t wLen_;
126 * Wraps a transport into a buffered one.
128 * @author Mark Slee <mcslee@facebook.com>
130 class TBufferedTransportFactory : public TTransportFactory {
131 public:
132 TBufferedTransportFactory() {}
134 virtual ~TBufferedTransportFactory() {}
137 * Wraps the transport into a buffered one.
139 virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
140 return boost::shared_ptr<TTransport>(new TBufferedTransport(trans));
146 * Framed transport. All writes go into an in-memory buffer until flush is
147 * called, at which point the transport writes the length of the entire
148 * binary chunk followed by the data payload. This allows the receiver on the
149 * other end to always do fixed-length reads.
151 * @author Mark Slee <mcslee@facebook.com>
153 class TFramedTransport : public TTransport {
154 public:
155 TFramedTransport(boost::shared_ptr<TTransport> transport) :
156 transport_(transport),
157 rPos_(0),
158 rLen_(0),
159 read_(true),
160 wBufSize_(512),
161 wLen_(0),
162 write_(true) {
163 rBuf_ = NULL;
164 wBuf_ = new uint8_t[wBufSize_];
167 TFramedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz) :
168 transport_(transport),
169 rPos_(0),
170 rLen_(0),
171 read_(true),
172 wBufSize_(sz),
173 wLen_(0),
174 write_(true) {
175 rBuf_ = NULL;
176 wBuf_ = new uint8_t[wBufSize_];
179 ~TFramedTransport() {
180 if (rBuf_ != NULL) {
181 delete [] rBuf_;
183 if (wBuf_ != NULL) {
184 delete [] wBuf_;
188 void setRead(bool read) {
189 read_ = read;
192 void setWrite(bool write) {
193 write_ = write;
196 void open() {
197 transport_->open();
200 bool isOpen() {
201 return transport_->isOpen();
204 bool peek() {
205 if (rPos_ < rLen_) {
206 return true;
208 return transport_->peek();
211 void close() {
212 if (wLen_ > 0) {
213 flush();
215 transport_->close();
218 uint32_t read(uint8_t* buf, uint32_t len);
220 void write(const uint8_t* buf, uint32_t len);
222 void flush();
224 bool borrow(uint8_t* buf, uint32_t len);
226 void consume(uint32_t len);
228 protected:
229 boost::shared_ptr<TTransport> transport_;
230 uint8_t* rBuf_;
231 uint32_t rPos_;
232 uint32_t rLen_;
233 bool read_;
235 uint8_t* wBuf_;
236 uint32_t wBufSize_;
237 uint32_t wLen_;
238 bool write_;
241 * Reads a frame of input from the underlying stream.
243 void readFrame();
247 * A memory buffer is a tranpsort that simply reads from and writes to an
248 * in memory buffer. Anytime you call write on it, the data is simply placed
249 * into a buffer, and anytime you call read, data is read from that buffer.
251 * The buffers are allocated using C constructs malloc,realloc, and the size
252 * doubles as necessary.
254 * @author Mark Slee <mcslee@facebook.com>
256 class TMemoryBuffer : public TTransport {
257 private:
259 // Common initialization done by all constructors.
260 void initCommon(uint8_t* buf, uint32_t size, bool owner, uint32_t wPos) {
261 if (buf == NULL && size != 0) {
262 assert(owner);
263 buf = (uint8_t*)malloc(size);
264 if (buf == NULL) {
265 throw TTransportException("Out of memory");
269 buffer_ = buf;
270 bufferSize_ = size;
271 owner_ = owner;
272 wPos_ = wPos;
273 rPos_ = 0;
276 public:
277 static const uint32_t defaultSize = 1024;
279 TMemoryBuffer() {
280 initCommon(NULL, defaultSize, true, 0);
283 TMemoryBuffer(uint32_t sz) {
284 initCommon(NULL, sz, true, 0);
287 // transferOwnership should be true if you want TMemoryBuffer to call free(buf).
288 TMemoryBuffer(uint8_t* buf, int sz, bool transferOwnership = false) {
289 initCommon(buf, sz, transferOwnership, sz);
292 // copy should be true if you want TMemoryBuffer to make a copy of the string.
293 // If you set copy to false, the string must not be destroyed before you are
294 // done with the TMemoryBuffer.
295 TMemoryBuffer(const std::string& str, bool copy = false) {
296 if (copy) {
297 initCommon(NULL, str.length(), true, 0);
298 this->write(reinterpret_cast<const uint8_t*>(str.data()), str.length());
299 } else {
300 // This first argument should be equivalent to the following:
301 // const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(str.data()))
302 initCommon((uint8_t*)str.data(), str.length(), false, str.length());
306 ~TMemoryBuffer() {
307 if (owner_) {
308 free(buffer_);
309 buffer_ = NULL;
313 bool isOpen() {
314 return true;
317 bool peek() {
318 return (rPos_ < wPos_);
321 void open() {}
323 void close() {}
325 void getBuffer(uint8_t** bufPtr, uint32_t* sz) {
326 *bufPtr = buffer_;
327 *sz = wPos_;
330 std::string getBufferAsString() {
331 if (buffer_ == NULL) {
332 return "";
334 return std::string((char*)buffer_, (std::string::size_type)wPos_);
337 void appendBufferToString(std::string& str) {
338 if (buffer_ == NULL) {
339 return;
341 str.append((char*)buffer_, wPos_);
344 void resetBuffer() {
345 wPos_ = 0;
346 rPos_ = 0;
349 // transferOwnership should be true if you want TMemoryBuffer to call free(buf).
350 void resetBuffer(uint8_t* buf, uint32_t sz, bool transferOwnership = false) {
351 if (owner_) {
352 free(buffer_);
354 owner_ = transferOwnership;
355 buffer_ = buf;
356 bufferSize_ = sz;
357 wPos_ = sz;
358 rPos_ = 0;
361 // See the constructor that takes a string.
362 void resetFromString(const std::string& str, bool copy = false) {
363 if (copy) {
364 uint8_t* buf = (uint8_t*)malloc(str.length());
365 if (buf == NULL) {
366 throw TTransportException("Out of memory");
368 std::copy(str.begin(), str.end(), buf);
369 resetBuffer(buf, str.length(), true);
370 } else {
371 // See the above comment about const_cast.
372 resetBuffer((uint8_t*)str.data(), str.length(), false);
376 uint32_t read(uint8_t* buf, uint32_t len);
378 std::string readAsString(uint32_t len) {
379 std::string str;
380 (void)readAppendToString(str, len);
381 return str;
384 uint32_t readAppendToString(std::string& str, uint32_t len);
386 void readEnd() {
387 if (rPos_ == wPos_) {
388 resetBuffer();
392 void write(const uint8_t* buf, uint32_t len);
394 uint32_t available() {
395 return wPos_ - rPos_;
398 bool borrow(uint8_t* buf, uint32_t len);
400 void consume(uint32_t len);
402 private:
403 // Data buffer
404 uint8_t* buffer_;
406 // Allocated buffer size
407 uint32_t bufferSize_;
409 // Where the write is at
410 uint32_t wPos_;
412 // Where the reader is at
413 uint32_t rPos_;
415 // Is this object the owner of the buffer?
416 bool owner_;
421 * TPipedTransport. This transport allows piping of a request from one
422 * transport to another either when readEnd() or writeEnd(). The typical
423 * use case for this is to log a request or a reply to disk.
424 * The underlying buffer expands to a keep a copy of the entire
425 * request/response.
427 * @author Aditya Agarwal <aditya@facebook.com>
429 class TPipedTransport : virtual public TTransport {
430 public:
431 TPipedTransport(boost::shared_ptr<TTransport> srcTrans,
432 boost::shared_ptr<TTransport> dstTrans) :
433 srcTrans_(srcTrans),
434 dstTrans_(dstTrans),
435 rBufSize_(512), rPos_(0), rLen_(0),
436 wBufSize_(512), wLen_(0) {
438 // default is to to pipe the request when readEnd() is called
439 pipeOnRead_ = true;
440 pipeOnWrite_ = false;
442 rBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * rBufSize_);
443 wBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * wBufSize_);
446 TPipedTransport(boost::shared_ptr<TTransport> srcTrans,
447 boost::shared_ptr<TTransport> dstTrans,
448 uint32_t sz) :
449 srcTrans_(srcTrans),
450 dstTrans_(dstTrans),
451 rBufSize_(512), rPos_(0), rLen_(0),
452 wBufSize_(sz), wLen_(0) {
454 rBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * rBufSize_);
455 wBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * wBufSize_);
458 ~TPipedTransport() {
459 free(rBuf_);
460 free(wBuf_);
463 bool isOpen() {
464 return srcTrans_->isOpen();
467 bool peek() {
468 if (rPos_ >= rLen_) {
469 // Double the size of the underlying buffer if it is full
470 if (rLen_ == rBufSize_) {
471 rBufSize_ *=2;
472 rBuf_ = (uint8_t *)realloc(rBuf_, sizeof(uint8_t) * rBufSize_);
475 // try to fill up the buffer
476 rLen_ += srcTrans_->read(rBuf_+rPos_, rBufSize_ - rPos_);
478 return (rLen_ > rPos_);
482 void open() {
483 srcTrans_->open();
486 void close() {
487 srcTrans_->close();
490 void setPipeOnRead(bool pipeVal) {
491 pipeOnRead_ = pipeVal;
494 void setPipeOnWrite(bool pipeVal) {
495 pipeOnWrite_ = pipeVal;
498 uint32_t read(uint8_t* buf, uint32_t len);
500 void readEnd() {
502 if (pipeOnRead_) {
503 dstTrans_->write(rBuf_, rLen_);
504 dstTrans_->flush();
507 srcTrans_->readEnd();
509 // reset state
510 rLen_ = 0;
511 rPos_ = 0;
514 void write(const uint8_t* buf, uint32_t len);
516 void writeEnd() {
517 if (pipeOnWrite_) {
518 dstTrans_->write(wBuf_, wLen_);
519 dstTrans_->flush();
523 void flush();
525 boost::shared_ptr<TTransport> getTargetTransport() {
526 return dstTrans_;
529 protected:
530 boost::shared_ptr<TTransport> srcTrans_;
531 boost::shared_ptr<TTransport> dstTrans_;
533 uint8_t* rBuf_;
534 uint32_t rBufSize_;
535 uint32_t rPos_;
536 uint32_t rLen_;
538 uint8_t* wBuf_;
539 uint32_t wBufSize_;
540 uint32_t wLen_;
542 bool pipeOnRead_;
543 bool pipeOnWrite_;
548 * Wraps a transport into a pipedTransport instance.
550 * @author Aditya Agarwal <aditya@facebook.com>
552 class TPipedTransportFactory : public TTransportFactory {
553 public:
554 TPipedTransportFactory() {}
555 TPipedTransportFactory(boost::shared_ptr<TTransport> dstTrans) {
556 initializeTargetTransport(dstTrans);
558 virtual ~TPipedTransportFactory() {}
561 * Wraps the base transport into a piped transport.
563 virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> srcTrans) {
564 return boost::shared_ptr<TTransport>(new TPipedTransport(srcTrans, dstTrans_));
567 virtual void initializeTargetTransport(boost::shared_ptr<TTransport> dstTrans) {
568 if (dstTrans_.get() == NULL) {
569 dstTrans_ = dstTrans;
570 } else {
571 throw TException("Target transport already initialized");
575 protected:
576 boost::shared_ptr<TTransport> dstTrans_;
580 * TPipedFileTransport. This is just like a TTransport, except that
581 * it is a templatized class, so that clients who rely on a specific
582 * TTransport can still access the original transport.
584 * @author James Wang <jwang@facebook.com>
586 class TPipedFileReaderTransport : public TPipedTransport,
587 public TFileReaderTransport {
588 public:
589 TPipedFileReaderTransport(boost::shared_ptr<TFileReaderTransport> srcTrans, boost::shared_ptr<TTransport> dstTrans);
591 ~TPipedFileReaderTransport();
593 // TTransport functions
594 bool isOpen();
595 bool peek();
596 void open();
597 void close();
598 uint32_t read(uint8_t* buf, uint32_t len);
599 uint32_t readAll(uint8_t* buf, uint32_t len);
600 void readEnd();
601 void write(const uint8_t* buf, uint32_t len);
602 void writeEnd();
603 void flush();
605 // TFileReaderTransport functions
606 int32_t getReadTimeout();
607 void setReadTimeout(int32_t readTimeout);
608 uint32_t getNumChunks();
609 uint32_t getCurChunk();
610 void seekToChunk(int32_t chunk);
611 void seekToEnd();
613 protected:
614 // shouldn't be used
615 TPipedFileReaderTransport();
616 boost::shared_ptr<TFileReaderTransport> srcTrans_;
620 * Creates a TPipedFileReaderTransport from a filepath and a destination transport
622 * @author James Wang <jwang@facebook.com>
624 class TPipedFileReaderTransportFactory : public TPipedTransportFactory {
625 public:
626 TPipedFileReaderTransportFactory() {}
627 TPipedFileReaderTransportFactory(boost::shared_ptr<TTransport> dstTrans)
628 : TPipedTransportFactory(dstTrans)
630 virtual ~TPipedFileReaderTransportFactory() {}
632 boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> srcTrans) {
633 boost::shared_ptr<TFileReaderTransport> pFileReaderTransport = boost::dynamic_pointer_cast<TFileReaderTransport>(srcTrans);
634 if (pFileReaderTransport.get() != NULL) {
635 return getFileReaderTransport(pFileReaderTransport);
636 } else {
637 return boost::shared_ptr<TTransport>();
641 boost::shared_ptr<TFileReaderTransport> getFileReaderTransport(boost::shared_ptr<TFileReaderTransport> srcTrans) {
642 return boost::shared_ptr<TFileReaderTransport>(new TPipedFileReaderTransport(srcTrans, dstTrans_));
646 }}} // facebook::thrift::transport
648 #endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_