3 * Copyright 2007 Google Inc.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 * Cloud Storage Write Client implements the stream wrapper functions required
19 * to write to a Google Cloud Storage object.
23 namespace google\appengine\ext\cloud_storage_streams
;
25 require_once 'google/appengine/ext/cloud_storage_streams/CloudStorageClient.php';
27 // TODO: Retry on transient errors.
29 final class CloudStorageWriteClient
extends CloudStorageClient
{
30 // GS requires all chunks of data written to be multiples of 256K except for
32 const WRITE_CHUNK_SIZE
= 262144;
34 // Content Range Header format when the total length is unknown.
35 const PARTIAL_CONTENT_RANGE_FORMAT
= "bytes %d-%d/*";
37 // Content Range Header format when the length is known.
38 const FINAL_CONTENT_RANGE_FORMAT
= "bytes %d-%d/%d";
40 // Content Range Header for final chunk with no new data
41 const FINAL_CONTENT_RANGE_NO_DATA
= "bytes */%d";
43 private static $upload_start_header = ["x-goog-resumable" => "start"];
45 // The array of bytes to be written to GS
48 // The resumable upload ID we are using for this upload.
51 // The offset in the file where the current buffer starts
52 private $buffer_start_offset;
54 // The number of bytes we've written to GS so far.
55 private $total_bytes_uploaded;
57 public function __construct($bucket, $object, $context) {
58 parent
::__construct($bucket, $object, $context);
62 * Called when the stream is being opened. Try and start a resumable upload
65 * @return true if the streamable upload started, false otherwise.
67 public function initialize() {
68 $headers = self
::$upload_start_header;
70 $token_header = $this->getOAuthTokenHeader(parent
::WRITE_SCOPE
);
71 if ($token_header === false) {
72 trigger_error("Unable to acquire OAuth token.", E_USER_WARNING
);
75 $headers = array_merge($headers, $token_header);
77 if (array_key_exists("Content-Type", $this->context_options
)) {
78 $headers["Content-Type"] = $this->context_options
["Content-Type"];
81 if (array_key_exists("acl", $this->context_options
)) {
82 $acl = $this->context_options
["acl"];
83 if (in_array($acl, parent
::$valid_acl_values)) {
84 $headers["x-goog-acl"] = $acl;
86 trigger_error(sprintf("Invalid ACL value: %s", $acl), E_USER_WARNING
);
91 $http_response = $this->makeHttpRequest($this->url
,
95 if ($http_response === false) {
96 trigger_error("Unable to connect to Google Cloud Storage Service.",
101 $status_code = $http_response['status_code'];
102 if ($status_code == HttpResponse
::FORBIDDEN
) {
103 trigger_error("Access Denied", E_USER_WARNING
);
106 if ($status_code != HttpResponse
::CREATED
) {
107 trigger_error(sprintf("Error connecting to Google Cloud Storage: %s",
108 HttpResponse
::getStatusMessage($status_code)),
113 $location = $this->getHeaderValue("Location", $http_response['headers']);
115 $query_str = parse_url($location)["query"];
116 parse_str($query_str, $query_arr);
117 $this->upload_id
= $query_arr["upload_id"];
119 if (!isset($this->upload_id
)) {
120 trigger_error(sprintf("Location Header was not returned (%s).",
122 array_keys($http_response['headers']))),
127 $this->buffer_start_offset
= 0;
128 $this->total_bytes_uploaded
= 0;
129 $this->byte_buffer
= "";
134 * Return the number of bytes written.
136 public function write($data) {
137 $this->byte_buffer
.= $data;
138 $current_buffer_len = strlen($this->byte_buffer
);
139 $data_len = strlen($data);
140 // If this data doesn't fill the buffer then write it and return.
141 if ($current_buffer_len < self
::WRITE_CHUNK_SIZE
) {
145 // Write out this data
146 if (!$this->writeBufferToGS()) {
147 // Remove the bytes we added to the buffer
148 $this->byte_buffer
= substr($this->byte_buffer
, 0, -strlen($data));
152 // We wrote the buffered content - but only return the amount of $data
153 // we wrote as per the contract of write()
158 * Because of the write byte alignment required by GS we will not write any
159 * data on a flush. If there is data remaining in the buffer we'll write it
162 public function flush() {
167 * When closing the stream we need to complete the upload.
169 public function close() {
170 $this->writeBufferToGS(true);
173 private function writeBufferToGS($complete = false) {
174 $headers = $this->getOAuthTokenHeader(parent
::WRITE_SCOPE
);
175 if ($headers === false) {
176 trigger_error("Unable to acquire OAuth token.", E_USER_ERROR
);
180 $buffer_len = strlen($this->byte_buffer
);
183 $write_size = $buffer_len;
185 // Incomplete writes should never be less than WRITE_CHUNK_SIZE
186 assert($buffer_len >= self
::WRITE_CHUNK_SIZE
);
187 // Is PHP the only language in the world where the quotient of two
188 // integers is a double?
190 floor($buffer_len / self
::WRITE_CHUNK_SIZE
) * self
::WRITE_CHUNK_SIZE
;
193 // Determine the final byte of the buffer we're writing for Range header.
194 if ($write_size !== 0) {
195 $write_end_byte = $this->buffer_start_offset +
$write_size - 1;
196 $body = substr($this->byte_buffer
, 0, $write_size);
202 $object_length = $this->buffer_start_offset +
$write_size;
203 if ($write_size === 0) {
204 $headers['Content-Range'] = sprintf(self
::FINAL_CONTENT_RANGE_NO_DATA
,
207 $headers['Content-Range'] = sprintf(self
::FINAL_CONTENT_RANGE_FORMAT
,
208 $this->buffer_start_offset
,
213 $headers['Content-Range'] = sprintf(self
::PARTIAL_CONTENT_RANGE_FORMAT
,
214 $this->buffer_start_offset
,
218 $url = sprintf("%s?upload_id=%s", $this->url
, $this->upload_id
);
219 $http_response = $this->makeHttpRequest($url, "PUT", $headers, $body);
220 $code = $http_response['status_code'];
222 // TODO: Retry on some status codes.
223 if (($complete && $code != HttpResponse
::OK
) ||
224 (!$complete && $code != HttpResponse
::RESUME_INCOMPLETE
)) {
226 sprintf("Error writing to Google Cloud Storage Service: %s",
227 HttpResponse
::getStatusMessage($code)),
231 // Buffer flushed, update pointers if we actually wrote something.
232 if ($write_size !== 0) {
233 $this->buffer_start_offset
= $write_end_byte +
1;
234 $this->byte_buffer
= substr($this->byte_buffer
, $write_size);