3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 """Stub for Google storage."""
28 from google
.appengine
.api
import datastore
29 from google
.appengine
.api
import namespace_manager
30 from google
.appengine
.api
.blobstore
import blobstore_stub
31 from google
.appengine
.ext
import db
32 from google
.appengine
.ext
.cloudstorage
import common
35 _GCS_DEFAULT_CONTENT_TYPE
= 'binary/octet-stream'
43 class _AE_GCSFileInfo_(db
.Model
):
44 """Store GCS specific info.
46 GCS allows user to define arbitrary metadata via header x-goog-meta-foo: bar.
47 These headers are returned when user does a GET or HEAD on the object.
52 filename
= db
.StringProperty(required
=True)
53 finalized
= db
.BooleanProperty(required
=True)
58 raw_options
= db
.StringListProperty()
61 size
= db
.IntegerProperty()
64 next_offset
= db
.IntegerProperty(default
=0)
66 creation
= db
.DateTimeProperty()
68 content_type
= db
.StringProperty(default
=_GCS_DEFAULT_CONTENT_TYPE
)
69 etag
= db
.ByteStringProperty()
71 def get_options(self
):
72 return dict(o
.split(':', 1) for o
in self
.raw_options
)
74 def set_options(self
, options_dict
):
76 '%s:%s' % (k
.lower(), v
) for k
, v
in options_dict
.iteritems()]
77 if 'content-type' in options_dict
:
78 self
.content_type
= options_dict
['content-type']
81 options
= property(get_options
, set_options
)
86 return blobstore_stub
._GS
_INFO
_KIND
89 class _AE_GCSPartialFile_(db
.Model
):
90 """Store partial content for uploading files."""
97 end
= db
.IntegerProperty(required
=True)
99 partial_content
= db
.TextProperty(required
=True)
102 class CloudStorageStub(object):
103 """Google Cloud Storage stub implementation.
105 We use blobstore stub to store files. All metadata are stored
108 Note: this Google Cloud Storage stub is designed to work with
109 apphosting.ext.cloudstorage.storage_api.py.
110 It only implements the part of GCS storage_api.py uses, and its interface
111 maps to GCS XML APIs.
114 def __init__(self
, blob_storage
):
119 apphosting.api.blobstore.blobstore_stub.BlobStorage instance
121 self
.blob_storage
= blob_storage
123 def _filename_to_blobkey(self
, filename
):
124 """Get blobkey for filename.
127 filename: gcs filename of form /bucket/filename.
130 blobinfo's datastore's key name, aka, blobkey.
132 common
.validate_file_path(filename
)
134 return blobstore_stub
.BlobstoreServiceStub
.CreateEncodedGoogleStorageKey(
137 @db.non_transactional
138 def post_start_creation(self
, filename
, options
):
139 """Start object creation with a POST.
141 This implements the resumable upload XML API.
143 Only major limitation of current implementation is that we don't
144 support multiple upload sessions for the same GCS file. Previous
145 _AE_GCSFileInfo (which represents either a finalized file, or
146 an upload session) will be removed when a new upload session is
150 filename: gcs filename of form /bucket/filename.
151 options: a dict containing all user specified request headers.
152 e.g. {'content-type': 'foo', 'x-goog-meta-bar': 'bar'}.
155 a token (blobkey) used for continuing upload.
157 ns
= namespace_manager
.get_namespace()
159 namespace_manager
.set_namespace('')
160 common
.validate_file_path(filename
)
161 token
= self
._filename
_to
_blobkey
(filename
)
162 gcs_file
= _AE_GCSFileInfo_
.get_by_key_name(token
)
164 self
._cleanup
_old
_file
(gcs_file
)
165 new_file
= _AE_GCSFileInfo_(key_name
=token
,
168 new_file
.options
= options
172 namespace_manager
.set_namespace(ns
)
175 @db.non_transactional
176 def _cleanup_old_file(self
, gcs_file
):
177 """Clean up the old version of a file.
179 The old version may or may not be finalized yet. Either way,
180 when user tries to create a file that already exists, we delete the
184 gcs_file: an instance of _AE_GCSFileInfo_.
187 if gcs_file
.finalized
:
188 blobkey
= gcs_file
.key().name()
189 self
.blob_storage
.DeleteBlob(blobkey
)
191 db
.delete(_AE_GCSPartialFile_
.all().ancestor(gcs_file
))
194 @db.non_transactional
195 def put_empty(self
, token
):
196 """Empty put is used to query upload progress.
198 The file must has not finished upload.
201 token: upload token returned by post_start_creation.
204 last offset uploaded. -1 if none has been uploaded.
207 ValueError: if token matches no in progress uploads.
209 ns
= namespace_manager
.get_namespace()
211 namespace_manager
.set_namespace('')
212 gcs_file
= _AE_GCSFileInfo_
.get_by_key_name(token
)
214 raise ValueError('Invalid token', httplib
.BAD_REQUEST
)
215 return gcs_file
.next_offset
- 1
217 namespace_manager
.set_namespace(ns
)
219 @db.non_transactional
220 def put_continue_creation(self
, token
, content
, content_range
,
222 _upload_filename
=None):
223 """Continue object upload with PUTs.
225 This implements the resumable upload XML API.
228 token: upload token returned by post_start_creation.
229 content: object content. None if no content was provided with this
231 content_range: a (start, end) tuple specifying the content range of this
232 chunk. Both are inclusive according to XML API. None is content is None.
233 length: file length, if this is the last chunk of file content.
234 _upload_filename: internal use. Might be removed any time! This is
235 used by blobstore to pass in the upload filename from user.
238 _AE_GCSFileInfo entity for this file if the file is finalized.
241 ValueError: if something is invalid. The exception.args is a tuple of
242 (msg, http status code).
246 ns
= namespace_manager
.get_namespace()
248 namespace_manager
.set_namespace('')
249 gcs_file
= _AE_GCSFileInfo_
.get_by_key_name(token
)
251 raise ValueError('Invalid token', httplib
.BAD_REQUEST
)
252 if gcs_file
.next_offset
== -1:
253 raise ValueError('Received more uploads after file %s '
254 'was finalized.' % gcs_file
.filename
,
257 start
, end
= content_range
258 if len(content
) != (end
- start
+ 1):
259 raise ValueError('Invalid content range %d-%d' % content_range
,
260 httplib
.REQUESTED_RANGE_NOT_SATISFIABLE
)
262 if start
> gcs_file
.next_offset
:
263 raise ValueError('Expect start offset %s, got %s' %
264 (gcs_file
.next_offset
, start
),
265 httplib
.REQUESTED_RANGE_NOT_SATISFIABLE
)
267 elif end
< gcs_file
.next_offset
:
271 content
= content
[gcs_file
.next_offset
- start
:]
272 start
= gcs_file
.next_offset
273 blobkey
= '%s-%d-%d' % (token
, start
, end
)
274 self
.blob_storage
.StoreBlob(blobkey
, StringIO
.StringIO(content
))
275 new_content
= _AE_GCSPartialFile_(
278 key_name
='%020d' % start
,
279 partial_content
=blobkey
,
283 gcs_file
.next_offset
= end
+ 1
285 if length
is not None and length
!= gcs_file
.next_offset
:
287 'Got finalization request with wrong file length. '
288 'Expecting %s, got %s' % (gcs_file
.next_offset
, length
),
289 httplib
.REQUESTED_RANGE_NOT_SATISFIABLE
)
290 elif length
is not None:
291 return self
._end
_creation
(token
, _upload_filename
)
293 namespace_manager
.set_namespace(ns
)
295 @db.non_transactional
296 def put_copy(self
, src
, dst
, options
):
297 """Copy file from src to dst.
302 src: /bucket/filename. This file must exist.
303 dst: /bucket/filename.
304 options: a dict containing all user specified request headers.
305 e.g. {'content-type': 'foo', 'x-goog-meta-bar': 'bar'}. If None,
306 old metadata is copied.
308 common
.validate_file_path(src
)
309 common
.validate_file_path(dst
)
312 ns
= namespace_manager
.get_namespace()
314 namespace_manager
.set_namespace('')
315 src_blobkey
= self
._filename
_to
_blobkey
(src
)
316 source
= _AE_GCSFileInfo_
.get_by_key_name(src_blobkey
)
317 token
= self
._filename
_to
_blobkey
(dst
)
318 new_file
= _AE_GCSFileInfo_(key_name
=token
,
322 new_file
.options
= options
324 new_file
.options
= source
.options
325 new_file
.etag
= source
.etag
326 new_file
.size
= source
.size
327 new_file
.creation
= source
.creation
330 namespace_manager
.set_namespace(ns
)
333 if src_blobkey
!= token
:
335 local_file
= self
.blob_storage
.OpenBlob(src_blobkey
)
336 self
.blob_storage
.StoreBlob(token
, local_file
)
338 @db.non_transactional
339 def _end_creation(self
, token
, _upload_filename
):
340 """End object upload.
343 token: upload token returned by post_start_creation.
346 _AE_GCSFileInfo Entity for this file.
349 ValueError: if token is invalid. Or file is corrupted during upload.
351 Save file content to blobstore. Save blobinfo and _AE_GCSFileInfo.
353 gcs_file
= _AE_GCSFileInfo_
.get_by_key_name(token
)
355 raise ValueError('Invalid token')
356 if gcs_file
.finalized
:
359 error_msg
, content
= self
._get
_content
(gcs_file
)
361 raise ValueError(error_msg
)
363 gcs_file
.etag
= hashlib
.md5(content
).hexdigest()
364 gcs_file
.creation
= datetime
.datetime
.utcnow()
365 gcs_file
.size
= len(content
)
369 blob_info
= datastore
.Entity('__BlobInfo__', name
=str(token
), namespace
='')
370 blob_info
['content_type'] = gcs_file
.content_type
371 blob_info
['creation'] = gcs_file
.creation
372 blob_info
['filename'] = _upload_filename
373 blob_info
['md5_hash'] = gcs_file
.etag
374 blob_info
['size'] = gcs_file
.size
375 datastore
.Put(blob_info
)
377 self
.blob_storage
.StoreBlob(token
, StringIO
.StringIO(content
))
379 gcs_file
.finalized
= True
381 gcs_file
.next_offset
= -1
385 @db.transactional(propagation
=db
.INDEPENDENT
)
386 def _get_content(self
, gcs_file
):
387 """Aggregate all partial content of the gcs_file.
390 gcs_file: an instance of _AE_GCSFileInfo_.
393 (error_msg, content) tuple. error_msg is set if the file is
394 corrupted during upload. Otherwise content is set to the
395 aggregation of all partial contents.
400 for partial
in (_AE_GCSPartialFile_
.all(namespace
='').ancestor(gcs_file
).
402 start
= int(partial
.key().name())
404 if start
< previous_end
:
405 error_msg
= 'File is corrupted due to missing chunks.'
406 elif start
> previous_end
:
407 error_msg
= 'File is corrupted due to overlapping chunks'
408 previous_end
= partial
.end
409 content
+= self
.blob_storage
.OpenBlob(partial
.partial_content
).read()
410 self
.blob_storage
.DeleteBlob(partial
.partial_content
)
415 return error_msg
, content
417 @db.non_transactional
424 """Get bucket listing with a GET.
426 How GCS listbucket work in production:
427 GCS tries to return as many items as possible in a single response. If
428 there are more items satisfying user's query and the current request
429 took too long (e.g spent on skipping files in a subdir) or items to return
430 gets too big (> max_keys), it returns fast and sets IsTruncated
431 and NextMarker for continuation. They serve redundant purpose: if
432 NextMarker is set, IsTruncated is True.
434 Note NextMarker is not where GCS scan left off. It is
435 only valid for the exact same type of query the marker was generated from.
436 For example, if a marker is generated from query with delimiter, the marker
437 is the name of a subdir (instead of the last file within the subdir). Thus
438 you can't use this marker to issue a query without delimiter.
441 bucketpath: gcs bucket path of form '/bucket'
442 prefix: prefix to limit listing.
443 marker: a str after which to start listing. Exclusive.
444 max_keys: max items we scan & return.
445 delimiter: delimiter for directory.
447 See https://developers.google.com/storage/docs/reference-methods#getbucket
451 A tuple of (a list of GCSFileStat for files or directories sorted by
452 filename, next_marker to use as next marker, is_truncated boolean to
453 indicate if there are more results satisfying query).
455 common
.validate_bucket_path(bucketpath
)
456 q
= _AE_GCSFileInfo_
.all(namespace
='')
457 fully_qualified_prefix
= '/'.join([bucketpath
, prefix
])
459 q
.filter('filename >', '/'.join([bucketpath
, marker
]))
461 q
.filter('filename >=', fully_qualified_prefix
)
469 if not info
.filename
.startswith(fully_qualified_prefix
):
471 if len(result
) == max_keys
:
475 info
= db
.get(info
.key())
482 start_index
= name
.find(delimiter
, len(fully_qualified_prefix
))
483 if start_index
!= -1:
484 name
= name
[:start_index
+ len(delimiter
)]
487 if marker
and (first
or name
== first_dir
):
492 result
.add(common
.GCSFileStat(name
, st_size
=None,
493 st_ctime
=None, etag
=None,
500 result
.add(common
.GCSFileStat(
503 st_ctime
=calendar
.timegm(info
.creation
.utctimetuple()),
507 """Check if there are more results satisfying the query."""
510 q
= _AE_GCSFileInfo_
.all(namespace
='')
511 q
.filter('filename >', name
)
514 if delimiter
and name
.endswith(delimiter
):
517 if not info
.filename
.startswith(name
):
519 if info
.filename
.startswith(name
):
523 if info
is None or not info
.filename
.startswith(fully_qualified_prefix
):
527 result
= list(result
)
529 truncated
= is_truncated()
530 next_marker
= name
if truncated
else None
532 return result
, next_marker
, truncated
534 @db.non_transactional
535 def get_object(self
, filename
, start
=0, end
=None):
536 """Get file content with a GET.
539 filename: gcs filename of form '/bucket/filename'.
540 start: start offset to request. Inclusive.
541 end: end offset to request. Inclusive.
544 The segment of file content requested.
547 ValueError: if file doesn't exist.
549 common
.validate_file_path(filename
)
550 blobkey
= self
._filename
_to
_blobkey
(filename
)
551 key
= blobstore_stub
.BlobstoreServiceStub
.ToDatastoreBlobKey(blobkey
)
552 gcsfileinfo
= db
.get(key
)
553 if not gcsfileinfo
or not gcsfileinfo
.finalized
:
554 raise ValueError('File does not exist.')
555 local_file
= self
.blob_storage
.OpenBlob(blobkey
)
556 local_file
.seek(start
)
558 return local_file
.read(end
- start
+ 1)
560 return local_file
.read()
562 @db.non_transactional
563 def head_object(self
, filename
):
564 """Get file stat with a HEAD.
567 filename: gcs filename of form '/bucket/filename'
570 A GCSFileStat object containing file stat. None if file doesn't exist.
572 common
.validate_file_path(filename
)
573 blobkey
= self
._filename
_to
_blobkey
(filename
)
574 key
= blobstore_stub
.BlobstoreServiceStub
.ToDatastoreBlobKey(blobkey
)
576 if info
and info
.finalized
:
577 metadata
= common
.get_metadata(info
.options
)
578 filestat
= common
.GCSFileStat(
579 filename
=info
.filename
,
582 st_ctime
=calendar
.timegm(info
.creation
.utctimetuple()),
583 content_type
=info
.content_type
,
588 @db.non_transactional
589 def delete_object(self
, filename
):
590 """Delete file with a DELETE.
593 filename: gcs filename of form '/bucket/filename'
596 True if file is deleted. False if file doesn't exist.
598 common
.validate_file_path(filename
)
599 blobkey
= self
._filename
_to
_blobkey
(filename
)
600 key
= blobstore_stub
.BlobstoreServiceStub
.ToDatastoreBlobKey(blobkey
)
601 gcsfileinfo
= db
.get(key
)
605 blobstore_stub
.BlobstoreServiceStub
.DeleteBlob(blobkey
, self
.blob_storage
)