App Engine Python SDK version 1.8.8
[gae.git] / python / google / appengine / ext / cloudstorage / cloudstorage_stub.py
bloba1fbf01f8c8e66b37ddd58785dc25b50512d5773
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 """Stub for Google storage."""
22 import calendar
23 import datetime
24 import hashlib
25 import httplib
26 import StringIO
28 from google.appengine.api import datastore
29 from google.appengine.api import namespace_manager
30 from google.appengine.api.blobstore import blobstore_stub
31 from google.appengine.ext import db
32 from google.appengine.ext.cloudstorage import common
35 _GCS_DEFAULT_CONTENT_TYPE = 'binary/octet-stream'
38 class _AE_GCSFileInfo_(db.Model):
39 """Store GCS specific info.
41 GCS allows user to define arbitrary metadata via header x-goog-meta-foo: bar.
42 These headers are returned when user does a GET or HEAD on the object.
44 Key name is blobkey.
45 """
47 filename = db.StringProperty(required=True)
48 finalized = db.BooleanProperty(required=True)
53 raw_options = db.StringListProperty()
56 size = db.IntegerProperty()
59 next_offset = db.IntegerProperty(default=0)
61 creation = db.DateTimeProperty()
63 content_type = db.StringProperty(default=_GCS_DEFAULT_CONTENT_TYPE)
64 etag = db.ByteStringProperty()
66 def get_options(self):
67 return dict(o.split(':', 1) for o in self.raw_options)
69 def set_options(self, options_dict):
70 self.raw_options = [
71 '%s:%s' % (k.lower(), v) for k, v in options_dict.iteritems()]
72 if 'content-type' in options_dict:
73 self.content_type = options_dict['content-type']
76 options = property(get_options, set_options)
78 @classmethod
79 def kind(cls):
81 return blobstore_stub._GS_INFO_KIND
84 class _AE_GCSPartialFile_(db.Model):
85 """Store partial content for uploading files."""
92 end = db.IntegerProperty(required=True)
94 partial_content = db.TextProperty(required=True)
97 class CloudStorageStub(object):
98 """Google Cloud Storage stub implementation.
100 We use blobstore stub to store files. All metadata are stored
101 in _AE_GCSFileInfo_.
103 Note: this Google Cloud Storage stub is designed to work with
104 apphosting.ext.cloudstorage.storage_api.py.
105 It only implements the part of GCS storage_api.py uses, and its interface
106 maps to GCS XML APIs.
109 def __init__(self, blob_storage):
110 """Initialize.
112 Args:
113 blob_storage:
114 apphosting.api.blobstore.blobstore_stub.BlobStorage instance
116 self.blob_storage = blob_storage
118 def _filename_to_blobkey(self, filename):
119 """Get blobkey for filename.
121 Args:
122 filename: gcs filename of form /bucket/filename.
124 Returns:
125 blobinfo's datastore's key name, aka, blobkey.
127 common.validate_file_path(filename)
129 return blobstore_stub.BlobstoreServiceStub.CreateEncodedGoogleStorageKey(
130 filename[1:])
132 @db.non_transactional
133 def post_start_creation(self, filename, options):
134 """Start object creation with a POST.
136 This implements the resumable upload XML API.
138 Only major limitation of current implementation is that we don't
139 support multiple upload sessions for the same GCS file. Previous
140 _AE_GCSFileInfo (which represents either a finalized file, or
141 an upload session) will be removed when a new upload session is
142 created.
144 Args:
145 filename: gcs filename of form /bucket/filename.
146 options: a dict containing all user specified request headers.
147 e.g. {'content-type': 'foo', 'x-goog-meta-bar': 'bar'}.
149 Returns:
150 a token (blobkey) used for continuing upload.
152 ns = namespace_manager.get_namespace()
153 try:
154 namespace_manager.set_namespace('')
155 common.validate_file_path(filename)
156 token = self._filename_to_blobkey(filename)
157 gcs_file = _AE_GCSFileInfo_.get_by_key_name(token)
159 self._cleanup_old_file(gcs_file)
160 new_file = _AE_GCSFileInfo_(key_name=token,
161 filename=filename,
162 finalized=False)
163 new_file.options = options
164 new_file.put()
165 return token
166 finally:
167 namespace_manager.set_namespace(ns)
170 @db.non_transactional
171 def _cleanup_old_file(self, gcs_file):
172 """Clean up the old version of a file.
174 The old version may or may not be finalized yet. Either way,
175 when user tries to create a file that already exists, we delete the
176 old version first.
178 Args:
179 gcs_file: an instance of _AE_GCSFileInfo_.
181 if gcs_file:
182 if gcs_file.finalized:
183 blobkey = gcs_file.key().name()
184 self.blob_storage.DeleteBlob(blobkey)
185 else:
186 db.delete(_AE_GCSPartialFile_.all().ancestor(gcs_file))
187 gcs_file.delete()
189 @db.non_transactional
190 def put_empty(self, token):
191 """Empty put is used to query upload progress.
193 The file must has not finished upload.
195 Args:
196 token: upload token returned by post_start_creation.
198 Returns:
199 last offset uploaded. -1 if none has been uploaded.
201 Raises:
202 ValueError: if token matches no in progress uploads.
204 ns = namespace_manager.get_namespace()
205 try:
206 namespace_manager.set_namespace('')
207 gcs_file = _AE_GCSFileInfo_.get_by_key_name(token)
208 if not gcs_file:
209 raise ValueError('Invalid token', httplib.BAD_REQUEST)
210 return gcs_file.next_offset - 1
211 finally:
212 namespace_manager.set_namespace(ns)
214 @db.non_transactional
215 def put_continue_creation(self, token, content, content_range,
216 length=None,
217 _upload_filename=None):
218 """Continue object upload with PUTs.
220 This implements the resumable upload XML API.
222 Args:
223 token: upload token returned by post_start_creation.
224 content: object content. None if no content was provided with this
225 PUT request.
226 content_range: a (start, end) tuple specifying the content range of this
227 chunk. Both are inclusive according to XML API. None is content is None.
228 length: file length, if this is the last chunk of file content.
229 _upload_filename: internal use. Might be removed any time! This is
230 used by blobstore to pass in the upload filename from user.
232 Returns:
233 _AE_GCSFileInfo entity for this file if the file is finalized.
235 Raises:
236 ValueError: if something is invalid. The exception.args is a tuple of
237 (msg, http status code).
241 ns = namespace_manager.get_namespace()
242 try:
243 namespace_manager.set_namespace('')
244 gcs_file = _AE_GCSFileInfo_.get_by_key_name(token)
245 if not gcs_file:
246 raise ValueError('Invalid token', httplib.BAD_REQUEST)
247 if gcs_file.next_offset == -1:
248 raise ValueError('Received more uploads after file %s '
249 'was finalized.' % gcs_file.filename,
250 httplib.OK)
251 if content:
252 start, end = content_range
253 if len(content) != (end - start + 1):
254 raise ValueError('Invalid content range %d-%d' % content_range,
255 httplib.REQUESTED_RANGE_NOT_SATISFIABLE)
257 if start > gcs_file.next_offset:
258 raise ValueError('Expect start offset %s, got %s' %
259 (gcs_file.next_offset, start),
260 httplib.REQUESTED_RANGE_NOT_SATISFIABLE)
262 elif end < gcs_file.next_offset:
263 return
264 else:
266 content = content[gcs_file.next_offset - start:]
267 start = gcs_file.next_offset
268 blobkey = '%s-%d-%d' % (token, start, end)
269 self.blob_storage.StoreBlob(blobkey, StringIO.StringIO(content))
270 new_content = _AE_GCSPartialFile_(
271 parent=gcs_file,
273 key_name='%020d' % start,
274 partial_content=blobkey,
275 start=start,
276 end=end + 1)
277 new_content.put()
278 gcs_file.next_offset = end + 1
279 gcs_file.put()
280 if length is not None and length != gcs_file.next_offset:
281 raise ValueError(
282 'Got finalization request with wrong file length. '
283 'Expecting %s, got %s' % (gcs_file.next_offset, length),
284 httplib.REQUESTED_RANGE_NOT_SATISFIABLE)
285 elif length is not None:
286 return self._end_creation(token, _upload_filename)
287 finally:
288 namespace_manager.set_namespace(ns)
290 @db.non_transactional
291 def put_copy(self, src, dst):
292 """Copy file from src to dst.
294 Metadata is copied.
296 Args:
297 src: /bucket/filename. This file must exist.
298 dst: /bucket/filename
300 common.validate_file_path(src)
301 common.validate_file_path(dst)
304 ns = namespace_manager.get_namespace()
305 try:
306 namespace_manager.set_namespace('')
307 src_blobkey = self._filename_to_blobkey(src)
308 source = _AE_GCSFileInfo_.get_by_key_name(src_blobkey)
309 token = self._filename_to_blobkey(dst)
310 new_file = _AE_GCSFileInfo_(key_name=token,
311 filename=dst,
312 finalized=True)
313 new_file.options = source.options
314 new_file.etag = source.etag
315 new_file.size = source.size
316 new_file.creation = datetime.datetime.utcnow()
317 new_file.put()
318 finally:
319 namespace_manager.set_namespace(ns)
322 local_file = self.blob_storage.OpenBlob(src_blobkey)
323 self.blob_storage.StoreBlob(token, local_file)
325 @db.non_transactional
326 def _end_creation(self, token, _upload_filename):
327 """End object upload.
329 Args:
330 token: upload token returned by post_start_creation.
332 Returns:
333 _AE_GCSFileInfo Entity for this file.
335 Raises:
336 ValueError: if token is invalid. Or file is corrupted during upload.
338 Save file content to blobstore. Save blobinfo and _AE_GCSFileInfo.
340 gcs_file = _AE_GCSFileInfo_.get_by_key_name(token)
341 if not gcs_file:
342 raise ValueError('Invalid token')
343 if gcs_file.finalized:
344 return gcs_file
346 error_msg, content = self._get_content(gcs_file)
347 if error_msg:
348 raise ValueError(error_msg)
350 gcs_file.etag = hashlib.md5(content).hexdigest()
351 gcs_file.creation = datetime.datetime.utcnow()
352 gcs_file.size = len(content)
356 blob_info = datastore.Entity('__BlobInfo__', name=str(token), namespace='')
357 blob_info['content_type'] = gcs_file.content_type
358 blob_info['creation'] = gcs_file.creation
359 blob_info['filename'] = _upload_filename
360 blob_info['md5_hash'] = gcs_file.etag
361 blob_info['size'] = gcs_file.size
362 datastore.Put(blob_info)
364 self.blob_storage.StoreBlob(token, StringIO.StringIO(content))
366 gcs_file.finalized = True
368 gcs_file.next_offset = -1
369 gcs_file.put()
370 return gcs_file
372 @db.transactional(propagation=db.INDEPENDENT)
373 def _get_content(self, gcs_file):
374 """Aggregate all partial content of the gcs_file.
376 Args:
377 gcs_file: an instance of _AE_GCSFileInfo_.
379 Returns:
380 (error_msg, content) tuple. error_msg is set if the file is
381 corrupted during upload. Otherwise content is set to the
382 aggregation of all partial contents.
384 content = ''
385 previous_end = 0
386 error_msg = ''
387 for partial in (_AE_GCSPartialFile_.all(namespace='').ancestor(gcs_file).
388 order('__key__')):
389 start = int(partial.key().name())
390 if not error_msg:
391 if start < previous_end:
392 error_msg = 'File is corrupted due to missing chunks.'
393 elif start > previous_end:
394 error_msg = 'File is corrupted due to overlapping chunks'
395 previous_end = partial.end
396 content += self.blob_storage.OpenBlob(partial.partial_content).read()
397 self.blob_storage.DeleteBlob(partial.partial_content)
398 partial.delete()
399 if error_msg:
400 gcs_file.delete()
401 content = ''
402 return error_msg, content
404 @db.non_transactional
405 def get_bucket(self,
406 bucketpath,
407 prefix,
408 marker,
409 max_keys,
410 delimiter):
411 """Get bucket listing with a GET.
413 How GCS listbucket work in production:
414 GCS tries to return as many items as possible in a single response. If
415 there are more items satisfying user's query and the current request
416 took too long (e.g spent on skipping files in a subdir) or items to return
417 gets too big (> max_keys), it returns fast and sets IsTruncated
418 and NextMarker for continuation. They serve redundant purpose: if
419 NextMarker is set, IsTruncated is True.
421 Note NextMarker is not where GCS scan left off. It is
422 only valid for the exact same type of query the marker was generated from.
423 For example, if a marker is generated from query with delimiter, the marker
424 is the name of a subdir (instead of the last file within the subdir). Thus
425 you can't use this marker to issue a query without delimiter.
427 Args:
428 bucketpath: gcs bucket path of form '/bucket'
429 prefix: prefix to limit listing.
430 marker: a str after which to start listing. Exclusive.
431 max_keys: max items we scan & return.
432 delimiter: delimiter for directory.
434 See https://developers.google.com/storage/docs/reference-methods#getbucket
435 for details.
437 Returns:
438 A tuple of (a list of GCSFileStat for files or directories sorted by
439 filename, next_marker to use as next marker, is_truncated boolean to
440 indicate if there are more results satisfying query).
442 common.validate_bucket_path(bucketpath)
443 q = _AE_GCSFileInfo_.all(namespace='')
444 fully_qualified_prefix = '/'.join([bucketpath, prefix])
445 if marker:
446 q.filter('filename >', '/'.join([bucketpath, marker]))
447 else:
448 q.filter('filename >=', fully_qualified_prefix)
450 result = set()
451 name = None
452 first = True
453 first_dir = None
454 for info in q.run():
456 if not info.filename.startswith(fully_qualified_prefix):
457 break
458 if len(result) == max_keys:
459 break
462 info = db.get(info.key())
463 if not info:
464 continue
466 name = info.filename
467 if delimiter:
469 start_index = name.find(delimiter, len(fully_qualified_prefix))
470 if start_index != -1:
471 name = name[:start_index + len(delimiter)]
474 if marker and (first or name == first_dir):
475 first = False
476 first_dir = name
478 else:
479 result.add(common.GCSFileStat(name, st_size=None,
480 st_ctime=None, etag=None,
481 is_dir=True))
482 continue
484 first = False
485 result.add(common.GCSFileStat(
486 filename=name,
487 st_size=info.size,
488 st_ctime=calendar.timegm(info.creation.utctimetuple()),
489 etag=info.etag))
491 def is_truncated():
492 """Check if there are more results satisfying the query."""
493 if not result:
494 return False
495 q = _AE_GCSFileInfo_.all(namespace='')
496 q.filter('filename >', name)
497 info = None
499 if delimiter and name.endswith(delimiter):
501 for info in q.run():
502 if not info.filename.startswith(name):
503 break
504 if info.filename.startswith(name):
505 info = None
506 else:
507 info = q.get()
508 if info is None or not info.filename.startswith(fully_qualified_prefix):
509 return False
510 return True
512 result = list(result)
513 result.sort()
514 truncated = is_truncated()
515 next_marker = name if truncated else None
517 return result, next_marker, truncated
519 @db.non_transactional
520 def get_object(self, filename, start=0, end=None):
521 """Get file content with a GET.
523 Args:
524 filename: gcs filename of form '/bucket/filename'.
525 start: start offset to request. Inclusive.
526 end: end offset to request. Inclusive.
528 Returns:
529 The segment of file content requested.
531 Raises:
532 ValueError: if file doesn't exist.
534 common.validate_file_path(filename)
535 blobkey = self._filename_to_blobkey(filename)
536 key = blobstore_stub.BlobstoreServiceStub.ToDatastoreBlobKey(blobkey)
537 gcsfileinfo = db.get(key)
538 if not gcsfileinfo or not gcsfileinfo.finalized:
539 raise ValueError('File does not exist.')
540 local_file = self.blob_storage.OpenBlob(blobkey)
541 local_file.seek(start)
542 if end:
543 return local_file.read(end - start + 1)
544 else:
545 return local_file.read()
547 @db.non_transactional
548 def head_object(self, filename):
549 """Get file stat with a HEAD.
551 Args:
552 filename: gcs filename of form '/bucket/filename'
554 Returns:
555 A GCSFileStat object containing file stat. None if file doesn't exist.
557 common.validate_file_path(filename)
558 blobkey = self._filename_to_blobkey(filename)
559 key = blobstore_stub.BlobstoreServiceStub.ToDatastoreBlobKey(blobkey)
560 info = db.get(key)
561 if info and info.finalized:
562 metadata = common.get_metadata(info.options)
563 filestat = common.GCSFileStat(
564 filename=info.filename,
565 st_size=info.size,
566 etag=info.etag,
567 st_ctime=calendar.timegm(info.creation.utctimetuple()),
568 content_type=info.content_type,
569 metadata=metadata)
570 return filestat
571 return None
573 @db.non_transactional
574 def delete_object(self, filename):
575 """Delete file with a DELETE.
577 Args:
578 filename: gcs filename of form '/bucket/filename'
580 Returns:
581 True if file is deleted. False if file doesn't exist.
583 common.validate_file_path(filename)
584 blobkey = self._filename_to_blobkey(filename)
585 key = blobstore_stub.BlobstoreServiceStub.ToDatastoreBlobKey(blobkey)
586 gcsfileinfo = db.get(key)
587 if not gcsfileinfo:
588 return False
590 blobstore_stub.BlobstoreServiceStub.DeleteBlob(blobkey, self.blob_storage)
591 return True