App Engine Python SDK version 1.9.9
[gae.git] / python / google / appengine / ext / datastore_admin / backup_handler.py
blobf74f60c5b234cf1a41fbe8312f99d3d624560343
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
22 """Handler for data backup operation.
24 Generic datastore admin console transfers control to ConfirmBackupHandler
25 after selection of entities. The ConfirmBackupHandler confirms with user
26 his choice, enters a backup name and transfers control to
27 DoBackupHandler. DoBackupHandler starts backup mappers and displays confirmation
28 page.
30 This module also contains actual mapper code for backing data over.
31 """
33 from __future__ import with_statement
37 import cStringIO
38 import datetime
39 import itertools
40 import logging
41 import os
42 import random
43 import re
44 import time
45 import urllib
46 import xml.dom.minidom
49 from google.appengine.datastore import entity_pb
50 from google.appengine.api import apiproxy_stub_map
51 from google.appengine.api import app_identity
52 from google.appengine.api import blobstore as blobstore_api
53 from google.appengine.api import capabilities
54 from google.appengine.api import datastore
55 from google.appengine.api import datastore_types
56 from google.appengine.api import files
57 from google.appengine.api import taskqueue
58 from google.appengine.api import urlfetch
59 from google.appengine.api.files import records
60 from google.appengine.api.taskqueue import taskqueue_service_pb
61 from google.appengine.datastore import datastore_query
62 from google.appengine.datastore import datastore_rpc
63 from google.appengine.ext import blobstore
64 from google.appengine.ext import db
65 from google.appengine.ext import deferred
66 from google.appengine.ext import webapp
67 from google.appengine.ext.datastore_admin import backup_pb2
68 from google.appengine.ext.datastore_admin import config
69 from google.appengine.ext.datastore_admin import utils
70 from google.appengine.runtime import apiproxy_errors
73 try:
75 from google.appengine.ext.mapreduce import context
76 from google.appengine.ext.mapreduce import datastore_range_iterators as db_iters
77 from google.appengine.ext.mapreduce import input_readers
78 from google.appengine.ext.mapreduce import json_util
79 from google.appengine.ext.mapreduce import operation as op
80 from google.appengine.ext.mapreduce import output_writers
81 except ImportError:
83 from google.appengine._internal.mapreduce import context
84 from google.appengine._internal.mapreduce import datastore_range_iterators as db_iters
85 from google.appengine._internal.mapreduce import input_readers
86 from google.appengine._internal.mapreduce import json_util
87 from google.appengine._internal.mapreduce import operation as op
88 from google.appengine._internal.mapreduce import output_writers
90 try:
92 from google.appengine.ext.datastore_admin import services_client
93 except ImportError:
95 pass
98 XSRF_ACTION = 'backup'
99 BUCKET_PATTERN = (r'^([a-zA-Z0-9]+([\-_]+[a-zA-Z0-9]+)*)'
100 r'(\.([a-zA-Z0-9]+([\-_]+[a-zA-Z0-9]+)*))*$')
101 MAX_BUCKET_LEN = 222
102 MIN_BUCKET_LEN = 3
103 MAX_BUCKET_SEGMENT_LEN = 63
104 NUM_KINDS_DEFERRED_THRESHOLD = 10
105 MAX_BLOBS_PER_DELETE = 500
106 TEST_WRITE_FILENAME_PREFIX = 'datastore_backup_write_test'
107 MAX_KEYS_LIST_SIZE = 100
108 MAX_TEST_FILENAME_TRIES = 10
110 MEANING_TO_PRIMITIVE_TYPE = {
111 entity_pb.Property.GD_WHEN: backup_pb2.EntitySchema.DATE_TIME,
112 entity_pb.Property.GD_RATING: backup_pb2.EntitySchema.RATING,
113 entity_pb.Property.ATOM_LINK: backup_pb2.EntitySchema.LINK,
114 entity_pb.Property.ATOM_CATEGORY: backup_pb2.EntitySchema.CATEGORY,
115 entity_pb.Property.GD_PHONENUMBER: backup_pb2.EntitySchema.PHONE_NUMBER,
116 entity_pb.Property.GD_POSTALADDRESS: backup_pb2.EntitySchema.POSTAL_ADDRESS,
117 entity_pb.Property.GD_EMAIL: backup_pb2.EntitySchema.EMAIL,
118 entity_pb.Property.GD_IM: backup_pb2.EntitySchema.IM_HANDLE,
119 entity_pb.Property.BLOBKEY: backup_pb2.EntitySchema.BLOB_KEY,
120 entity_pb.Property.TEXT: backup_pb2.EntitySchema.TEXT,
121 entity_pb.Property.BLOB: backup_pb2.EntitySchema.BLOB,
122 entity_pb.Property.BYTESTRING: backup_pb2.EntitySchema.SHORT_BLOB
126 class ConfirmBackupHandler(webapp.RequestHandler):
127 """Handler to deal with requests from the admin console to backup data."""
129 SUFFIX = 'confirm_backup'
131 @classmethod
132 def Render(cls, handler):
133 """Rendering method that can be called by main.py.
135 Args:
136 handler: the webapp.RequestHandler invoking the method
138 kinds = handler.request.get_all('kind')
139 sizes_known, size_total, remainder = utils.ParseKindsAndSizes(kinds)
140 notreadonly_warning = capabilities.CapabilitySet(
141 'datastore_v3', capabilities=['write']).is_enabled()
142 blob_warning = bool(blobstore.BlobInfo.all().count(1))
143 template_params = {
144 'run_as_a_service': handler.request.get('run_as_a_service'),
145 'form_target': DoBackupHandler.SUFFIX,
146 'kind_list': kinds,
147 'remainder': remainder,
148 'sizes_known': sizes_known,
149 'size_total': size_total,
150 'queues': None,
151 'datastore_admin_home': utils.GenerateHomeUrl(handler.request),
152 'namespaces': get_namespaces(handler.request.get('namespace', None)),
153 'xsrf_token': utils.CreateXsrfToken(XSRF_ACTION),
154 'notreadonly_warning': notreadonly_warning,
155 'blob_warning': blob_warning,
156 'backup_name': 'datastore_backup_%s' % time.strftime('%Y_%m_%d')
158 utils.RenderToResponse(handler, 'confirm_backup.html', template_params)
161 def get_namespaces(selected_namespace):
162 namespaces = [('--All--', '*', selected_namespace is None)]
163 for ns in datastore.Query('__namespace__', keys_only=True).Run():
164 ns_name = ns.name() or ''
165 namespaces.append((ns_name or '--Default--',
166 ns_name,
167 ns_name == selected_namespace))
168 return namespaces
171 class ConfirmDeleteBackupHandler(webapp.RequestHandler):
172 """Handler to confirm admin console requests to delete a backup copy."""
174 SUFFIX = 'confirm_delete_backup'
176 @classmethod
177 def Render(cls, handler):
178 """Rendering method that can be called by main.py.
180 Args:
181 handler: the webapp.RequestHandler invoking the method
183 requested_backup_ids = handler.request.get_all('backup_id')
184 backups = []
185 gs_warning = False
186 if requested_backup_ids:
187 for backup in db.get(requested_backup_ids):
188 if backup:
189 backups.append(backup)
190 gs_warning |= backup.filesystem == files.GS_FILESYSTEM
191 template_params = {
192 'form_target': DoBackupDeleteHandler.SUFFIX,
193 'datastore_admin_home': utils.GenerateHomeUrl(handler.request),
194 'backups': backups,
195 'xsrf_token': utils.CreateXsrfToken(XSRF_ACTION),
196 'gs_warning': gs_warning,
197 'run_as_a_service': handler.request.get('run_as_a_service'),
199 utils.RenderToResponse(handler, 'confirm_delete_backup.html',
200 template_params)
203 class ConfirmAbortBackupHandler(webapp.RequestHandler):
204 """Handler to confirm admin console requests to abort a backup copy."""
206 SUFFIX = 'confirm_abort_backup'
208 @classmethod
209 def Render(cls, handler):
210 """Rendering method that can be called by main.py.
212 Args:
213 handler: the webapp.RequestHandler invoking the method
215 requested_backup_ids = handler.request.get_all('backup_id')
216 backups = []
217 if requested_backup_ids:
218 for backup in db.get(requested_backup_ids):
219 if backup:
220 backups.append(backup)
221 template_params = {
222 'form_target': DoBackupAbortHandler.SUFFIX,
223 'datastore_admin_home': utils.GenerateHomeUrl(handler.request),
224 'backups': backups,
225 'xsrf_token': utils.CreateXsrfToken(XSRF_ACTION),
226 'run_as_a_service': handler.request.get('run_as_a_service'),
228 utils.RenderToResponse(handler, 'confirm_abort_backup.html',
229 template_params)
232 class ConfirmRestoreFromBackupHandler(webapp.RequestHandler):
233 """Handler to confirm admin console requests to restore from backup."""
235 SUFFIX = 'confirm_restore_from_backup'
237 @classmethod
238 def Render(cls, handler, default_backup_id=None,
239 default_delete_backup_after_restore=False):
240 """Rendering method that can be called by main.py.
242 Args:
243 handler: the webapp.RequestHandler invoking the method
244 default_backup_id: default value for handler.request
245 default_delete_backup_after_restore: default value for handler.request
247 backup_id = handler.request.get('backup_id', default_backup_id)
248 backup = db.get(backup_id) if backup_id else None
249 notreadonly_warning = capabilities.CapabilitySet(
250 'datastore_v3', capabilities=['write']).is_enabled()
251 original_app_warning = backup.original_app
252 if os.getenv('APPLICATION_ID') == original_app_warning:
253 original_app_warning = None
254 template_params = {
255 'form_target': DoBackupRestoreHandler.SUFFIX,
256 'queues': None,
257 'datastore_admin_home': utils.GenerateHomeUrl(handler.request),
258 'backup': backup,
259 'delete_backup_after_restore': handler.request.get(
260 'delete_backup_after_restore', default_delete_backup_after_restore),
261 'xsrf_token': utils.CreateXsrfToken(XSRF_ACTION),
262 'notreadonly_warning': notreadonly_warning,
263 'original_app_warning': original_app_warning,
264 'run_as_a_service': handler.request.get('run_as_a_service'),
266 utils.RenderToResponse(handler, 'confirm_restore_from_backup.html',
267 template_params)
270 class ConfirmBackupImportHandler(webapp.RequestHandler):
271 """Handler to import backup information."""
273 SUFFIX = 'backup_information'
275 @classmethod
276 def Render(cls, handler):
277 """Rendering method that can be called by main.py.
279 Args:
280 handler: the webapp.RequestHandler invoking the method
282 gs_handle = handler.request.get('gs_handle')
283 error = None if gs_handle else 'Google Cloud Storage path is missing'
284 other_backup_info_files = []
285 selected_backup_info_file = None
286 backup_info_specified = False
287 if not error:
288 try:
289 gs_handle = gs_handle.rstrip()
290 bucket_name, prefix = parse_gs_handle(gs_handle)
291 validate_gs_bucket_name(bucket_name)
292 if not is_accessible_bucket_name(bucket_name):
293 raise BackupValidationError(
294 'Bucket "%s" is not accessible' % bucket_name)
295 if prefix.endswith('.backup_info'):
296 prefix = prefix[0:prefix.rfind('/')]
297 backup_info_specified = True
298 elif prefix and not prefix.endswith('/'):
299 prefix += '/'
300 for backup_info_file in list_bucket_files(bucket_name, prefix):
301 backup_info_path = '/gs/%s/%s' % (bucket_name, backup_info_file)
302 if backup_info_specified and backup_info_path == gs_handle:
303 selected_backup_info_file = backup_info_path
304 elif (backup_info_file.endswith('.backup_info')
305 and backup_info_file.count('.') == 1):
306 other_backup_info_files.append(backup_info_path)
307 except Exception, ex:
308 error = 'Failed to read bucket: %s' % ex.message
309 logging.exception(ex.message)
310 template_params = {
311 'error': error,
312 'form_target': DoBackupImportHandler.SUFFIX,
313 'datastore_admin_home': utils.GenerateHomeUrl(handler.request),
314 'selected_backup_info_file': selected_backup_info_file,
315 'other_backup_info_files': other_backup_info_files,
316 'backup_info_specified': backup_info_specified,
317 'xsrf_token': utils.CreateXsrfToken(XSRF_ACTION),
318 'run_as_a_service': handler.request.get('run_as_a_service'),
320 utils.RenderToResponse(handler, 'confirm_backup_import.html',
321 template_params)
324 class BackupInformationHandler(webapp.RequestHandler):
325 """Handler to display backup information."""
327 SUFFIX = 'backup_information'
329 @classmethod
330 def Render(cls, handler):
331 """Rendering method that can be called by main.py.
333 Args:
334 handler: the webapp.RequestHandler invoking the method
336 backup_ids = handler.request.get_all('backup_id')
337 template_params = {
338 'backups': db.get(backup_ids),
339 'datastore_admin_home': utils.GenerateHomeUrl(handler.request),
340 'run_as_a_service': handler.request.get('run_as_a_service'),
342 utils.RenderToResponse(handler, 'backup_information.html', template_params)
345 class BaseDoHandler(webapp.RequestHandler):
346 """Base class for all Do*Handlers."""
348 MAPREDUCE_DETAIL = config.MAPREDUCE_PATH + '/detail?mapreduce_id='
350 def get(self):
351 """Handler for get requests to datastore_admin backup operations.
353 Status of executed jobs is displayed.
355 jobs = self.request.get_all('job')
356 remote_job = self.request.get('remote_job')
357 tasks = self.request.get_all('task')
358 error = self.request.get('error', '')
359 xsrf_error = self.request.get('xsrf_error', '')
360 template_params = {
361 'job_list': jobs,
362 'remote_job': remote_job,
363 'task_list': tasks,
364 'mapreduce_detail': self.MAPREDUCE_DETAIL,
365 'error': error,
366 'xsrf_error': xsrf_error,
367 'datastore_admin_home': utils.GenerateHomeUrl(self.request),
369 utils.RenderToResponse(self, self._get_html_page, template_params)
371 @property
372 def _get_html_page(self):
373 """Return the name of the HTML page for HTTP/GET requests."""
374 raise NotImplementedError
376 @property
377 def _get_post_html_page(self):
378 """Return the name of the HTML page for HTTP/POST requests."""
379 raise NotImplementedError
381 def _ProcessPostRequest(self):
382 """Process the HTTP/POST request and return the result as parametrs."""
383 raise NotImplementedError
385 def _GetBasicMapperParams(self):
386 namespace = self.request.get('namespace', None)
387 if namespace == '*':
388 namespace = None
389 return {'namespace': namespace}
391 def SendRedirect(self, path=None, params=()):
392 """Send a redirect response."""
394 run_as_a_service = self.request.get('run_as_a_service')
395 if run_as_a_service:
396 params = list(params)
397 params.append(('run_as_a_service', True))
398 dest = config.BASE_PATH
399 if path:
400 dest = '%s/%s' % (dest, path)
401 if params:
402 dest = '%s?%s' % (dest, urllib.urlencode(params))
403 self.redirect(dest)
405 def post(self):
406 """Handler for post requests to datastore_admin/backup.do.
408 Redirects to the get handler after processing the request.
410 token = self.request.get('xsrf_token')
412 if not utils.ValidateXsrfToken(token, XSRF_ACTION):
413 parameters = [('xsrf_error', '1')]
414 else:
415 try:
416 parameters = self._ProcessPostRequest()
419 except Exception, e:
420 error = self._HandleException(e)
421 parameters = [('error', error)]
423 self.SendRedirect(self._get_post_html_page, parameters)
425 def _HandleException(self, e):
426 """Make exception handling overridable by tests.
428 Args:
429 e: The exception to handle.
431 Returns:
432 The exception error string.
434 logging.exception(e.message)
435 return '%s: %s' % (type(e), e.message)
438 class BackupValidationError(utils.Error):
439 """Raised upon backup request validation."""
442 def _perform_backup(run_as_a_service, kinds, selected_namespace,
443 filesystem, gs_bucket_name, backup,
444 queue, mapper_params, max_jobs):
445 """Triggers backup mapper jobs.
447 Args:
448 run_as_a_service: True if backup should be done via admin-jobs
449 kinds: a sequence of kind names
450 selected_namespace: The selected namespace or None for all
451 filesystem: files.BLOBSTORE_FILESYSTEM or files.GS_FILESYSTEM
452 or None to default to blobstore
453 gs_bucket_name: the GS file system bucket in which to store the backup
454 when using the GS file system, and otherwise ignored
455 backup: the backup name
456 queue: the task queue for the backup task
457 mapper_params: the mapper parameters
458 max_jobs: if backup needs more jobs than this, defer them
460 Returns:
461 The job or task ids.
463 Raises:
464 BackupValidationError: On validation error.
465 Exception: On other error.
467 BACKUP_COMPLETE_HANDLER = __name__ + '.BackupCompleteHandler'
468 BACKUP_HANDLER = __name__ + '.BackupEntity.map'
469 INPUT_READER = __name__ + '.DatastoreEntityProtoInputReader'
470 OUTPUT_WRITER = output_writers.__name__ + '.FileRecordsOutputWriter'
472 if run_as_a_service:
473 if not gs_bucket_name:
474 raise BackupValidationError('Bucket name missing.')
475 gs_bucket_name = validate_and_canonicalize_gs_bucket(gs_bucket_name)
476 datastore_admin_service = services_client.DatastoreAdminClient()
477 description = 'Remote backup job: %s' % backup
478 remote_job_id = datastore_admin_service.create_backup(
479 description, backup, gs_bucket_name, selected_namespace, kinds)
480 return [('remote_job', remote_job_id)]
482 queue = queue or os.environ.get('HTTP_X_APPENGINE_QUEUENAME', 'default')
483 if queue[0] == '_':
485 queue = 'default'
486 if not filesystem:
487 filesystem = files.BLOBSTORE_FILESYSTEM
488 if filesystem == files.GS_FILESYSTEM:
490 if not gs_bucket_name:
491 raise BackupValidationError('Bucket name missing.')
492 gs_bucket_name = validate_and_canonicalize_gs_bucket(gs_bucket_name)
493 elif filesystem == files.BLOBSTORE_FILESYSTEM:
494 pass
495 else:
496 raise BackupValidationError('Unknown filesystem "%s".' % filesystem)
498 backup_info = None
499 job_operation = None
501 job_name = 'datastore_backup_%s_%%(kind)s' % re.sub(r'[^\w]', '_', backup)
502 try:
503 job_operation = utils.StartOperation('Backup: %s' % backup)
504 backup_info = BackupInformation(parent=job_operation)
505 backup_info.filesystem = filesystem
506 backup_info.name = backup
507 backup_info.kinds = kinds
508 if selected_namespace is not None:
509 backup_info.namespaces = [selected_namespace]
510 backup_info.put(force_writes=True)
511 mapreduce_params = {
512 'done_callback_handler': BACKUP_COMPLETE_HANDLER,
513 'backup_info_pk': str(backup_info.key()),
514 'force_ops_writes': True,
516 mapper_params = dict(mapper_params)
517 mapper_params['filesystem'] = filesystem
518 if filesystem == files.GS_FILESYSTEM:
519 mapper_params['gs_bucket_name'] = gs_bucket_name
520 if len(kinds) <= max_jobs:
521 return [('job', job) for job in _run_map_jobs(
522 job_operation.key(), backup_info.key(), kinds, job_name,
523 BACKUP_HANDLER, INPUT_READER, OUTPUT_WRITER,
524 mapper_params, mapreduce_params, queue)]
525 else:
526 retry_options = taskqueue.TaskRetryOptions(task_retry_limit=1)
527 deferred_task = deferred.defer(_run_map_jobs_deferred,
528 backup, job_operation.key(),
529 backup_info.key(), kinds, job_name,
530 BACKUP_HANDLER, INPUT_READER,
531 OUTPUT_WRITER, mapper_params,
532 mapreduce_params, queue, _queue=queue,
533 _url=config.DEFERRED_PATH,
534 _retry_options=retry_options)
535 return [('task', deferred_task.name)]
536 except Exception:
537 logging.exception('Failed to start a datastore backup job[s] for "%s".',
538 backup)
539 if backup_info:
540 delete_backup_info(backup_info)
541 if job_operation:
542 job_operation.status = utils.DatastoreAdminOperation.STATUS_FAILED
543 job_operation.put(force_writes=True)
544 raise
547 class BackupLinkHandler(webapp.RequestHandler):
548 """Handler to deal with requests to the backup link to backup data."""
550 SUFFIX = 'backup.create'
552 def get(self):
553 """Handler for get requests to datastore_admin/backup.create."""
554 self.post()
556 def post(self):
557 """Handler for post requests to datastore_admin/backup.create."""
558 try:
563 if ('X-AppEngine-TaskName' not in self.request.headers and
564 'X-AppEngine-Cron' not in self.request.headers):
565 logging.critical('Scheduled backups must be started via task queue or '
566 'cron.')
567 self.response.set_status(403)
568 return
570 backup_prefix = self.request.get('name')
571 if not backup_prefix:
572 if self.request.headers.get('X-AppEngine-Cron'):
573 backup_prefix = 'cron-'
574 else:
575 backup_prefix = 'link-'
576 backup_prefix_with_date = backup_prefix + time.strftime('%Y_%m_%d')
577 backup_name = backup_prefix_with_date
578 backup_suffix_counter = 1
579 while BackupInformation.name_exists(backup_name):
580 backup_suffix_counter += 1
581 backup_name = backup_prefix_with_date + '-' + str(backup_suffix_counter)
582 kinds = self.request.get_all('kind')
583 if not kinds:
584 self.errorResponse('Backup must include at least one kind.')
585 return
586 for kind in kinds:
587 if not utils.IsKindNameVisible(kind):
588 self.errorResponse('Invalid kind %s.' % kind)
589 return
590 namespace = self.request.get('namespace', None)
591 if namespace == '*':
592 namespace = None
593 mapper_params = {'namespace': namespace}
594 _perform_backup(self.request.get('run_as_a_service', False),
595 kinds,
596 namespace,
597 self.request.get('filesystem'),
598 self.request.get('gs_bucket_name'),
599 backup_name,
600 self.request.get('queue'),
601 mapper_params,
602 1000000)
603 except Exception, e:
604 self.errorResponse(e.message)
606 def errorResponse(self, message):
607 logging.error('Could not create backup via link: %s', message)
608 self.response.set_status(400, message)
611 class DatastoreEntityProtoInputReader(input_readers.RawDatastoreInputReader):
612 """An input reader which yields datastore entity proto for a kind."""
614 _KEY_RANGE_ITER_CLS = db_iters.KeyRangeEntityProtoIterator
617 class DoBackupHandler(BaseDoHandler):
618 """Handler to deal with requests from the admin console to backup data."""
620 SUFFIX = 'backup.do'
621 _get_html_page = 'do_backup.html'
622 _get_post_html_page = SUFFIX
624 def _ProcessPostRequest(self):
625 """Triggers backup mapper jobs and returns their ids."""
626 try:
627 backup = self.request.get('backup_name').strip()
628 if not backup:
629 raise BackupValidationError('Unspecified backup name.')
630 if BackupInformation.name_exists(backup):
631 raise BackupValidationError('Backup "%s" already exists.' % backup)
632 mapper_params = self._GetBasicMapperParams()
633 backup_result = _perform_backup(self.request.get('run_as_a_service',
634 False),
635 self.request.get_all('kind'),
636 mapper_params.get('namespace'),
637 self.request.get('filesystem'),
638 self.request.get('gs_bucket_name'),
639 backup,
640 self.request.get('queue'),
641 mapper_params,
643 return backup_result
644 except Exception, e:
645 logging.exception(e.message)
646 return [('error', e.message)]
649 def _run_map_jobs_deferred(backup_name, job_operation_key, backup_info_key,
650 kinds, job_name, backup_handler, input_reader,
651 output_writer, mapper_params, mapreduce_params,
652 queue):
653 backup_info = BackupInformation.get(backup_info_key)
654 if backup_info:
655 try:
656 _run_map_jobs(job_operation_key, backup_info_key, kinds, job_name,
657 backup_handler, input_reader, output_writer, mapper_params,
658 mapreduce_params, queue)
659 except BaseException:
660 logging.exception('Failed to start a datastore backup job[s] for "%s".',
661 backup_name)
662 delete_backup_info(backup_info)
663 else:
664 logging.info('Missing backup info, can not start backup jobs for "%s"',
665 backup_name)
668 def _run_map_jobs(job_operation_key, backup_info_key, kinds, job_name,
669 backup_handler, input_reader, output_writer, mapper_params,
670 mapreduce_params, queue):
671 """Creates backup/restore MR jobs for the given operation.
673 Args:
674 job_operation_key: a key of utils.DatastoreAdminOperation entity.
675 backup_info_key: a key of BackupInformation entity.
676 kinds: a list of kinds to run the M/R for.
677 job_name: the M/R job name prefix.
678 backup_handler: M/R job completion handler.
679 input_reader: M/R input reader.
680 output_writer: M/R output writer.
681 mapper_params: custom parameters to pass to mapper.
682 mapreduce_params: dictionary parameters relevant to the whole job.
683 queue: the name of the queue that will be used by the M/R.
685 Returns:
686 Ids of all started mapper jobs as list of strings.
688 backup_info = BackupInformation.get(backup_info_key)
689 if not backup_info:
690 return []
691 jobs = utils.RunMapForKinds(
692 job_operation_key,
693 kinds,
694 job_name,
695 backup_handler,
696 input_reader,
697 output_writer,
698 mapper_params,
699 mapreduce_params,
700 queue_name=queue)
701 backup_info.active_jobs = jobs
702 backup_info.put(force_writes=True)
703 return jobs
706 def get_backup_files(backup_info, selected_kinds=None):
707 """Returns the backup filenames for selected kinds or all if None/Empty."""
708 if backup_info.blob_files:
710 return backup_info.blob_files
711 else:
712 kinds_backup_files = backup_info.get_kind_backup_files(selected_kinds)
713 return list(itertools.chain(*(
714 kind_backup_files.files for kind_backup_files in kinds_backup_files)))
717 def delete_backup_files(filesystem, backup_files):
718 if backup_files:
722 if filesystem == files.BLOBSTORE_FILESYSTEM:
725 blob_keys = []
726 for fname in backup_files:
727 blob_key = files.blobstore.get_blob_key(fname)
728 if blob_key:
729 blob_keys.append(blob_key)
730 if len(blob_keys) == MAX_BLOBS_PER_DELETE:
731 blobstore_api.delete(blob_keys)
732 blob_keys = []
733 if blob_keys:
734 blobstore_api.delete(blob_keys)
737 def delete_backup_info(backup_info, delete_files=True):
738 """Deletes a backup including its associated files and other metadata."""
739 if backup_info.blob_files:
740 delete_backup_files(backup_info.filesystem, backup_info.blob_files)
741 backup_info.delete(force_writes=True)
742 else:
743 kinds_backup_files = tuple(backup_info.get_kind_backup_files())
744 if delete_files:
745 delete_backup_files(backup_info.filesystem, itertools.chain(*(
746 kind_backup_files.files for kind_backup_files in kinds_backup_files)))
747 db.delete(kinds_backup_files + (backup_info,), force_writes=True)
750 class DoBackupDeleteHandler(BaseDoHandler):
751 """Handler to deal with datastore admin requests to delete backup data."""
753 SUFFIX = 'backup_delete.do'
755 def get(self):
756 self.post()
758 def post(self):
759 """Handler for post requests to datastore_admin/backup_delete.do.
761 Deletes are executed and user is redirected to the base-path handler.
763 backup_ids = self.request.get_all('backup_id')
764 token = self.request.get('xsrf_token')
765 params = ()
766 if backup_ids and utils.ValidateXsrfToken(token, XSRF_ACTION):
767 try:
768 for backup_info in db.get(backup_ids):
769 if backup_info:
770 delete_backup_info(backup_info)
771 except Exception, e:
772 logging.exception('Failed to delete datastore backup.')
773 params = [('error', e.message)]
775 self.SendRedirect(params=params)
778 class DoBackupAbortHandler(BaseDoHandler):
779 """Handler to deal with datastore admin requests to abort pending backups."""
781 SUFFIX = 'backup_abort.do'
783 def get(self):
784 self.post()
786 def post(self):
787 """Handler for post requests to datastore_admin/backup_abort.do.
789 Abort is executed and user is redirected to the base-path handler.
791 backup_ids = self.request.get_all('backup_id')
792 token = self.request.get('xsrf_token')
793 params = ()
794 if backup_ids and utils.ValidateXsrfToken(token, XSRF_ACTION):
795 try:
796 for backup_info in db.get(backup_ids):
797 if backup_info:
798 operation = backup_info.parent()
799 if operation.parent_key():
800 job_id = str(operation.parent_key())
801 datastore_admin_service = services_client.DatastoreAdminClient()
802 datastore_admin_service.abort_backup(job_id)
803 else:
804 utils.AbortAdminOperation(operation.key())
805 delete_backup_info(backup_info)
806 except Exception, e:
807 logging.exception('Failed to abort pending datastore backup.')
808 params = [('error', e.message)]
810 self.SendRedirect(params=params)
813 class DoBackupRestoreHandler(BaseDoHandler):
814 """Handler to restore backup data.
816 Deals with requests from the admin console.
818 SUFFIX = 'backup_restore.do'
819 BACKUP_RESTORE_HANDLER = __name__ + '.RestoreEntity.map'
820 RESTORE_COMPLETE_HANDLER = __name__ + '.RestoreCompleteHandler'
822 INPUT_READER = input_readers.__name__ + '.RecordsReader'
823 _get_html_page = 'do_restore_from_backup.html'
824 _get_post_html_page = SUFFIX
826 def _ProcessPostRequest(self):
827 """Triggers backup restore mapper jobs and returns their ids."""
828 backup_id = self.request.get('backup_id')
829 if not backup_id:
830 return [('error', 'Unspecified Backup.')]
832 backup = db.get(db.Key(backup_id))
833 if not backup:
834 return [('error', 'Invalid Backup id.')]
836 if backup.gs_handle:
837 if not is_readable_gs_handle(backup.gs_handle):
838 return [('error', 'Backup not readable')]
840 kinds = set(self.request.get_all('kind'))
841 if not (backup.blob_files or kinds):
842 return [('error', 'No kinds were selected')]
843 backup_kinds = set(backup.kinds)
844 difference = kinds.difference(backup_kinds)
845 if difference:
846 return [('error', 'Backup does not have kind[s] %s' %
847 ', '.join(difference))]
849 if self.request.get('run_as_a_service', False):
850 if backup.filesystem != files.GS_FILESYSTEM:
851 return [('error',
852 'Restore as a service is only available for GS backups')]
853 datastore_admin_service = services_client.DatastoreAdminClient()
854 description = 'Remote restore job: %s' % backup.name
855 remote_job_id = datastore_admin_service.restore_from_backup(
856 description, backup_id, list(kinds))
857 return [('remote_job', remote_job_id)]
859 queue = self.request.get('queue')
860 job_name = 'datastore_backup_restore_%s' % re.sub(r'[^\w]', '_',
861 backup.name)
862 job_operation = None
863 try:
864 operation_name = 'Restoring %s from backup: %s' % (
865 ', '.join(kinds) if kinds else 'all', backup.name)
866 job_operation = utils.StartOperation(operation_name)
867 mapper_params = self._GetBasicMapperParams()
870 kinds = list(kinds) if len(backup_kinds) != len(kinds) else []
871 mapper_params['files'] = get_backup_files(backup, kinds)
872 mapper_params['kind_filter'] = kinds
873 mapper_params['original_app'] = backup.original_app
874 mapreduce_params = {
875 'backup_name': backup.name,
876 'force_ops_writes': True,
878 shard_count = min(max(utils.MAPREDUCE_MIN_SHARDS,
879 len(mapper_params['files'])),
880 utils.MAPREDUCE_MAX_SHARDS)
881 job = utils.StartMap(job_operation.key(), job_name,
882 self.BACKUP_RESTORE_HANDLER, self.INPUT_READER, None,
883 mapper_params, mapreduce_params, queue_name=queue,
884 shard_count=shard_count)
885 return [('job', job)]
886 except Exception:
887 logging.exception('Failed to start a restore from backup job "%s".',
888 job_name)
889 if job_operation:
890 job_operation.status = utils.DatastoreAdminOperation.STATUS_FAILED
891 job_operation.put(force_writes=True)
892 raise
895 class DoBackupImportHandler(BaseDoHandler):
896 """Handler to deal with datastore admin requests to import backup info."""
898 SUFFIX = 'import_backup.do'
900 def get(self):
901 self.post()
903 def post(self):
904 """Handler for post requests to datastore_admin/import_backup.do.
906 Import is executed and user is redirected to the base-path handler.
908 gs_handle = self.request.get('gs_handle')
909 token = self.request.get('xsrf_token')
910 error = None
911 if gs_handle and utils.ValidateXsrfToken(token, XSRF_ACTION):
912 try:
913 bucket_name, path = parse_gs_handle(gs_handle)
914 file_content = get_gs_object(bucket_name, path)
915 entities = parse_backup_info_file(file_content)
916 original_backup_info = entities.next()
917 entity = datastore.Entity(BackupInformation.kind())
918 entity.update(original_backup_info)
919 backup_info = BackupInformation.from_entity(entity)
920 if original_backup_info.key().app() != os.getenv('APPLICATION_ID'):
921 backup_info.original_app = original_backup_info.key().app()
923 def tx():
924 backup_info.put(force_writes=True)
925 kind_files_models = []
926 for entity in entities:
927 kind_files = backup_info.create_kind_backup_files(
928 entity.key().name(), entity['files'])
929 kind_files_models.append(kind_files)
930 db.put(kind_files_models, force_writes=True)
931 db.run_in_transaction(tx)
932 backup_id = str(backup_info.key())
933 except Exception, e:
934 logging.exception('Failed to Import datastore backup information.')
935 error = e.message
937 if error:
938 self.SendRedirect(params=[('error', error)])
939 elif self.request.get('Restore'):
940 ConfirmRestoreFromBackupHandler.Render(
941 self, default_backup_id=backup_id,
942 default_delete_backup_after_restore=True)
943 else:
944 self.SendRedirect()
948 class BackupInformation(db.Model):
949 """An entity to keep information on a datastore backup."""
951 name = db.StringProperty()
952 kinds = db.StringListProperty()
953 namespaces = db.StringListProperty()
954 filesystem = db.StringProperty(default=files.BLOBSTORE_FILESYSTEM)
955 start_time = db.DateTimeProperty(auto_now_add=True)
956 active_jobs = db.StringListProperty()
957 completed_jobs = db.StringListProperty()
958 complete_time = db.DateTimeProperty(default=None)
959 blob_files = db.StringListProperty()
960 original_app = db.StringProperty(default=None)
961 gs_handle = db.TextProperty(default=None)
962 destination = db.StringProperty()
964 @classmethod
965 def kind(cls):
966 return utils.BACKUP_INFORMATION_KIND
968 @classmethod
969 def name_exists(cls, backup_name):
970 query = BackupInformation.all(keys_only=True)
971 query.filter('name =', backup_name)
972 return query.get() is not None
974 def create_kind_backup_files_key(self, kind):
975 return db.Key.from_path(KindBackupFiles.kind(), kind, parent=self.key())
977 def create_kind_backup_files(self, kind, kind_files):
978 return KindBackupFiles(key=self.create_kind_backup_files_key(kind),
979 files=kind_files)
981 def get_kind_backup_files(self, kinds=None):
982 if kinds:
983 return db.get([self.create_kind_backup_files_key(kind) for kind in kinds])
984 else:
985 return KindBackupFiles.all().ancestor(self).run()
989 class KindBackupFiles(db.Model):
990 """An entity to keep files information per kind for a backup.
992 A key for this model should created using kind as a name and the associated
993 BackupInformation as a parent.
995 files = db.StringListProperty(indexed=False)
997 @property
998 def backup_kind(self):
999 return self.key().name()
1001 @classmethod
1002 def kind(cls):
1003 return utils.BACKUP_INFORMATION_FILES_KIND
1006 def BackupCompleteHandler(operation, job_id, mapreduce_state):
1007 """Updates BackupInformation record for a completed mapper job."""
1008 mapreduce_spec = mapreduce_state.mapreduce_spec
1009 filenames = mapreduce_spec.mapper.output_writer_class().get_filenames(
1010 mapreduce_state)
1011 _perform_backup_complete(operation,
1012 job_id,
1013 mapreduce_spec.mapper.params['entity_kind'],
1014 mapreduce_spec.params['backup_info_pk'],
1015 mapreduce_spec.mapper.params.get('gs_bucket_name'),
1016 filenames,
1017 mapreduce_spec.params.get('done_callback_queue'))
1020 @db.transactional
1021 def _perform_backup_complete(
1022 operation, job_id, kind, backup_info_pk, gs_bucket_name, filenames, queue):
1023 backup_info = BackupInformation.get(backup_info_pk)
1024 if backup_info:
1025 if job_id in backup_info.active_jobs:
1026 backup_info.active_jobs.remove(job_id)
1027 backup_info.completed_jobs = list(
1028 set(backup_info.completed_jobs + [job_id]))
1031 if backup_info.filesystem == files.BLOBSTORE_FILESYSTEM:
1032 filenames = drop_empty_files(filenames)
1033 kind_backup_files = backup_info.get_kind_backup_files([kind])[0]
1034 if kind_backup_files:
1035 kind_backup_files.files = list(set(kind_backup_files.files + filenames))
1036 else:
1037 kind_backup_files = backup_info.create_kind_backup_files(kind, filenames)
1038 db.put((backup_info, kind_backup_files), force_writes=True)
1039 if operation.status == utils.DatastoreAdminOperation.STATUS_COMPLETED:
1040 deferred.defer(finalize_backup_info, backup_info.key(),
1041 gs_bucket_name,
1042 _url=config.DEFERRED_PATH,
1043 _queue=queue,
1044 _transactional=True)
1045 else:
1046 logging.warn('BackupInfo was not found for %s', backup_info_pk)
1049 def finalize_backup_info(backup_info_pk, gs_bucket):
1050 """Finalize the state of BackupInformation and creates info file for GS."""
1052 def get_backup_info():
1053 return BackupInformation.get(backup_info_pk)
1055 backup_info = db.run_in_transaction(get_backup_info)
1056 if backup_info:
1057 complete_time = datetime.datetime.now()
1058 backup_info.complete_time = complete_time
1059 gs_handle = None
1060 if backup_info.filesystem == files.GS_FILESYSTEM:
1066 gs_handle = BackupInfoWriter(gs_bucket).write(backup_info)[0]
1068 def set_backup_info_with_finalize_info():
1069 backup_info = get_backup_info()
1070 backup_info.complete_time = complete_time
1071 backup_info.gs_handle = gs_handle
1072 backup_info.put(force_writes=True)
1073 db.run_in_transaction(set_backup_info_with_finalize_info)
1074 logging.info('Backup %s completed', backup_info.name)
1075 else:
1076 logging.warn('Backup %s could not be found', backup_info_pk)
1079 def parse_backup_info_file(content):
1080 """Returns entities iterator from a backup_info file content."""
1081 reader = records.RecordsReader(cStringIO.StringIO(content))
1082 version = reader.read()
1083 if version != '1':
1084 raise IOError('Unsupported version')
1085 return (datastore.Entity.FromPb(record) for record in reader)
1088 @db.non_transactional
1089 def drop_empty_files(filenames):
1090 """Deletes empty files and returns filenames minus the deleted ones."""
1091 non_empty_filenames = []
1092 empty_file_keys = []
1093 blobs_info = blobstore.BlobInfo.get(
1094 [files.blobstore.get_blob_key(fn) for fn in filenames])
1095 for filename, blob_info in itertools.izip(filenames, blobs_info):
1096 if blob_info:
1097 if blob_info.size > 0:
1098 non_empty_filenames.append(filename)
1099 else:
1100 empty_file_keys.append(blob_info.key())
1101 blobstore_api.delete(empty_file_keys)
1102 return non_empty_filenames
1105 class BackupInfoWriter(object):
1106 """A class for writing Datastore backup metadata files."""
1108 def __init__(self, gs_bucket):
1109 """Construct a BackupInfoWriter.
1111 Args:
1112 gs_bucket: Required string for the target GS bucket.
1114 self.__gs_bucket = gs_bucket
1116 def write(self, backup_info):
1117 """Write the metadata files for the given backup_info.
1119 As a side effect, updates the backup_info in-memory entity object with the
1120 gs_handle to the Backup info filename. This is not saved to the datastore.
1122 Args:
1123 backup_info: Required BackupInformation.
1125 Returns:
1126 A list with Backup info filename followed by Kind info filenames.
1128 fn = self._write_backup_info(backup_info)
1129 return [fn] + self._write_kind_info(backup_info)
1131 def _generate_filename(self, backup_info, suffix):
1132 key_str = str(backup_info.key()).replace('/', '_')
1133 return '/gs/%s/%s%s' % (self.__gs_bucket, key_str, suffix)
1135 def _write_backup_info(self, backup_info):
1136 """Writes a backup_info_file.
1138 Args:
1139 backup_info: Required BackupInformation.
1141 Returns:
1142 Backup info filename.
1144 filename = self._generate_filename(backup_info, '.backup_info')
1145 backup_info.gs_handle = filename
1146 info_file = files.open(files.gs.create(filename), 'a', exclusive_lock=True)
1147 try:
1148 with records.RecordsWriter(info_file) as writer:
1150 writer.write('1')
1152 writer.write(db.model_to_protobuf(backup_info).SerializeToString())
1154 for kind_files in backup_info.get_kind_backup_files():
1155 writer.write(db.model_to_protobuf(kind_files).SerializeToString())
1156 finally:
1157 info_file.close(finalize=True)
1158 return filename
1160 def _write_kind_info(self, backup_info):
1161 """Writes type information schema for each kind in backup_info.
1163 Args:
1164 backup_info: Required BackupInformation.
1166 Returns:
1167 A list with all created filenames.
1169 def get_backup_files_tx():
1170 kind_backup_files_list = []
1172 for kind_backup_files in backup_info.get_kind_backup_files():
1173 kind_backup_files_list.append(kind_backup_files)
1174 return kind_backup_files_list
1176 kind_backup_files_list = db.run_in_transaction(get_backup_files_tx)
1177 filenames = []
1178 for kind_backup_files in kind_backup_files_list:
1179 backup = self._create_kind_backup(backup_info, kind_backup_files)
1180 filename = self._generate_filename(
1181 backup_info, '.%s.backup_info' % kind_backup_files.backup_kind)
1182 self._write_kind_backup_info_file(filename, backup)
1183 filenames.append(filename)
1184 return filenames
1186 def _create_kind_backup(self, backup_info, kind_backup_files):
1187 """Creates and populate a backup_pb2.Backup."""
1188 backup = backup_pb2.Backup()
1189 backup.backup_info.backup_name = backup_info.name
1190 backup.backup_info.start_timestamp = datastore_types.DatetimeToTimestamp(
1191 backup_info.start_time)
1192 backup.backup_info.end_timestamp = datastore_types.DatetimeToTimestamp(
1193 backup_info.complete_time)
1194 kind = kind_backup_files.backup_kind
1195 kind_info = backup.kind_info.add()
1196 kind_info.kind = kind
1197 kind_info.entity_schema.kind = kind
1198 kind_info.file.extend(kind_backup_files.files)
1199 entity_type_info = EntityTypeInfo(kind=kind)
1200 for sharded_aggregation in SchemaAggregationResult.load(
1201 backup_info.key(), kind):
1202 if sharded_aggregation.is_partial:
1203 kind_info.is_partial = True
1204 if sharded_aggregation.entity_type_info:
1205 entity_type_info.merge(sharded_aggregation.entity_type_info)
1206 entity_type_info.populate_entity_schema(kind_info.entity_schema)
1207 return backup
1209 @classmethod
1210 def _write_kind_backup_info_file(cls, filename, backup):
1211 """Writes a kind backup_info.
1213 Args:
1214 filename: The name of the file to be created as string.
1215 backup: apphosting.ext.datastore_admin.Backup proto.
1217 f = files.open(files.gs.create(filename), 'a', exclusive_lock=True)
1218 try:
1219 f.write(backup.SerializeToString())
1220 finally:
1221 f.close(finalize=True)
1224 class PropertyTypeInfo(json_util.JsonMixin):
1225 """Type information for an entity property."""
1227 def __init__(self, name, is_repeated=False, primitive_types=None,
1228 embedded_entities=None):
1229 """Construct a PropertyTypeInfo instance.
1231 Args:
1232 name: The name of the property as a string.
1233 is_repeated: A boolean that indicates if the property is repeated.
1234 primitive_types: Optional list of PrimitiveType integer values.
1235 embedded_entities: Optional list of EntityTypeInfo.
1237 self.__name = name
1238 self.__is_repeated = is_repeated
1239 self.__primitive_types = set(primitive_types) if primitive_types else set()
1240 self.__embedded_entities = {}
1241 for entity in embedded_entities or ():
1242 if entity.kind in self.__embedded_entities:
1243 self.__embedded_entities[entity.kind].merge(entity)
1244 else:
1245 self.__embedded_entities[entity.kind] = entity
1247 @property
1248 def name(self):
1249 return self.__name
1251 @property
1252 def is_repeated(self):
1253 return self.__is_repeated
1255 @property
1256 def primitive_types(self):
1257 return self.__primitive_types
1259 def embedded_entities_kind_iter(self):
1260 return self.__embedded_entities.iterkeys()
1262 def get_embedded_entity(self, kind):
1263 return self.__embedded_entities.get(kind)
1265 def merge(self, other):
1266 """Merge a PropertyTypeInfo with this instance.
1268 Args:
1269 other: Required PropertyTypeInfo to merge.
1271 Returns:
1272 True if anything was changed. False otherwise.
1274 Raises:
1275 ValueError: if property names do not match.
1276 TypeError: if other is not instance of PropertyTypeInfo.
1278 if not isinstance(other, PropertyTypeInfo):
1279 raise TypeError('Expected PropertyTypeInfo, was %r' % (other,))
1281 if other.__name != self.__name:
1282 raise ValueError('Property names mismatch (%s, %s)' %
1283 (self.__name, other.__name))
1284 changed = False
1285 if other.__is_repeated and not self.__is_repeated:
1286 self.__is_repeated = True
1287 changed = True
1288 if not other.__primitive_types.issubset(self.__primitive_types):
1289 self.__primitive_types = self.__primitive_types.union(
1290 other.__primitive_types)
1291 changed = True
1292 for kind, other_embedded_entity in other.__embedded_entities.iteritems():
1293 embedded_entity = self.__embedded_entities.get(kind)
1294 if embedded_entity:
1295 changed = embedded_entity.merge(other_embedded_entity) or changed
1296 else:
1297 self.__embedded_entities[kind] = other_embedded_entity
1298 changed = True
1299 return changed
1301 def populate_entity_schema_field(self, entity_schema):
1302 """Add an populate a Field to the given entity_schema.
1304 Args:
1305 entity_schema: apphosting.ext.datastore_admin.EntitySchema proto.
1307 if not (self.__primitive_types or self.__embedded_entities):
1308 return
1310 field = entity_schema.field.add()
1311 field.name = self.__name
1312 field_type = field.type.add()
1313 field_type.is_list = self.__is_repeated
1314 field_type.primitive_type.extend(self.__primitive_types)
1315 for embedded_entity in self.__embedded_entities.itervalues():
1316 embedded_entity_schema = field_type.embedded_schema.add()
1317 embedded_entity.populate_entity_schema(embedded_entity_schema)
1319 def to_json(self):
1320 json = dict()
1321 json['name'] = self.__name
1322 json['is_repeated'] = self.__is_repeated
1323 json['primitive_types'] = list(self.__primitive_types)
1324 json['embedded_entities'] = [e.to_json() for e in
1325 self.__embedded_entities.itervalues()]
1326 return json
1328 @classmethod
1329 def from_json(cls, json):
1330 return cls(json['name'], json['is_repeated'], json.get('primitive_types'),
1331 [EntityTypeInfo.from_json(entity_json) for entity_json
1332 in json.get('embedded_entities')])
1335 class EntityTypeInfo(json_util.JsonMixin):
1336 """Type information for an entity."""
1338 def __init__(self, kind=None, properties=None):
1339 """Construct an EntityTypeInfo instance.
1341 Args:
1342 kind: An optional kind name as string.
1343 properties: An optional list of PropertyTypeInfo.
1345 self.__kind = kind
1346 self.__properties = {}
1347 for property_type_info in properties or ():
1348 if property_type_info.name in self.__properties:
1349 self.__properties[property_type_info.name].merge(property_type_info)
1350 else:
1351 self.__properties[property_type_info.name] = property_type_info
1353 @property
1354 def kind(self):
1355 return self.__kind
1357 def properties_name_iter(self):
1358 return self.__properties.iterkeys()
1360 def get_property(self, name):
1361 return self.__properties.get(name)
1363 def merge(self, other):
1364 """Merge an EntityTypeInfo with this instance.
1366 Args:
1367 other: Required EntityTypeInfo to merge.
1369 Returns:
1370 True if anything was changed. False otherwise.
1372 Raises:
1373 ValueError: if kinds do not match.
1374 TypeError: if other is not instance of EntityTypeInfo.
1376 if not isinstance(other, EntityTypeInfo):
1377 raise TypeError('Expected EntityTypeInfo, was %r' % (other,))
1379 if other.__kind != self.__kind:
1380 raise ValueError('Kinds mismatch (%s, %s)' % (self.__kind, other.__kind))
1381 changed = False
1382 for name, other_property in other.__properties.iteritems():
1383 self_property = self.__properties.get(name)
1384 if self_property:
1385 changed = self_property.merge(other_property) or changed
1386 else:
1387 self.__properties[name] = other_property
1388 changed = True
1389 return changed
1391 def populate_entity_schema(self, entity_schema):
1392 """Populates the given entity_schema with values from this instance.
1394 Args:
1395 entity_schema: apphosting.ext.datastore_admin.EntitySchema proto.
1397 if self.__kind:
1398 entity_schema.kind = self.__kind
1399 for property_type_info in self.__properties.itervalues():
1400 property_type_info.populate_entity_schema_field(entity_schema)
1402 def to_json(self):
1403 return {
1404 'kind': self.__kind,
1405 'properties': [p.to_json() for p in self.__properties.itervalues()]
1408 @classmethod
1409 def from_json(cls, json):
1410 kind = json.get('kind')
1411 properties_json = json.get('properties')
1412 if properties_json:
1413 return cls(kind, [PropertyTypeInfo.from_json(p) for p in properties_json])
1414 else:
1415 return cls(kind)
1417 @classmethod
1418 def create_from_entity_proto(cls, entity_proto):
1419 """Creates and populates an EntityTypeInfo from an EntityProto."""
1420 properties = [cls.__get_property_type_info(property_proto) for
1421 property_proto in itertools.chain(
1422 entity_proto.property_list(),
1423 entity_proto.raw_property_list())]
1424 kind = utils.get_kind_from_entity_pb(entity_proto)
1425 return cls(kind, properties)
1427 @classmethod
1428 def __get_property_type_info(cls, property_proto):
1429 """Returns the type mapping for the provided property."""
1430 name = property_proto.name()
1431 is_repeated = bool(property_proto.multiple())
1432 primitive_type = None
1433 entity_type = None
1434 if property_proto.has_meaning():
1435 primitive_type = MEANING_TO_PRIMITIVE_TYPE.get(property_proto.meaning())
1436 if primitive_type is None:
1437 value = property_proto.value()
1438 if value.has_int64value():
1439 primitive_type = backup_pb2.EntitySchema.INTEGER
1440 elif value.has_booleanvalue():
1441 primitive_type = backup_pb2.EntitySchema.BOOLEAN
1442 elif value.has_stringvalue():
1443 if property_proto.meaning() == entity_pb.Property.ENTITY_PROTO:
1444 entity_proto = entity_pb.EntityProto()
1445 try:
1446 entity_proto.ParsePartialFromString(value.stringvalue())
1447 except Exception:
1449 pass
1450 else:
1451 entity_type = EntityTypeInfo.create_from_entity_proto(entity_proto)
1452 else:
1453 primitive_type = backup_pb2.EntitySchema.STRING
1454 elif value.has_doublevalue():
1455 primitive_type = backup_pb2.EntitySchema.FLOAT
1456 elif value.has_pointvalue():
1457 primitive_type = backup_pb2.EntitySchema.GEO_POINT
1458 elif value.has_uservalue():
1459 primitive_type = backup_pb2.EntitySchema.USER
1460 elif value.has_referencevalue():
1461 primitive_type = backup_pb2.EntitySchema.REFERENCE
1462 return PropertyTypeInfo(
1463 name, is_repeated,
1464 (primitive_type,) if primitive_type is not None else None,
1465 (entity_type,) if entity_type else None)
1468 class SchemaAggregationResult(db.Model):
1469 """Persistent aggregated type information for a kind.
1471 An instance can be retrieved via the load method or created
1472 using the create method. An instance aggregates all type information
1473 for all seen embedded_entities via the merge method and persisted when needed
1474 using the model put method.
1477 entity_type_info = json_util.JsonProperty(
1478 EntityTypeInfo, default=EntityTypeInfo(), indexed=False)
1479 is_partial = db.BooleanProperty(default=False)
1481 def merge(self, other):
1482 """Merge a SchemaAggregationResult or an EntityTypeInfo with this instance.
1484 Args:
1485 other: Required SchemaAggregationResult or EntityTypeInfo to merge.
1487 Returns:
1488 True if anything was changed. False otherwise.
1490 if self.is_partial:
1491 return False
1492 if isinstance(other, SchemaAggregationResult):
1493 other = other.entity_type_info
1494 return self.entity_type_info.merge(other)
1496 @classmethod
1497 def _get_parent_key(cls, backup_id, kind_name):
1498 return datastore_types.Key.from_path('Kind', kind_name, parent=backup_id)
1500 @classmethod
1501 def create(cls, backup_id, kind_name, shard_id):
1502 """Create SchemaAggregationResult instance.
1504 Args:
1505 backup_id: Required BackupInformation Key.
1506 kind_name: Required kind name as string.
1507 shard_id: Required shard id as string.
1509 Returns:
1510 A new SchemaAggregationResult instance.
1512 parent = cls._get_parent_key(backup_id, kind_name)
1513 return SchemaAggregationResult(
1514 key_name=shard_id, parent=parent,
1515 entity_type_info=EntityTypeInfo(kind=kind_name))
1517 @classmethod
1518 def load(cls, backup_id, kind_name, shard_id=None):
1519 """Retrieve SchemaAggregationResult from the Datastore.
1521 Args:
1522 backup_id: Required BackupInformation Key.
1523 kind_name: Required kind name as string.
1524 shard_id: Optional shard id as string.
1526 Returns:
1527 SchemaAggregationResult iterator or an entity if shard_id not None.
1529 parent = cls._get_parent_key(backup_id, kind_name)
1530 if shard_id:
1531 key = datastore_types.Key.from_path(cls.kind(), shard_id, parent=parent)
1532 return SchemaAggregationResult.get(key)
1533 else:
1534 return db.Query(cls).ancestor(parent).run()
1536 @classmethod
1537 def kind(cls):
1538 return utils.BACKUP_INFORMATION_KIND_TYPE_INFO
1542 class SchemaAggregationPool(object):
1543 """An MR pool to aggregation type information per kind."""
1545 def __init__(self, backup_id, kind, shard_id):
1546 """Construct SchemaAggregationPool instance.
1548 Args:
1549 backup_id: Required BackupInformation Key.
1550 kind: Required kind name as string.
1551 shard_id: Required shard id as string.
1553 self.__backup_id = backup_id
1554 self.__kind = kind
1555 self.__shard_id = shard_id
1556 self.__aggregation = SchemaAggregationResult.load(backup_id, kind, shard_id)
1557 if not self.__aggregation:
1558 self.__aggregation = SchemaAggregationResult.create(backup_id, kind,
1559 shard_id)
1560 self.__needs_save = True
1561 else:
1562 self.__needs_save = False
1564 def merge(self, entity_type_info):
1565 """Merge EntityTypeInfo into aggregated type information."""
1566 if self.__aggregation.merge(entity_type_info):
1567 self.__needs_save = True
1569 def flush(self):
1570 """Save aggregated type information to the datastore if changed."""
1571 if self.__needs_save:
1573 def update_aggregation_tx():
1574 aggregation = SchemaAggregationResult.load(
1575 self.__backup_id, self.__kind, self.__shard_id)
1576 if aggregation:
1577 if aggregation.merge(self.__aggregation):
1578 aggregation.put(force_writes=True)
1579 self.__aggregation = aggregation
1580 else:
1581 self.__aggregation.put(force_writes=True)
1583 def mark_aggregation_as_partial_tx():
1584 aggregation = SchemaAggregationResult.load(
1585 self.__backup_id, self.__kind, self.__shard_id)
1586 if aggregation is None:
1587 aggregation = SchemaAggregationResult.create(
1588 self.__backup_id, self.__kind, self.__shard_id)
1589 aggregation.is_partial = True
1590 aggregation.put(force_writes=True)
1591 self.__aggregation = aggregation
1593 try:
1594 db.run_in_transaction(update_aggregation_tx)
1595 except apiproxy_errors.RequestTooLargeError:
1596 db.run_in_transaction(mark_aggregation_as_partial_tx)
1597 self.__needs_save = False
1600 class AggregateSchema(op.Operation):
1601 """An MR Operation to aggregation type information for a kind.
1603 This operation will register an MR pool, SchemaAggregationPool, if
1604 one is not already registered and will invoke the pool's merge operation
1605 per entity. The pool is responsible for keeping a persistent state of
1606 type aggregation using the sharded db model, SchemaAggregationResult.
1609 def __init__(self, entity_proto):
1610 self.__entity_info = EntityTypeInfo.create_from_entity_proto(entity_proto)
1612 def __call__(self, ctx):
1613 pool = ctx.get_pool('schema_aggregation_pool')
1614 if not pool:
1615 backup_id = datastore_types.Key(
1616 context.get().mapreduce_spec.params['backup_info_pk'])
1617 pool = SchemaAggregationPool(
1618 backup_id, self.__entity_info.kind, ctx.shard_id)
1619 ctx.register_pool('schema_aggregation_pool', pool)
1620 pool.merge(self.__entity_info)
1623 class BackupEntity(object):
1624 """A class which dumps the entity to the writer."""
1626 def map(self, entity_proto):
1627 """Backup entity map handler.
1629 Args:
1630 entity_proto: An instance of entity_pb.EntityProto.
1632 Yields:
1633 A serialized entity_pb.EntityProto as a string
1635 yield entity_proto.SerializeToString()
1636 yield AggregateSchema(entity_proto)
1639 class RestoreEntity(object):
1640 """A class which restore the entity to datastore."""
1642 def __init__(self):
1643 self.initialized = False
1644 self.kind_filter = None
1646 self.app_id = None
1648 def initialize(self):
1649 """Initialize a restore mapper instance."""
1650 if self.initialized:
1651 return
1652 mapper_params = get_mapper_params_from_context()
1653 kind_filter = mapper_params.get('kind_filter')
1654 self.kind_filter = set(kind_filter) if kind_filter else None
1655 original_app = mapper_params.get('original_app')
1656 target_app = os.getenv('APPLICATION_ID')
1657 if original_app and target_app != original_app:
1658 self.app_id = target_app
1659 self.initialized = True
1661 def map(self, record):
1662 """Restore entity map handler.
1664 Args:
1665 record: A serialized entity_pb.EntityProto.
1667 Yields:
1668 A operation.db.Put for the mapped entity
1670 self.initialize()
1671 pb = entity_pb.EntityProto(contents=record)
1672 if self.app_id:
1673 utils.FixKeys(pb, self.app_id)
1677 if not self.kind_filter or (
1678 utils.get_kind_from_entity_pb(pb) in self.kind_filter):
1679 yield utils.Put(pb)
1680 if self.app_id:
1682 yield utils.ReserveKey(datastore_types.Key._FromPb(pb.key()))
1685 def get_mapper_params_from_context():
1686 """Get mapper params from MR context. Split out for ease of testing."""
1687 return context.get().mapreduce_spec.mapper.params
1690 def validate_gs_bucket_name(bucket_name):
1691 """Validate the format of the given bucket_name.
1693 Validation rules are based:
1694 https://developers.google.com/storage/docs/bucketnaming#requirements
1696 Args:
1697 bucket_name: The bucket name to validate.
1699 Raises:
1700 BackupValidationError: If the bucket name is invalid.
1702 if len(bucket_name) > MAX_BUCKET_LEN:
1703 raise BackupValidationError(
1704 'Bucket name length should not be longer than %d' % MAX_BUCKET_LEN)
1705 if len(bucket_name) < MIN_BUCKET_LEN:
1706 raise BackupValidationError(
1707 'Bucket name length should be longer than %d' % MIN_BUCKET_LEN)
1708 if bucket_name.lower().startswith('goog'):
1709 raise BackupValidationError(
1710 'Bucket name should not start with a "goog" prefix')
1711 bucket_elements = bucket_name.split('.')
1712 for bucket_element in bucket_elements:
1713 if len(bucket_element) > MAX_BUCKET_SEGMENT_LEN:
1714 raise BackupValidationError(
1715 'Segment length of bucket name should not be longer than %d' %
1716 MAX_BUCKET_SEGMENT_LEN)
1717 if not re.match(BUCKET_PATTERN, bucket_name):
1718 raise BackupValidationError('Invalid bucket name "%s"' % bucket_name)
1721 def is_accessible_bucket_name(bucket_name):
1722 """Returns True if the application has access to the specified bucket."""
1723 scope = config.GoogleApiScope('devstorage.read_write')
1724 bucket_url = config.GsBucketURL(bucket_name)
1725 auth_token, _ = app_identity.get_access_token(scope)
1726 result = urlfetch.fetch(bucket_url, method=urlfetch.HEAD, headers={
1727 'Authorization': 'OAuth %s' % auth_token,
1728 'x-goog-api-version': '2'})
1729 return result and result.status_code == 200
1732 def verify_bucket_writable(bucket_name):
1733 """Verify the application can write to the specified bucket.
1735 Args:
1736 bucket_name: The bucket to verify.
1738 Raises:
1739 BackupValidationError: If the bucket is not writable.
1741 path = '/gs/%s' % bucket_name
1742 try:
1743 file_names = files.gs.listdir(path,
1744 {'prefix': TEST_WRITE_FILENAME_PREFIX,
1745 'max_keys': MAX_KEYS_LIST_SIZE})
1746 except (files.InvalidParameterError, files.PermissionDeniedError):
1747 raise BackupValidationError('Bucket "%s" not accessible' % bucket_name)
1748 except files.InvalidFileNameError:
1749 raise BackupValidationError('Bucket "%s" does not exist' % bucket_name)
1750 file_name = '%s/%s.tmp' % (path, TEST_WRITE_FILENAME_PREFIX)
1751 file_name_try = 0
1752 while True:
1753 if file_name_try >= MAX_TEST_FILENAME_TRIES:
1756 return
1757 if file_name not in file_names:
1758 break
1759 gen = random.randint(0, 9999)
1760 file_name = '%s/%s_%s.tmp' % (path, TEST_WRITE_FILENAME_PREFIX, gen)
1761 file_name_try += 1
1762 try:
1763 test_file = files.open(files.gs.create(file_name), 'a', exclusive_lock=True)
1764 try:
1765 test_file.write('test')
1766 finally:
1767 test_file.close(finalize=True)
1768 except files.PermissionDeniedError:
1769 raise BackupValidationError('Bucket "%s" is not writable' % bucket_name)
1770 try:
1771 files.delete(file_name)
1772 except (files.InvalidArgumentError, files.InvalidFileNameError, IOError):
1773 logging.warn('Failed to delete test file %s', file_name)
1776 def is_readable_gs_handle(gs_handle):
1777 """Return True if the application can read the specified gs_handle."""
1778 try:
1779 with files.open(gs_handle) as bak_file:
1780 bak_file.read(1)
1781 except files.PermissionDeniedError:
1782 return False
1783 return True
1787 def parse_gs_handle(gs_handle):
1788 """Splits [/gs/]?bucket_name[/folder]*[/file]? to (bucket_name, path | '')."""
1789 if gs_handle.startswith('/'):
1790 filesystem = gs_handle[1:].split('/', 1)[0]
1791 if filesystem == 'gs':
1792 gs_handle = gs_handle[4:]
1793 else:
1794 raise BackupValidationError('Unsupported filesystem: %s' % filesystem)
1795 tokens = gs_handle.split('/', 1)
1796 return (tokens[0], '') if len(tokens) == 1 else tuple(tokens)
1799 def validate_and_canonicalize_gs_bucket(gs_bucket_name):
1800 bucket_name, path = parse_gs_handle(gs_bucket_name)
1801 gs_bucket_name = ('%s/%s' % (bucket_name, path)).rstrip('/')
1802 validate_gs_bucket_name(bucket_name)
1803 verify_bucket_writable(bucket_name)
1804 return gs_bucket_name
1807 def list_bucket_files(bucket_name, prefix, max_keys=1000):
1808 """Returns a listing of of a bucket that matches the given prefix."""
1809 scope = config.GoogleApiScope('devstorage.read_only')
1810 bucket_url = config.GsBucketURL(bucket_name)
1811 url = bucket_url + '?'
1812 query = [('max-keys', max_keys)]
1813 if prefix:
1814 query.append(('prefix', prefix))
1815 url += urllib.urlencode(query)
1816 auth_token, _ = app_identity.get_access_token(scope)
1817 result = urlfetch.fetch(url, method=urlfetch.GET, headers={
1818 'Authorization': 'OAuth %s' % auth_token,
1819 'x-goog-api-version': '2'})
1820 if result and result.status_code == 200:
1821 doc = xml.dom.minidom.parseString(result.content)
1822 return [node.childNodes[0].data for node in doc.getElementsByTagName('Key')]
1823 raise BackupValidationError('Request to Google Cloud Storage failed')
1826 def get_gs_object(bucket_name, path):
1827 """Returns a listing of of a bucket that matches the given prefix."""
1828 scope = config.GoogleApiScope('devstorage.read_only')
1829 bucket_url = config.GsBucketURL(bucket_name)
1830 url = bucket_url + path
1831 auth_token, _ = app_identity.get_access_token(scope)
1832 result = urlfetch.fetch(url, method=urlfetch.GET, headers={
1833 'Authorization': 'OAuth %s' % auth_token,
1834 'x-goog-api-version': '2'})
1835 if result and result.status_code == 200:
1836 return result.content
1837 if result and result.status_code == 403:
1838 raise BackupValidationError(
1839 'Requested path %s is not accessible/access denied' % url)
1840 if result and result.status_code == 404:
1841 raise BackupValidationError('Requested path %s was not found' % url)
1842 raise BackupValidationError('Error encountered accessing requested path %s' %
1843 url)
1848 def get_queue_names(app_id=None, max_rows=100):
1849 """Returns a list with all non-special queue names for app_id."""
1850 rpc = apiproxy_stub_map.UserRPC('taskqueue')
1851 request = taskqueue_service_pb.TaskQueueFetchQueuesRequest()
1852 response = taskqueue_service_pb.TaskQueueFetchQueuesResponse()
1853 if app_id:
1854 request.set_app_id(app_id)
1855 request.set_max_rows(max_rows)
1856 queues = ['default']
1857 try:
1858 rpc.make_call('FetchQueues', request, response)
1859 rpc.check_success()
1861 for queue in response.queue_list():
1862 if (queue.mode() == taskqueue_service_pb.TaskQueueMode.PUSH and
1863 not queue.queue_name().startswith('__') and
1864 queue.queue_name() != 'default'):
1865 queues.append(queue.queue_name())
1866 except Exception:
1867 logging.exception('Failed to get queue names.')
1868 return queues
1871 def handlers_list(base_path):
1872 return [
1873 (r'%s/%s' % (base_path, BackupLinkHandler.SUFFIX),
1874 BackupLinkHandler),
1875 (r'%s/%s' % (base_path, ConfirmBackupHandler.SUFFIX),
1876 ConfirmBackupHandler),
1877 (r'%s/%s' % (base_path, DoBackupHandler.SUFFIX), DoBackupHandler),
1878 (r'%s/%s' % (base_path, DoBackupRestoreHandler.SUFFIX),
1879 DoBackupRestoreHandler),
1880 (r'%s/%s' % (base_path, DoBackupDeleteHandler.SUFFIX),
1881 DoBackupDeleteHandler),
1882 (r'%s/%s' % (base_path, DoBackupAbortHandler.SUFFIX),
1883 DoBackupAbortHandler),
1884 (r'%s/%s' % (base_path, DoBackupImportHandler.SUFFIX),
1885 DoBackupImportHandler),