App Engine Python SDK version 1.8.9
[gae.git] / python / google / appengine / ext / datastore_admin / backup_handler.py
blob8c23f76d40504f94084aae25a405069c7ce7788b
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
22 """Handler for data backup operation.
24 Generic datastore admin console transfers control to ConfirmBackupHandler
25 after selection of entities. The ConfirmBackupHandler confirms with user
26 his choice, enters a backup name and transfers control to
27 DoBackupHandler. DoBackupHandler starts backup mappers and displays confirmation
28 page.
30 This module also contains actual mapper code for backing data over.
31 """
33 from __future__ import with_statement
37 import cStringIO
38 import datetime
39 import itertools
40 import logging
41 import os
42 import random
43 import re
44 import time
45 import urllib
46 import xml.dom.minidom
49 from google.appengine.datastore import entity_pb
50 from google.appengine.api import apiproxy_stub_map
51 from google.appengine.api import app_identity
52 from google.appengine.api import blobstore as blobstore_api
53 from google.appengine.api import capabilities
54 from google.appengine.api import datastore
55 from google.appengine.api import datastore_types
56 from google.appengine.api import files
57 from google.appengine.api import taskqueue
58 from google.appengine.api import urlfetch
59 from google.appengine.api.files import records
60 from google.appengine.api.taskqueue import taskqueue_service_pb
61 from google.appengine.datastore import datastore_query
62 from google.appengine.datastore import datastore_rpc
63 from google.appengine.ext import blobstore
64 from google.appengine.ext import db
65 from google.appengine.ext import deferred
66 from google.appengine.ext import webapp
67 from google.appengine.ext.datastore_admin import backup_pb2
68 from google.appengine.ext.datastore_admin import config
69 from google.appengine.ext.datastore_admin import utils
70 from google.appengine.ext.mapreduce import context
71 from google.appengine.ext.mapreduce import datastore_range_iterators as db_iters
72 from google.appengine.ext.mapreduce import input_readers
73 from google.appengine.ext.mapreduce import json_util
74 from google.appengine.ext.mapreduce import operation as op
75 from google.appengine.ext.mapreduce import output_writers
76 from google.appengine.runtime import apiproxy_errors
78 try:
80 from google.appengine.ext.datastore_admin import services_client
82 except ImportError:
84 pass
87 XSRF_ACTION = 'backup'
88 BUCKET_PATTERN = (r'^([a-zA-Z0-9]+([\-_]+[a-zA-Z0-9]+)*)'
89 r'(\.([a-zA-Z0-9]+([\-_]+[a-zA-Z0-9]+)*))*$')
90 MAX_BUCKET_LEN = 222
91 MIN_BUCKET_LEN = 3
92 MAX_BUCKET_SEGMENT_LEN = 63
93 NUM_KINDS_DEFERRED_THRESHOLD = 10
94 MAX_BLOBS_PER_DELETE = 500
95 TEST_WRITE_FILENAME_PREFIX = 'datastore_backup_write_test'
96 MAX_KEYS_LIST_SIZE = 100
97 MAX_TEST_FILENAME_TRIES = 10
99 MEANING_TO_PRIMITIVE_TYPE = {
100 entity_pb.Property.GD_WHEN: backup_pb2.EntitySchema.DATE_TIME,
101 entity_pb.Property.GD_RATING: backup_pb2.EntitySchema.RATING,
102 entity_pb.Property.ATOM_LINK: backup_pb2.EntitySchema.LINK,
103 entity_pb.Property.ATOM_CATEGORY: backup_pb2.EntitySchema.CATEGORY,
104 entity_pb.Property.GD_PHONENUMBER: backup_pb2.EntitySchema.PHONE_NUMBER,
105 entity_pb.Property.GD_POSTALADDRESS: backup_pb2.EntitySchema.POSTAL_ADDRESS,
106 entity_pb.Property.GD_EMAIL: backup_pb2.EntitySchema.EMAIL,
107 entity_pb.Property.GD_IM: backup_pb2.EntitySchema.IM_HANDLE,
108 entity_pb.Property.BLOBKEY: backup_pb2.EntitySchema.BLOB_KEY,
109 entity_pb.Property.TEXT: backup_pb2.EntitySchema.TEXT,
110 entity_pb.Property.BLOB: backup_pb2.EntitySchema.BLOB,
111 entity_pb.Property.BYTESTRING: backup_pb2.EntitySchema.SHORT_BLOB
115 class ConfirmBackupHandler(webapp.RequestHandler):
116 """Handler to deal with requests from the admin console to backup data."""
118 SUFFIX = 'confirm_backup'
120 @classmethod
121 def Render(cls, handler):
122 """Rendering method that can be called by main.py.
124 Args:
125 handler: the webapp.RequestHandler invoking the method
127 kinds = handler.request.get_all('kind')
128 sizes_known, size_total, remainder = utils.ParseKindsAndSizes(kinds)
129 notreadonly_warning = capabilities.CapabilitySet(
130 'datastore_v3', capabilities=['write']).is_enabled()
131 blob_warning = bool(blobstore.BlobInfo.all().count(1))
132 template_params = {
133 'run_as_a_service': handler.request.get('run_as_a_service'),
134 'form_target': DoBackupHandler.SUFFIX,
135 'kind_list': kinds,
136 'remainder': remainder,
137 'sizes_known': sizes_known,
138 'size_total': size_total,
139 'queues': None,
140 'datastore_admin_home': utils.GenerateHomeUrl(handler.request),
141 'namespaces': get_namespaces(handler.request.get('namespace', None)),
142 'xsrf_token': utils.CreateXsrfToken(XSRF_ACTION),
143 'notreadonly_warning': notreadonly_warning,
144 'blob_warning': blob_warning,
145 'backup_name': 'datastore_backup_%s' % time.strftime('%Y_%m_%d')
147 utils.RenderToResponse(handler, 'confirm_backup.html', template_params)
150 def get_namespaces(selected_namespace):
151 namespaces = [('--All--', '*', selected_namespace is None)]
152 for ns in datastore.Query('__namespace__', keys_only=True).Run():
153 ns_name = ns.name() or ''
154 namespaces.append((ns_name or '--Default--',
155 ns_name,
156 ns_name == selected_namespace))
157 return namespaces
160 class ConfirmDeleteBackupHandler(webapp.RequestHandler):
161 """Handler to confirm admin console requests to delete a backup copy."""
163 SUFFIX = 'confirm_delete_backup'
165 @classmethod
166 def Render(cls, handler):
167 """Rendering method that can be called by main.py.
169 Args:
170 handler: the webapp.RequestHandler invoking the method
172 requested_backup_ids = handler.request.get_all('backup_id')
173 backups = []
174 gs_warning = False
175 if requested_backup_ids:
176 for backup in db.get(requested_backup_ids):
177 if backup:
178 backups.append(backup)
179 gs_warning |= backup.filesystem == files.GS_FILESYSTEM
180 template_params = {
181 'form_target': DoBackupDeleteHandler.SUFFIX,
182 'datastore_admin_home': utils.GenerateHomeUrl(handler.request),
183 'backups': backups,
184 'xsrf_token': utils.CreateXsrfToken(XSRF_ACTION),
185 'gs_warning': gs_warning,
186 'run_as_a_service': handler.request.get('run_as_a_service'),
188 utils.RenderToResponse(handler, 'confirm_delete_backup.html',
189 template_params)
192 class ConfirmAbortBackupHandler(webapp.RequestHandler):
193 """Handler to confirm admin console requests to abort a backup copy."""
195 SUFFIX = 'confirm_abort_backup'
197 @classmethod
198 def Render(cls, handler):
199 """Rendering method that can be called by main.py.
201 Args:
202 handler: the webapp.RequestHandler invoking the method
204 requested_backup_ids = handler.request.get_all('backup_id')
205 backups = []
206 if requested_backup_ids:
207 for backup in db.get(requested_backup_ids):
208 if backup:
209 backups.append(backup)
210 template_params = {
211 'form_target': DoBackupAbortHandler.SUFFIX,
212 'datastore_admin_home': utils.GenerateHomeUrl(handler.request),
213 'backups': backups,
214 'xsrf_token': utils.CreateXsrfToken(XSRF_ACTION),
215 'run_as_a_service': handler.request.get('run_as_a_service'),
217 utils.RenderToResponse(handler, 'confirm_abort_backup.html',
218 template_params)
221 class ConfirmRestoreFromBackupHandler(webapp.RequestHandler):
222 """Handler to confirm admin console requests to restore from backup."""
224 SUFFIX = 'confirm_restore_from_backup'
226 @classmethod
227 def Render(cls, handler, default_backup_id=None,
228 default_delete_backup_after_restore=False):
229 """Rendering method that can be called by main.py.
231 Args:
232 handler: the webapp.RequestHandler invoking the method
233 default_backup_id: default value for handler.request
234 default_delete_backup_after_restore: default value for handler.request
236 backup_id = handler.request.get('backup_id', default_backup_id)
237 backup = db.get(backup_id) if backup_id else None
238 notreadonly_warning = capabilities.CapabilitySet(
239 'datastore_v3', capabilities=['write']).is_enabled()
240 original_app_warning = backup.original_app
241 if os.getenv('APPLICATION_ID') == original_app_warning:
242 original_app_warning = None
243 template_params = {
244 'form_target': DoBackupRestoreHandler.SUFFIX,
245 'queues': None,
246 'datastore_admin_home': utils.GenerateHomeUrl(handler.request),
247 'backup': backup,
248 'delete_backup_after_restore': handler.request.get(
249 'delete_backup_after_restore', default_delete_backup_after_restore),
250 'xsrf_token': utils.CreateXsrfToken(XSRF_ACTION),
251 'notreadonly_warning': notreadonly_warning,
252 'original_app_warning': original_app_warning,
253 'run_as_a_service': handler.request.get('run_as_a_service'),
255 utils.RenderToResponse(handler, 'confirm_restore_from_backup.html',
256 template_params)
259 class ConfirmBackupImportHandler(webapp.RequestHandler):
260 """Handler to import backup information."""
262 SUFFIX = 'backup_information'
264 @classmethod
265 def Render(cls, handler):
266 """Rendering method that can be called by main.py.
268 Args:
269 handler: the webapp.RequestHandler invoking the method
271 gs_handle = handler.request.get('gs_handle')
272 error = None if gs_handle else 'Google Cloud Storage path is missing'
273 other_backup_info_files = []
274 selected_backup_info_file = None
275 backup_info_specified = False
276 if not error:
277 try:
278 gs_handle = gs_handle.rstrip()
279 bucket_name, prefix = parse_gs_handle(gs_handle)
280 validate_gs_bucket_name(bucket_name)
281 if not is_accessible_bucket_name(bucket_name):
282 raise BackupValidationError(
283 'Bucket "%s" is not accessible' % bucket_name)
284 if prefix.endswith('.backup_info'):
285 prefix = prefix[0:prefix.rfind('/')]
286 backup_info_specified = True
287 elif prefix and not prefix.endswith('/'):
288 prefix += '/'
289 for backup_info_file in list_bucket_files(bucket_name, prefix):
290 backup_info_path = '/gs/%s/%s' % (bucket_name, backup_info_file)
291 if backup_info_specified and backup_info_path == gs_handle:
292 selected_backup_info_file = backup_info_path
293 elif (backup_info_file.endswith('.backup_info')
294 and backup_info_file.count('.') == 1):
295 other_backup_info_files.append(backup_info_path)
296 except Exception, ex:
297 error = 'Failed to read bucket: %s' % ex.message
298 logging.exception(ex.message)
299 template_params = {
300 'error': error,
301 'form_target': DoBackupImportHandler.SUFFIX,
302 'datastore_admin_home': utils.GenerateHomeUrl(handler.request),
303 'selected_backup_info_file': selected_backup_info_file,
304 'other_backup_info_files': other_backup_info_files,
305 'backup_info_specified': backup_info_specified,
306 'xsrf_token': utils.CreateXsrfToken(XSRF_ACTION),
307 'run_as_a_service': handler.request.get('run_as_a_service'),
309 utils.RenderToResponse(handler, 'confirm_backup_import.html',
310 template_params)
313 class BackupInformationHandler(webapp.RequestHandler):
314 """Handler to display backup information."""
316 SUFFIX = 'backup_information'
318 @classmethod
319 def Render(cls, handler):
320 """Rendering method that can be called by main.py.
322 Args:
323 handler: the webapp.RequestHandler invoking the method
325 backup_ids = handler.request.get_all('backup_id')
326 template_params = {
327 'backups': db.get(backup_ids),
328 'datastore_admin_home': utils.GenerateHomeUrl(handler.request),
329 'run_as_a_service': handler.request.get('run_as_a_service'),
331 utils.RenderToResponse(handler, 'backup_information.html', template_params)
334 class BaseDoHandler(webapp.RequestHandler):
335 """Base class for all Do*Handlers."""
337 MAPREDUCE_DETAIL = config.MAPREDUCE_PATH + '/detail?mapreduce_id='
339 def get(self):
340 """Handler for get requests to datastore_admin backup operations.
342 Status of executed jobs is displayed.
344 jobs = self.request.get_all('job')
345 remote_job = self.request.get('remote_job')
346 tasks = self.request.get_all('task')
347 error = self.request.get('error', '')
348 xsrf_error = self.request.get('xsrf_error', '')
349 template_params = {
350 'job_list': jobs,
351 'remote_job': remote_job,
352 'task_list': tasks,
353 'mapreduce_detail': self.MAPREDUCE_DETAIL,
354 'error': error,
355 'xsrf_error': xsrf_error,
356 'datastore_admin_home': utils.GenerateHomeUrl(self.request),
358 utils.RenderToResponse(self, self._get_html_page, template_params)
360 @property
361 def _get_html_page(self):
362 """Return the name of the HTML page for HTTP/GET requests."""
363 raise NotImplementedError
365 @property
366 def _get_post_html_page(self):
367 """Return the name of the HTML page for HTTP/POST requests."""
368 raise NotImplementedError
370 def _ProcessPostRequest(self):
371 """Process the HTTP/POST request and return the result as parametrs."""
372 raise NotImplementedError
374 def _GetBasicMapperParams(self):
375 namespace = self.request.get('namespace', None)
376 if namespace == '*':
377 namespace = None
378 return {'namespace': namespace}
380 def SendRedirect(self, path=None, params=()):
381 """Send a redirect response."""
383 run_as_a_service = self.request.get('run_as_a_service')
384 if run_as_a_service:
385 params = list(params)
386 params.append(('run_as_a_service', True))
387 dest = config.BASE_PATH
388 if path:
389 dest = '%s/%s' % (dest, path)
390 if params:
391 dest = '%s?%s' % (dest, urllib.urlencode(params))
392 self.redirect(dest)
394 def post(self):
395 """Handler for post requests to datastore_admin/backup.do.
397 Redirects to the get handler after processing the request.
399 token = self.request.get('xsrf_token')
401 if not utils.ValidateXsrfToken(token, XSRF_ACTION):
402 parameters = [('xsrf_error', '1')]
403 else:
404 try:
405 parameters = self._ProcessPostRequest()
408 except Exception, e:
409 error = self._HandleException(e)
410 parameters = [('error', error)]
412 self.SendRedirect(self._get_post_html_page, parameters)
414 def _HandleException(self, e):
415 """Make exception handling overridable by tests.
417 Args:
418 e: The exception to handle.
420 Returns:
421 The exception error string.
423 logging.exception(e.message)
424 return '%s: %s' % (type(e), e.message)
427 class BackupValidationError(utils.Error):
428 """Raised upon backup request validation."""
431 def _perform_backup(run_as_a_service, kinds, selected_namespace,
432 filesystem, gs_bucket_name, backup,
433 queue, mapper_params, max_jobs):
434 """Triggers backup mapper jobs.
436 Args:
437 run_as_a_service: True if backup should be done via admin-jobs
438 kinds: a sequence of kind names
439 selected_namespace: The selected namespace or None for all
440 filesystem: files.BLOBSTORE_FILESYSTEM or files.GS_FILESYSTEM
441 or None to default to blobstore
442 gs_bucket_name: the GS file system bucket in which to store the backup
443 when using the GS file system, and otherwise ignored
444 backup: the backup name
445 queue: the task queue for the backup task
446 mapper_params: the mapper parameters
447 max_jobs: if backup needs more jobs than this, defer them
449 Returns:
450 The job or task ids.
452 Raises:
453 BackupValidationError: On validation error.
454 Exception: On other error.
456 BACKUP_COMPLETE_HANDLER = __name__ + '.BackupCompleteHandler'
457 BACKUP_HANDLER = __name__ + '.BackupEntity.map'
458 INPUT_READER = __name__ + '.DatastoreEntityProtoInputReader'
459 OUTPUT_WRITER = output_writers.__name__ + '.FileRecordsOutputWriter'
461 if run_as_a_service:
462 if not gs_bucket_name:
463 raise BackupValidationError('Bucket name missing.')
464 gs_bucket_name = validate_and_canonicalize_gs_bucket(gs_bucket_name)
465 datastore_admin_service = services_client.DatastoreAdminClient()
466 description = 'Remote backup job: %s' % backup
467 remote_job_id = datastore_admin_service.create_backup(
468 description, backup, gs_bucket_name, selected_namespace, kinds)
469 return [('remote_job', remote_job_id)]
471 queue = queue or os.environ.get('HTTP_X_APPENGINE_QUEUENAME', 'default')
472 if queue[0] == '_':
474 queue = 'default'
475 if not filesystem:
476 filesystem = files.BLOBSTORE_FILESYSTEM
477 if filesystem == files.GS_FILESYSTEM:
479 if not gs_bucket_name:
480 raise BackupValidationError('Bucket name missing.')
481 gs_bucket_name = validate_and_canonicalize_gs_bucket(gs_bucket_name)
482 elif filesystem == files.BLOBSTORE_FILESYSTEM:
483 pass
484 else:
485 raise BackupValidationError('Unknown filesystem "%s".' % filesystem)
487 job_name = 'datastore_backup_%s_%%(kind)s' % re.sub(r'[^\w]', '_', backup)
488 try:
489 job_operation = utils.StartOperation('Backup: %s' % backup)
490 backup_info = BackupInformation(parent=job_operation)
491 backup_info.filesystem = filesystem
492 backup_info.name = backup
493 backup_info.kinds = kinds
494 if selected_namespace is not None:
495 backup_info.namespaces = [selected_namespace]
496 backup_info.put(force_writes=True)
497 mapreduce_params = {
498 'done_callback_handler': BACKUP_COMPLETE_HANDLER,
499 'backup_info_pk': str(backup_info.key()),
500 'force_ops_writes': True,
502 mapper_params = dict(mapper_params)
503 mapper_params['filesystem'] = filesystem
504 if filesystem == files.GS_FILESYSTEM:
505 mapper_params['gs_bucket_name'] = gs_bucket_name
506 if len(kinds) <= max_jobs:
507 return [('job', job) for job in _run_map_jobs(
508 job_operation.key(), backup_info.key(), kinds, job_name,
509 BACKUP_HANDLER, INPUT_READER, OUTPUT_WRITER,
510 mapper_params, mapreduce_params, queue)]
511 else:
512 retry_options = taskqueue.TaskRetryOptions(task_retry_limit=1)
513 deferred_task = deferred.defer(_run_map_jobs_deferred,
514 backup, job_operation.key(),
515 backup_info.key(), kinds, job_name,
516 BACKUP_HANDLER, INPUT_READER,
517 OUTPUT_WRITER, mapper_params,
518 mapreduce_params, queue, _queue=queue,
519 _url=config.DEFERRED_PATH,
520 _retry_options=retry_options)
521 return [('task', deferred_task.name)]
522 except Exception:
523 logging.exception('Failed to start a datastore backup job[s] for "%s".',
524 backup)
525 if backup_info:
526 delete_backup_info(backup_info)
527 if job_operation:
528 job_operation.status = utils.DatastoreAdminOperation.STATUS_FAILED
529 job_operation.put(force_writes=True)
530 raise
533 class BackupLinkHandler(webapp.RequestHandler):
534 """Handler to deal with requests to the backup link to backup data."""
536 SUFFIX = 'backup.create'
538 def get(self):
539 """Handler for get requests to datastore_admin/backup.create."""
540 self.post()
542 def post(self):
543 """Handler for post requests to datastore_admin/backup.create."""
544 try:
549 if ('X-AppEngine-TaskName' not in self.request.headers and
550 'X-AppEngine-Cron' not in self.request.headers):
551 logging.critical('Scheduled backups must be started via task queue or '
552 'cron.')
553 self.response.set_status(403)
554 return
556 backup_prefix = self.request.get('name')
557 if not backup_prefix:
558 if self.request.headers.get('X-AppEngine-Cron'):
559 backup_prefix = 'cron-'
560 else:
561 backup_prefix = 'link-'
562 backup_prefix_with_date = backup_prefix + time.strftime('%Y_%m_%d')
563 backup_name = backup_prefix_with_date
564 backup_suffix_counter = 1
565 while BackupInformation.name_exists(backup_name):
566 backup_suffix_counter += 1
567 backup_name = backup_prefix_with_date + '-' + str(backup_suffix_counter)
568 kinds = self.request.get_all('kind')
569 if not kinds:
570 self.errorResponse('Backup must include at least one kind.')
571 return
572 for kind in kinds:
573 if not utils.IsKindNameVisible(kind):
574 self.errorResponse('Invalid kind %s.' % kind)
575 return
576 namespace = self.request.get('namespace', None)
577 if namespace == '*':
578 namespace = None
579 mapper_params = {'namespace': namespace}
580 _perform_backup(self.request.get('run_as_a_service', False),
581 kinds,
582 namespace,
583 self.request.get('filesystem'),
584 self.request.get('gs_bucket_name'),
585 backup_name,
586 self.request.get('queue'),
587 mapper_params,
588 1000000)
589 except Exception, e:
590 self.errorResponse(e.message)
592 def errorResponse(self, message):
593 logging.error('Could not create backup via link: %s', message)
594 self.response.set_status(400, message)
597 class DatastoreEntityProtoInputReader(input_readers.RawDatastoreInputReader):
598 """An input reader which yields datastore entity proto for a kind."""
600 _KEY_RANGE_ITER_CLS = db_iters.KeyRangeEntityProtoIterator
603 class DoBackupHandler(BaseDoHandler):
604 """Handler to deal with requests from the admin console to backup data."""
606 SUFFIX = 'backup.do'
607 _get_html_page = 'do_backup.html'
608 _get_post_html_page = SUFFIX
610 def _ProcessPostRequest(self):
611 """Triggers backup mapper jobs and returns their ids."""
612 try:
613 backup = self.request.get('backup_name').strip()
614 if not backup:
615 raise BackupValidationError('Unspecified backup name.')
616 if BackupInformation.name_exists(backup):
617 raise BackupValidationError('Backup "%s" already exists.' % backup)
618 mapper_params = self._GetBasicMapperParams()
619 backup_result = _perform_backup(self.request.get('run_as_a_service',
620 False),
621 self.request.get_all('kind'),
622 mapper_params.get('namespace'),
623 self.request.get('filesystem'),
624 self.request.get('gs_bucket_name'),
625 backup,
626 self.request.get('queue'),
627 mapper_params,
629 return backup_result
630 except Exception, e:
631 logging.exception(e.message)
632 return [('error', e.message)]
635 def _run_map_jobs_deferred(backup_name, job_operation_key, backup_info_key,
636 kinds, job_name, backup_handler, input_reader,
637 output_writer, mapper_params, mapreduce_params,
638 queue):
639 backup_info = BackupInformation.get(backup_info_key)
640 if backup_info:
641 try:
642 _run_map_jobs(job_operation_key, backup_info_key, kinds, job_name,
643 backup_handler, input_reader, output_writer, mapper_params,
644 mapreduce_params, queue)
645 except BaseException:
646 logging.exception('Failed to start a datastore backup job[s] for "%s".',
647 backup_name)
648 delete_backup_info(backup_info)
649 else:
650 logging.info('Missing backup info, can not start backup jobs for "%s"',
651 backup_name)
654 def _run_map_jobs(job_operation_key, backup_info_key, kinds, job_name,
655 backup_handler, input_reader, output_writer, mapper_params,
656 mapreduce_params, queue):
657 """Creates backup/restore MR jobs for the given operation.
659 Args:
660 job_operation_key: a key of utils.DatastoreAdminOperation entity.
661 backup_info_key: a key of BackupInformation entity.
662 kinds: a list of kinds to run the M/R for.
663 job_name: the M/R job name prefix.
664 backup_handler: M/R job completion handler.
665 input_reader: M/R input reader.
666 output_writer: M/R output writer.
667 mapper_params: custom parameters to pass to mapper.
668 mapreduce_params: dictionary parameters relevant to the whole job.
669 queue: the name of the queue that will be used by the M/R.
671 Returns:
672 Ids of all started mapper jobs as list of strings.
674 backup_info = BackupInformation.get(backup_info_key)
675 if not backup_info:
676 return []
677 jobs = utils.RunMapForKinds(
678 job_operation_key,
679 kinds,
680 job_name,
681 backup_handler,
682 input_reader,
683 output_writer,
684 mapper_params,
685 mapreduce_params,
686 queue_name=queue)
687 backup_info.active_jobs = jobs
688 backup_info.put(force_writes=True)
689 return jobs
692 def get_backup_files(backup_info, selected_kinds=None):
693 """Returns the backup filenames for selected kinds or all if None/Empty."""
694 if backup_info.blob_files:
696 return backup_info.blob_files
697 else:
698 kinds_backup_files = backup_info.get_kind_backup_files(selected_kinds)
699 return list(itertools.chain(*(
700 kind_backup_files.files for kind_backup_files in kinds_backup_files)))
703 def delete_backup_files(filesystem, backup_files):
704 if backup_files:
708 if filesystem == files.BLOBSTORE_FILESYSTEM:
711 blob_keys = []
712 for fname in backup_files:
713 blob_key = files.blobstore.get_blob_key(fname)
714 if blob_key:
715 blob_keys.append(blob_key)
716 if len(blob_keys) == MAX_BLOBS_PER_DELETE:
717 blobstore_api.delete(blob_keys)
718 blob_keys = []
719 if blob_keys:
720 blobstore_api.delete(blob_keys)
723 def delete_backup_info(backup_info, delete_files=True):
724 """Deletes a backup including its associated files and other metadata."""
725 if backup_info.blob_files:
726 delete_backup_files(backup_info.filesystem, backup_info.blob_files)
727 backup_info.delete(force_writes=True)
728 else:
729 kinds_backup_files = tuple(backup_info.get_kind_backup_files())
730 if delete_files:
731 delete_backup_files(backup_info.filesystem, itertools.chain(*(
732 kind_backup_files.files for kind_backup_files in kinds_backup_files)))
733 db.delete(kinds_backup_files + (backup_info,), force_writes=True)
736 class DoBackupDeleteHandler(BaseDoHandler):
737 """Handler to deal with datastore admin requests to delete backup data."""
739 SUFFIX = 'backup_delete.do'
741 def get(self):
742 self.post()
744 def post(self):
745 """Handler for post requests to datastore_admin/backup_delete.do.
747 Deletes are executed and user is redirected to the base-path handler.
749 backup_ids = self.request.get_all('backup_id')
750 token = self.request.get('xsrf_token')
751 params = ()
752 if backup_ids and utils.ValidateXsrfToken(token, XSRF_ACTION):
753 try:
754 for backup_info in db.get(backup_ids):
755 if backup_info:
756 delete_backup_info(backup_info)
757 except Exception, e:
758 logging.exception('Failed to delete datastore backup.')
759 params = [('error', e.message)]
761 self.SendRedirect(params=params)
764 class DoBackupAbortHandler(BaseDoHandler):
765 """Handler to deal with datastore admin requests to abort pending backups."""
767 SUFFIX = 'backup_abort.do'
769 def get(self):
770 self.post()
772 def post(self):
773 """Handler for post requests to datastore_admin/backup_abort.do.
775 Abort is executed and user is redirected to the base-path handler.
777 backup_ids = self.request.get_all('backup_id')
778 token = self.request.get('xsrf_token')
779 params = ()
780 if backup_ids and utils.ValidateXsrfToken(token, XSRF_ACTION):
781 try:
782 for backup_info in db.get(backup_ids):
783 if backup_info:
784 operation = backup_info.parent()
785 if operation.parent_key():
786 job_id = str(operation.parent_key())
787 datastore_admin_service = services_client.DatastoreAdminClient()
788 datastore_admin_service.abort_backup(job_id)
789 else:
790 utils.AbortAdminOperation(operation.key())
791 delete_backup_info(backup_info)
792 except Exception, e:
793 logging.exception('Failed to abort pending datastore backup.')
794 params = [('error', e.message)]
796 self.SendRedirect(params=params)
799 class DoBackupRestoreHandler(BaseDoHandler):
800 """Handler to restore backup data.
802 Deals with requests from the admin console.
804 SUFFIX = 'backup_restore.do'
805 BACKUP_RESTORE_HANDLER = __name__ + '.RestoreEntity.map'
806 RESTORE_COMPLETE_HANDLER = __name__ + '.RestoreCompleteHandler'
808 INPUT_READER = input_readers.__name__ + '.RecordsReader'
809 _get_html_page = 'do_restore_from_backup.html'
810 _get_post_html_page = SUFFIX
812 def _ProcessPostRequest(self):
813 """Triggers backup restore mapper jobs and returns their ids."""
814 backup_id = self.request.get('backup_id')
815 if not backup_id:
816 return [('error', 'Unspecified Backup.')]
818 backup = db.get(db.Key(backup_id))
819 if not backup:
820 return [('error', 'Invalid Backup id.')]
822 if backup.gs_handle:
823 if not is_readable_gs_handle(backup.gs_handle):
824 return [('error', 'Backup not readable')]
826 kinds = set(self.request.get_all('kind'))
827 if not (backup.blob_files or kinds):
828 return [('error', 'No kinds were selected')]
829 backup_kinds = set(backup.kinds)
830 difference = kinds.difference(backup_kinds)
831 if difference:
832 return [('error', 'Backup does not have kind[s] %s' %
833 ', '.join(difference))]
835 if self.request.get('run_as_a_service', False):
836 if backup.filesystem != files.GS_FILESYSTEM:
837 return [('error',
838 'Restore as a service is only available for GS backups')]
839 datastore_admin_service = services_client.DatastoreAdminClient()
840 description = 'Remote restore job: %s' % backup.name
841 remote_job_id = datastore_admin_service.restore_from_backup(
842 description, backup_id, list(kinds))
843 return [('remote_job', remote_job_id)]
845 queue = self.request.get('queue')
846 job_name = 'datastore_backup_restore_%s' % re.sub(r'[^\w]', '_',
847 backup.name)
848 job_operation = None
849 try:
850 operation_name = 'Restoring %s from backup: %s' % (
851 ', '.join(kinds) if kinds else 'all', backup.name)
852 job_operation = utils.StartOperation(operation_name)
853 mapper_params = self._GetBasicMapperParams()
856 kinds = list(kinds) if len(backup_kinds) != len(kinds) else []
857 mapper_params['files'] = get_backup_files(backup, kinds)
858 mapper_params['kind_filter'] = kinds
859 mapper_params['original_app'] = backup.original_app
860 mapreduce_params = {
861 'backup_name': backup.name,
862 'force_ops_writes': True,
864 shard_count = min(max(utils.MAPREDUCE_MIN_SHARDS,
865 len(mapper_params['files'])),
866 utils.MAPREDUCE_MAX_SHARDS)
867 job = utils.StartMap(job_operation.key(), job_name,
868 self.BACKUP_RESTORE_HANDLER, self.INPUT_READER, None,
869 mapper_params, mapreduce_params, queue_name=queue,
870 shard_count=shard_count)
871 return [('job', job)]
872 except Exception:
873 logging.exception('Failed to start a restore from backup job "%s".',
874 job_name)
875 if job_operation:
876 job_operation.status = utils.DatastoreAdminOperation.STATUS_FAILED
877 job_operation.put(force_writes=True)
878 raise
881 class DoBackupImportHandler(BaseDoHandler):
882 """Handler to deal with datastore admin requests to import backup info."""
884 SUFFIX = 'import_backup.do'
886 def get(self):
887 self.post()
889 def post(self):
890 """Handler for post requests to datastore_admin/import_backup.do.
892 Import is executed and user is redirected to the base-path handler.
894 gs_handle = self.request.get('gs_handle')
895 token = self.request.get('xsrf_token')
896 error = None
897 if gs_handle and utils.ValidateXsrfToken(token, XSRF_ACTION):
898 try:
899 bucket_name, path = parse_gs_handle(gs_handle)
900 file_content = get_gs_object(bucket_name, path)
901 entities = parse_backup_info_file(file_content)
902 original_backup_info = entities.next()
903 entity = datastore.Entity(BackupInformation.kind())
904 entity.update(original_backup_info)
905 backup_info = BackupInformation.from_entity(entity)
906 if original_backup_info.key().app() != os.getenv('APPLICATION_ID'):
907 backup_info.original_app = original_backup_info.key().app()
909 def tx():
910 backup_info.put(force_writes=True)
911 kind_files_models = []
912 for entity in entities:
913 kind_files = backup_info.create_kind_backup_files(
914 entity.key().name(), entity['files'])
915 kind_files_models.append(kind_files)
916 db.put(kind_files_models, force_writes=True)
917 db.run_in_transaction(tx)
918 backup_id = str(backup_info.key())
919 except Exception, e:
920 logging.exception('Failed to Import datastore backup information.')
921 error = e.message
923 if error:
924 self.SendRedirect(params=[('error', error)])
925 elif self.request.get('Restore'):
926 ConfirmRestoreFromBackupHandler.Render(
927 self, default_backup_id=backup_id,
928 default_delete_backup_after_restore=True)
929 else:
930 self.SendRedirect()
934 class BackupInformation(db.Model):
935 """An entity to keep information on a datastore backup."""
937 name = db.StringProperty()
938 kinds = db.StringListProperty()
939 namespaces = db.StringListProperty()
940 filesystem = db.StringProperty(default=files.BLOBSTORE_FILESYSTEM)
941 start_time = db.DateTimeProperty(auto_now_add=True)
942 active_jobs = db.StringListProperty()
943 completed_jobs = db.StringListProperty()
944 complete_time = db.DateTimeProperty(default=None)
945 blob_files = db.StringListProperty()
946 original_app = db.StringProperty(default=None)
947 gs_handle = db.TextProperty(default=None)
949 @classmethod
950 def kind(cls):
951 return utils.BACKUP_INFORMATION_KIND
953 @classmethod
954 def name_exists(cls, backup_name):
955 query = BackupInformation.all(keys_only=True)
956 query.filter('name =', backup_name)
957 return query.get() is not None
959 def create_kind_backup_files_key(self, kind):
960 return db.Key.from_path(KindBackupFiles.kind(), kind, parent=self.key())
962 def create_kind_backup_files(self, kind, kind_files):
963 return KindBackupFiles(key=self.create_kind_backup_files_key(kind),
964 files=kind_files)
966 def get_kind_backup_files(self, kinds=None):
967 if kinds:
968 return db.get([self.create_kind_backup_files_key(kind) for kind in kinds])
969 else:
970 return KindBackupFiles.all().ancestor(self).run()
974 class KindBackupFiles(db.Model):
975 """An entity to keep files information per kind for a backup.
977 A key for this model should created using kind as a name and the associated
978 BackupInformation as a parent.
980 files = db.StringListProperty(indexed=False)
982 @property
983 def backup_kind(self):
984 return self.key().name()
986 @classmethod
987 def kind(cls):
988 return utils.BACKUP_INFORMATION_FILES_KIND
991 def BackupCompleteHandler(operation, job_id, mapreduce_state):
992 """Updates BackupInformation record for a completed mapper job."""
993 mapreduce_spec = mapreduce_state.mapreduce_spec
994 filenames = mapreduce_spec.mapper.output_writer_class().get_filenames(
995 mapreduce_state)
996 _perform_backup_complete(operation,
997 job_id,
998 mapreduce_spec.mapper.params['entity_kind'],
999 mapreduce_spec.params['backup_info_pk'],
1000 mapreduce_spec.mapper.params.get('gs_bucket_name'),
1001 filenames,
1002 mapreduce_spec.params.get('done_callback_queue'))
1005 @db.transactional
1006 def _perform_backup_complete(
1007 operation, job_id, kind, backup_info_pk, gs_bucket_name, filenames, queue):
1008 backup_info = BackupInformation.get(backup_info_pk)
1009 if backup_info:
1010 if job_id in backup_info.active_jobs:
1011 backup_info.active_jobs.remove(job_id)
1012 backup_info.completed_jobs = list(
1013 set(backup_info.completed_jobs + [job_id]))
1016 if backup_info.filesystem == files.BLOBSTORE_FILESYSTEM:
1017 filenames = drop_empty_files(filenames)
1018 kind_backup_files = backup_info.get_kind_backup_files([kind])[0]
1019 if kind_backup_files:
1020 kind_backup_files.files = list(set(kind_backup_files.files + filenames))
1021 else:
1022 kind_backup_files = backup_info.create_kind_backup_files(kind, filenames)
1023 db.put((backup_info, kind_backup_files), force_writes=True)
1024 if operation.status == utils.DatastoreAdminOperation.STATUS_COMPLETED:
1025 deferred.defer(finalize_backup_info, backup_info.key(),
1026 gs_bucket_name,
1027 _url=config.DEFERRED_PATH,
1028 _queue=queue,
1029 _transactional=True)
1030 else:
1031 logging.warn('BackupInfo was not found for %s', backup_info_pk)
1034 def finalize_backup_info(backup_info_pk, gs_bucket):
1035 """Finalize the state of BackupInformation and creates info file for GS."""
1037 def get_backup_info():
1038 return BackupInformation.get(backup_info_pk)
1040 backup_info = db.run_in_transaction(get_backup_info)
1041 if backup_info:
1042 complete_time = datetime.datetime.now()
1043 backup_info.complete_time = complete_time
1044 gs_handle = None
1045 if backup_info.filesystem == files.GS_FILESYSTEM:
1051 gs_handle = BackupInfoWriter(gs_bucket).write(backup_info)[0]
1053 def set_backup_info_with_finalize_info():
1054 backup_info = get_backup_info()
1055 backup_info.complete_time = complete_time
1056 backup_info.gs_handle = gs_handle
1057 backup_info.put(force_writes=True)
1058 db.run_in_transaction(set_backup_info_with_finalize_info)
1059 logging.info('Backup %s completed', backup_info.name)
1060 else:
1061 logging.warn('Backup %s could not be found', backup_info_pk)
1064 def parse_backup_info_file(content):
1065 """Returns entities iterator from a backup_info file content."""
1066 reader = records.RecordsReader(cStringIO.StringIO(content))
1067 version = reader.read()
1068 if version != '1':
1069 raise IOError('Unsupported version')
1070 return (datastore.Entity.FromPb(record) for record in reader)
1073 @db.non_transactional
1074 def drop_empty_files(filenames):
1075 """Deletes empty files and returns filenames minus the deleted ones."""
1076 non_empty_filenames = []
1077 empty_file_keys = []
1078 blobs_info = blobstore.BlobInfo.get(
1079 [files.blobstore.get_blob_key(fn) for fn in filenames])
1080 for filename, blob_info in itertools.izip(filenames, blobs_info):
1081 if blob_info:
1082 if blob_info.size > 0:
1083 non_empty_filenames.append(filename)
1084 else:
1085 empty_file_keys.append(blob_info.key())
1086 blobstore_api.delete(empty_file_keys)
1087 return non_empty_filenames
1090 class BackupInfoWriter(object):
1091 """A class for writing Datastore backup metadata files."""
1093 def __init__(self, gs_bucket):
1094 """Construct a BackupInfoWriter.
1096 Args:
1097 gs_bucket: Required string for the target GS bucket.
1099 self.__gs_bucket = gs_bucket
1101 def write(self, backup_info):
1102 """Write the metadata files for the given backup_info.
1104 As a side effect, updates the backup_info in-memory entity object with the
1105 gs_handle to the Backup info filename. This is not saved to the datastore.
1107 Args:
1108 backup_info: Required BackupInformation.
1110 Returns:
1111 A list with Backup info filename followed by Kind info filenames.
1113 fn = self._write_backup_info(backup_info)
1114 return [fn] + self._write_kind_info(backup_info)
1116 def _generate_filename(self, backup_info, suffix):
1117 key_str = str(backup_info.key()).replace('/', '_')
1118 return '/gs/%s/%s%s' % (self.__gs_bucket, key_str, suffix)
1120 def _write_backup_info(self, backup_info):
1121 """Writes a backup_info_file.
1123 Args:
1124 backup_info: Required BackupInformation.
1126 Returns:
1127 Backup info filename.
1129 filename = self._generate_filename(backup_info, '.backup_info')
1130 backup_info.gs_handle = filename
1131 info_file = files.open(files.gs.create(filename), 'a', exclusive_lock=True)
1132 try:
1133 with records.RecordsWriter(info_file) as writer:
1135 writer.write('1')
1137 writer.write(db.model_to_protobuf(backup_info).SerializeToString())
1139 for kind_files in backup_info.get_kind_backup_files():
1140 writer.write(db.model_to_protobuf(kind_files).SerializeToString())
1141 finally:
1142 info_file.close(finalize=True)
1143 return filename
1145 def _write_kind_info(self, backup_info):
1146 """Writes type information schema for each kind in backup_info.
1148 Args:
1149 backup_info: Required BackupInformation.
1151 Returns:
1152 A list with all created filenames.
1154 def get_backup_files_tx():
1155 kind_backup_files_list = []
1157 for kind_backup_files in backup_info.get_kind_backup_files():
1158 kind_backup_files_list.append(kind_backup_files)
1159 return kind_backup_files_list
1161 kind_backup_files_list = db.run_in_transaction(get_backup_files_tx)
1162 filenames = []
1163 for kind_backup_files in kind_backup_files_list:
1164 backup = self._create_kind_backup(backup_info, kind_backup_files)
1165 filename = self._generate_filename(
1166 backup_info, '.%s.backup_info' % kind_backup_files.backup_kind)
1167 self._write_kind_backup_info_file(filename, backup)
1168 filenames.append(filename)
1169 return filenames
1171 def _create_kind_backup(self, backup_info, kind_backup_files):
1172 """Creates and populate a backup_pb2.Backup."""
1173 backup = backup_pb2.Backup()
1174 backup.backup_info.backup_name = backup_info.name
1175 backup.backup_info.start_timestamp = datastore_types.DatetimeToTimestamp(
1176 backup_info.start_time)
1177 backup.backup_info.end_timestamp = datastore_types.DatetimeToTimestamp(
1178 backup_info.complete_time)
1179 kind = kind_backup_files.backup_kind
1180 kind_info = backup.kind_info.add()
1181 kind_info.kind = kind
1182 kind_info.entity_schema.kind = kind
1183 kind_info.file.extend(kind_backup_files.files)
1184 entity_type_info = EntityTypeInfo(kind=kind)
1185 for sharded_aggregation in SchemaAggregationResult.load(
1186 backup_info.key(), kind):
1187 if sharded_aggregation.is_partial:
1188 kind_info.is_partial = True
1189 if sharded_aggregation.entity_type_info:
1190 entity_type_info.merge(sharded_aggregation.entity_type_info)
1191 entity_type_info.populate_entity_schema(kind_info.entity_schema)
1192 return backup
1194 @classmethod
1195 def _write_kind_backup_info_file(cls, filename, backup):
1196 """Writes a kind backup_info.
1198 Args:
1199 filename: The name of the file to be created as string.
1200 backup: apphosting.ext.datastore_admin.Backup proto.
1202 f = files.open(files.gs.create(filename), 'a', exclusive_lock=True)
1203 try:
1204 f.write(backup.SerializeToString())
1205 finally:
1206 f.close(finalize=True)
1209 class PropertyTypeInfo(json_util.JsonMixin):
1210 """Type information for an entity property."""
1212 def __init__(self, name, is_repeated=False, primitive_types=None,
1213 embedded_entities=None):
1214 """Construct a PropertyTypeInfo instance.
1216 Args:
1217 name: The name of the property as a string.
1218 is_repeated: A boolean that indicates if the property is repeated.
1219 primitive_types: Optional list of PrimitiveType integer values.
1220 embedded_entities: Optional list of EntityTypeInfo.
1222 self.__name = name
1223 self.__is_repeated = is_repeated
1224 self.__primitive_types = set(primitive_types) if primitive_types else set()
1225 self.__embedded_entities = {}
1226 for entity in embedded_entities or ():
1227 if entity.kind in self.__embedded_entities:
1228 self.__embedded_entities[entity.kind].merge(entity)
1229 else:
1230 self.__embedded_entities[entity.kind] = entity
1232 @property
1233 def name(self):
1234 return self.__name
1236 @property
1237 def is_repeated(self):
1238 return self.__is_repeated
1240 @property
1241 def primitive_types(self):
1242 return self.__primitive_types
1244 def embedded_entities_kind_iter(self):
1245 return self.__embedded_entities.iterkeys()
1247 def get_embedded_entity(self, kind):
1248 return self.__embedded_entities.get(kind)
1250 def merge(self, other):
1251 """Merge a PropertyTypeInfo with this instance.
1253 Args:
1254 other: Required PropertyTypeInfo to merge.
1256 Returns:
1257 True if anything was changed. False otherwise.
1259 Raises:
1260 ValueError: if property names do not match.
1261 TypeError: if other is not instance of PropertyTypeInfo.
1263 if not isinstance(other, PropertyTypeInfo):
1264 raise TypeError('Expected PropertyTypeInfo, was %r' % (other,))
1266 if other.__name != self.__name:
1267 raise ValueError('Property names mismatch (%s, %s)' %
1268 (self.__name, other.__name))
1269 changed = False
1270 if other.__is_repeated and not self.__is_repeated:
1271 self.__is_repeated = True
1272 changed = True
1273 if not other.__primitive_types.issubset(self.__primitive_types):
1274 self.__primitive_types = self.__primitive_types.union(
1275 other.__primitive_types)
1276 changed = True
1277 for kind, other_embedded_entity in other.__embedded_entities.iteritems():
1278 embedded_entity = self.__embedded_entities.get(kind)
1279 if embedded_entity:
1280 changed = embedded_entity.merge(other_embedded_entity) or changed
1281 else:
1282 self.__embedded_entities[kind] = other_embedded_entity
1283 changed = True
1284 return changed
1286 def populate_entity_schema_field(self, entity_schema):
1287 """Add an populate a Field to the given entity_schema.
1289 Args:
1290 entity_schema: apphosting.ext.datastore_admin.EntitySchema proto.
1292 if not (self.__primitive_types or self.__embedded_entities):
1293 return
1295 field = entity_schema.field.add()
1296 field.name = self.__name
1297 field_type = field.type.add()
1298 field_type.is_list = self.__is_repeated
1299 field_type.primitive_type.extend(self.__primitive_types)
1300 for embedded_entity in self.__embedded_entities.itervalues():
1301 embedded_entity_schema = field_type.embedded_schema.add()
1302 embedded_entity.populate_entity_schema(embedded_entity_schema)
1304 def to_json(self):
1305 json = dict()
1306 json['name'] = self.__name
1307 json['is_repeated'] = self.__is_repeated
1308 json['primitive_types'] = list(self.__primitive_types)
1309 json['embedded_entities'] = [e.to_json() for e in
1310 self.__embedded_entities.itervalues()]
1311 return json
1313 @classmethod
1314 def from_json(cls, json):
1315 return cls(json['name'], json['is_repeated'], json.get('primitive_types'),
1316 [EntityTypeInfo.from_json(entity_json) for entity_json
1317 in json.get('embedded_entities')])
1320 class EntityTypeInfo(json_util.JsonMixin):
1321 """Type information for an entity."""
1323 def __init__(self, kind=None, properties=None):
1324 """Construct an EntityTypeInfo instance.
1326 Args:
1327 kind: An optional kind name as string.
1328 properties: An optional list of PropertyTypeInfo.
1330 self.__kind = kind
1331 self.__properties = {}
1332 for property_type_info in properties or ():
1333 if property_type_info.name in self.__properties:
1334 self.__properties[property_type_info.name].merge(property_type_info)
1335 else:
1336 self.__properties[property_type_info.name] = property_type_info
1338 @property
1339 def kind(self):
1340 return self.__kind
1342 def properties_name_iter(self):
1343 return self.__properties.iterkeys()
1345 def get_property(self, name):
1346 return self.__properties.get(name)
1348 def merge(self, other):
1349 """Merge an EntityTypeInfo with this instance.
1351 Args:
1352 other: Required EntityTypeInfo to merge.
1354 Returns:
1355 True if anything was changed. False otherwise.
1357 Raises:
1358 ValueError: if kinds do not match.
1359 TypeError: if other is not instance of EntityTypeInfo.
1361 if not isinstance(other, EntityTypeInfo):
1362 raise TypeError('Expected EntityTypeInfo, was %r' % (other,))
1364 if other.__kind != self.__kind:
1365 raise ValueError('Kinds mismatch (%s, %s)' % (self.__kind, other.__kind))
1366 changed = False
1367 for name, other_property in other.__properties.iteritems():
1368 self_property = self.__properties.get(name)
1369 if self_property:
1370 changed = self_property.merge(other_property) or changed
1371 else:
1372 self.__properties[name] = other_property
1373 changed = True
1374 return changed
1376 def populate_entity_schema(self, entity_schema):
1377 """Populates the given entity_schema with values from this instance.
1379 Args:
1380 entity_schema: apphosting.ext.datastore_admin.EntitySchema proto.
1382 if self.__kind:
1383 entity_schema.kind = self.__kind
1384 for property_type_info in self.__properties.itervalues():
1385 property_type_info.populate_entity_schema_field(entity_schema)
1387 def to_json(self):
1388 return {
1389 'kind': self.__kind,
1390 'properties': [p.to_json() for p in self.__properties.itervalues()]
1393 @classmethod
1394 def from_json(cls, json):
1395 kind = json.get('kind')
1396 properties_json = json.get('properties')
1397 if properties_json:
1398 return cls(kind, [PropertyTypeInfo.from_json(p) for p in properties_json])
1399 else:
1400 return cls(kind)
1402 @classmethod
1403 def create_from_entity_proto(cls, entity_proto):
1404 """Creates and populates an EntityTypeInfo from an EntityProto."""
1405 properties = [cls.__get_property_type_info(property_proto) for
1406 property_proto in itertools.chain(
1407 entity_proto.property_list(),
1408 entity_proto.raw_property_list())]
1409 kind = utils.get_kind_from_entity_pb(entity_proto)
1410 return cls(kind, properties)
1412 @classmethod
1413 def __get_property_type_info(cls, property_proto):
1414 """Returns the type mapping for the provided property."""
1415 name = property_proto.name()
1416 is_repeated = bool(property_proto.multiple())
1417 primitive_type = None
1418 entity_type = None
1419 if property_proto.has_meaning():
1420 primitive_type = MEANING_TO_PRIMITIVE_TYPE.get(property_proto.meaning())
1421 if primitive_type is None:
1422 value = property_proto.value()
1423 if value.has_int64value():
1424 primitive_type = backup_pb2.EntitySchema.INTEGER
1425 elif value.has_booleanvalue():
1426 primitive_type = backup_pb2.EntitySchema.BOOLEAN
1427 elif value.has_stringvalue():
1428 if property_proto.meaning() == entity_pb.Property.ENTITY_PROTO:
1429 entity_proto = entity_pb.EntityProto()
1430 try:
1431 entity_proto.ParsePartialFromString(value.stringvalue())
1432 except Exception:
1434 pass
1435 else:
1436 entity_type = EntityTypeInfo.create_from_entity_proto(entity_proto)
1437 else:
1438 primitive_type = backup_pb2.EntitySchema.STRING
1439 elif value.has_doublevalue():
1440 primitive_type = backup_pb2.EntitySchema.FLOAT
1441 elif value.has_pointvalue():
1442 primitive_type = backup_pb2.EntitySchema.GEO_POINT
1443 elif value.has_uservalue():
1444 primitive_type = backup_pb2.EntitySchema.USER
1445 elif value.has_referencevalue():
1446 primitive_type = backup_pb2.EntitySchema.REFERENCE
1447 return PropertyTypeInfo(
1448 name, is_repeated,
1449 (primitive_type,) if primitive_type is not None else None,
1450 (entity_type,) if entity_type else None)
1453 class SchemaAggregationResult(db.Model):
1454 """Persistent aggregated type information for a kind.
1456 An instance can be retrieved via the load method or created
1457 using the create method. An instance aggregates all type information
1458 for all seen embedded_entities via the merge method and persisted when needed
1459 using the model put method.
1462 entity_type_info = json_util.JsonProperty(
1463 EntityTypeInfo, default=EntityTypeInfo(), indexed=False)
1464 is_partial = db.BooleanProperty(default=False)
1466 def merge(self, other):
1467 """Merge a SchemaAggregationResult or an EntityTypeInfo with this instance.
1469 Args:
1470 other: Required SchemaAggregationResult or EntityTypeInfo to merge.
1472 Returns:
1473 True if anything was changed. False otherwise.
1475 if self.is_partial:
1476 return False
1477 if isinstance(other, SchemaAggregationResult):
1478 other = other.entity_type_info
1479 return self.entity_type_info.merge(other)
1481 @classmethod
1482 def _get_parent_key(cls, backup_id, kind_name):
1483 return datastore_types.Key.from_path('Kind', kind_name, parent=backup_id)
1485 @classmethod
1486 def create(cls, backup_id, kind_name, shard_id):
1487 """Create SchemaAggregationResult instance.
1489 Args:
1490 backup_id: Required BackupInformation Key.
1491 kind_name: Required kind name as string.
1492 shard_id: Required shard id as string.
1494 Returns:
1495 A new SchemaAggregationResult instance.
1497 parent = cls._get_parent_key(backup_id, kind_name)
1498 return SchemaAggregationResult(
1499 key_name=shard_id, parent=parent,
1500 entity_type_info=EntityTypeInfo(kind=kind_name))
1502 @classmethod
1503 def load(cls, backup_id, kind_name, shard_id=None):
1504 """Retrieve SchemaAggregationResult from the Datastore.
1506 Args:
1507 backup_id: Required BackupInformation Key.
1508 kind_name: Required kind name as string.
1509 shard_id: Optional shard id as string.
1511 Returns:
1512 SchemaAggregationResult iterator or an entity if shard_id not None.
1514 parent = cls._get_parent_key(backup_id, kind_name)
1515 if shard_id:
1516 key = datastore_types.Key.from_path(cls.kind(), shard_id, parent=parent)
1517 return SchemaAggregationResult.get(key)
1518 else:
1519 return db.Query(cls).ancestor(parent).run()
1521 @classmethod
1522 def kind(cls):
1523 return utils.BACKUP_INFORMATION_KIND_TYPE_INFO
1527 class SchemaAggregationPool(object):
1528 """An MR pool to aggregation type information per kind."""
1530 def __init__(self, backup_id, kind, shard_id):
1531 """Construct SchemaAggregationPool instance.
1533 Args:
1534 backup_id: Required BackupInformation Key.
1535 kind: Required kind name as string.
1536 shard_id: Required shard id as string.
1538 self.__backup_id = backup_id
1539 self.__kind = kind
1540 self.__shard_id = shard_id
1541 self.__aggregation = SchemaAggregationResult.load(backup_id, kind, shard_id)
1542 if not self.__aggregation:
1543 self.__aggregation = SchemaAggregationResult.create(backup_id, kind,
1544 shard_id)
1545 self.__needs_save = True
1546 else:
1547 self.__needs_save = False
1549 def merge(self, entity_type_info):
1550 """Merge EntityTypeInfo into aggregated type information."""
1551 if self.__aggregation.merge(entity_type_info):
1552 self.__needs_save = True
1554 def flush(self):
1555 """Save aggregated type information to the datastore if changed."""
1556 if self.__needs_save:
1558 def update_aggregation_tx():
1559 aggregation = SchemaAggregationResult.load(
1560 self.__backup_id, self.__kind, self.__shard_id)
1561 if aggregation:
1562 if aggregation.merge(self.__aggregation):
1563 aggregation.put(force_writes=True)
1564 self.__aggregation = aggregation
1565 else:
1566 self.__aggregation.put(force_writes=True)
1568 def mark_aggregation_as_partial_tx():
1569 aggregation = SchemaAggregationResult.load(
1570 self.__backup_id, self.__kind, self.__shard_id)
1571 if aggregation is None:
1572 aggregation = SchemaAggregationResult.create(
1573 self.__backup_id, self.__kind, self.__shard_id)
1574 aggregation.is_partial = True
1575 aggregation.put(force_writes=True)
1576 self.__aggregation = aggregation
1578 try:
1579 db.run_in_transaction(update_aggregation_tx)
1580 except apiproxy_errors.RequestTooLargeError:
1581 db.run_in_transaction(mark_aggregation_as_partial_tx)
1582 self.__needs_save = False
1585 class AggregateSchema(op.Operation):
1586 """An MR Operation to aggregation type information for a kind.
1588 This operation will register an MR pool, SchemaAggregationPool, if
1589 one is not already registered and will invoke the pool's merge operation
1590 per entity. The pool is responsible for keeping a persistent state of
1591 type aggregation using the sharded db model, SchemaAggregationResult.
1594 def __init__(self, entity_proto):
1595 self.__entity_info = EntityTypeInfo.create_from_entity_proto(entity_proto)
1597 def __call__(self, ctx):
1598 pool = ctx.get_pool('schema_aggregation_pool')
1599 if not pool:
1600 backup_id = datastore_types.Key(
1601 context.get().mapreduce_spec.params['backup_info_pk'])
1602 pool = SchemaAggregationPool(
1603 backup_id, self.__entity_info.kind, ctx.shard_id)
1604 ctx.register_pool('schema_aggregation_pool', pool)
1605 pool.merge(self.__entity_info)
1608 class BackupEntity(object):
1609 """A class which dumps the entity to the writer."""
1611 def map(self, entity_proto):
1612 """Backup entity map handler.
1614 Args:
1615 entity_proto: An instance of entity_pb.EntityProto.
1617 Yields:
1618 A serialized entity_pb.EntityProto as a string
1620 yield entity_proto.SerializeToString()
1621 yield AggregateSchema(entity_proto)
1624 class RestoreEntity(object):
1625 """A class which restore the entity to datastore."""
1627 def __init__(self):
1628 self.initialized = False
1629 self.kind_filter = None
1630 self.app_id = None
1632 def initialize(self):
1633 if self.initialized:
1634 return
1635 mapper_params = context.get().mapreduce_spec.mapper.params
1636 kind_filter = mapper_params.get('kind_filter')
1637 self.kind_filter = set(kind_filter) if kind_filter else None
1638 original_app = mapper_params.get('original_app')
1639 if original_app and os.getenv('APPLICATION_ID') != original_app:
1640 self.app_id = os.getenv('APPLICATION_ID')
1641 self.initialized = True
1643 def map(self, record):
1644 """Restore entity map handler.
1646 Args:
1647 record: A serialized entity_pb.EntityProto.
1649 Yields:
1650 A operation.db.Put for the mapped entity
1652 self.initialize()
1653 pb = entity_pb.EntityProto(contents=record)
1654 if self.app_id:
1655 utils.FixKeys(pb, self.app_id)
1656 entity = datastore.Entity.FromPb(pb)
1657 if not self.kind_filter or entity.kind() in self.kind_filter:
1658 yield op.db.Put(entity)
1659 if self.app_id:
1660 yield utils.ReserveKey(entity.key())
1663 def validate_gs_bucket_name(bucket_name):
1664 """Validate the format of the given bucket_name.
1666 Validation rules are based:
1667 https://developers.google.com/storage/docs/bucketnaming#requirements
1669 Args:
1670 bucket_name: The bucket name to validate.
1672 Raises:
1673 BackupValidationError: If the bucket name is invalid.
1675 if len(bucket_name) > MAX_BUCKET_LEN:
1676 raise BackupValidationError(
1677 'Bucket name length should not be longer than %d' % MAX_BUCKET_LEN)
1678 if len(bucket_name) < MIN_BUCKET_LEN:
1679 raise BackupValidationError(
1680 'Bucket name length should be longer than %d' % MIN_BUCKET_LEN)
1681 if bucket_name.lower().startswith('goog'):
1682 raise BackupValidationError(
1683 'Bucket name should not start with a "goog" prefix')
1684 bucket_elements = bucket_name.split('.')
1685 for bucket_element in bucket_elements:
1686 if len(bucket_element) > MAX_BUCKET_SEGMENT_LEN:
1687 raise BackupValidationError(
1688 'Segment length of bucket name should not be longer than %d' %
1689 MAX_BUCKET_SEGMENT_LEN)
1690 if not re.match(BUCKET_PATTERN, bucket_name):
1691 raise BackupValidationError('Invalid bucket name "%s"' % bucket_name)
1694 def is_accessible_bucket_name(bucket_name):
1695 """Returns True if the application has access to the specified bucket."""
1696 scope = config.GoogleApiScope('devstorage.read_write')
1697 bucket_url = config.GsBucketURL(bucket_name)
1698 auth_token, _ = app_identity.get_access_token(scope)
1699 result = urlfetch.fetch(bucket_url, method=urlfetch.HEAD, headers={
1700 'Authorization': 'OAuth %s' % auth_token,
1701 'x-goog-api-version': '2'})
1702 return result and result.status_code == 200
1705 def verify_bucket_writable(bucket_name):
1706 """Verify the application can write to the specified bucket.
1708 Args:
1709 bucket_name: The bucket to verify.
1711 Raises:
1712 BackupValidationError: If the bucket is not writable.
1714 path = '/gs/%s' % bucket_name
1715 try:
1716 file_names = files.gs.listdir(path,
1717 {'prefix': TEST_WRITE_FILENAME_PREFIX,
1718 'max_keys': MAX_KEYS_LIST_SIZE})
1719 except (files.InvalidParameterError, files.PermissionDeniedError):
1720 raise BackupValidationError('Bucket "%s" not accessible' % bucket_name)
1721 except files.InvalidFileNameError:
1722 raise BackupValidationError('Bucket "%s" does not exist' % bucket_name)
1723 file_name = '%s/%s.tmp' % (path, TEST_WRITE_FILENAME_PREFIX)
1724 file_name_try = 0
1725 while True:
1726 if file_name_try >= MAX_TEST_FILENAME_TRIES:
1729 return
1730 if file_name not in file_names:
1731 break
1732 gen = random.randint(0, 9999)
1733 file_name = '%s/%s_%s.tmp' % (path, TEST_WRITE_FILENAME_PREFIX, gen)
1734 file_name_try += 1
1735 try:
1736 test_file = files.open(files.gs.create(file_name), 'a', exclusive_lock=True)
1737 try:
1738 test_file.write('test')
1739 finally:
1740 test_file.close(finalize=True)
1741 except files.PermissionDeniedError:
1742 raise BackupValidationError('Bucket "%s" is not writable' % bucket_name)
1743 try:
1744 files.delete(file_name)
1745 except (files.InvalidArgumentError, files.InvalidFileNameError, IOError):
1746 logging.warn('Failed to delete test file %s', file_name)
1749 def is_readable_gs_handle(gs_handle):
1750 """Return True if the application can read the specified gs_handle."""
1751 try:
1752 with files.open(gs_handle) as bak_file:
1753 bak_file.read(1)
1754 except files.PermissionDeniedError:
1755 return False
1756 return True
1760 def parse_gs_handle(gs_handle):
1761 """Splits [/gs/]?bucket_name[/folder]*[/file]? to (bucket_name, path | '')."""
1762 if gs_handle.startswith('/'):
1763 filesystem = gs_handle[1:].split('/', 1)[0]
1764 if filesystem == 'gs':
1765 gs_handle = gs_handle[4:]
1766 else:
1767 raise BackupValidationError('Unsupported filesystem: %s' % filesystem)
1768 tokens = gs_handle.split('/', 1)
1769 return (tokens[0], '') if len(tokens) == 1 else tuple(tokens)
1772 def validate_and_canonicalize_gs_bucket(gs_bucket_name):
1773 bucket_name, path = parse_gs_handle(gs_bucket_name)
1774 gs_bucket_name = ('%s/%s' % (bucket_name, path)).rstrip('/')
1775 validate_gs_bucket_name(bucket_name)
1776 verify_bucket_writable(bucket_name)
1777 return gs_bucket_name
1780 def list_bucket_files(bucket_name, prefix, max_keys=1000):
1781 """Returns a listing of of a bucket that matches the given prefix."""
1782 scope = config.GoogleApiScope('devstorage.read_only')
1783 bucket_url = config.GsBucketURL(bucket_name)
1784 url = bucket_url + '?'
1785 query = [('max-keys', max_keys)]
1786 if prefix:
1787 query.append(('prefix', prefix))
1788 url += urllib.urlencode(query)
1789 auth_token, _ = app_identity.get_access_token(scope)
1790 result = urlfetch.fetch(url, method=urlfetch.GET, headers={
1791 'Authorization': 'OAuth %s' % auth_token,
1792 'x-goog-api-version': '2'})
1793 if result and result.status_code == 200:
1794 doc = xml.dom.minidom.parseString(result.content)
1795 return [node.childNodes[0].data for node in doc.getElementsByTagName('Key')]
1796 raise BackupValidationError('Request to Google Cloud Storage failed')
1799 def get_gs_object(bucket_name, path):
1800 """Returns a listing of of a bucket that matches the given prefix."""
1801 scope = config.GoogleApiScope('devstorage.read_only')
1802 bucket_url = config.GsBucketURL(bucket_name)
1803 url = bucket_url + path
1804 auth_token, _ = app_identity.get_access_token(scope)
1805 result = urlfetch.fetch(url, method=urlfetch.GET, headers={
1806 'Authorization': 'OAuth %s' % auth_token,
1807 'x-goog-api-version': '2'})
1808 if result and result.status_code == 200:
1809 return result.content
1810 if result and result.status_code == 403:
1811 raise BackupValidationError(
1812 'Requested path %s is not accessible/access denied' % url)
1813 if result and result.status_code == 404:
1814 raise BackupValidationError('Requested path %s was not found' % url)
1815 raise BackupValidationError('Error encountered accessing requested path %s' %
1816 url)
1821 def get_queue_names(app_id=None, max_rows=100):
1822 """Returns a list with all non-special queue names for app_id."""
1823 rpc = apiproxy_stub_map.UserRPC('taskqueue')
1824 request = taskqueue_service_pb.TaskQueueFetchQueuesRequest()
1825 response = taskqueue_service_pb.TaskQueueFetchQueuesResponse()
1826 if app_id:
1827 request.set_app_id(app_id)
1828 request.set_max_rows(max_rows)
1829 queues = ['default']
1830 try:
1831 rpc.make_call('FetchQueues', request, response)
1832 rpc.check_success()
1834 for queue in response.queue_list():
1835 if (queue.mode() == taskqueue_service_pb.TaskQueueMode.PUSH and
1836 not queue.queue_name().startswith('__') and
1837 queue.queue_name() != 'default'):
1838 queues.append(queue.queue_name())
1839 except Exception:
1840 logging.exception('Failed to get queue names.')
1841 return queues
1844 def handlers_list(base_path):
1845 return [
1846 (r'%s/%s' % (base_path, BackupLinkHandler.SUFFIX),
1847 BackupLinkHandler),
1848 (r'%s/%s' % (base_path, ConfirmBackupHandler.SUFFIX),
1849 ConfirmBackupHandler),
1850 (r'%s/%s' % (base_path, DoBackupHandler.SUFFIX), DoBackupHandler),
1851 (r'%s/%s' % (base_path, DoBackupRestoreHandler.SUFFIX),
1852 DoBackupRestoreHandler),
1853 (r'%s/%s' % (base_path, DoBackupDeleteHandler.SUFFIX),
1854 DoBackupDeleteHandler),
1855 (r'%s/%s' % (base_path, DoBackupAbortHandler.SUFFIX),
1856 DoBackupAbortHandler),
1857 (r'%s/%s' % (base_path, DoBackupImportHandler.SUFFIX),
1858 DoBackupImportHandler),