1 // Copyright (c) 2006- Facebook
2 // Distributed under the Thrift Software License
4 // See accompanying file LICENSE or visit the Thrift site at:
5 // http://developers.facebook.com/thrift/
7 #include <transport/TTransportUtils.h>
11 namespace facebook
{ namespace thrift
{ namespace transport
{
13 uint32_t TBufferedTransport::read(uint8_t* buf
, uint32_t len
) {
16 // We don't have enough data yet
17 if (rLen_
-rPos_
< need
) {
18 // Copy out whatever we have
19 if (rLen_
-rPos_
> 0) {
20 memcpy(buf
, rBuf_
+rPos_
, rLen_
-rPos_
);
24 // Get more from underlying transport up to buffer size
25 rLen_
= transport_
->read(rBuf_
, rBufSize_
);
29 // Hand over whatever we have
31 if (rLen_
-rPos_
< give
) {
34 memcpy(buf
, rBuf_
+rPos_
, give
);
40 void TBufferedTransport::write(const uint8_t* buf
, uint32_t len
) {
47 while ((len
-pos
) + wLen_
>= wBufSize_
) {
48 uint32_t copy
= wBufSize_
- wLen_
;
49 memcpy(wBuf_
+ wLen_
, buf
+ pos
, copy
);
51 transport_
->write(wBuf_
, wBufSize_
);
56 if ((len
- pos
) > 0) {
57 memcpy(wBuf_
+ wLen_
, buf
+ pos
, len
- pos
);
62 bool TBufferedTransport::borrow(uint8_t* buf
, uint32_t len
) {
63 // Don't try to be clever with shifting buffers.
64 // If we have enough data, give it, otherwise
65 // let the protcol use its slow path.
66 if (rLen_
-rPos_
>= len
) {
67 memcpy(buf
, rBuf_
+rPos_
, len
);
73 void TBufferedTransport::consume(uint32_t len
) {
74 if (rLen_
-rPos_
>= len
) {
77 throw TTransportException(TTransportException::BAD_ARGS
,
78 "consume did not follow a borrow.");
82 void TBufferedTransport::flush() {
83 // Write out any data waiting in the write buffer
85 transport_
->write(wBuf_
, wLen_
);
89 // Flush the underlying transport
93 uint32_t TFramedTransport::read(uint8_t* buf
, uint32_t len
) {
95 return transport_
->read(buf
, len
);
100 // We don't have enough data yet
101 if (rLen_
-rPos_
< need
) {
102 // Copy out whatever we have
103 if (rLen_
-rPos_
> 0) {
104 memcpy(buf
, rBuf_
+rPos_
, rLen_
-rPos_
);
109 // Read another chunk
113 // Hand over whatever we have
114 uint32_t give
= need
;
115 if (rLen_
-rPos_
< give
) {
118 memcpy(buf
, rBuf_
+rPos_
, give
);
124 void TFramedTransport::readFrame() {
125 // Get rid of the old frame
131 // Read in the next chunk size
133 transport_
->readAll((uint8_t*)&sz
, 4);
134 sz
= (int32_t)ntohl(sz
);
137 throw TTransportException("Frame size has negative value");
140 // Read the frame payload, reset markers
141 rBuf_
= new uint8_t[sz
];
142 transport_
->readAll(rBuf_
, sz
);
147 void TFramedTransport::write(const uint8_t* buf
, uint32_t len
) {
152 // Shortcut out if not write mode
154 transport_
->write(buf
, len
);
158 // Need to grow the buffer
159 if (len
+ wLen_
>= wBufSize_
) {
161 // Double buffer size until sufficient
162 while (wBufSize_
< len
+ wLen_
) {
166 // Allocate new buffer
167 uint8_t* wBuf2
= new uint8_t[wBufSize_
];
169 // Copy the old buffer to the new one
170 memcpy(wBuf2
, wBuf_
, wLen_
);
172 // Now point buf to the new one
177 // Copy data into buffer
178 memcpy(wBuf_
+ wLen_
, buf
, len
);
182 void TFramedTransport::flush() {
190 sz
= (int32_t)htonl(sz
);
192 transport_
->write((const uint8_t*)&sz
, 4);
196 transport_
->write(wBuf_
, wLen_
);
202 // Flush the underlying
206 bool TFramedTransport::borrow(uint8_t* buf
, uint32_t len
) {
207 // Don't try to be clever with shifting buffers.
208 // If we have enough data, give it, otherwise
209 // let the protcol use its slow path.
210 if (read_
&& (rLen_
-rPos_
>= len
)) {
211 memcpy(buf
, rBuf_
+rPos_
, len
);
217 void TFramedTransport::consume(uint32_t len
) {
218 if (rLen_
-rPos_
>= len
) {
221 throw TTransportException(TTransportException::BAD_ARGS
,
222 "consume did not follow a borrow.");
226 uint32_t TMemoryBuffer::read(uint8_t* buf
, uint32_t len
) {
227 // Check avaible data for reading
228 uint32_t avail
= wPos_
- rPos_
;
233 // Decide how much to give
239 // Copy into buffer and increment rPos_
240 memcpy(buf
, buffer_
+ rPos_
, give
);
246 uint32_t TMemoryBuffer::readAppendToString(std::string
& str
, uint32_t len
) {
247 // Don't get some stupid assertion failure.
248 if (buffer_
== NULL
) {
252 // Check avaible data for reading
253 uint32_t avail
= wPos_
- rPos_
;
258 // Device how much to give
264 // Reserve memory, copy into string, and increment rPos_
265 str
.reserve(str
.length()+give
);
266 str
.append((char*)buffer_
+ rPos_
, give
);
272 void TMemoryBuffer::write(const uint8_t* buf
, uint32_t len
) {
273 // Check available space
274 uint32_t avail
= bufferSize_
- wPos_
;
279 throw TTransportException("Insufficient space in external MemoryBuffer");
281 while (len
> avail
) {
283 avail
= bufferSize_
- wPos_
;
285 buffer_
= (uint8_t*)realloc(buffer_
, bufferSize_
);
286 if (buffer_
== NULL
) {
287 throw TTransportException("Out of memory.");
291 // Copy into the buffer and increment wPos_
292 memcpy(buffer_
+ wPos_
, buf
, len
);
296 bool TMemoryBuffer::borrow(uint8_t* buf
, uint32_t len
) {
297 // Don't try to be clever with shifting buffers.
298 // If we have enough data, give it, otherwise
299 // let the protcol use its slow path.
300 if (wPos_
-rPos_
>= len
) {
301 memcpy(buf
, buffer_
+ rPos_
, len
);
307 void TMemoryBuffer::consume(uint32_t len
) {
308 if (wPos_
-rPos_
>= len
) {
311 throw TTransportException(TTransportException::BAD_ARGS
,
312 "consume did not follow a borrow.");
316 uint32_t TPipedTransport::read(uint8_t* buf
, uint32_t len
) {
319 // We don't have enough data yet
320 if (rLen_
-rPos_
< need
) {
321 // Copy out whatever we have
322 if (rLen_
-rPos_
> 0) {
323 memcpy(buf
, rBuf_
+rPos_
, rLen_
-rPos_
);
329 // Double the size of the underlying buffer if it is full
330 if (rLen_
== rBufSize_
) {
332 rBuf_
= (uint8_t *)realloc(rBuf_
, sizeof(uint8_t) * rBufSize_
);
335 // try to fill up the buffer
336 rLen_
+= srcTrans_
->read(rBuf_
+rPos_
, rBufSize_
- rPos_
);
340 // Hand over whatever we have
341 uint32_t give
= need
;
342 if (rLen_
-rPos_
< give
) {
346 memcpy(buf
, rBuf_
+rPos_
, give
);
354 void TPipedTransport::write(const uint8_t* buf
, uint32_t len
) {
359 // Make the buffer as big as it needs to be
360 if ((len
+ wLen_
) >= wBufSize_
) {
361 uint32_t newBufSize
= wBufSize_
*2;
362 while ((len
+ wLen_
) >= newBufSize
) {
365 wBuf_
= (uint8_t *)realloc(wBuf_
, sizeof(uint8_t) * newBufSize
);
366 wBufSize_
= newBufSize
;
369 // Copy into the buffer
370 memcpy(wBuf_
+ wLen_
, buf
, len
);
374 void TPipedTransport::flush() {
375 // Write out any data waiting in the write buffer
377 srcTrans_
->write(wBuf_
, wLen_
);
381 // Flush the underlying transport
385 TPipedFileReaderTransport::TPipedFileReaderTransport(boost::shared_ptr
<TFileReaderTransport
> srcTrans
, boost::shared_ptr
<TTransport
> dstTrans
)
386 : TPipedTransport(srcTrans
, dstTrans
),
387 srcTrans_(srcTrans
) {
390 TPipedFileReaderTransport::~TPipedFileReaderTransport() {
393 bool TPipedFileReaderTransport::isOpen() {
394 return TPipedTransport::isOpen();
397 bool TPipedFileReaderTransport::peek() {
398 return TPipedTransport::peek();
401 void TPipedFileReaderTransport::open() {
402 TPipedTransport::open();
405 void TPipedFileReaderTransport::close() {
406 TPipedTransport::close();
409 uint32_t TPipedFileReaderTransport::read(uint8_t* buf
, uint32_t len
) {
410 return TPipedTransport::read(buf
, len
);
413 uint32_t TPipedFileReaderTransport::readAll(uint8_t* buf
, uint32_t len
) {
418 get
= read(buf
+have
, len
-have
);
420 throw TEOFException();
428 void TPipedFileReaderTransport::readEnd() {
429 TPipedTransport::readEnd();
432 void TPipedFileReaderTransport::write(const uint8_t* buf
, uint32_t len
) {
433 TPipedTransport::write(buf
, len
);
436 void TPipedFileReaderTransport::writeEnd() {
437 TPipedTransport::writeEnd();
440 void TPipedFileReaderTransport::flush() {
441 TPipedTransport::flush();
444 int32_t TPipedFileReaderTransport::getReadTimeout() {
445 return srcTrans_
->getReadTimeout();
448 void TPipedFileReaderTransport::setReadTimeout(int32_t readTimeout
) {
449 srcTrans_
->setReadTimeout(readTimeout
);
452 uint32_t TPipedFileReaderTransport::getNumChunks() {
453 return srcTrans_
->getNumChunks();
456 uint32_t TPipedFileReaderTransport::getCurChunk() {
457 return srcTrans_
->getCurChunk();
460 void TPipedFileReaderTransport::seekToChunk(int32_t chunk
) {
461 srcTrans_
->seekToChunk(chunk
);
464 void TPipedFileReaderTransport::seekToEnd() {
465 srcTrans_
->seekToEnd();
468 }}} // facebook::thrift::transport