3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 """Stub for Google storage."""
28 from google
.appengine
.api
import datastore
29 from google
.appengine
.api
import namespace_manager
30 from google
.appengine
.api
.blobstore
import blobstore_stub
31 from google
.appengine
.ext
import db
32 from google
.appengine
.ext
.cloudstorage
import common
35 _GCS_DEFAULT_CONTENT_TYPE
= 'binary/octet-stream'
38 class _AE_GCSFileInfo_(db
.Model
):
39 """Store GCS specific info.
41 GCS allows user to define arbitrary metadata via header x-goog-meta-foo: bar.
42 These headers are returned when user does a GET or HEAD on the object.
47 filename
= db
.StringProperty(required
=True)
48 finalized
= db
.BooleanProperty(required
=True)
53 raw_options
= db
.StringListProperty()
56 size
= db
.IntegerProperty()
59 next_offset
= db
.IntegerProperty(default
=0)
61 creation
= db
.DateTimeProperty()
63 content_type
= db
.StringProperty(default
=_GCS_DEFAULT_CONTENT_TYPE
)
64 etag
= db
.ByteStringProperty()
66 def get_options(self
):
67 return dict(o
.split(':', 1) for o
in self
.raw_options
)
69 def set_options(self
, options_dict
):
71 '%s:%s' % (k
.lower(), v
) for k
, v
in options_dict
.iteritems()]
72 if 'content-type' in options_dict
:
73 self
.content_type
= options_dict
['content-type']
76 options
= property(get_options
, set_options
)
81 return blobstore_stub
._GS
_INFO
_KIND
84 class _AE_GCSPartialFile_(db
.Model
):
85 """Store partial content for uploading files."""
92 end
= db
.IntegerProperty(required
=True)
94 partial_content
= db
.TextProperty(required
=True)
97 class CloudStorageStub(object):
98 """Google Cloud Storage stub implementation.
100 We use blobstore stub to store files. All metadata are stored
103 Note: this Google Cloud Storage stub is designed to work with
104 apphosting.ext.cloudstorage.storage_api.py.
105 It only implements the part of GCS storage_api.py uses, and its interface
106 maps to GCS XML APIs.
109 def __init__(self
, blob_storage
):
114 apphosting.api.blobstore.blobstore_stub.BlobStorage instance
116 self
.blob_storage
= blob_storage
118 def _filename_to_blobkey(self
, filename
):
119 """Get blobkey for filename.
122 filename: gcs filename of form /bucket/filename.
125 blobinfo's datastore's key name, aka, blobkey.
127 common
.validate_file_path(filename
)
129 return blobstore_stub
.BlobstoreServiceStub
.CreateEncodedGoogleStorageKey(
132 @db.non_transactional
133 def post_start_creation(self
, filename
, options
):
134 """Start object creation with a POST.
136 This implements the resumable upload XML API.
138 Only major limitation of current implementation is that we don't
139 support multiple upload sessions for the same GCS file. Previous
140 _AE_GCSFileInfo (which represents either a finalized file, or
141 an upload session) will be removed when a new upload session is
145 filename: gcs filename of form /bucket/filename.
146 options: a dict containing all user specified request headers.
147 e.g. {'content-type': 'foo', 'x-goog-meta-bar': 'bar'}.
150 a token (blobkey) used for continuing upload.
152 ns
= namespace_manager
.get_namespace()
154 namespace_manager
.set_namespace('')
155 common
.validate_file_path(filename
)
156 token
= self
._filename
_to
_blobkey
(filename
)
157 gcs_file
= _AE_GCSFileInfo_
.get_by_key_name(token
)
159 self
._cleanup
_old
_file
(gcs_file
)
160 new_file
= _AE_GCSFileInfo_(key_name
=token
,
163 new_file
.options
= options
167 namespace_manager
.set_namespace(ns
)
170 @db.non_transactional
171 def _cleanup_old_file(self
, gcs_file
):
172 """Clean up the old version of a file.
174 The old version may or may not be finalized yet. Either way,
175 when user tries to create a file that already exists, we delete the
179 gcs_file: an instance of _AE_GCSFileInfo_.
182 if gcs_file
.finalized
:
183 blobkey
= gcs_file
.key().name()
184 self
.blob_storage
.DeleteBlob(blobkey
)
186 db
.delete(_AE_GCSPartialFile_
.all().ancestor(gcs_file
))
189 @db.non_transactional
190 def put_empty(self
, token
):
191 """Empty put is used to query upload progress.
193 The file must has not finished upload.
196 token: upload token returned by post_start_creation.
199 last offset uploaded. -1 if none has been uploaded.
202 ValueError: if token matches no in progress uploads.
204 ns
= namespace_manager
.get_namespace()
206 namespace_manager
.set_namespace('')
207 gcs_file
= _AE_GCSFileInfo_
.get_by_key_name(token
)
209 raise ValueError('Invalid token', httplib
.BAD_REQUEST
)
210 return gcs_file
.next_offset
- 1
212 namespace_manager
.set_namespace(ns
)
214 @db.non_transactional
215 def put_continue_creation(self
, token
, content
, content_range
,
217 _upload_filename
=None):
218 """Continue object upload with PUTs.
220 This implements the resumable upload XML API.
223 token: upload token returned by post_start_creation.
224 content: object content. None if no content was provided with this
226 content_range: a (start, end) tuple specifying the content range of this
227 chunk. Both are inclusive according to XML API. None is content is None.
228 length: file length, if this is the last chunk of file content.
229 _upload_filename: internal use. Might be removed any time! This is
230 used by blobstore to pass in the upload filename from user.
233 _AE_GCSFileInfo entity for this file if the file is finalized.
236 ValueError: if something is invalid. The exception.args is a tuple of
237 (msg, http status code).
241 ns
= namespace_manager
.get_namespace()
243 namespace_manager
.set_namespace('')
244 gcs_file
= _AE_GCSFileInfo_
.get_by_key_name(token
)
246 raise ValueError('Invalid token', httplib
.BAD_REQUEST
)
247 if gcs_file
.next_offset
== -1:
248 raise ValueError('Received more uploads after file %s '
249 'was finalized.' % gcs_file
.filename
,
252 start
, end
= content_range
253 if len(content
) != (end
- start
+ 1):
254 raise ValueError('Invalid content range %d-%d' % content_range
,
255 httplib
.REQUESTED_RANGE_NOT_SATISFIABLE
)
257 if start
> gcs_file
.next_offset
:
258 raise ValueError('Expect start offset %s, got %s' %
259 (gcs_file
.next_offset
, start
),
260 httplib
.REQUESTED_RANGE_NOT_SATISFIABLE
)
262 elif end
< gcs_file
.next_offset
:
266 content
= content
[gcs_file
.next_offset
- start
:]
267 start
= gcs_file
.next_offset
268 blobkey
= '%s-%d-%d' % (token
, start
, end
)
269 self
.blob_storage
.StoreBlob(blobkey
, StringIO
.StringIO(content
))
270 new_content
= _AE_GCSPartialFile_(
273 key_name
='%020d' % start
,
274 partial_content
=blobkey
,
278 gcs_file
.next_offset
= end
+ 1
280 if length
is not None and length
!= gcs_file
.next_offset
:
282 'Got finalization request with wrong file length. '
283 'Expecting %s, got %s' % (gcs_file
.next_offset
, length
),
284 httplib
.REQUESTED_RANGE_NOT_SATISFIABLE
)
285 elif length
is not None:
286 return self
._end
_creation
(token
, _upload_filename
)
288 namespace_manager
.set_namespace(ns
)
290 @db.non_transactional
291 def put_copy(self
, src
, dst
):
292 """Copy file from src to dst.
297 src: /bucket/filename. This file must exist.
298 dst: /bucket/filename
300 common
.validate_file_path(src
)
301 common
.validate_file_path(dst
)
304 ns
= namespace_manager
.get_namespace()
306 namespace_manager
.set_namespace('')
307 src_blobkey
= self
._filename
_to
_blobkey
(src
)
308 source
= _AE_GCSFileInfo_
.get_by_key_name(src_blobkey
)
309 token
= self
._filename
_to
_blobkey
(dst
)
310 new_file
= _AE_GCSFileInfo_(key_name
=token
,
313 new_file
.options
= source
.options
314 new_file
.etag
= source
.etag
315 new_file
.size
= source
.size
316 new_file
.creation
= datetime
.datetime
.utcnow()
319 namespace_manager
.set_namespace(ns
)
322 local_file
= self
.blob_storage
.OpenBlob(src_blobkey
)
323 self
.blob_storage
.StoreBlob(token
, local_file
)
325 @db.non_transactional
326 def _end_creation(self
, token
, _upload_filename
):
327 """End object upload.
330 token: upload token returned by post_start_creation.
333 _AE_GCSFileInfo Entity for this file.
336 ValueError: if token is invalid. Or file is corrupted during upload.
338 Save file content to blobstore. Save blobinfo and _AE_GCSFileInfo.
340 gcs_file
= _AE_GCSFileInfo_
.get_by_key_name(token
)
342 raise ValueError('Invalid token')
343 if gcs_file
.finalized
:
346 error_msg
, content
= self
._get
_content
(gcs_file
)
348 raise ValueError(error_msg
)
350 gcs_file
.etag
= hashlib
.md5(content
).hexdigest()
351 gcs_file
.creation
= datetime
.datetime
.utcnow()
352 gcs_file
.size
= len(content
)
356 blob_info
= datastore
.Entity('__BlobInfo__', name
=str(token
), namespace
='')
357 blob_info
['content_type'] = gcs_file
.content_type
358 blob_info
['creation'] = gcs_file
.creation
359 blob_info
['filename'] = _upload_filename
360 blob_info
['md5_hash'] = gcs_file
.etag
361 blob_info
['size'] = gcs_file
.size
362 datastore
.Put(blob_info
)
364 self
.blob_storage
.StoreBlob(token
, StringIO
.StringIO(content
))
366 gcs_file
.finalized
= True
368 gcs_file
.next_offset
= -1
372 @db.transactional(propagation
=db
.INDEPENDENT
)
373 def _get_content(self
, gcs_file
):
374 """Aggregate all partial content of the gcs_file.
377 gcs_file: an instance of _AE_GCSFileInfo_.
380 (error_msg, content) tuple. error_msg is set if the file is
381 corrupted during upload. Otherwise content is set to the
382 aggregation of all partial contents.
387 for partial
in (_AE_GCSPartialFile_
.all(namespace
='').ancestor(gcs_file
).
389 start
= int(partial
.key().name())
391 if start
< previous_end
:
392 error_msg
= 'File is corrupted due to missing chunks.'
393 elif start
> previous_end
:
394 error_msg
= 'File is corrupted due to overlapping chunks'
395 previous_end
= partial
.end
396 content
+= self
.blob_storage
.OpenBlob(partial
.partial_content
).read()
397 self
.blob_storage
.DeleteBlob(partial
.partial_content
)
402 return error_msg
, content
404 @db.non_transactional
411 """Get bucket listing with a GET.
413 How GCS listbucket work in production:
414 GCS tries to return as many items as possible in a single response. If
415 there are more items satisfying user's query and the current request
416 took too long (e.g spent on skipping files in a subdir) or items to return
417 gets too big (> max_keys), it returns fast and sets IsTruncated
418 and NextMarker for continuation. They serve redundant purpose: if
419 NextMarker is set, IsTruncated is True.
421 Note NextMarker is not where GCS scan left off. It is
422 only valid for the exact same type of query the marker was generated from.
423 For example, if a marker is generated from query with delimiter, the marker
424 is the name of a subdir (instead of the last file within the subdir). Thus
425 you can't use this marker to issue a query without delimiter.
428 bucketpath: gcs bucket path of form '/bucket'
429 prefix: prefix to limit listing.
430 marker: a str after which to start listing. Exclusive.
431 max_keys: max items we scan & return.
432 delimiter: delimiter for directory.
434 See https://developers.google.com/storage/docs/reference-methods#getbucket
438 A tuple of (a list of GCSFileStat for files or directories sorted by
439 filename, next_marker to use as next marker, is_truncated boolean to
440 indicate if there are more results satisfying query).
442 common
.validate_bucket_path(bucketpath
)
443 q
= _AE_GCSFileInfo_
.all(namespace
='')
444 fully_qualified_prefix
= '/'.join([bucketpath
, prefix
])
446 q
.filter('filename >', '/'.join([bucketpath
, marker
]))
448 q
.filter('filename >=', fully_qualified_prefix
)
456 if not info
.filename
.startswith(fully_qualified_prefix
):
458 if len(result
) == max_keys
:
462 info
= db
.get(info
.key())
469 start_index
= name
.find(delimiter
, len(fully_qualified_prefix
))
470 if start_index
!= -1:
471 name
= name
[:start_index
+ len(delimiter
)]
474 if marker
and (first
or name
== first_dir
):
479 result
.add(common
.GCSFileStat(name
, st_size
=None,
480 st_ctime
=None, etag
=None,
485 result
.add(common
.GCSFileStat(
488 st_ctime
=calendar
.timegm(info
.creation
.utctimetuple()),
492 """Check if there are more results satisfying the query."""
495 q
= _AE_GCSFileInfo_
.all(namespace
='')
496 q
.filter('filename >', name
)
499 if delimiter
and name
.endswith(delimiter
):
502 if not info
.filename
.startswith(name
):
504 if info
.filename
.startswith(name
):
508 if info
is None or not info
.filename
.startswith(fully_qualified_prefix
):
512 result
= list(result
)
514 truncated
= is_truncated()
515 next_marker
= name
if truncated
else None
517 return result
, next_marker
, truncated
519 @db.non_transactional
520 def get_object(self
, filename
, start
=0, end
=None):
521 """Get file content with a GET.
524 filename: gcs filename of form '/bucket/filename'.
525 start: start offset to request. Inclusive.
526 end: end offset to request. Inclusive.
529 The segment of file content requested.
532 ValueError: if file doesn't exist.
534 common
.validate_file_path(filename
)
535 blobkey
= self
._filename
_to
_blobkey
(filename
)
536 key
= blobstore_stub
.BlobstoreServiceStub
.ToDatastoreBlobKey(blobkey
)
537 gcsfileinfo
= db
.get(key
)
538 if not gcsfileinfo
or not gcsfileinfo
.finalized
:
539 raise ValueError('File does not exist.')
540 local_file
= self
.blob_storage
.OpenBlob(blobkey
)
541 local_file
.seek(start
)
543 return local_file
.read(end
- start
+ 1)
545 return local_file
.read()
547 @db.non_transactional
548 def head_object(self
, filename
):
549 """Get file stat with a HEAD.
552 filename: gcs filename of form '/bucket/filename'
555 A GCSFileStat object containing file stat. None if file doesn't exist.
557 common
.validate_file_path(filename
)
558 blobkey
= self
._filename
_to
_blobkey
(filename
)
559 key
= blobstore_stub
.BlobstoreServiceStub
.ToDatastoreBlobKey(blobkey
)
561 if info
and info
.finalized
:
562 metadata
= common
.get_metadata(info
.options
)
563 filestat
= common
.GCSFileStat(
564 filename
=info
.filename
,
567 st_ctime
=calendar
.timegm(info
.creation
.utctimetuple()),
568 content_type
=info
.content_type
,
573 @db.non_transactional
574 def delete_object(self
, filename
):
575 """Delete file with a DELETE.
578 filename: gcs filename of form '/bucket/filename'
581 True if file is deleted. False if file doesn't exist.
583 common
.validate_file_path(filename
)
584 blobkey
= self
._filename
_to
_blobkey
(filename
)
585 key
= blobstore_stub
.BlobstoreServiceStub
.ToDatastoreBlobKey(blobkey
)
586 gcsfileinfo
= db
.get(key
)
590 blobstore_stub
.BlobstoreServiceStub
.DeleteBlob(blobkey
, self
.blob_storage
)