r1355@opsdev009 (orig r71477): mcslee | 2007-11-27 00:42:19 -0800
[amiethrift.git] / lib / cpp / src / transport / TTransportUtils.cpp
blobb10958cbddeffd8ca562d3e93c66cb60159f1c57
1 // Copyright (c) 2006- Facebook
2 // Distributed under the Thrift Software License
3 //
4 // See accompanying file LICENSE or visit the Thrift site at:
5 // http://developers.facebook.com/thrift/
7 #include <transport/TTransportUtils.h>
9 using std::string;
11 namespace facebook { namespace thrift { namespace transport {
13 uint32_t TBufferedTransport::read(uint8_t* buf, uint32_t len) {
14 uint32_t need = len;
16 // We don't have enough data yet
17 if (rLen_-rPos_ < need) {
18 // Copy out whatever we have
19 if (rLen_-rPos_ > 0) {
20 memcpy(buf, rBuf_+rPos_, rLen_-rPos_);
21 need -= rLen_-rPos_;
22 buf += rLen_-rPos_;
24 // Get more from underlying transport up to buffer size
25 rLen_ = transport_->read(rBuf_, rBufSize_);
26 rPos_ = 0;
29 // Hand over whatever we have
30 uint32_t give = need;
31 if (rLen_-rPos_ < give) {
32 give = rLen_-rPos_;
34 memcpy(buf, rBuf_+rPos_, give);
35 rPos_ += give;
36 need -= give;
37 return (len - need);
40 void TBufferedTransport::write(const uint8_t* buf, uint32_t len) {
41 if (len == 0) {
42 return;
45 uint32_t pos = 0;
47 while ((len-pos) + wLen_ >= wBufSize_) {
48 uint32_t copy = wBufSize_ - wLen_;
49 memcpy(wBuf_ + wLen_, buf + pos, copy);
51 transport_->write(wBuf_, wBufSize_);
52 pos += copy;
53 wLen_ = 0;
56 if ((len - pos) > 0) {
57 memcpy(wBuf_ + wLen_, buf + pos, len - pos);
58 wLen_ += len - pos;
62 bool TBufferedTransport::borrow(uint8_t* buf, uint32_t len) {
63 // Don't try to be clever with shifting buffers.
64 // If we have enough data, give it, otherwise
65 // let the protcol use its slow path.
66 if (rLen_-rPos_ >= len) {
67 memcpy(buf, rBuf_+rPos_, len);
68 return true;
70 return false;
73 void TBufferedTransport::consume(uint32_t len) {
74 if (rLen_-rPos_ >= len) {
75 rPos_ += len;
76 } else {
77 throw TTransportException(TTransportException::BAD_ARGS,
78 "consume did not follow a borrow.");
82 void TBufferedTransport::flush() {
83 // Write out any data waiting in the write buffer
84 if (wLen_ > 0) {
85 transport_->write(wBuf_, wLen_);
86 wLen_ = 0;
89 // Flush the underlying transport
90 transport_->flush();
93 uint32_t TFramedTransport::read(uint8_t* buf, uint32_t len) {
94 if (!read_) {
95 return transport_->read(buf, len);
98 uint32_t need = len;
100 // We don't have enough data yet
101 if (rLen_-rPos_ < need) {
102 // Copy out whatever we have
103 if (rLen_-rPos_ > 0) {
104 memcpy(buf, rBuf_+rPos_, rLen_-rPos_);
105 need -= rLen_-rPos_;
106 buf += rLen_-rPos_;
109 // Read another chunk
110 readFrame();
113 // Hand over whatever we have
114 uint32_t give = need;
115 if (rLen_-rPos_ < give) {
116 give = rLen_-rPos_;
118 memcpy(buf, rBuf_+rPos_, give);
119 rPos_ += give;
120 need -= give;
121 return (len - need);
124 void TFramedTransport::readFrame() {
125 // Get rid of the old frame
126 if (rBuf_ != NULL) {
127 delete [] rBuf_;
128 rBuf_ = NULL;
131 // Read in the next chunk size
132 int32_t sz;
133 transport_->readAll((uint8_t*)&sz, 4);
134 sz = (int32_t)ntohl(sz);
136 if (sz < 0) {
137 throw TTransportException("Frame size has negative value");
140 // Read the frame payload, reset markers
141 rBuf_ = new uint8_t[sz];
142 transport_->readAll(rBuf_, sz);
143 rPos_ = 0;
144 rLen_ = sz;
147 void TFramedTransport::write(const uint8_t* buf, uint32_t len) {
148 if (len == 0) {
149 return;
152 // Shortcut out if not write mode
153 if (!write_) {
154 transport_->write(buf, len);
155 return;
158 // Need to grow the buffer
159 if (len + wLen_ >= wBufSize_) {
161 // Double buffer size until sufficient
162 while (wBufSize_ < len + wLen_) {
163 wBufSize_ *= 2;
166 // Allocate new buffer
167 uint8_t* wBuf2 = new uint8_t[wBufSize_];
169 // Copy the old buffer to the new one
170 memcpy(wBuf2, wBuf_, wLen_);
172 // Now point buf to the new one
173 delete [] wBuf_;
174 wBuf_ = wBuf2;
177 // Copy data into buffer
178 memcpy(wBuf_ + wLen_, buf, len);
179 wLen_ += len;
182 void TFramedTransport::flush() {
183 if (!write_) {
184 transport_->flush();
185 return;
188 // Write frame size
189 int32_t sz = wLen_;
190 sz = (int32_t)htonl(sz);
192 transport_->write((const uint8_t*)&sz, 4);
194 // Write frame body
195 if (wLen_ > 0) {
196 transport_->write(wBuf_, wLen_);
199 // All done
200 wLen_ = 0;
202 // Flush the underlying
203 transport_->flush();
206 bool TFramedTransport::borrow(uint8_t* buf, uint32_t len) {
207 // Don't try to be clever with shifting buffers.
208 // If we have enough data, give it, otherwise
209 // let the protcol use its slow path.
210 if (read_ && (rLen_-rPos_ >= len)) {
211 memcpy(buf, rBuf_+rPos_, len);
212 return true;
214 return false;
217 void TFramedTransport::consume(uint32_t len) {
218 if (rLen_-rPos_ >= len) {
219 rPos_ += len;
220 } else {
221 throw TTransportException(TTransportException::BAD_ARGS,
222 "consume did not follow a borrow.");
226 uint32_t TMemoryBuffer::read(uint8_t* buf, uint32_t len) {
227 // Check avaible data for reading
228 uint32_t avail = wPos_ - rPos_;
229 if (avail == 0) {
230 return 0;
233 // Decide how much to give
234 uint32_t give = len;
235 if (avail < len) {
236 give = avail;
239 // Copy into buffer and increment rPos_
240 memcpy(buf, buffer_ + rPos_, give);
241 rPos_ += give;
243 return give;
246 uint32_t TMemoryBuffer::readAppendToString(std::string& str, uint32_t len) {
247 // Don't get some stupid assertion failure.
248 if (buffer_ == NULL) {
249 return 0;
252 // Check avaible data for reading
253 uint32_t avail = wPos_ - rPos_;
254 if (avail == 0) {
255 return 0;
258 // Device how much to give
259 uint32_t give = len;
260 if (avail < len) {
261 give = avail;
264 // Reserve memory, copy into string, and increment rPos_
265 str.reserve(str.length()+give);
266 str.append((char*)buffer_ + rPos_, give);
267 rPos_ += give;
269 return give;
272 void TMemoryBuffer::write(const uint8_t* buf, uint32_t len) {
273 // Check available space
274 uint32_t avail = bufferSize_ - wPos_;
276 // Grow the buffer
277 if (len > avail) {
278 if (!owner_) {
279 throw TTransportException("Insufficient space in external MemoryBuffer");
281 while (len > avail) {
282 bufferSize_ *= 2;
283 avail = bufferSize_ - wPos_;
285 buffer_ = (uint8_t*)realloc(buffer_, bufferSize_);
286 if (buffer_ == NULL) {
287 throw TTransportException("Out of memory.");
291 // Copy into the buffer and increment wPos_
292 memcpy(buffer_ + wPos_, buf, len);
293 wPos_ += len;
296 bool TMemoryBuffer::borrow(uint8_t* buf, uint32_t len) {
297 // Don't try to be clever with shifting buffers.
298 // If we have enough data, give it, otherwise
299 // let the protcol use its slow path.
300 if (wPos_-rPos_ >= len) {
301 memcpy(buf, buffer_ + rPos_, len);
302 return true;
304 return false;
307 void TMemoryBuffer::consume(uint32_t len) {
308 if (wPos_-rPos_ >= len) {
309 rPos_ += len;
310 } else {
311 throw TTransportException(TTransportException::BAD_ARGS,
312 "consume did not follow a borrow.");
316 uint32_t TPipedTransport::read(uint8_t* buf, uint32_t len) {
317 uint32_t need = len;
319 // We don't have enough data yet
320 if (rLen_-rPos_ < need) {
321 // Copy out whatever we have
322 if (rLen_-rPos_ > 0) {
323 memcpy(buf, rBuf_+rPos_, rLen_-rPos_);
324 need -= rLen_-rPos_;
325 buf += rLen_-rPos_;
326 rPos_ = rLen_;
329 // Double the size of the underlying buffer if it is full
330 if (rLen_ == rBufSize_) {
331 rBufSize_ *=2;
332 rBuf_ = (uint8_t *)realloc(rBuf_, sizeof(uint8_t) * rBufSize_);
335 // try to fill up the buffer
336 rLen_ += srcTrans_->read(rBuf_+rPos_, rBufSize_ - rPos_);
340 // Hand over whatever we have
341 uint32_t give = need;
342 if (rLen_-rPos_ < give) {
343 give = rLen_-rPos_;
345 if (give > 0) {
346 memcpy(buf, rBuf_+rPos_, give);
347 rPos_ += give;
348 need -= give;
351 return (len - need);
354 void TPipedTransport::write(const uint8_t* buf, uint32_t len) {
355 if (len == 0) {
356 return;
359 // Make the buffer as big as it needs to be
360 if ((len + wLen_) >= wBufSize_) {
361 uint32_t newBufSize = wBufSize_*2;
362 while ((len + wLen_) >= newBufSize) {
363 newBufSize *= 2;
365 wBuf_ = (uint8_t *)realloc(wBuf_, sizeof(uint8_t) * newBufSize);
366 wBufSize_ = newBufSize;
369 // Copy into the buffer
370 memcpy(wBuf_ + wLen_, buf, len);
371 wLen_ += len;
374 void TPipedTransport::flush() {
375 // Write out any data waiting in the write buffer
376 if (wLen_ > 0) {
377 srcTrans_->write(wBuf_, wLen_);
378 wLen_ = 0;
381 // Flush the underlying transport
382 srcTrans_->flush();
385 TPipedFileReaderTransport::TPipedFileReaderTransport(boost::shared_ptr<TFileReaderTransport> srcTrans, boost::shared_ptr<TTransport> dstTrans)
386 : TPipedTransport(srcTrans, dstTrans),
387 srcTrans_(srcTrans) {
390 TPipedFileReaderTransport::~TPipedFileReaderTransport() {
393 bool TPipedFileReaderTransport::isOpen() {
394 return TPipedTransport::isOpen();
397 bool TPipedFileReaderTransport::peek() {
398 return TPipedTransport::peek();
401 void TPipedFileReaderTransport::open() {
402 TPipedTransport::open();
405 void TPipedFileReaderTransport::close() {
406 TPipedTransport::close();
409 uint32_t TPipedFileReaderTransport::read(uint8_t* buf, uint32_t len) {
410 return TPipedTransport::read(buf, len);
413 uint32_t TPipedFileReaderTransport::readAll(uint8_t* buf, uint32_t len) {
414 uint32_t have = 0;
415 uint32_t get = 0;
417 while (have < len) {
418 get = read(buf+have, len-have);
419 if (get <= 0) {
420 throw TEOFException();
422 have += get;
425 return have;
428 void TPipedFileReaderTransport::readEnd() {
429 TPipedTransport::readEnd();
432 void TPipedFileReaderTransport::write(const uint8_t* buf, uint32_t len) {
433 TPipedTransport::write(buf, len);
436 void TPipedFileReaderTransport::writeEnd() {
437 TPipedTransport::writeEnd();
440 void TPipedFileReaderTransport::flush() {
441 TPipedTransport::flush();
444 int32_t TPipedFileReaderTransport::getReadTimeout() {
445 return srcTrans_->getReadTimeout();
448 void TPipedFileReaderTransport::setReadTimeout(int32_t readTimeout) {
449 srcTrans_->setReadTimeout(readTimeout);
452 uint32_t TPipedFileReaderTransport::getNumChunks() {
453 return srcTrans_->getNumChunks();
456 uint32_t TPipedFileReaderTransport::getCurChunk() {
457 return srcTrans_->getCurChunk();
460 void TPipedFileReaderTransport::seekToChunk(int32_t chunk) {
461 srcTrans_->seekToChunk(chunk);
464 void TPipedFileReaderTransport::seekToEnd() {
465 srcTrans_->seekToEnd();
468 }}} // facebook::thrift::transport