1 // Copyright (c) 2006- Facebook
2 // Distributed under the Thrift Software License
4 // See accompanying file LICENSE or visit the Thrift site at:
5 // http://developers.facebook.com/thrift/
7 #ifndef _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_
8 #define _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_ 1
12 #include <transport/TTransport.h>
13 #include <transport/TFileTransport.h>
15 namespace facebook
{ namespace thrift
{ namespace transport
{
18 * The null transport is a dummy transport that doesn't actually do anything.
19 * It's sort of an analogy to /dev/null, you can never read anything from it
20 * and it will let you write anything you want to it, though it won't actually
23 * @author Mark Slee <mcslee@facebook.com>
25 class TNullTransport
: public TTransport
{
37 void write(const uint8_t* buf
, uint32_t len
) {
45 * Buffered transport. For reads it will read more data than is requested
46 * and will serve future data out of a local buffer. For writes, data is
47 * stored to an in memory buffer before being written out.
49 * @author Mark Slee <mcslee@facebook.com>
51 class TBufferedTransport
: public TTransport
{
53 TBufferedTransport(boost::shared_ptr
<TTransport
> transport
) :
54 transport_(transport
),
55 rBufSize_(512), rPos_(0), rLen_(0),
56 wBufSize_(512), wLen_(0) {
57 rBuf_
= new uint8_t[rBufSize_
];
58 wBuf_
= new uint8_t[wBufSize_
];
61 TBufferedTransport(boost::shared_ptr
<TTransport
> transport
, uint32_t sz
) :
62 transport_(transport
),
63 rBufSize_(sz
), rPos_(0), rLen_(0),
64 wBufSize_(sz
), wLen_(0) {
65 rBuf_
= new uint8_t[rBufSize_
];
66 wBuf_
= new uint8_t[wBufSize_
];
69 TBufferedTransport(boost::shared_ptr
<TTransport
> transport
, uint32_t rsz
, uint32_t wsz
) :
70 transport_(transport
),
71 rBufSize_(rsz
), rPos_(0), rLen_(0),
72 wBufSize_(wsz
), wLen_(0) {
73 rBuf_
= new uint8_t[rBufSize_
];
74 wBuf_
= new uint8_t[wBufSize_
];
77 ~TBufferedTransport() {
83 return transport_
->isOpen();
88 rLen_
= transport_
->read(rBuf_
, rBufSize_
);
91 return (rLen_
> rPos_
);
103 uint32_t read(uint8_t* buf
, uint32_t len
);
105 void write(const uint8_t* buf
, uint32_t len
);
109 bool borrow(uint8_t* buf
, uint32_t len
);
111 void consume(uint32_t len
);
113 boost::shared_ptr
<TTransport
> getUnderlyingTransport() {
118 boost::shared_ptr
<TTransport
> transport_
;
130 * Wraps a transport into a buffered one.
132 * @author Mark Slee <mcslee@facebook.com>
134 class TBufferedTransportFactory
: public TTransportFactory
{
136 TBufferedTransportFactory() {}
138 virtual ~TBufferedTransportFactory() {}
141 * Wraps the transport into a buffered one.
143 virtual boost::shared_ptr
<TTransport
> getTransport(boost::shared_ptr
<TTransport
> trans
) {
144 return boost::shared_ptr
<TTransport
>(new TBufferedTransport(trans
));
150 * Framed transport. All writes go into an in-memory buffer until flush is
151 * called, at which point the transport writes the length of the entire
152 * binary chunk followed by the data payload. This allows the receiver on the
153 * other end to always do fixed-length reads.
155 * @author Mark Slee <mcslee@facebook.com>
157 class TFramedTransport
: public TTransport
{
159 TFramedTransport(boost::shared_ptr
<TTransport
> transport
) :
160 transport_(transport
),
168 wBuf_
= new uint8_t[wBufSize_
];
171 TFramedTransport(boost::shared_ptr
<TTransport
> transport
, uint32_t sz
) :
172 transport_(transport
),
180 wBuf_
= new uint8_t[wBufSize_
];
183 ~TFramedTransport() {
192 void setRead(bool read
) {
196 void setWrite(bool write
) {
205 return transport_
->isOpen();
212 return transport_
->peek();
222 uint32_t read(uint8_t* buf
, uint32_t len
);
224 void write(const uint8_t* buf
, uint32_t len
);
228 bool borrow(uint8_t* buf
, uint32_t len
);
230 void consume(uint32_t len
);
232 boost::shared_ptr
<TTransport
> getUnderlyingTransport() {
237 boost::shared_ptr
<TTransport
> transport_
;
249 * Reads a frame of input from the underlying stream.
255 * Wraps a transport into a framed one.
257 * @author Dave Simpson <dave@powerset.com>
259 class TFramedTransportFactory
: public TTransportFactory
{
261 TFramedTransportFactory() {}
263 virtual ~TFramedTransportFactory() {}
266 * Wraps the transport into a framed one.
268 virtual boost::shared_ptr
<TTransport
> getTransport(boost::shared_ptr
<TTransport
> trans
) {
269 return boost::shared_ptr
<TTransport
>(new TFramedTransport(trans
));
276 * A memory buffer is a tranpsort that simply reads from and writes to an
277 * in memory buffer. Anytime you call write on it, the data is simply placed
278 * into a buffer, and anytime you call read, data is read from that buffer.
280 * The buffers are allocated using C constructs malloc,realloc, and the size
281 * doubles as necessary.
283 * @author Mark Slee <mcslee@facebook.com>
285 class TMemoryBuffer
: public TTransport
{
288 // Common initialization done by all constructors.
289 void initCommon(uint8_t* buf
, uint32_t size
, bool owner
, uint32_t wPos
) {
290 if (buf
== NULL
&& size
!= 0) {
292 buf
= (uint8_t*)malloc(size
);
294 throw TTransportException("Out of memory");
306 static const uint32_t defaultSize
= 1024;
309 initCommon(NULL
, defaultSize
, true, 0);
312 TMemoryBuffer(uint32_t sz
) {
313 initCommon(NULL
, sz
, true, 0);
316 // transferOwnership should be true if you want TMemoryBuffer to call free(buf).
317 TMemoryBuffer(uint8_t* buf
, int sz
, bool transferOwnership
= false) {
318 initCommon(buf
, sz
, transferOwnership
, sz
);
321 // copy should be true if you want TMemoryBuffer to make a copy of the string.
322 // If you set copy to false, the string must not be destroyed before you are
323 // done with the TMemoryBuffer.
324 TMemoryBuffer(const std::string
& str
, bool copy
= false) {
326 initCommon(NULL
, str
.length(), true, 0);
327 this->write(reinterpret_cast<const uint8_t*>(str
.data()), str
.length());
329 // This first argument should be equivalent to the following:
330 // const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(str.data()))
331 initCommon((uint8_t*)str
.data(), str
.length(), false, str
.length());
347 return (rPos_
< wPos_
);
354 void getBuffer(uint8_t** bufPtr
, uint32_t* sz
) {
359 std::string
getBufferAsString() {
360 if (buffer_
== NULL
) {
363 return std::string((char*)buffer_
, (std::string::size_type
)wPos_
);
366 void appendBufferToString(std::string
& str
) {
367 if (buffer_
== NULL
) {
370 str
.append((char*)buffer_
, wPos_
);
378 // transferOwnership should be true if you want TMemoryBuffer to call free(buf).
379 void resetBuffer(uint8_t* buf
, uint32_t sz
, bool transferOwnership
= false) {
383 owner_
= transferOwnership
;
390 // See the constructor that takes a string.
391 void resetFromString(const std::string
& str
, bool copy
= false) {
393 uint8_t* buf
= (uint8_t*)malloc(str
.length());
395 throw TTransportException("Out of memory");
397 std::copy(str
.begin(), str
.end(), buf
);
398 resetBuffer(buf
, str
.length(), true);
400 // See the above comment about const_cast.
401 resetBuffer((uint8_t*)str
.data(), str
.length(), false);
405 uint32_t read(uint8_t* buf
, uint32_t len
);
407 std::string
readAsString(uint32_t len
) {
409 (void)readAppendToString(str
, len
);
413 uint32_t readAppendToString(std::string
& str
, uint32_t len
);
416 if (rPos_
== wPos_
) {
421 void write(const uint8_t* buf
, uint32_t len
);
423 uint32_t available() {
424 return wPos_
- rPos_
;
427 bool borrow(uint8_t* buf
, uint32_t len
);
429 void consume(uint32_t len
);
435 // Allocated buffer size
436 uint32_t bufferSize_
;
438 // Where the write is at
441 // Where the reader is at
444 // Is this object the owner of the buffer?
450 * TPipedTransport. This transport allows piping of a request from one
451 * transport to another either when readEnd() or writeEnd(). The typical
452 * use case for this is to log a request or a reply to disk.
453 * The underlying buffer expands to a keep a copy of the entire
456 * @author Aditya Agarwal <aditya@facebook.com>
458 class TPipedTransport
: virtual public TTransport
{
460 TPipedTransport(boost::shared_ptr
<TTransport
> srcTrans
,
461 boost::shared_ptr
<TTransport
> dstTrans
) :
464 rBufSize_(512), rPos_(0), rLen_(0),
465 wBufSize_(512), wLen_(0) {
467 // default is to to pipe the request when readEnd() is called
469 pipeOnWrite_
= false;
471 rBuf_
= (uint8_t*) malloc(sizeof(uint8_t) * rBufSize_
);
472 wBuf_
= (uint8_t*) malloc(sizeof(uint8_t) * wBufSize_
);
475 TPipedTransport(boost::shared_ptr
<TTransport
> srcTrans
,
476 boost::shared_ptr
<TTransport
> dstTrans
,
480 rBufSize_(512), rPos_(0), rLen_(0),
481 wBufSize_(sz
), wLen_(0) {
483 rBuf_
= (uint8_t*) malloc(sizeof(uint8_t) * rBufSize_
);
484 wBuf_
= (uint8_t*) malloc(sizeof(uint8_t) * wBufSize_
);
493 return srcTrans_
->isOpen();
497 if (rPos_
>= rLen_
) {
498 // Double the size of the underlying buffer if it is full
499 if (rLen_
== rBufSize_
) {
501 rBuf_
= (uint8_t *)realloc(rBuf_
, sizeof(uint8_t) * rBufSize_
);
504 // try to fill up the buffer
505 rLen_
+= srcTrans_
->read(rBuf_
+rPos_
, rBufSize_
- rPos_
);
507 return (rLen_
> rPos_
);
519 void setPipeOnRead(bool pipeVal
) {
520 pipeOnRead_
= pipeVal
;
523 void setPipeOnWrite(bool pipeVal
) {
524 pipeOnWrite_
= pipeVal
;
527 uint32_t read(uint8_t* buf
, uint32_t len
);
532 dstTrans_
->write(rBuf_
, rLen_
);
536 srcTrans_
->readEnd();
543 void write(const uint8_t* buf
, uint32_t len
);
547 dstTrans_
->write(wBuf_
, wLen_
);
554 boost::shared_ptr
<TTransport
> getTargetTransport() {
559 boost::shared_ptr
<TTransport
> srcTrans_
;
560 boost::shared_ptr
<TTransport
> dstTrans_
;
577 * Wraps a transport into a pipedTransport instance.
579 * @author Aditya Agarwal <aditya@facebook.com>
581 class TPipedTransportFactory
: public TTransportFactory
{
583 TPipedTransportFactory() {}
584 TPipedTransportFactory(boost::shared_ptr
<TTransport
> dstTrans
) {
585 initializeTargetTransport(dstTrans
);
587 virtual ~TPipedTransportFactory() {}
590 * Wraps the base transport into a piped transport.
592 virtual boost::shared_ptr
<TTransport
> getTransport(boost::shared_ptr
<TTransport
> srcTrans
) {
593 return boost::shared_ptr
<TTransport
>(new TPipedTransport(srcTrans
, dstTrans_
));
596 virtual void initializeTargetTransport(boost::shared_ptr
<TTransport
> dstTrans
) {
597 if (dstTrans_
.get() == NULL
) {
598 dstTrans_
= dstTrans
;
600 throw TException("Target transport already initialized");
605 boost::shared_ptr
<TTransport
> dstTrans_
;
609 * TPipedFileTransport. This is just like a TTransport, except that
610 * it is a templatized class, so that clients who rely on a specific
611 * TTransport can still access the original transport.
613 * @author James Wang <jwang@facebook.com>
615 class TPipedFileReaderTransport
: public TPipedTransport
,
616 public TFileReaderTransport
{
618 TPipedFileReaderTransport(boost::shared_ptr
<TFileReaderTransport
> srcTrans
, boost::shared_ptr
<TTransport
> dstTrans
);
620 ~TPipedFileReaderTransport();
622 // TTransport functions
627 uint32_t read(uint8_t* buf
, uint32_t len
);
628 uint32_t readAll(uint8_t* buf
, uint32_t len
);
630 void write(const uint8_t* buf
, uint32_t len
);
634 // TFileReaderTransport functions
635 int32_t getReadTimeout();
636 void setReadTimeout(int32_t readTimeout
);
637 uint32_t getNumChunks();
638 uint32_t getCurChunk();
639 void seekToChunk(int32_t chunk
);
644 TPipedFileReaderTransport();
645 boost::shared_ptr
<TFileReaderTransport
> srcTrans_
;
649 * Creates a TPipedFileReaderTransport from a filepath and a destination transport
651 * @author James Wang <jwang@facebook.com>
653 class TPipedFileReaderTransportFactory
: public TPipedTransportFactory
{
655 TPipedFileReaderTransportFactory() {}
656 TPipedFileReaderTransportFactory(boost::shared_ptr
<TTransport
> dstTrans
)
657 : TPipedTransportFactory(dstTrans
)
659 virtual ~TPipedFileReaderTransportFactory() {}
661 boost::shared_ptr
<TTransport
> getTransport(boost::shared_ptr
<TTransport
> srcTrans
) {
662 boost::shared_ptr
<TFileReaderTransport
> pFileReaderTransport
= boost::dynamic_pointer_cast
<TFileReaderTransport
>(srcTrans
);
663 if (pFileReaderTransport
.get() != NULL
) {
664 return getFileReaderTransport(pFileReaderTransport
);
666 return boost::shared_ptr
<TTransport
>();
670 boost::shared_ptr
<TFileReaderTransport
> getFileReaderTransport(boost::shared_ptr
<TFileReaderTransport
> srcTrans
) {
671 return boost::shared_ptr
<TFileReaderTransport
>(new TPipedFileReaderTransport(srcTrans
, dstTrans_
));
675 }}} // facebook::thrift::transport
677 #endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_