4 namespace Doctrine\CouchDB\HTTP
;
8 * Streams the multipart data from the source to the target and thus makes the
9 * transfer with lesser memory footprint.
11 * Class MultipartParserAndSender
12 * @package Doctrine\CouchDB\HTTP
14 class MultipartParserAndSender
19 protected $sourceClient;
24 protected $targetClient;
29 protected $sourceConnection;
34 protected $targetConnection;
37 * @param StreamClient $source
38 * @param SocketClient $target
40 public function __construct(
41 AbstractHTTPClient
$source,
42 AbstractHTTPClient
$target
44 $sourceOptions = $source->getOptions();
45 $this->sourceClient
= new StreamClient(
46 $sourceOptions['host'],
47 $sourceOptions['port'],
48 $sourceOptions['username'],
49 $sourceOptions['password'],
51 $sourceOptions['ssl'],
52 $sourceOptions['path']
55 $targetOptions = $target->getOptions();
56 $this->targetClient
= new SocketClient(
57 $targetOptions['host'],
58 $targetOptions['port'],
59 $targetOptions['username'],
60 $targetOptions['password'],
62 $targetOptions['ssl'],
63 $targetOptions['path']
68 * Perform request to the source, parse the multipart response,
69 * stream the documents with attachments to the target and return
70 * the responses along with docs that did not have any attachments.
72 * @param string $sourceMethod
73 * @param string $sourcePath
74 * @param string $targetPath
75 * @param string $sourceData
76 * @param array $sourceHeaders
77 * @return array|ErrorResponse|string
78 * @throws HTTPException
81 public function request(
86 array $sourceHeaders = array()
88 $this->sourceConnection
= $this->sourceClient
->getConnection(
94 $sourceResponseHeaders = $this->sourceClient
->getStreamHeaders();
97 if (empty($sourceResponseHeaders['status'])) {
99 // Close the connection resource.
100 fclose($this->sourceConnection
);
101 } catch (\Exception
$e) {
104 throw HTTPException
::readFailure(
105 $this->sourceClient
->getOptions()['ip'],
106 $this->sourceClient
->getOptions()['port'],
107 'Received an empty response or not status code',
112 } elseif ($sourceResponseHeaders['status'] != 200) {
113 while (!feof($this->sourceConnection
)) {
114 $body .= fgets($this->sourceConnection
);
117 fclose($this->sourceConnection
);
118 } catch (\Exception
$e) {
121 return new ErrorResponse(
122 $sourceResponseHeaders['status'],
123 $sourceResponseHeaders,
129 // Body is an array containing:
130 // 1) Array of json string documents that don't have
131 // attachments. These should be posted using the Bulk API.
132 // 2) Responses of posting docs with attachments.
133 $body = $this->parseAndSend($targetPath);
135 fclose($this->sourceConnection
);
136 } catch (\Exception
$e) {
140 } catch(\Exception
$e) {
148 * Read and return next line from the connection pointer.
149 * $maxLength parameter can be used to set the maximum length
152 * @param int $maxLength
155 protected function getNextLineFromSourceConnection($maxLength = null)
157 if ($maxLength !== null) {
158 return fgets($this->sourceConnection
, $maxLength);
160 return fgets($this->sourceConnection
);
165 * Parses multipart data. Returns an array having:
166 * 1) Array of json docs(which are strings) that don't have attachments.
167 * These should be posted using the Bulk API.
168 * 2) Responses of posting docs with attachments.
173 * @throws \HTTPException
175 protected function parseAndSend($targetPath)
177 // Main response boundary of the multipart stream.
178 $mainBoundary = trim($this->getNextLineFromSourceConnection());
180 // Docs that don't have attachment.
181 // These should be posted using Bulk upload.
184 // Responses from posting docs that have attachments.
185 $responses = array();
187 while (!feof($this->sourceConnection
)) {
189 $line = ltrim($this->getNextLineFromSourceConnection());
193 } elseif (strpos($line, 'Content-Type') !== false) {
196 list($header, $value) = explode(':', $line);
197 $header = trim($header);
198 $value = trim($value);
201 if (strpos($value, ';') !== false) {
202 list($type, $info) = explode(';', $value, 2);
205 // Get the boundary for the current doc.
206 if (strpos($info, 'boundary') !== false) {
209 } elseif (strpos($info, 'error') !== false) {
211 // Missing revs at the source. Continue till the end
213 while (strpos($this->getNextLineFromSourceConnection(), $mainBoundary) === false) ;
218 throw new \
Exception('Unknown parameter with Content-Type.');
222 // Doc with attachments.
223 if (strpos($value, 'multipart/related') !== false) {
225 if ($boundary == '') {
226 throw new \
Exception('Boundary not set for multipart/related data.');
230 $boundary = explode('=', $boundary, 2)[1];
233 $responses[] = $this->sendStream(
237 array('Content-Type' => 'multipart/related; boundary=' . $boundary));
238 } catch (\Exception
$e) {
243 } elseif ($value == 'application/json') {
244 // JSON doc without attachment.
247 while(trim(($jsonDoc = $this->getNextLineFromSourceConnection())) == '');
248 array_push($docStack, trim($jsonDoc));
250 // Continue till the end of this document.
251 while (strpos($this->getNextLineFromSourceConnection(), $mainBoundary) === false) ;
254 throw new \
UnexpectedValueException('This value is not supported.');
257 throw new \
Exception('The first line is not the Content-Type.');
260 return array($docStack, $responses);
265 * Reads multipart data from sourceConnection and streams it to the
266 * targetConnection.Returns the body of the request or the status code in
267 * case there is no body.
272 * @param array $requestHeaders
273 * @return mixed|string
275 * @throws \HTTPException
277 protected function sendStream(
281 $requestHeaders = array()
283 $dataStream = $this->sourceConnection
;
286 // Read the json doc. Use _attachments field to find the total
287 // Content-Length and create the request header with initial doc data.
288 // At present CouchDB can't handle chunked data and needs
289 // Content-Length header.
292 $attachmentCount = 0;
293 $totalAttachmentLength = 0;
294 $streamLine = $this->getNextLineFromSourceConnection();
298 trim($streamLine) == ''
302 if (strpos($streamLine, 'Content-Type: application/json') !== false) {
305 $streamLine = $this->getNextLineFromSourceConnection();
307 $docBoundaryLength = strlen(explode('=', $requestHeaders['Content-Type'], 2)[1]) +
2;
308 $json = json_decode($streamLine, true);
309 foreach ($json['_attachments'] as $docName => $metaData) {
310 // Quotes and a "/r/n"
311 $totalAttachmentLength +
= strlen('Content-Disposition: attachment; filename=') +
strlen($docName) +
4;
312 $totalAttachmentLength +
= strlen('Content-Type: ') +
strlen($metaData['content_type']) +
2;
313 $totalAttachmentLength +
= strlen('Content-Length: ');
314 if (isset($metaData['encoding'])) {
315 $totalAttachmentLength +
= $metaData['encoded_length'] +
strlen($metaData['encoded_length']) +
2;
316 $totalAttachmentLength +
= strlen('Content-Encoding: ') +
strlen($metaData['encoding']) +
2;
318 $totalAttachmentLength +
= $metaData['length'] +
strlen($metaData['length']) +
2;
320 $totalAttachmentLength +
= 2;
324 // Add Content-Length to the headers.
325 $requestHeaders['Content-Length'] = strlen($str) +
strlen($streamLine)
326 +
$totalAttachmentLength +
$attachmentCount * (2 +
$docBoundaryLength) +
$docBoundaryLength +
2;
329 if ($this->targetConnection
== null) {
330 $this->targetConnection
= $this->targetClient
->getConnection(
337 // Write the initial body data.
338 fwrite($this->targetConnection
, $str);
340 // Write the rest of the data including attachments line by line or in
342 while(!feof($dataStream) &&
343 ($streamEnd === null ||
344 strpos($streamLine, $streamEnd) ===
349 $length = strlen($streamLine);
350 while($totalSent != $length) {
351 $sent = fwrite($this->targetConnection
, substr($streamLine,$totalSent));
352 if ($sent === false) {
353 throw new \
HTTPException('Stream write error.');
358 // Use maxLength while reading the data as there may be no newlines
359 // in the binary and compressed attachments, or the lines may be
361 $streamLine = $this->getNextLineFromSourceConnection(100000);
364 // Read response headers
367 'connection' => ($this->targetClient
->getOptions()['keep-alive'] ?
'Keep-Alive' : 'Close'),
371 // Remove leading newlines, should not occur at all, actually.
372 while ((($line = fgets($this->targetConnection
)) !== false) &&
373 (($lineContent = rtrim($line)) === ''));
375 // Throw exception, if connection has been aborted by the server, and
376 // leave handling to the user for now.
377 if ($line === false) {
378 // sendStream can't be called in recursion as the source stream can be
380 $error = error_get_last();
381 throw HTTPException
::connectionFailure(
382 $this->targetClient
->getOptions()['ip'],
383 $this->targetClient
->getOptions()['port'],
390 // Also store raw headers for later logging
391 $rawHeaders .= $lineContent . "\n";
392 // Extract header values
393 if (preg_match('(^HTTP/(?P<version>\d+\.\d+)\s+(?P<status>\d+))S', $lineContent, $match)) {
394 $headers['version'] = $match['version'];
395 $headers['status'] = (int) $match['status'];
397 list($key, $value) = explode(':', $lineContent, 2);
398 $headers[strtolower($key)] = ltrim($value);
400 } while ((($line = fgets($this->targetConnection
)) !== false) &&
401 (($lineContent = rtrim($line)) !== ''));
404 // Read response body
407 // HTTP 1.1 supports chunked transfer encoding, if the according
408 // header is not set, just read the specified amount of bytes.
409 $bytesToRead = (int) (isset( $headers['content-length']) ?
$headers['content-length'] : 0);
410 // Read body only as specified by chunk sizes, everything else
411 // are just footnotes, which are not relevant for us.
412 while ($bytesToRead > 0) {
413 $body .= $read = fgets($this->targetConnection
, $bytesToRead +
1);
414 $bytesToRead -= strlen($read);
417 // Reset the connection if the server asks for it.
418 if ($headers['connection'] !== 'Keep-Alive') {
419 fclose($this->targetConnection
);
420 $this->targetConnection
= null;
422 // Handle some response state as special cases
423 switch ($headers['status']) {
428 // Temporary redirect.
429 // sendStream can't be called in recursion as the source stream can be
431 throw HTTPException
::fromResponse($path, new Response($headers['status'], $headers, $body));
433 return ($body != '' ?
json_decode($body, true) : array("status" => $headers['status'])) ;