1 // Copyright (c) 2006- Facebook
2 // Distributed under the Thrift Software License
4 // See accompanying file LICENSE or visit the Thrift site at:
5 // http://developers.facebook.com/thrift/
7 #ifndef _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_
8 #define _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_ 1
12 #include <transport/TTransport.h>
13 #include <transport/TFileTransport.h>
15 namespace facebook
{ namespace thrift
{ namespace transport
{
18 * The null transport is a dummy transport that doesn't actually do anything.
19 * It's sort of an analogy to /dev/null, you can never read anything from it
20 * and it will let you write anything you want to it, though it won't actually
23 * @author Mark Slee <mcslee@facebook.com>
25 class TNullTransport
: public TTransport
{
37 void write(const uint8_t* buf
, uint32_t len
) {
45 * Buffered transport. For reads it will read more data than is requested
46 * and will serve future data out of a local buffer. For writes, data is
47 * stored to an in memory buffer before being written out.
49 * @author Mark Slee <mcslee@facebook.com>
51 class TBufferedTransport
: public TTransport
{
53 TBufferedTransport(boost::shared_ptr
<TTransport
> transport
) :
54 transport_(transport
),
55 rBufSize_(512), rPos_(0), rLen_(0),
56 wBufSize_(512), wLen_(0) {
57 rBuf_
= new uint8_t[rBufSize_
];
58 wBuf_
= new uint8_t[wBufSize_
];
61 TBufferedTransport(boost::shared_ptr
<TTransport
> transport
, uint32_t sz
) :
62 transport_(transport
),
63 rBufSize_(sz
), rPos_(0), rLen_(0),
64 wBufSize_(sz
), wLen_(0) {
65 rBuf_
= new uint8_t[rBufSize_
];
66 wBuf_
= new uint8_t[wBufSize_
];
69 TBufferedTransport(boost::shared_ptr
<TTransport
> transport
, uint32_t rsz
, uint32_t wsz
) :
70 transport_(transport
),
71 rBufSize_(rsz
), rPos_(0), rLen_(0),
72 wBufSize_(wsz
), wLen_(0) {
73 rBuf_
= new uint8_t[rBufSize_
];
74 wBuf_
= new uint8_t[wBufSize_
];
77 ~TBufferedTransport() {
83 return transport_
->isOpen();
88 rLen_
= transport_
->read(rBuf_
, rBufSize_
);
91 return (rLen_
> rPos_
);
103 uint32_t read(uint8_t* buf
, uint32_t len
);
105 void write(const uint8_t* buf
, uint32_t len
);
109 bool borrow(uint8_t* buf
, uint32_t len
);
111 void consume(uint32_t len
);
114 boost::shared_ptr
<TTransport
> transport_
;
126 * Wraps a transport into a buffered one.
128 * @author Mark Slee <mcslee@facebook.com>
130 class TBufferedTransportFactory
: public TTransportFactory
{
132 TBufferedTransportFactory() {}
134 virtual ~TBufferedTransportFactory() {}
137 * Wraps the transport into a buffered one.
139 virtual boost::shared_ptr
<TTransport
> getTransport(boost::shared_ptr
<TTransport
> trans
) {
140 return boost::shared_ptr
<TTransport
>(new TBufferedTransport(trans
));
146 * Framed transport. All writes go into an in-memory buffer until flush is
147 * called, at which point the transport writes the length of the entire
148 * binary chunk followed by the data payload. This allows the receiver on the
149 * other end to always do fixed-length reads.
151 * @author Mark Slee <mcslee@facebook.com>
153 class TFramedTransport
: public TTransport
{
155 TFramedTransport(boost::shared_ptr
<TTransport
> transport
) :
156 transport_(transport
),
164 wBuf_
= new uint8_t[wBufSize_
];
167 TFramedTransport(boost::shared_ptr
<TTransport
> transport
, uint32_t sz
) :
168 transport_(transport
),
176 wBuf_
= new uint8_t[wBufSize_
];
179 ~TFramedTransport() {
188 void setRead(bool read
) {
192 void setWrite(bool write
) {
201 return transport_
->isOpen();
208 return transport_
->peek();
218 uint32_t read(uint8_t* buf
, uint32_t len
);
220 void write(const uint8_t* buf
, uint32_t len
);
224 bool borrow(uint8_t* buf
, uint32_t len
);
226 void consume(uint32_t len
);
229 boost::shared_ptr
<TTransport
> transport_
;
241 * Reads a frame of input from the underlying stream.
247 * A memory buffer is a tranpsort that simply reads from and writes to an
248 * in memory buffer. Anytime you call write on it, the data is simply placed
249 * into a buffer, and anytime you call read, data is read from that buffer.
251 * The buffers are allocated using C constructs malloc,realloc, and the size
252 * doubles as necessary.
254 * @author Mark Slee <mcslee@facebook.com>
256 class TMemoryBuffer
: public TTransport
{
259 // Common initialization done by all constructors.
260 void initCommon(uint8_t* buf
, uint32_t size
, bool owner
, uint32_t wPos
) {
261 if (buf
== NULL
&& size
!= 0) {
263 buf
= (uint8_t*)malloc(size
);
265 throw TTransportException("Out of memory");
277 static const uint32_t defaultSize
= 1024;
280 initCommon(NULL
, defaultSize
, true, 0);
283 TMemoryBuffer(uint32_t sz
) {
284 initCommon(NULL
, sz
, true, 0);
287 // transferOwnership should be true if you want TMemoryBuffer to call free(buf).
288 TMemoryBuffer(uint8_t* buf
, int sz
, bool transferOwnership
= false) {
289 initCommon(buf
, sz
, transferOwnership
, sz
);
292 // copy should be true if you want TMemoryBuffer to make a copy of the string.
293 // If you set copy to false, the string must not be destroyed before you are
294 // done with the TMemoryBuffer.
295 TMemoryBuffer(const std::string
& str
, bool copy
= false) {
297 initCommon(NULL
, str
.length(), true, 0);
298 this->write(reinterpret_cast<const uint8_t*>(str
.data()), str
.length());
300 // This first argument should be equivalent to the following:
301 // const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(str.data()))
302 initCommon((uint8_t*)str
.data(), str
.length(), false, str
.length());
318 return (rPos_
< wPos_
);
325 void getBuffer(uint8_t** bufPtr
, uint32_t* sz
) {
330 std::string
getBufferAsString() {
331 if (buffer_
== NULL
) {
334 return std::string((char*)buffer_
, (std::string::size_type
)wPos_
);
337 void appendBufferToString(std::string
& str
) {
338 if (buffer_
== NULL
) {
341 str
.append((char*)buffer_
, wPos_
);
349 // transferOwnership should be true if you want TMemoryBuffer to call free(buf).
350 void resetBuffer(uint8_t* buf
, uint32_t sz
, bool transferOwnership
= false) {
354 owner_
= transferOwnership
;
361 // See the constructor that takes a string.
362 void resetFromString(const std::string
& str
, bool copy
= false) {
364 uint8_t* buf
= (uint8_t*)malloc(str
.length());
366 throw TTransportException("Out of memory");
368 std::copy(str
.begin(), str
.end(), buf
);
369 resetBuffer(buf
, str
.length(), true);
371 // See the above comment about const_cast.
372 resetBuffer((uint8_t*)str
.data(), str
.length(), false);
376 uint32_t read(uint8_t* buf
, uint32_t len
);
378 std::string
readAsString(uint32_t len
) {
380 (void)readAppendToString(str
, len
);
384 uint32_t readAppendToString(std::string
& str
, uint32_t len
);
387 if (rPos_
== wPos_
) {
392 void write(const uint8_t* buf
, uint32_t len
);
394 uint32_t available() {
395 return wPos_
- rPos_
;
398 bool borrow(uint8_t* buf
, uint32_t len
);
400 void consume(uint32_t len
);
406 // Allocated buffer size
407 uint32_t bufferSize_
;
409 // Where the write is at
412 // Where the reader is at
415 // Is this object the owner of the buffer?
421 * TPipedTransport. This transport allows piping of a request from one
422 * transport to another either when readEnd() or writeEnd(). The typical
423 * use case for this is to log a request or a reply to disk.
424 * The underlying buffer expands to a keep a copy of the entire
427 * @author Aditya Agarwal <aditya@facebook.com>
429 class TPipedTransport
: virtual public TTransport
{
431 TPipedTransport(boost::shared_ptr
<TTransport
> srcTrans
,
432 boost::shared_ptr
<TTransport
> dstTrans
) :
435 rBufSize_(512), rPos_(0), rLen_(0),
436 wBufSize_(512), wLen_(0) {
438 // default is to to pipe the request when readEnd() is called
440 pipeOnWrite_
= false;
442 rBuf_
= (uint8_t*) malloc(sizeof(uint8_t) * rBufSize_
);
443 wBuf_
= (uint8_t*) malloc(sizeof(uint8_t) * wBufSize_
);
446 TPipedTransport(boost::shared_ptr
<TTransport
> srcTrans
,
447 boost::shared_ptr
<TTransport
> dstTrans
,
451 rBufSize_(512), rPos_(0), rLen_(0),
452 wBufSize_(sz
), wLen_(0) {
454 rBuf_
= (uint8_t*) malloc(sizeof(uint8_t) * rBufSize_
);
455 wBuf_
= (uint8_t*) malloc(sizeof(uint8_t) * wBufSize_
);
464 return srcTrans_
->isOpen();
468 if (rPos_
>= rLen_
) {
469 // Double the size of the underlying buffer if it is full
470 if (rLen_
== rBufSize_
) {
472 rBuf_
= (uint8_t *)realloc(rBuf_
, sizeof(uint8_t) * rBufSize_
);
475 // try to fill up the buffer
476 rLen_
+= srcTrans_
->read(rBuf_
+rPos_
, rBufSize_
- rPos_
);
478 return (rLen_
> rPos_
);
490 void setPipeOnRead(bool pipeVal
) {
491 pipeOnRead_
= pipeVal
;
494 void setPipeOnWrite(bool pipeVal
) {
495 pipeOnWrite_
= pipeVal
;
498 uint32_t read(uint8_t* buf
, uint32_t len
);
503 dstTrans_
->write(rBuf_
, rLen_
);
507 srcTrans_
->readEnd();
514 void write(const uint8_t* buf
, uint32_t len
);
518 dstTrans_
->write(wBuf_
, wLen_
);
525 boost::shared_ptr
<TTransport
> getTargetTransport() {
530 boost::shared_ptr
<TTransport
> srcTrans_
;
531 boost::shared_ptr
<TTransport
> dstTrans_
;
548 * Wraps a transport into a pipedTransport instance.
550 * @author Aditya Agarwal <aditya@facebook.com>
552 class TPipedTransportFactory
: public TTransportFactory
{
554 TPipedTransportFactory() {}
555 TPipedTransportFactory(boost::shared_ptr
<TTransport
> dstTrans
) {
556 initializeTargetTransport(dstTrans
);
558 virtual ~TPipedTransportFactory() {}
561 * Wraps the base transport into a piped transport.
563 virtual boost::shared_ptr
<TTransport
> getTransport(boost::shared_ptr
<TTransport
> srcTrans
) {
564 return boost::shared_ptr
<TTransport
>(new TPipedTransport(srcTrans
, dstTrans_
));
567 virtual void initializeTargetTransport(boost::shared_ptr
<TTransport
> dstTrans
) {
568 if (dstTrans_
.get() == NULL
) {
569 dstTrans_
= dstTrans
;
571 throw TException("Target transport already initialized");
576 boost::shared_ptr
<TTransport
> dstTrans_
;
580 * TPipedFileTransport. This is just like a TTransport, except that
581 * it is a templatized class, so that clients who rely on a specific
582 * TTransport can still access the original transport.
584 * @author James Wang <jwang@facebook.com>
586 class TPipedFileReaderTransport
: public TPipedTransport
,
587 public TFileReaderTransport
{
589 TPipedFileReaderTransport(boost::shared_ptr
<TFileReaderTransport
> srcTrans
, boost::shared_ptr
<TTransport
> dstTrans
);
591 ~TPipedFileReaderTransport();
593 // TTransport functions
598 uint32_t read(uint8_t* buf
, uint32_t len
);
599 uint32_t readAll(uint8_t* buf
, uint32_t len
);
601 void write(const uint8_t* buf
, uint32_t len
);
605 // TFileReaderTransport functions
606 int32_t getReadTimeout();
607 void setReadTimeout(int32_t readTimeout
);
608 uint32_t getNumChunks();
609 uint32_t getCurChunk();
610 void seekToChunk(int32_t chunk
);
615 TPipedFileReaderTransport();
616 boost::shared_ptr
<TFileReaderTransport
> srcTrans_
;
620 * Creates a TPipedFileReaderTransport from a filepath and a destination transport
622 * @author James Wang <jwang@facebook.com>
624 class TPipedFileReaderTransportFactory
: public TPipedTransportFactory
{
626 TPipedFileReaderTransportFactory() {}
627 TPipedFileReaderTransportFactory(boost::shared_ptr
<TTransport
> dstTrans
)
628 : TPipedTransportFactory(dstTrans
)
630 virtual ~TPipedFileReaderTransportFactory() {}
632 boost::shared_ptr
<TTransport
> getTransport(boost::shared_ptr
<TTransport
> srcTrans
) {
633 boost::shared_ptr
<TFileReaderTransport
> pFileReaderTransport
= boost::dynamic_pointer_cast
<TFileReaderTransport
>(srcTrans
);
634 if (pFileReaderTransport
.get() != NULL
) {
635 return getFileReaderTransport(pFileReaderTransport
);
637 return boost::shared_ptr
<TTransport
>();
641 boost::shared_ptr
<TFileReaderTransport
> getFileReaderTransport(boost::shared_ptr
<TFileReaderTransport
> srcTrans
) {
642 return boost::shared_ptr
<TFileReaderTransport
>(new TPipedFileReaderTransport(srcTrans
, dstTrans_
));
646 }}} // facebook::thrift::transport
648 #endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_