App Engine Python SDK version 1.8.1
[gae.git] / python / php / sdk / google / appengine / ext / cloud_storage_streams / CloudStorageWriteClient.php
blobf1a37a14935b4d888ccf24ad3cb4a4305dcc293b
1 <?php
2 /**
3 * Copyright 2007 Google Inc.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 /**
18 * Cloud Storage Write Client implements the stream wrapper functions required
19 * to write to a Google Cloud Storage object.
23 namespace google\appengine\ext\cloud_storage_streams;
25 require_once 'google/appengine/ext/cloud_storage_streams/CloudStorageClient.php';
27 // TODO: Retry on transient errors.
29 final class CloudStorageWriteClient extends CloudStorageClient {
30 // GS requires all chunks of data written to be multiples of 256K except for
31 // the last chunk.
32 const WRITE_CHUNK_SIZE = 262144;
34 // Content Range Header format when the total length is unknown.
35 const PARTIAL_CONTENT_RANGE_FORMAT = "bytes %d-%d/*";
37 // Content Range Header format when the length is known.
38 const FINAL_CONTENT_RANGE_FORMAT = "bytes %d-%d/%d";
40 // Content Range Header for final chunk with no new data
41 const FINAL_CONTENT_RANGE_NO_DATA = "bytes */%d";
43 private static $upload_start_header = ["x-goog-resumable" => "start"];
45 // The array of bytes to be written to GS
46 private $byte_buffer;
48 // The resumable upload ID we are using for this upload.
49 private $upload_id;
51 // The offset in the file where the current buffer starts
52 private $buffer_start_offset;
54 // The number of bytes we've written to GS so far.
55 private $total_bytes_uploaded;
57 public function __construct($bucket, $object, $context) {
58 parent::__construct($bucket, $object, $context);
61 /**
62 * Called when the stream is being opened. Try and start a resumable upload
63 * here.
65 * @return true if the streamable upload started, false otherwise.
67 public function initialize() {
68 $headers = self::$upload_start_header;
70 $token_header = $this->getOAuthTokenHeader(parent::WRITE_SCOPE);
71 if ($token_header === false) {
72 trigger_error("Unable to acquire OAuth token.", E_USER_WARNING);
73 return false;
75 $headers = array_merge($headers, $token_header);
77 if (array_key_exists("Content-Type", $this->context_options)) {
78 $headers["Content-Type"] = $this->context_options["Content-Type"];
81 if (array_key_exists("acl", $this->context_options)) {
82 $acl = $this->context_options["acl"];
83 if (in_array($acl, parent::$valid_acl_values)) {
84 $headers["x-goog-acl"] = $acl;
85 } else {
86 trigger_error(sprintf("Invalid ACL value: %s", $acl), E_USER_WARNING);
87 return false;
91 $http_response = $this->makeHttpRequest($this->url,
92 "POST",
93 $headers);
95 if ($http_response === false) {
96 trigger_error("Unable to connect to Google Cloud Storage Service.",
97 E_USER_WARNING);
98 return false;
101 $status_code = $http_response['status_code'];
102 if ($status_code == HttpResponse::FORBIDDEN) {
103 trigger_error("Access Denied", E_USER_WARNING);
104 return false;
106 if ($status_code != HttpResponse::CREATED) {
107 trigger_error(sprintf("Error connecting to Google Cloud Storage: %s",
108 HttpResponse::getStatusMessage($status_code)),
109 E_USER_WARNING);
110 return false;
113 $location = $this->getHeaderValue("Location", $http_response['headers']);
115 $query_str = parse_url($location)["query"];
116 parse_str($query_str, $query_arr);
117 $this->upload_id = $query_arr["upload_id"];
119 if (!isset($this->upload_id)) {
120 trigger_error(sprintf("Location Header was not returned (%s).",
121 implode(",",
122 array_keys($http_response['headers']))),
123 E_USER_WARNING);
124 return false;
127 $this->buffer_start_offset = 0;
128 $this->total_bytes_uploaded = 0;
129 $this->byte_buffer = "";
130 return true;
134 * Return the number of bytes written.
136 public function write($data) {
137 $this->byte_buffer .= $data;
138 $current_buffer_len = strlen($this->byte_buffer);
139 $data_len = strlen($data);
140 // If this data doesn't fill the buffer then write it and return.
141 if ($current_buffer_len < self::WRITE_CHUNK_SIZE) {
142 return $data_len;
145 // Write out this data
146 if (!$this->writeBufferToGS()) {
147 // Remove the bytes we added to the buffer
148 $this->byte_buffer = substr($this->byte_buffer, 0, -strlen($data));
149 return 0;
152 // We wrote the buffered content - but only return the amount of $data
153 // we wrote as per the contract of write()
154 return $data_len;
158 * Because of the write byte alignment required by GS we will not write any
159 * data on a flush. If there is data remaining in the buffer we'll write it
160 * during close.
162 public function flush() {
163 return true;
167 * When closing the stream we need to complete the upload.
169 public function close() {
170 $this->writeBufferToGS(true);
173 private function writeBufferToGS($complete = false) {
174 $headers = $this->getOAuthTokenHeader(parent::WRITE_SCOPE);
175 if ($headers === false) {
176 trigger_error("Unable to acquire OAuth token.", E_USER_ERROR);
177 return false;
180 $buffer_len = strlen($this->byte_buffer);
182 if ($complete) {
183 $write_size = $buffer_len;
184 } else {
185 // Incomplete writes should never be less than WRITE_CHUNK_SIZE
186 assert($buffer_len >= self::WRITE_CHUNK_SIZE);
187 // Is PHP the only language in the world where the quotient of two
188 // integers is a double?
189 $write_size =
190 floor($buffer_len / self::WRITE_CHUNK_SIZE) * self::WRITE_CHUNK_SIZE;
193 // Determine the final byte of the buffer we're writing for Range header.
194 if ($write_size !== 0) {
195 $write_end_byte = $this->buffer_start_offset + $write_size - 1;
196 $body = substr($this->byte_buffer, 0, $write_size);
197 } else {
198 $body = null;
201 if ($complete) {
202 $object_length = $this->buffer_start_offset + $write_size;
203 if ($write_size === 0) {
204 $headers['Content-Range'] = sprintf(self::FINAL_CONTENT_RANGE_NO_DATA,
205 $object_length);
206 } else {
207 $headers['Content-Range'] = sprintf(self::FINAL_CONTENT_RANGE_FORMAT,
208 $this->buffer_start_offset,
209 $write_end_byte,
210 $object_length);
212 } else {
213 $headers['Content-Range'] = sprintf(self::PARTIAL_CONTENT_RANGE_FORMAT,
214 $this->buffer_start_offset,
215 $write_end_byte);
218 $url = sprintf("%s?upload_id=%s", $this->url, $this->upload_id);
219 $http_response = $this->makeHttpRequest($url, "PUT", $headers, $body);
220 $code = $http_response['status_code'];
222 // TODO: Retry on some status codes.
223 if (($complete && $code != HttpResponse::OK) ||
224 (!$complete && $code != HttpResponse::RESUME_INCOMPLETE)) {
225 trigger_error(
226 sprintf("Error writing to Google Cloud Storage Service: %s",
227 HttpResponse::getStatusMessage($code)),
228 E_USER_WARNING);
229 return false;
231 // Buffer flushed, update pointers if we actually wrote something.
232 if ($write_size !== 0) {
233 $this->buffer_start_offset = $write_end_byte + 1;
234 $this->byte_buffer = substr($this->byte_buffer, $write_size);
236 return true;