App Engine Python SDK version 1.8.9
[gae.git] / python / google / appengine / ext / cloudstorage / cloudstorage_stub.py
blob2332caec26b314eb959e69f1d8c50727492c4666
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 """Stub for Google storage."""
22 import calendar
23 import datetime
24 import hashlib
25 import httplib
26 import StringIO
28 from google.appengine.api import datastore
29 from google.appengine.api import namespace_manager
30 from google.appengine.api.blobstore import blobstore_stub
31 from google.appengine.ext import db
32 from google.appengine.ext.cloudstorage import common
35 _GCS_DEFAULT_CONTENT_TYPE = 'binary/octet-stream'
43 class _AE_GCSFileInfo_(db.Model):
44 """Store GCS specific info.
46 GCS allows user to define arbitrary metadata via header x-goog-meta-foo: bar.
47 These headers are returned when user does a GET or HEAD on the object.
49 Key name is blobkey.
50 """
52 filename = db.StringProperty(required=True)
53 finalized = db.BooleanProperty(required=True)
58 raw_options = db.StringListProperty()
61 size = db.IntegerProperty()
64 next_offset = db.IntegerProperty(default=0)
66 creation = db.DateTimeProperty()
68 content_type = db.StringProperty(default=_GCS_DEFAULT_CONTENT_TYPE)
69 etag = db.ByteStringProperty()
71 def get_options(self):
72 return dict(o.split(':', 1) for o in self.raw_options)
74 def set_options(self, options_dict):
75 self.raw_options = [
76 '%s:%s' % (k.lower(), v) for k, v in options_dict.iteritems()]
77 if 'content-type' in options_dict:
78 self.content_type = options_dict['content-type']
81 options = property(get_options, set_options)
83 @classmethod
84 def kind(cls):
86 return blobstore_stub._GS_INFO_KIND
89 class _AE_GCSPartialFile_(db.Model):
90 """Store partial content for uploading files."""
97 end = db.IntegerProperty(required=True)
99 partial_content = db.TextProperty(required=True)
102 class CloudStorageStub(object):
103 """Google Cloud Storage stub implementation.
105 We use blobstore stub to store files. All metadata are stored
106 in _AE_GCSFileInfo_.
108 Note: this Google Cloud Storage stub is designed to work with
109 apphosting.ext.cloudstorage.storage_api.py.
110 It only implements the part of GCS storage_api.py uses, and its interface
111 maps to GCS XML APIs.
114 def __init__(self, blob_storage):
115 """Initialize.
117 Args:
118 blob_storage:
119 apphosting.api.blobstore.blobstore_stub.BlobStorage instance
121 self.blob_storage = blob_storage
123 def _filename_to_blobkey(self, filename):
124 """Get blobkey for filename.
126 Args:
127 filename: gcs filename of form /bucket/filename.
129 Returns:
130 blobinfo's datastore's key name, aka, blobkey.
132 common.validate_file_path(filename)
134 return blobstore_stub.BlobstoreServiceStub.CreateEncodedGoogleStorageKey(
135 filename[1:])
137 @db.non_transactional
138 def post_start_creation(self, filename, options):
139 """Start object creation with a POST.
141 This implements the resumable upload XML API.
143 Only major limitation of current implementation is that we don't
144 support multiple upload sessions for the same GCS file. Previous
145 _AE_GCSFileInfo (which represents either a finalized file, or
146 an upload session) will be removed when a new upload session is
147 created.
149 Args:
150 filename: gcs filename of form /bucket/filename.
151 options: a dict containing all user specified request headers.
152 e.g. {'content-type': 'foo', 'x-goog-meta-bar': 'bar'}.
154 Returns:
155 a token (blobkey) used for continuing upload.
157 ns = namespace_manager.get_namespace()
158 try:
159 namespace_manager.set_namespace('')
160 common.validate_file_path(filename)
161 token = self._filename_to_blobkey(filename)
162 gcs_file = _AE_GCSFileInfo_.get_by_key_name(token)
164 self._cleanup_old_file(gcs_file)
165 new_file = _AE_GCSFileInfo_(key_name=token,
166 filename=filename,
167 finalized=False)
168 new_file.options = options
169 new_file.put()
170 return token
171 finally:
172 namespace_manager.set_namespace(ns)
175 @db.non_transactional
176 def _cleanup_old_file(self, gcs_file):
177 """Clean up the old version of a file.
179 The old version may or may not be finalized yet. Either way,
180 when user tries to create a file that already exists, we delete the
181 old version first.
183 Args:
184 gcs_file: an instance of _AE_GCSFileInfo_.
186 if gcs_file:
187 if gcs_file.finalized:
188 blobkey = gcs_file.key().name()
189 self.blob_storage.DeleteBlob(blobkey)
190 else:
191 db.delete(_AE_GCSPartialFile_.all().ancestor(gcs_file))
192 gcs_file.delete()
194 @db.non_transactional
195 def put_empty(self, token):
196 """Empty put is used to query upload progress.
198 The file must has not finished upload.
200 Args:
201 token: upload token returned by post_start_creation.
203 Returns:
204 last offset uploaded. -1 if none has been uploaded.
206 Raises:
207 ValueError: if token matches no in progress uploads.
209 ns = namespace_manager.get_namespace()
210 try:
211 namespace_manager.set_namespace('')
212 gcs_file = _AE_GCSFileInfo_.get_by_key_name(token)
213 if not gcs_file:
214 raise ValueError('Invalid token', httplib.BAD_REQUEST)
215 return gcs_file.next_offset - 1
216 finally:
217 namespace_manager.set_namespace(ns)
219 @db.non_transactional
220 def put_continue_creation(self, token, content, content_range,
221 length=None,
222 _upload_filename=None):
223 """Continue object upload with PUTs.
225 This implements the resumable upload XML API.
227 Args:
228 token: upload token returned by post_start_creation.
229 content: object content. None if no content was provided with this
230 PUT request.
231 content_range: a (start, end) tuple specifying the content range of this
232 chunk. Both are inclusive according to XML API. None is content is None.
233 length: file length, if this is the last chunk of file content.
234 _upload_filename: internal use. Might be removed any time! This is
235 used by blobstore to pass in the upload filename from user.
237 Returns:
238 _AE_GCSFileInfo entity for this file if the file is finalized.
240 Raises:
241 ValueError: if something is invalid. The exception.args is a tuple of
242 (msg, http status code).
246 ns = namespace_manager.get_namespace()
247 try:
248 namespace_manager.set_namespace('')
249 gcs_file = _AE_GCSFileInfo_.get_by_key_name(token)
250 if not gcs_file:
251 raise ValueError('Invalid token', httplib.BAD_REQUEST)
252 if gcs_file.next_offset == -1:
253 raise ValueError('Received more uploads after file %s '
254 'was finalized.' % gcs_file.filename,
255 httplib.OK)
256 if content:
257 start, end = content_range
258 if len(content) != (end - start + 1):
259 raise ValueError('Invalid content range %d-%d' % content_range,
260 httplib.REQUESTED_RANGE_NOT_SATISFIABLE)
262 if start > gcs_file.next_offset:
263 raise ValueError('Expect start offset %s, got %s' %
264 (gcs_file.next_offset, start),
265 httplib.REQUESTED_RANGE_NOT_SATISFIABLE)
267 elif end < gcs_file.next_offset:
268 return
269 else:
271 content = content[gcs_file.next_offset - start:]
272 start = gcs_file.next_offset
273 blobkey = '%s-%d-%d' % (token, start, end)
274 self.blob_storage.StoreBlob(blobkey, StringIO.StringIO(content))
275 new_content = _AE_GCSPartialFile_(
276 parent=gcs_file,
278 key_name='%020d' % start,
279 partial_content=blobkey,
280 start=start,
281 end=end + 1)
282 new_content.put()
283 gcs_file.next_offset = end + 1
284 gcs_file.put()
285 if length is not None and length != gcs_file.next_offset:
286 raise ValueError(
287 'Got finalization request with wrong file length. '
288 'Expecting %s, got %s' % (gcs_file.next_offset, length),
289 httplib.REQUESTED_RANGE_NOT_SATISFIABLE)
290 elif length is not None:
291 return self._end_creation(token, _upload_filename)
292 finally:
293 namespace_manager.set_namespace(ns)
295 @db.non_transactional
296 def put_copy(self, src, dst, options):
297 """Copy file from src to dst.
299 Metadata is copied.
301 Args:
302 src: /bucket/filename. This file must exist.
303 dst: /bucket/filename.
304 options: a dict containing all user specified request headers.
305 e.g. {'content-type': 'foo', 'x-goog-meta-bar': 'bar'}. If None,
306 old metadata is copied.
308 common.validate_file_path(src)
309 common.validate_file_path(dst)
312 ns = namespace_manager.get_namespace()
313 try:
314 namespace_manager.set_namespace('')
315 src_blobkey = self._filename_to_blobkey(src)
316 source = _AE_GCSFileInfo_.get_by_key_name(src_blobkey)
317 token = self._filename_to_blobkey(dst)
318 new_file = _AE_GCSFileInfo_(key_name=token,
319 filename=dst,
320 finalized=True)
321 if options:
322 new_file.options = options
323 else:
324 new_file.options = source.options
325 new_file.etag = source.etag
326 new_file.size = source.size
327 new_file.creation = source.creation
328 new_file.put()
329 finally:
330 namespace_manager.set_namespace(ns)
333 if src_blobkey != token:
335 local_file = self.blob_storage.OpenBlob(src_blobkey)
336 self.blob_storage.StoreBlob(token, local_file)
338 @db.non_transactional
339 def _end_creation(self, token, _upload_filename):
340 """End object upload.
342 Args:
343 token: upload token returned by post_start_creation.
345 Returns:
346 _AE_GCSFileInfo Entity for this file.
348 Raises:
349 ValueError: if token is invalid. Or file is corrupted during upload.
351 Save file content to blobstore. Save blobinfo and _AE_GCSFileInfo.
353 gcs_file = _AE_GCSFileInfo_.get_by_key_name(token)
354 if not gcs_file:
355 raise ValueError('Invalid token')
356 if gcs_file.finalized:
357 return gcs_file
359 error_msg, content = self._get_content(gcs_file)
360 if error_msg:
361 raise ValueError(error_msg)
363 gcs_file.etag = hashlib.md5(content).hexdigest()
364 gcs_file.creation = datetime.datetime.utcnow()
365 gcs_file.size = len(content)
369 blob_info = datastore.Entity('__BlobInfo__', name=str(token), namespace='')
370 blob_info['content_type'] = gcs_file.content_type
371 blob_info['creation'] = gcs_file.creation
372 blob_info['filename'] = _upload_filename
373 blob_info['md5_hash'] = gcs_file.etag
374 blob_info['size'] = gcs_file.size
375 datastore.Put(blob_info)
377 self.blob_storage.StoreBlob(token, StringIO.StringIO(content))
379 gcs_file.finalized = True
381 gcs_file.next_offset = -1
382 gcs_file.put()
383 return gcs_file
385 @db.transactional(propagation=db.INDEPENDENT)
386 def _get_content(self, gcs_file):
387 """Aggregate all partial content of the gcs_file.
389 Args:
390 gcs_file: an instance of _AE_GCSFileInfo_.
392 Returns:
393 (error_msg, content) tuple. error_msg is set if the file is
394 corrupted during upload. Otherwise content is set to the
395 aggregation of all partial contents.
397 content = ''
398 previous_end = 0
399 error_msg = ''
400 for partial in (_AE_GCSPartialFile_.all(namespace='').ancestor(gcs_file).
401 order('__key__')):
402 start = int(partial.key().name())
403 if not error_msg:
404 if start < previous_end:
405 error_msg = 'File is corrupted due to missing chunks.'
406 elif start > previous_end:
407 error_msg = 'File is corrupted due to overlapping chunks'
408 previous_end = partial.end
409 content += self.blob_storage.OpenBlob(partial.partial_content).read()
410 self.blob_storage.DeleteBlob(partial.partial_content)
411 partial.delete()
412 if error_msg:
413 gcs_file.delete()
414 content = ''
415 return error_msg, content
417 @db.non_transactional
418 def get_bucket(self,
419 bucketpath,
420 prefix,
421 marker,
422 max_keys,
423 delimiter):
424 """Get bucket listing with a GET.
426 How GCS listbucket work in production:
427 GCS tries to return as many items as possible in a single response. If
428 there are more items satisfying user's query and the current request
429 took too long (e.g spent on skipping files in a subdir) or items to return
430 gets too big (> max_keys), it returns fast and sets IsTruncated
431 and NextMarker for continuation. They serve redundant purpose: if
432 NextMarker is set, IsTruncated is True.
434 Note NextMarker is not where GCS scan left off. It is
435 only valid for the exact same type of query the marker was generated from.
436 For example, if a marker is generated from query with delimiter, the marker
437 is the name of a subdir (instead of the last file within the subdir). Thus
438 you can't use this marker to issue a query without delimiter.
440 Args:
441 bucketpath: gcs bucket path of form '/bucket'
442 prefix: prefix to limit listing.
443 marker: a str after which to start listing. Exclusive.
444 max_keys: max items we scan & return.
445 delimiter: delimiter for directory.
447 See https://developers.google.com/storage/docs/reference-methods#getbucket
448 for details.
450 Returns:
451 A tuple of (a list of GCSFileStat for files or directories sorted by
452 filename, next_marker to use as next marker, is_truncated boolean to
453 indicate if there are more results satisfying query).
455 common.validate_bucket_path(bucketpath)
456 q = _AE_GCSFileInfo_.all(namespace='')
457 fully_qualified_prefix = '/'.join([bucketpath, prefix])
458 if marker:
459 q.filter('filename >', '/'.join([bucketpath, marker]))
460 else:
461 q.filter('filename >=', fully_qualified_prefix)
463 result = set()
464 name = None
465 first = True
466 first_dir = None
467 for info in q.run():
469 if not info.filename.startswith(fully_qualified_prefix):
470 break
471 if len(result) == max_keys:
472 break
475 info = db.get(info.key())
476 if not info:
477 continue
479 name = info.filename
480 if delimiter:
482 start_index = name.find(delimiter, len(fully_qualified_prefix))
483 if start_index != -1:
484 name = name[:start_index + len(delimiter)]
487 if marker and (first or name == first_dir):
488 first = False
489 first_dir = name
491 else:
492 result.add(common.GCSFileStat(name, st_size=None,
493 st_ctime=None, etag=None,
494 is_dir=True))
495 continue
498 if info.finalized:
499 first = False
500 result.add(common.GCSFileStat(
501 filename=name,
502 st_size=info.size,
503 st_ctime=calendar.timegm(info.creation.utctimetuple()),
504 etag=info.etag))
506 def is_truncated():
507 """Check if there are more results satisfying the query."""
508 if not result:
509 return False
510 q = _AE_GCSFileInfo_.all(namespace='')
511 q.filter('filename >', name)
512 info = None
514 if delimiter and name.endswith(delimiter):
516 for info in q.run():
517 if not info.filename.startswith(name):
518 break
519 if info.filename.startswith(name):
520 info = None
521 else:
522 info = q.get()
523 if info is None or not info.filename.startswith(fully_qualified_prefix):
524 return False
525 return True
527 result = list(result)
528 result.sort()
529 truncated = is_truncated()
530 next_marker = name if truncated else None
532 return result, next_marker, truncated
534 @db.non_transactional
535 def get_object(self, filename, start=0, end=None):
536 """Get file content with a GET.
538 Args:
539 filename: gcs filename of form '/bucket/filename'.
540 start: start offset to request. Inclusive.
541 end: end offset to request. Inclusive.
543 Returns:
544 The segment of file content requested.
546 Raises:
547 ValueError: if file doesn't exist.
549 common.validate_file_path(filename)
550 blobkey = self._filename_to_blobkey(filename)
551 key = blobstore_stub.BlobstoreServiceStub.ToDatastoreBlobKey(blobkey)
552 gcsfileinfo = db.get(key)
553 if not gcsfileinfo or not gcsfileinfo.finalized:
554 raise ValueError('File does not exist.')
555 local_file = self.blob_storage.OpenBlob(blobkey)
556 local_file.seek(start)
557 if end:
558 return local_file.read(end - start + 1)
559 else:
560 return local_file.read()
562 @db.non_transactional
563 def head_object(self, filename):
564 """Get file stat with a HEAD.
566 Args:
567 filename: gcs filename of form '/bucket/filename'
569 Returns:
570 A GCSFileStat object containing file stat. None if file doesn't exist.
572 common.validate_file_path(filename)
573 blobkey = self._filename_to_blobkey(filename)
574 key = blobstore_stub.BlobstoreServiceStub.ToDatastoreBlobKey(blobkey)
575 info = db.get(key)
576 if info and info.finalized:
577 metadata = common.get_metadata(info.options)
578 filestat = common.GCSFileStat(
579 filename=info.filename,
580 st_size=info.size,
581 etag=info.etag,
582 st_ctime=calendar.timegm(info.creation.utctimetuple()),
583 content_type=info.content_type,
584 metadata=metadata)
585 return filestat
586 return None
588 @db.non_transactional
589 def delete_object(self, filename):
590 """Delete file with a DELETE.
592 Args:
593 filename: gcs filename of form '/bucket/filename'
595 Returns:
596 True if file is deleted. False if file doesn't exist.
598 common.validate_file_path(filename)
599 blobkey = self._filename_to_blobkey(filename)
600 key = blobstore_stub.BlobstoreServiceStub.ToDatastoreBlobKey(blobkey)
601 gcsfileinfo = db.get(key)
602 if not gcsfileinfo:
603 return False
605 blobstore_stub.BlobstoreServiceStub.DeleteBlob(blobkey, self.blob_storage)
606 return True