3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
22 """Handler for data backup operation.
24 Generic datastore admin console transfers control to ConfirmBackupHandler
25 after selection of entities. The ConfirmBackupHandler confirms with user
26 his choice, enters a backup name and transfers control to
27 DoBackupHandler. DoBackupHandler starts backup mappers and displays confirmation
30 This module also contains actual mapper code for backing data over.
33 from __future__
import with_statement
46 import xml
.dom
.minidom
49 from google
.appengine
.datastore
import entity_pb
50 from google
.appengine
.api
import apiproxy_stub_map
51 from google
.appengine
.api
import app_identity
52 from google
.appengine
.api
import blobstore
as blobstore_api
53 from google
.appengine
.api
import capabilities
54 from google
.appengine
.api
import datastore
55 from google
.appengine
.api
import datastore_types
56 from google
.appengine
.api
import files
57 from google
.appengine
.api
import taskqueue
58 from google
.appengine
.api
import urlfetch
59 from google
.appengine
.api
.files
import records
60 from google
.appengine
.api
.taskqueue
import taskqueue_service_pb
61 from google
.appengine
.datastore
import datastore_query
62 from google
.appengine
.datastore
import datastore_rpc
63 from google
.appengine
.ext
import blobstore
64 from google
.appengine
.ext
import db
65 from google
.appengine
.ext
import deferred
66 from google
.appengine
.ext
import webapp
67 from google
.appengine
.ext
.datastore_admin
import backup_pb2
68 from google
.appengine
.ext
.datastore_admin
import config
69 from google
.appengine
.ext
.datastore_admin
import utils
70 from google
.appengine
.ext
.mapreduce
import context
71 from google
.appengine
.ext
.mapreduce
import datastore_range_iterators
as db_iters
72 from google
.appengine
.ext
.mapreduce
import input_readers
73 from google
.appengine
.ext
.mapreduce
import json_util
74 from google
.appengine
.ext
.mapreduce
import operation
as op
75 from google
.appengine
.ext
.mapreduce
import output_writers
76 from google
.appengine
.runtime
import apiproxy_errors
80 from google
.appengine
.ext
.datastore_admin
import services_client
87 XSRF_ACTION
= 'backup'
88 BUCKET_PATTERN
= (r
'^([a-zA-Z0-9]+([\-_]+[a-zA-Z0-9]+)*)'
89 r
'(\.([a-zA-Z0-9]+([\-_]+[a-zA-Z0-9]+)*))*$')
92 MAX_BUCKET_SEGMENT_LEN
= 63
93 NUM_KINDS_DEFERRED_THRESHOLD
= 10
94 MAX_BLOBS_PER_DELETE
= 500
95 TEST_WRITE_FILENAME_PREFIX
= 'datastore_backup_write_test'
96 MAX_KEYS_LIST_SIZE
= 100
97 MAX_TEST_FILENAME_TRIES
= 10
99 MEANING_TO_PRIMITIVE_TYPE
= {
100 entity_pb
.Property
.GD_WHEN
: backup_pb2
.EntitySchema
.DATE_TIME
,
101 entity_pb
.Property
.GD_RATING
: backup_pb2
.EntitySchema
.RATING
,
102 entity_pb
.Property
.ATOM_LINK
: backup_pb2
.EntitySchema
.LINK
,
103 entity_pb
.Property
.ATOM_CATEGORY
: backup_pb2
.EntitySchema
.CATEGORY
,
104 entity_pb
.Property
.GD_PHONENUMBER
: backup_pb2
.EntitySchema
.PHONE_NUMBER
,
105 entity_pb
.Property
.GD_POSTALADDRESS
: backup_pb2
.EntitySchema
.POSTAL_ADDRESS
,
106 entity_pb
.Property
.GD_EMAIL
: backup_pb2
.EntitySchema
.EMAIL
,
107 entity_pb
.Property
.GD_IM
: backup_pb2
.EntitySchema
.IM_HANDLE
,
108 entity_pb
.Property
.BLOBKEY
: backup_pb2
.EntitySchema
.BLOB_KEY
,
109 entity_pb
.Property
.TEXT
: backup_pb2
.EntitySchema
.TEXT
,
110 entity_pb
.Property
.BLOB
: backup_pb2
.EntitySchema
.BLOB
,
111 entity_pb
.Property
.BYTESTRING
: backup_pb2
.EntitySchema
.SHORT_BLOB
115 class ConfirmBackupHandler(webapp
.RequestHandler
):
116 """Handler to deal with requests from the admin console to backup data."""
118 SUFFIX
= 'confirm_backup'
121 def Render(cls
, handler
):
122 """Rendering method that can be called by main.py.
125 handler: the webapp.RequestHandler invoking the method
127 kinds
= handler
.request
.get_all('kind')
128 sizes_known
, size_total
, remainder
= utils
.ParseKindsAndSizes(kinds
)
129 notreadonly_warning
= capabilities
.CapabilitySet(
130 'datastore_v3', capabilities
=['write']).is_enabled()
131 blob_warning
= bool(blobstore
.BlobInfo
.all().count(1))
133 'run_as_a_service': handler
.request
.get('run_as_a_service'),
134 'form_target': DoBackupHandler
.SUFFIX
,
136 'remainder': remainder
,
137 'sizes_known': sizes_known
,
138 'size_total': size_total
,
140 'datastore_admin_home': utils
.GenerateHomeUrl(handler
.request
),
141 'namespaces': get_namespaces(handler
.request
.get('namespace', None)),
142 'xsrf_token': utils
.CreateXsrfToken(XSRF_ACTION
),
143 'notreadonly_warning': notreadonly_warning
,
144 'blob_warning': blob_warning
,
145 'backup_name': 'datastore_backup_%s' % time
.strftime('%Y_%m_%d')
147 utils
.RenderToResponse(handler
, 'confirm_backup.html', template_params
)
150 def get_namespaces(selected_namespace
):
151 namespaces
= [('--All--', '*', selected_namespace
is None)]
152 for ns
in datastore
.Query('__namespace__', keys_only
=True).Run():
153 ns_name
= ns
.name() or ''
154 namespaces
.append((ns_name
or '--Default--',
156 ns_name
== selected_namespace
))
160 class ConfirmDeleteBackupHandler(webapp
.RequestHandler
):
161 """Handler to confirm admin console requests to delete a backup copy."""
163 SUFFIX
= 'confirm_delete_backup'
166 def Render(cls
, handler
):
167 """Rendering method that can be called by main.py.
170 handler: the webapp.RequestHandler invoking the method
172 requested_backup_ids
= handler
.request
.get_all('backup_id')
175 if requested_backup_ids
:
176 for backup
in db
.get(requested_backup_ids
):
178 backups
.append(backup
)
179 gs_warning |
= backup
.filesystem
== files
.GS_FILESYSTEM
181 'form_target': DoBackupDeleteHandler
.SUFFIX
,
182 'datastore_admin_home': utils
.GenerateHomeUrl(handler
.request
),
184 'xsrf_token': utils
.CreateXsrfToken(XSRF_ACTION
),
185 'gs_warning': gs_warning
,
186 'run_as_a_service': handler
.request
.get('run_as_a_service'),
188 utils
.RenderToResponse(handler
, 'confirm_delete_backup.html',
192 class ConfirmAbortBackupHandler(webapp
.RequestHandler
):
193 """Handler to confirm admin console requests to abort a backup copy."""
195 SUFFIX
= 'confirm_abort_backup'
198 def Render(cls
, handler
):
199 """Rendering method that can be called by main.py.
202 handler: the webapp.RequestHandler invoking the method
204 requested_backup_ids
= handler
.request
.get_all('backup_id')
206 if requested_backup_ids
:
207 for backup
in db
.get(requested_backup_ids
):
209 backups
.append(backup
)
211 'form_target': DoBackupAbortHandler
.SUFFIX
,
212 'datastore_admin_home': utils
.GenerateHomeUrl(handler
.request
),
214 'xsrf_token': utils
.CreateXsrfToken(XSRF_ACTION
),
215 'run_as_a_service': handler
.request
.get('run_as_a_service'),
217 utils
.RenderToResponse(handler
, 'confirm_abort_backup.html',
221 class ConfirmRestoreFromBackupHandler(webapp
.RequestHandler
):
222 """Handler to confirm admin console requests to restore from backup."""
224 SUFFIX
= 'confirm_restore_from_backup'
227 def Render(cls
, handler
, default_backup_id
=None,
228 default_delete_backup_after_restore
=False):
229 """Rendering method that can be called by main.py.
232 handler: the webapp.RequestHandler invoking the method
233 default_backup_id: default value for handler.request
234 default_delete_backup_after_restore: default value for handler.request
236 backup_id
= handler
.request
.get('backup_id', default_backup_id
)
237 backup
= db
.get(backup_id
) if backup_id
else None
238 notreadonly_warning
= capabilities
.CapabilitySet(
239 'datastore_v3', capabilities
=['write']).is_enabled()
240 original_app_warning
= backup
.original_app
241 if os
.getenv('APPLICATION_ID') == original_app_warning
:
242 original_app_warning
= None
244 'form_target': DoBackupRestoreHandler
.SUFFIX
,
246 'datastore_admin_home': utils
.GenerateHomeUrl(handler
.request
),
248 'delete_backup_after_restore': handler
.request
.get(
249 'delete_backup_after_restore', default_delete_backup_after_restore
),
250 'xsrf_token': utils
.CreateXsrfToken(XSRF_ACTION
),
251 'notreadonly_warning': notreadonly_warning
,
252 'original_app_warning': original_app_warning
,
253 'run_as_a_service': handler
.request
.get('run_as_a_service'),
255 utils
.RenderToResponse(handler
, 'confirm_restore_from_backup.html',
259 class ConfirmBackupImportHandler(webapp
.RequestHandler
):
260 """Handler to import backup information."""
262 SUFFIX
= 'backup_information'
265 def Render(cls
, handler
):
266 """Rendering method that can be called by main.py.
269 handler: the webapp.RequestHandler invoking the method
271 gs_handle
= handler
.request
.get('gs_handle')
272 error
= None if gs_handle
else 'Google Cloud Storage path is missing'
273 other_backup_info_files
= []
274 selected_backup_info_file
= None
275 backup_info_specified
= False
278 gs_handle
= gs_handle
.rstrip()
279 bucket_name
, prefix
= parse_gs_handle(gs_handle
)
280 validate_gs_bucket_name(bucket_name
)
281 if not is_accessible_bucket_name(bucket_name
):
282 raise BackupValidationError(
283 'Bucket "%s" is not accessible' % bucket_name
)
284 if prefix
.endswith('.backup_info'):
285 prefix
= prefix
[0:prefix
.rfind('/')]
286 backup_info_specified
= True
287 elif prefix
and not prefix
.endswith('/'):
289 for backup_info_file
in list_bucket_files(bucket_name
, prefix
):
290 backup_info_path
= '/gs/%s/%s' % (bucket_name
, backup_info_file
)
291 if backup_info_specified
and backup_info_path
== gs_handle
:
292 selected_backup_info_file
= backup_info_path
293 elif (backup_info_file
.endswith('.backup_info')
294 and backup_info_file
.count('.') == 1):
295 other_backup_info_files
.append(backup_info_path
)
296 except Exception, ex
:
297 error
= 'Failed to read bucket: %s' % ex
.message
298 logging
.exception(ex
.message
)
301 'form_target': DoBackupImportHandler
.SUFFIX
,
302 'datastore_admin_home': utils
.GenerateHomeUrl(handler
.request
),
303 'selected_backup_info_file': selected_backup_info_file
,
304 'other_backup_info_files': other_backup_info_files
,
305 'backup_info_specified': backup_info_specified
,
306 'xsrf_token': utils
.CreateXsrfToken(XSRF_ACTION
),
307 'run_as_a_service': handler
.request
.get('run_as_a_service'),
309 utils
.RenderToResponse(handler
, 'confirm_backup_import.html',
313 class BackupInformationHandler(webapp
.RequestHandler
):
314 """Handler to display backup information."""
316 SUFFIX
= 'backup_information'
319 def Render(cls
, handler
):
320 """Rendering method that can be called by main.py.
323 handler: the webapp.RequestHandler invoking the method
325 backup_ids
= handler
.request
.get_all('backup_id')
327 'backups': db
.get(backup_ids
),
328 'datastore_admin_home': utils
.GenerateHomeUrl(handler
.request
),
329 'run_as_a_service': handler
.request
.get('run_as_a_service'),
331 utils
.RenderToResponse(handler
, 'backup_information.html', template_params
)
334 class BaseDoHandler(webapp
.RequestHandler
):
335 """Base class for all Do*Handlers."""
337 MAPREDUCE_DETAIL
= config
.MAPREDUCE_PATH
+ '/detail?mapreduce_id='
340 """Handler for get requests to datastore_admin backup operations.
342 Status of executed jobs is displayed.
344 jobs
= self
.request
.get_all('job')
345 remote_job
= self
.request
.get('remote_job')
346 tasks
= self
.request
.get_all('task')
347 error
= self
.request
.get('error', '')
348 xsrf_error
= self
.request
.get('xsrf_error', '')
351 'remote_job': remote_job
,
353 'mapreduce_detail': self
.MAPREDUCE_DETAIL
,
355 'xsrf_error': xsrf_error
,
356 'datastore_admin_home': utils
.GenerateHomeUrl(self
.request
),
358 utils
.RenderToResponse(self
, self
._get
_html
_page
, template_params
)
361 def _get_html_page(self
):
362 """Return the name of the HTML page for HTTP/GET requests."""
363 raise NotImplementedError
366 def _get_post_html_page(self
):
367 """Return the name of the HTML page for HTTP/POST requests."""
368 raise NotImplementedError
370 def _ProcessPostRequest(self
):
371 """Process the HTTP/POST request and return the result as parametrs."""
372 raise NotImplementedError
374 def _GetBasicMapperParams(self
):
375 namespace
= self
.request
.get('namespace', None)
378 return {'namespace': namespace
}
380 def SendRedirect(self
, path
=None, params
=()):
381 """Send a redirect response."""
383 run_as_a_service
= self
.request
.get('run_as_a_service')
385 params
= list(params
)
386 params
.append(('run_as_a_service', True))
387 dest
= config
.BASE_PATH
389 dest
= '%s/%s' % (dest
, path
)
391 dest
= '%s?%s' % (dest
, urllib
.urlencode(params
))
395 """Handler for post requests to datastore_admin/backup.do.
397 Redirects to the get handler after processing the request.
399 token
= self
.request
.get('xsrf_token')
401 if not utils
.ValidateXsrfToken(token
, XSRF_ACTION
):
402 parameters
= [('xsrf_error', '1')]
405 parameters
= self
._ProcessPostRequest
()
409 error
= self
._HandleException
(e
)
410 parameters
= [('error', error
)]
412 self
.SendRedirect(self
._get
_post
_html
_page
, parameters
)
414 def _HandleException(self
, e
):
415 """Make exception handling overridable by tests.
418 e: The exception to handle.
421 The exception error string.
423 logging
.exception(e
.message
)
424 return '%s: %s' % (type(e
), e
.message
)
427 class BackupValidationError(utils
.Error
):
428 """Raised upon backup request validation."""
431 def _perform_backup(run_as_a_service
, kinds
, selected_namespace
,
432 filesystem
, gs_bucket_name
, backup
,
433 queue
, mapper_params
, max_jobs
):
434 """Triggers backup mapper jobs.
437 run_as_a_service: True if backup should be done via admin-jobs
438 kinds: a sequence of kind names
439 selected_namespace: The selected namespace or None for all
440 filesystem: files.BLOBSTORE_FILESYSTEM or files.GS_FILESYSTEM
441 or None to default to blobstore
442 gs_bucket_name: the GS file system bucket in which to store the backup
443 when using the GS file system, and otherwise ignored
444 backup: the backup name
445 queue: the task queue for the backup task
446 mapper_params: the mapper parameters
447 max_jobs: if backup needs more jobs than this, defer them
453 BackupValidationError: On validation error.
454 Exception: On other error.
456 BACKUP_COMPLETE_HANDLER
= __name__
+ '.BackupCompleteHandler'
457 BACKUP_HANDLER
= __name__
+ '.BackupEntity.map'
458 INPUT_READER
= __name__
+ '.DatastoreEntityProtoInputReader'
459 OUTPUT_WRITER
= output_writers
.__name
__ + '.FileRecordsOutputWriter'
462 if not gs_bucket_name
:
463 raise BackupValidationError('Bucket name missing.')
464 gs_bucket_name
= validate_and_canonicalize_gs_bucket(gs_bucket_name
)
465 datastore_admin_service
= services_client
.DatastoreAdminClient()
466 description
= 'Remote backup job: %s' % backup
467 remote_job_id
= datastore_admin_service
.create_backup(
468 description
, backup
, gs_bucket_name
, selected_namespace
, kinds
)
469 return [('remote_job', remote_job_id
)]
471 queue
= queue
or os
.environ
.get('HTTP_X_APPENGINE_QUEUENAME', 'default')
476 filesystem
= files
.BLOBSTORE_FILESYSTEM
477 if filesystem
== files
.GS_FILESYSTEM
:
479 if not gs_bucket_name
:
480 raise BackupValidationError('Bucket name missing.')
481 gs_bucket_name
= validate_and_canonicalize_gs_bucket(gs_bucket_name
)
482 elif filesystem
== files
.BLOBSTORE_FILESYSTEM
:
485 raise BackupValidationError('Unknown filesystem "%s".' % filesystem
)
487 job_name
= 'datastore_backup_%s_%%(kind)s' % re
.sub(r
'[^\w]', '_', backup
)
489 job_operation
= utils
.StartOperation('Backup: %s' % backup
)
490 backup_info
= BackupInformation(parent
=job_operation
)
491 backup_info
.filesystem
= filesystem
492 backup_info
.name
= backup
493 backup_info
.kinds
= kinds
494 if selected_namespace
is not None:
495 backup_info
.namespaces
= [selected_namespace
]
496 backup_info
.put(force_writes
=True)
498 'done_callback_handler': BACKUP_COMPLETE_HANDLER
,
499 'backup_info_pk': str(backup_info
.key()),
500 'force_ops_writes': True,
502 mapper_params
= dict(mapper_params
)
503 mapper_params
['filesystem'] = filesystem
504 if filesystem
== files
.GS_FILESYSTEM
:
505 mapper_params
['gs_bucket_name'] = gs_bucket_name
506 if len(kinds
) <= max_jobs
:
507 return [('job', job
) for job
in _run_map_jobs(
508 job_operation
.key(), backup_info
.key(), kinds
, job_name
,
509 BACKUP_HANDLER
, INPUT_READER
, OUTPUT_WRITER
,
510 mapper_params
, mapreduce_params
, queue
)]
512 retry_options
= taskqueue
.TaskRetryOptions(task_retry_limit
=1)
513 deferred_task
= deferred
.defer(_run_map_jobs_deferred
,
514 backup
, job_operation
.key(),
515 backup_info
.key(), kinds
, job_name
,
516 BACKUP_HANDLER
, INPUT_READER
,
517 OUTPUT_WRITER
, mapper_params
,
518 mapreduce_params
, queue
, _queue
=queue
,
519 _url
=config
.DEFERRED_PATH
,
520 _retry_options
=retry_options
)
521 return [('task', deferred_task
.name
)]
523 logging
.exception('Failed to start a datastore backup job[s] for "%s".',
526 delete_backup_info(backup_info
)
528 job_operation
.status
= utils
.DatastoreAdminOperation
.STATUS_FAILED
529 job_operation
.put(force_writes
=True)
533 class BackupLinkHandler(webapp
.RequestHandler
):
534 """Handler to deal with requests to the backup link to backup data."""
536 SUFFIX
= 'backup.create'
539 """Handler for get requests to datastore_admin/backup.create."""
543 """Handler for post requests to datastore_admin/backup.create."""
549 if ('X-AppEngine-TaskName' not in self
.request
.headers
and
550 'X-AppEngine-Cron' not in self
.request
.headers
):
551 logging
.critical('Scheduled backups must be started via task queue or '
553 self
.response
.set_status(403)
556 backup_prefix
= self
.request
.get('name')
557 if not backup_prefix
:
558 if self
.request
.headers
.get('X-AppEngine-Cron'):
559 backup_prefix
= 'cron-'
561 backup_prefix
= 'link-'
562 backup_prefix_with_date
= backup_prefix
+ time
.strftime('%Y_%m_%d')
563 backup_name
= backup_prefix_with_date
564 backup_suffix_counter
= 1
565 while BackupInformation
.name_exists(backup_name
):
566 backup_suffix_counter
+= 1
567 backup_name
= backup_prefix_with_date
+ '-' + str(backup_suffix_counter
)
568 kinds
= self
.request
.get_all('kind')
570 self
.errorResponse('Backup must include at least one kind.')
573 if not utils
.IsKindNameVisible(kind
):
574 self
.errorResponse('Invalid kind %s.' % kind
)
576 namespace
= self
.request
.get('namespace', None)
579 mapper_params
= {'namespace': namespace
}
580 _perform_backup(self
.request
.get('run_as_a_service', False),
583 self
.request
.get('filesystem'),
584 self
.request
.get('gs_bucket_name'),
586 self
.request
.get('queue'),
590 self
.errorResponse(e
.message
)
592 def errorResponse(self
, message
):
593 logging
.error('Could not create backup via link: %s', message
)
594 self
.response
.set_status(400, message
)
597 class DatastoreEntityProtoInputReader(input_readers
.RawDatastoreInputReader
):
598 """An input reader which yields datastore entity proto for a kind."""
600 _KEY_RANGE_ITER_CLS
= db_iters
.KeyRangeEntityProtoIterator
603 class DoBackupHandler(BaseDoHandler
):
604 """Handler to deal with requests from the admin console to backup data."""
607 _get_html_page
= 'do_backup.html'
608 _get_post_html_page
= SUFFIX
610 def _ProcessPostRequest(self
):
611 """Triggers backup mapper jobs and returns their ids."""
613 backup
= self
.request
.get('backup_name').strip()
615 raise BackupValidationError('Unspecified backup name.')
616 if BackupInformation
.name_exists(backup
):
617 raise BackupValidationError('Backup "%s" already exists.' % backup
)
618 mapper_params
= self
._GetBasicMapperParams
()
619 backup_result
= _perform_backup(self
.request
.get('run_as_a_service',
621 self
.request
.get_all('kind'),
622 mapper_params
.get('namespace'),
623 self
.request
.get('filesystem'),
624 self
.request
.get('gs_bucket_name'),
626 self
.request
.get('queue'),
631 logging
.exception(e
.message
)
632 return [('error', e
.message
)]
635 def _run_map_jobs_deferred(backup_name
, job_operation_key
, backup_info_key
,
636 kinds
, job_name
, backup_handler
, input_reader
,
637 output_writer
, mapper_params
, mapreduce_params
,
639 backup_info
= BackupInformation
.get(backup_info_key
)
642 _run_map_jobs(job_operation_key
, backup_info_key
, kinds
, job_name
,
643 backup_handler
, input_reader
, output_writer
, mapper_params
,
644 mapreduce_params
, queue
)
645 except BaseException
:
646 logging
.exception('Failed to start a datastore backup job[s] for "%s".',
648 delete_backup_info(backup_info
)
650 logging
.info('Missing backup info, can not start backup jobs for "%s"',
654 def _run_map_jobs(job_operation_key
, backup_info_key
, kinds
, job_name
,
655 backup_handler
, input_reader
, output_writer
, mapper_params
,
656 mapreduce_params
, queue
):
657 """Creates backup/restore MR jobs for the given operation.
660 job_operation_key: a key of utils.DatastoreAdminOperation entity.
661 backup_info_key: a key of BackupInformation entity.
662 kinds: a list of kinds to run the M/R for.
663 job_name: the M/R job name prefix.
664 backup_handler: M/R job completion handler.
665 input_reader: M/R input reader.
666 output_writer: M/R output writer.
667 mapper_params: custom parameters to pass to mapper.
668 mapreduce_params: dictionary parameters relevant to the whole job.
669 queue: the name of the queue that will be used by the M/R.
672 Ids of all started mapper jobs as list of strings.
674 backup_info
= BackupInformation
.get(backup_info_key
)
677 jobs
= utils
.RunMapForKinds(
687 backup_info
.active_jobs
= jobs
688 backup_info
.put(force_writes
=True)
692 def get_backup_files(backup_info
, selected_kinds
=None):
693 """Returns the backup filenames for selected kinds or all if None/Empty."""
694 if backup_info
.blob_files
:
696 return backup_info
.blob_files
698 kinds_backup_files
= backup_info
.get_kind_backup_files(selected_kinds
)
699 return list(itertools
.chain(*(
700 kind_backup_files
.files
for kind_backup_files
in kinds_backup_files
)))
703 def delete_backup_files(filesystem
, backup_files
):
708 if filesystem
== files
.BLOBSTORE_FILESYSTEM
:
712 for fname
in backup_files
:
713 blob_key
= files
.blobstore
.get_blob_key(fname
)
715 blob_keys
.append(blob_key
)
716 if len(blob_keys
) == MAX_BLOBS_PER_DELETE
:
717 blobstore_api
.delete(blob_keys
)
720 blobstore_api
.delete(blob_keys
)
723 def delete_backup_info(backup_info
, delete_files
=True):
724 """Deletes a backup including its associated files and other metadata."""
725 if backup_info
.blob_files
:
726 delete_backup_files(backup_info
.filesystem
, backup_info
.blob_files
)
727 backup_info
.delete(force_writes
=True)
729 kinds_backup_files
= tuple(backup_info
.get_kind_backup_files())
731 delete_backup_files(backup_info
.filesystem
, itertools
.chain(*(
732 kind_backup_files
.files
for kind_backup_files
in kinds_backup_files
)))
733 db
.delete(kinds_backup_files
+ (backup_info
,), force_writes
=True)
736 class DoBackupDeleteHandler(BaseDoHandler
):
737 """Handler to deal with datastore admin requests to delete backup data."""
739 SUFFIX
= 'backup_delete.do'
745 """Handler for post requests to datastore_admin/backup_delete.do.
747 Deletes are executed and user is redirected to the base-path handler.
749 backup_ids
= self
.request
.get_all('backup_id')
750 token
= self
.request
.get('xsrf_token')
752 if backup_ids
and utils
.ValidateXsrfToken(token
, XSRF_ACTION
):
754 for backup_info
in db
.get(backup_ids
):
756 delete_backup_info(backup_info
)
758 logging
.exception('Failed to delete datastore backup.')
759 params
= [('error', e
.message
)]
761 self
.SendRedirect(params
=params
)
764 class DoBackupAbortHandler(BaseDoHandler
):
765 """Handler to deal with datastore admin requests to abort pending backups."""
767 SUFFIX
= 'backup_abort.do'
773 """Handler for post requests to datastore_admin/backup_abort.do.
775 Abort is executed and user is redirected to the base-path handler.
777 backup_ids
= self
.request
.get_all('backup_id')
778 token
= self
.request
.get('xsrf_token')
780 if backup_ids
and utils
.ValidateXsrfToken(token
, XSRF_ACTION
):
782 for backup_info
in db
.get(backup_ids
):
784 operation
= backup_info
.parent()
785 if operation
.parent_key():
786 job_id
= str(operation
.parent_key())
787 datastore_admin_service
= services_client
.DatastoreAdminClient()
788 datastore_admin_service
.abort_backup(job_id
)
790 utils
.AbortAdminOperation(operation
.key())
791 delete_backup_info(backup_info
)
793 logging
.exception('Failed to abort pending datastore backup.')
794 params
= [('error', e
.message
)]
796 self
.SendRedirect(params
=params
)
799 class DoBackupRestoreHandler(BaseDoHandler
):
800 """Handler to restore backup data.
802 Deals with requests from the admin console.
804 SUFFIX
= 'backup_restore.do'
805 BACKUP_RESTORE_HANDLER
= __name__
+ '.RestoreEntity.map'
806 RESTORE_COMPLETE_HANDLER
= __name__
+ '.RestoreCompleteHandler'
808 INPUT_READER
= input_readers
.__name
__ + '.RecordsReader'
809 _get_html_page
= 'do_restore_from_backup.html'
810 _get_post_html_page
= SUFFIX
812 def _ProcessPostRequest(self
):
813 """Triggers backup restore mapper jobs and returns their ids."""
814 backup_id
= self
.request
.get('backup_id')
816 return [('error', 'Unspecified Backup.')]
818 backup
= db
.get(db
.Key(backup_id
))
820 return [('error', 'Invalid Backup id.')]
823 if not is_readable_gs_handle(backup
.gs_handle
):
824 return [('error', 'Backup not readable')]
826 kinds
= set(self
.request
.get_all('kind'))
827 if not (backup
.blob_files
or kinds
):
828 return [('error', 'No kinds were selected')]
829 backup_kinds
= set(backup
.kinds
)
830 difference
= kinds
.difference(backup_kinds
)
832 return [('error', 'Backup does not have kind[s] %s' %
833 ', '.join(difference
))]
835 if self
.request
.get('run_as_a_service', False):
836 if backup
.filesystem
!= files
.GS_FILESYSTEM
:
838 'Restore as a service is only available for GS backups')]
839 datastore_admin_service
= services_client
.DatastoreAdminClient()
840 description
= 'Remote restore job: %s' % backup
.name
841 remote_job_id
= datastore_admin_service
.restore_from_backup(
842 description
, backup_id
, list(kinds
))
843 return [('remote_job', remote_job_id
)]
845 queue
= self
.request
.get('queue')
846 job_name
= 'datastore_backup_restore_%s' % re
.sub(r
'[^\w]', '_',
850 operation_name
= 'Restoring %s from backup: %s' % (
851 ', '.join(kinds
) if kinds
else 'all', backup
.name
)
852 job_operation
= utils
.StartOperation(operation_name
)
853 mapper_params
= self
._GetBasicMapperParams
()
856 kinds
= list(kinds
) if len(backup_kinds
) != len(kinds
) else []
857 mapper_params
['files'] = get_backup_files(backup
, kinds
)
858 mapper_params
['kind_filter'] = kinds
859 mapper_params
['original_app'] = backup
.original_app
861 'backup_name': backup
.name
,
862 'force_ops_writes': True,
864 shard_count
= min(max(utils
.MAPREDUCE_MIN_SHARDS
,
865 len(mapper_params
['files'])),
866 utils
.MAPREDUCE_MAX_SHARDS
)
867 job
= utils
.StartMap(job_operation
.key(), job_name
,
868 self
.BACKUP_RESTORE_HANDLER
, self
.INPUT_READER
, None,
869 mapper_params
, mapreduce_params
, queue_name
=queue
,
870 shard_count
=shard_count
)
871 return [('job', job
)]
873 logging
.exception('Failed to start a restore from backup job "%s".',
876 job_operation
.status
= utils
.DatastoreAdminOperation
.STATUS_FAILED
877 job_operation
.put(force_writes
=True)
881 class DoBackupImportHandler(BaseDoHandler
):
882 """Handler to deal with datastore admin requests to import backup info."""
884 SUFFIX
= 'import_backup.do'
890 """Handler for post requests to datastore_admin/import_backup.do.
892 Import is executed and user is redirected to the base-path handler.
894 gs_handle
= self
.request
.get('gs_handle')
895 token
= self
.request
.get('xsrf_token')
897 if gs_handle
and utils
.ValidateXsrfToken(token
, XSRF_ACTION
):
899 bucket_name
, path
= parse_gs_handle(gs_handle
)
900 file_content
= get_gs_object(bucket_name
, path
)
901 entities
= parse_backup_info_file(file_content
)
902 original_backup_info
= entities
.next()
903 entity
= datastore
.Entity(BackupInformation
.kind())
904 entity
.update(original_backup_info
)
905 backup_info
= BackupInformation
.from_entity(entity
)
906 if original_backup_info
.key().app() != os
.getenv('APPLICATION_ID'):
907 backup_info
.original_app
= original_backup_info
.key().app()
910 backup_info
.put(force_writes
=True)
911 kind_files_models
= []
912 for entity
in entities
:
913 kind_files
= backup_info
.create_kind_backup_files(
914 entity
.key().name(), entity
['files'])
915 kind_files_models
.append(kind_files
)
916 db
.put(kind_files_models
, force_writes
=True)
917 db
.run_in_transaction(tx
)
918 backup_id
= str(backup_info
.key())
920 logging
.exception('Failed to Import datastore backup information.')
924 self
.SendRedirect(params
=[('error', error
)])
925 elif self
.request
.get('Restore'):
926 ConfirmRestoreFromBackupHandler
.Render(
927 self
, default_backup_id
=backup_id
,
928 default_delete_backup_after_restore
=True)
934 class BackupInformation(db
.Model
):
935 """An entity to keep information on a datastore backup."""
937 name
= db
.StringProperty()
938 kinds
= db
.StringListProperty()
939 namespaces
= db
.StringListProperty()
940 filesystem
= db
.StringProperty(default
=files
.BLOBSTORE_FILESYSTEM
)
941 start_time
= db
.DateTimeProperty(auto_now_add
=True)
942 active_jobs
= db
.StringListProperty()
943 completed_jobs
= db
.StringListProperty()
944 complete_time
= db
.DateTimeProperty(default
=None)
945 blob_files
= db
.StringListProperty()
946 original_app
= db
.StringProperty(default
=None)
947 gs_handle
= db
.TextProperty(default
=None)
951 return utils
.BACKUP_INFORMATION_KIND
954 def name_exists(cls
, backup_name
):
955 query
= BackupInformation
.all(keys_only
=True)
956 query
.filter('name =', backup_name
)
957 return query
.get() is not None
959 def create_kind_backup_files_key(self
, kind
):
960 return db
.Key
.from_path(KindBackupFiles
.kind(), kind
, parent
=self
.key())
962 def create_kind_backup_files(self
, kind
, kind_files
):
963 return KindBackupFiles(key
=self
.create_kind_backup_files_key(kind
),
966 def get_kind_backup_files(self
, kinds
=None):
968 return db
.get([self
.create_kind_backup_files_key(kind
) for kind
in kinds
])
970 return KindBackupFiles
.all().ancestor(self
).run()
974 class KindBackupFiles(db
.Model
):
975 """An entity to keep files information per kind for a backup.
977 A key for this model should created using kind as a name and the associated
978 BackupInformation as a parent.
980 files
= db
.StringListProperty(indexed
=False)
983 def backup_kind(self
):
984 return self
.key().name()
988 return utils
.BACKUP_INFORMATION_FILES_KIND
991 def BackupCompleteHandler(operation
, job_id
, mapreduce_state
):
992 """Updates BackupInformation record for a completed mapper job."""
993 mapreduce_spec
= mapreduce_state
.mapreduce_spec
994 filenames
= mapreduce_spec
.mapper
.output_writer_class().get_filenames(
996 _perform_backup_complete(operation
,
998 mapreduce_spec
.mapper
.params
['entity_kind'],
999 mapreduce_spec
.params
['backup_info_pk'],
1000 mapreduce_spec
.mapper
.params
.get('gs_bucket_name'),
1002 mapreduce_spec
.params
.get('done_callback_queue'))
1006 def _perform_backup_complete(
1007 operation
, job_id
, kind
, backup_info_pk
, gs_bucket_name
, filenames
, queue
):
1008 backup_info
= BackupInformation
.get(backup_info_pk
)
1010 if job_id
in backup_info
.active_jobs
:
1011 backup_info
.active_jobs
.remove(job_id
)
1012 backup_info
.completed_jobs
= list(
1013 set(backup_info
.completed_jobs
+ [job_id
]))
1016 if backup_info
.filesystem
== files
.BLOBSTORE_FILESYSTEM
:
1017 filenames
= drop_empty_files(filenames
)
1018 kind_backup_files
= backup_info
.get_kind_backup_files([kind
])[0]
1019 if kind_backup_files
:
1020 kind_backup_files
.files
= list(set(kind_backup_files
.files
+ filenames
))
1022 kind_backup_files
= backup_info
.create_kind_backup_files(kind
, filenames
)
1023 db
.put((backup_info
, kind_backup_files
), force_writes
=True)
1024 if operation
.status
== utils
.DatastoreAdminOperation
.STATUS_COMPLETED
:
1025 deferred
.defer(finalize_backup_info
, backup_info
.key(),
1027 _url
=config
.DEFERRED_PATH
,
1029 _transactional
=True)
1031 logging
.warn('BackupInfo was not found for %s', backup_info_pk
)
1034 def finalize_backup_info(backup_info_pk
, gs_bucket
):
1035 """Finalize the state of BackupInformation and creates info file for GS."""
1037 def get_backup_info():
1038 return BackupInformation
.get(backup_info_pk
)
1040 backup_info
= db
.run_in_transaction(get_backup_info
)
1042 complete_time
= datetime
.datetime
.now()
1043 backup_info
.complete_time
= complete_time
1045 if backup_info
.filesystem
== files
.GS_FILESYSTEM
:
1051 gs_handle
= BackupInfoWriter(gs_bucket
).write(backup_info
)[0]
1053 def set_backup_info_with_finalize_info():
1054 backup_info
= get_backup_info()
1055 backup_info
.complete_time
= complete_time
1056 backup_info
.gs_handle
= gs_handle
1057 backup_info
.put(force_writes
=True)
1058 db
.run_in_transaction(set_backup_info_with_finalize_info
)
1059 logging
.info('Backup %s completed', backup_info
.name
)
1061 logging
.warn('Backup %s could not be found', backup_info_pk
)
1064 def parse_backup_info_file(content
):
1065 """Returns entities iterator from a backup_info file content."""
1066 reader
= records
.RecordsReader(cStringIO
.StringIO(content
))
1067 version
= reader
.read()
1069 raise IOError('Unsupported version')
1070 return (datastore
.Entity
.FromPb(record
) for record
in reader
)
1073 @db.non_transactional
1074 def drop_empty_files(filenames
):
1075 """Deletes empty files and returns filenames minus the deleted ones."""
1076 non_empty_filenames
= []
1077 empty_file_keys
= []
1078 blobs_info
= blobstore
.BlobInfo
.get(
1079 [files
.blobstore
.get_blob_key(fn
) for fn
in filenames
])
1080 for filename
, blob_info
in itertools
.izip(filenames
, blobs_info
):
1082 if blob_info
.size
> 0:
1083 non_empty_filenames
.append(filename
)
1085 empty_file_keys
.append(blob_info
.key())
1086 blobstore_api
.delete(empty_file_keys
)
1087 return non_empty_filenames
1090 class BackupInfoWriter(object):
1091 """A class for writing Datastore backup metadata files."""
1093 def __init__(self
, gs_bucket
):
1094 """Construct a BackupInfoWriter.
1097 gs_bucket: Required string for the target GS bucket.
1099 self
.__gs
_bucket
= gs_bucket
1101 def write(self
, backup_info
):
1102 """Write the metadata files for the given backup_info.
1104 As a side effect, updates the backup_info in-memory entity object with the
1105 gs_handle to the Backup info filename. This is not saved to the datastore.
1108 backup_info: Required BackupInformation.
1111 A list with Backup info filename followed by Kind info filenames.
1113 fn
= self
._write
_backup
_info
(backup_info
)
1114 return [fn
] + self
._write
_kind
_info
(backup_info
)
1116 def _generate_filename(self
, backup_info
, suffix
):
1117 key_str
= str(backup_info
.key()).replace('/', '_')
1118 return '/gs/%s/%s%s' % (self
.__gs
_bucket
, key_str
, suffix
)
1120 def _write_backup_info(self
, backup_info
):
1121 """Writes a backup_info_file.
1124 backup_info: Required BackupInformation.
1127 Backup info filename.
1129 filename
= self
._generate
_filename
(backup_info
, '.backup_info')
1130 backup_info
.gs_handle
= filename
1131 info_file
= files
.open(files
.gs
.create(filename
), 'a', exclusive_lock
=True)
1133 with records
.RecordsWriter(info_file
) as writer
:
1137 writer
.write(db
.model_to_protobuf(backup_info
).SerializeToString())
1139 for kind_files
in backup_info
.get_kind_backup_files():
1140 writer
.write(db
.model_to_protobuf(kind_files
).SerializeToString())
1142 info_file
.close(finalize
=True)
1145 def _write_kind_info(self
, backup_info
):
1146 """Writes type information schema for each kind in backup_info.
1149 backup_info: Required BackupInformation.
1152 A list with all created filenames.
1154 def get_backup_files_tx():
1155 kind_backup_files_list
= []
1157 for kind_backup_files
in backup_info
.get_kind_backup_files():
1158 kind_backup_files_list
.append(kind_backup_files
)
1159 return kind_backup_files_list
1161 kind_backup_files_list
= db
.run_in_transaction(get_backup_files_tx
)
1163 for kind_backup_files
in kind_backup_files_list
:
1164 backup
= self
._create
_kind
_backup
(backup_info
, kind_backup_files
)
1165 filename
= self
._generate
_filename
(
1166 backup_info
, '.%s.backup_info' % kind_backup_files
.backup_kind
)
1167 self
._write
_kind
_backup
_info
_file
(filename
, backup
)
1168 filenames
.append(filename
)
1171 def _create_kind_backup(self
, backup_info
, kind_backup_files
):
1172 """Creates and populate a backup_pb2.Backup."""
1173 backup
= backup_pb2
.Backup()
1174 backup
.backup_info
.backup_name
= backup_info
.name
1175 backup
.backup_info
.start_timestamp
= datastore_types
.DatetimeToTimestamp(
1176 backup_info
.start_time
)
1177 backup
.backup_info
.end_timestamp
= datastore_types
.DatetimeToTimestamp(
1178 backup_info
.complete_time
)
1179 kind
= kind_backup_files
.backup_kind
1180 kind_info
= backup
.kind_info
.add()
1181 kind_info
.kind
= kind
1182 kind_info
.entity_schema
.kind
= kind
1183 kind_info
.file.extend(kind_backup_files
.files
)
1184 entity_type_info
= EntityTypeInfo(kind
=kind
)
1185 for sharded_aggregation
in SchemaAggregationResult
.load(
1186 backup_info
.key(), kind
):
1187 if sharded_aggregation
.is_partial
:
1188 kind_info
.is_partial
= True
1189 if sharded_aggregation
.entity_type_info
:
1190 entity_type_info
.merge(sharded_aggregation
.entity_type_info
)
1191 entity_type_info
.populate_entity_schema(kind_info
.entity_schema
)
1195 def _write_kind_backup_info_file(cls
, filename
, backup
):
1196 """Writes a kind backup_info.
1199 filename: The name of the file to be created as string.
1200 backup: apphosting.ext.datastore_admin.Backup proto.
1202 f
= files
.open(files
.gs
.create(filename
), 'a', exclusive_lock
=True)
1204 f
.write(backup
.SerializeToString())
1206 f
.close(finalize
=True)
1209 class PropertyTypeInfo(json_util
.JsonMixin
):
1210 """Type information for an entity property."""
1212 def __init__(self
, name
, is_repeated
=False, primitive_types
=None,
1213 embedded_entities
=None):
1214 """Construct a PropertyTypeInfo instance.
1217 name: The name of the property as a string.
1218 is_repeated: A boolean that indicates if the property is repeated.
1219 primitive_types: Optional list of PrimitiveType integer values.
1220 embedded_entities: Optional list of EntityTypeInfo.
1223 self
.__is
_repeated
= is_repeated
1224 self
.__primitive
_types
= set(primitive_types
) if primitive_types
else set()
1225 self
.__embedded
_entities
= {}
1226 for entity
in embedded_entities
or ():
1227 if entity
.kind
in self
.__embedded
_entities
:
1228 self
.__embedded
_entities
[entity
.kind
].merge(entity
)
1230 self
.__embedded
_entities
[entity
.kind
] = entity
1237 def is_repeated(self
):
1238 return self
.__is
_repeated
1241 def primitive_types(self
):
1242 return self
.__primitive
_types
1244 def embedded_entities_kind_iter(self
):
1245 return self
.__embedded
_entities
.iterkeys()
1247 def get_embedded_entity(self
, kind
):
1248 return self
.__embedded
_entities
.get(kind
)
1250 def merge(self
, other
):
1251 """Merge a PropertyTypeInfo with this instance.
1254 other: Required PropertyTypeInfo to merge.
1257 True if anything was changed. False otherwise.
1260 ValueError: if property names do not match.
1261 TypeError: if other is not instance of PropertyTypeInfo.
1263 if not isinstance(other
, PropertyTypeInfo
):
1264 raise TypeError('Expected PropertyTypeInfo, was %r' % (other
,))
1266 if other
.__name
!= self
.__name
:
1267 raise ValueError('Property names mismatch (%s, %s)' %
1268 (self
.__name
, other
.__name
))
1270 if other
.__is
_repeated
and not self
.__is
_repeated
:
1271 self
.__is
_repeated
= True
1273 if not other
.__primitive
_types
.issubset(self
.__primitive
_types
):
1274 self
.__primitive
_types
= self
.__primitive
_types
.union(
1275 other
.__primitive
_types
)
1277 for kind
, other_embedded_entity
in other
.__embedded
_entities
.iteritems():
1278 embedded_entity
= self
.__embedded
_entities
.get(kind
)
1280 changed
= embedded_entity
.merge(other_embedded_entity
) or changed
1282 self
.__embedded
_entities
[kind
] = other_embedded_entity
1286 def populate_entity_schema_field(self
, entity_schema
):
1287 """Add an populate a Field to the given entity_schema.
1290 entity_schema: apphosting.ext.datastore_admin.EntitySchema proto.
1292 if not (self
.__primitive
_types
or self
.__embedded
_entities
):
1295 field
= entity_schema
.field
.add()
1296 field
.name
= self
.__name
1297 field_type
= field
.type.add()
1298 field_type
.is_list
= self
.__is
_repeated
1299 field_type
.primitive_type
.extend(self
.__primitive
_types
)
1300 for embedded_entity
in self
.__embedded
_entities
.itervalues():
1301 embedded_entity_schema
= field_type
.embedded_schema
.add()
1302 embedded_entity
.populate_entity_schema(embedded_entity_schema
)
1306 json
['name'] = self
.__name
1307 json
['is_repeated'] = self
.__is
_repeated
1308 json
['primitive_types'] = list(self
.__primitive
_types
)
1309 json
['embedded_entities'] = [e
.to_json() for e
in
1310 self
.__embedded
_entities
.itervalues()]
1314 def from_json(cls
, json
):
1315 return cls(json
['name'], json
['is_repeated'], json
.get('primitive_types'),
1316 [EntityTypeInfo
.from_json(entity_json
) for entity_json
1317 in json
.get('embedded_entities')])
1320 class EntityTypeInfo(json_util
.JsonMixin
):
1321 """Type information for an entity."""
1323 def __init__(self
, kind
=None, properties
=None):
1324 """Construct an EntityTypeInfo instance.
1327 kind: An optional kind name as string.
1328 properties: An optional list of PropertyTypeInfo.
1331 self
.__properties
= {}
1332 for property_type_info
in properties
or ():
1333 if property_type_info
.name
in self
.__properties
:
1334 self
.__properties
[property_type_info
.name
].merge(property_type_info
)
1336 self
.__properties
[property_type_info
.name
] = property_type_info
1342 def properties_name_iter(self
):
1343 return self
.__properties
.iterkeys()
1345 def get_property(self
, name
):
1346 return self
.__properties
.get(name
)
1348 def merge(self
, other
):
1349 """Merge an EntityTypeInfo with this instance.
1352 other: Required EntityTypeInfo to merge.
1355 True if anything was changed. False otherwise.
1358 ValueError: if kinds do not match.
1359 TypeError: if other is not instance of EntityTypeInfo.
1361 if not isinstance(other
, EntityTypeInfo
):
1362 raise TypeError('Expected EntityTypeInfo, was %r' % (other
,))
1364 if other
.__kind
!= self
.__kind
:
1365 raise ValueError('Kinds mismatch (%s, %s)' % (self
.__kind
, other
.__kind
))
1367 for name
, other_property
in other
.__properties
.iteritems():
1368 self_property
= self
.__properties
.get(name
)
1370 changed
= self_property
.merge(other_property
) or changed
1372 self
.__properties
[name
] = other_property
1376 def populate_entity_schema(self
, entity_schema
):
1377 """Populates the given entity_schema with values from this instance.
1380 entity_schema: apphosting.ext.datastore_admin.EntitySchema proto.
1383 entity_schema
.kind
= self
.__kind
1384 for property_type_info
in self
.__properties
.itervalues():
1385 property_type_info
.populate_entity_schema_field(entity_schema
)
1389 'kind': self
.__kind
,
1390 'properties': [p
.to_json() for p
in self
.__properties
.itervalues()]
1394 def from_json(cls
, json
):
1395 kind
= json
.get('kind')
1396 properties_json
= json
.get('properties')
1398 return cls(kind
, [PropertyTypeInfo
.from_json(p
) for p
in properties_json
])
1403 def create_from_entity_proto(cls
, entity_proto
):
1404 """Creates and populates an EntityTypeInfo from an EntityProto."""
1405 properties
= [cls
.__get
_property
_type
_info
(property_proto
) for
1406 property_proto
in itertools
.chain(
1407 entity_proto
.property_list(),
1408 entity_proto
.raw_property_list())]
1409 kind
= utils
.get_kind_from_entity_pb(entity_proto
)
1410 return cls(kind
, properties
)
1413 def __get_property_type_info(cls
, property_proto
):
1414 """Returns the type mapping for the provided property."""
1415 name
= property_proto
.name()
1416 is_repeated
= bool(property_proto
.multiple())
1417 primitive_type
= None
1419 if property_proto
.has_meaning():
1420 primitive_type
= MEANING_TO_PRIMITIVE_TYPE
.get(property_proto
.meaning())
1421 if primitive_type
is None:
1422 value
= property_proto
.value()
1423 if value
.has_int64value():
1424 primitive_type
= backup_pb2
.EntitySchema
.INTEGER
1425 elif value
.has_booleanvalue():
1426 primitive_type
= backup_pb2
.EntitySchema
.BOOLEAN
1427 elif value
.has_stringvalue():
1428 if property_proto
.meaning() == entity_pb
.Property
.ENTITY_PROTO
:
1429 entity_proto
= entity_pb
.EntityProto()
1431 entity_proto
.ParsePartialFromString(value
.stringvalue())
1436 entity_type
= EntityTypeInfo
.create_from_entity_proto(entity_proto
)
1438 primitive_type
= backup_pb2
.EntitySchema
.STRING
1439 elif value
.has_doublevalue():
1440 primitive_type
= backup_pb2
.EntitySchema
.FLOAT
1441 elif value
.has_pointvalue():
1442 primitive_type
= backup_pb2
.EntitySchema
.GEO_POINT
1443 elif value
.has_uservalue():
1444 primitive_type
= backup_pb2
.EntitySchema
.USER
1445 elif value
.has_referencevalue():
1446 primitive_type
= backup_pb2
.EntitySchema
.REFERENCE
1447 return PropertyTypeInfo(
1449 (primitive_type
,) if primitive_type
is not None else None,
1450 (entity_type
,) if entity_type
else None)
1453 class SchemaAggregationResult(db
.Model
):
1454 """Persistent aggregated type information for a kind.
1456 An instance can be retrieved via the load method or created
1457 using the create method. An instance aggregates all type information
1458 for all seen embedded_entities via the merge method and persisted when needed
1459 using the model put method.
1462 entity_type_info
= json_util
.JsonProperty(
1463 EntityTypeInfo
, default
=EntityTypeInfo(), indexed
=False)
1464 is_partial
= db
.BooleanProperty(default
=False)
1466 def merge(self
, other
):
1467 """Merge a SchemaAggregationResult or an EntityTypeInfo with this instance.
1470 other: Required SchemaAggregationResult or EntityTypeInfo to merge.
1473 True if anything was changed. False otherwise.
1477 if isinstance(other
, SchemaAggregationResult
):
1478 other
= other
.entity_type_info
1479 return self
.entity_type_info
.merge(other
)
1482 def _get_parent_key(cls
, backup_id
, kind_name
):
1483 return datastore_types
.Key
.from_path('Kind', kind_name
, parent
=backup_id
)
1486 def create(cls
, backup_id
, kind_name
, shard_id
):
1487 """Create SchemaAggregationResult instance.
1490 backup_id: Required BackupInformation Key.
1491 kind_name: Required kind name as string.
1492 shard_id: Required shard id as string.
1495 A new SchemaAggregationResult instance.
1497 parent
= cls
._get
_parent
_key
(backup_id
, kind_name
)
1498 return SchemaAggregationResult(
1499 key_name
=shard_id
, parent
=parent
,
1500 entity_type_info
=EntityTypeInfo(kind
=kind_name
))
1503 def load(cls
, backup_id
, kind_name
, shard_id
=None):
1504 """Retrieve SchemaAggregationResult from the Datastore.
1507 backup_id: Required BackupInformation Key.
1508 kind_name: Required kind name as string.
1509 shard_id: Optional shard id as string.
1512 SchemaAggregationResult iterator or an entity if shard_id not None.
1514 parent
= cls
._get
_parent
_key
(backup_id
, kind_name
)
1516 key
= datastore_types
.Key
.from_path(cls
.kind(), shard_id
, parent
=parent
)
1517 return SchemaAggregationResult
.get(key
)
1519 return db
.Query(cls
).ancestor(parent
).run()
1523 return utils
.BACKUP_INFORMATION_KIND_TYPE_INFO
1527 class SchemaAggregationPool(object):
1528 """An MR pool to aggregation type information per kind."""
1530 def __init__(self
, backup_id
, kind
, shard_id
):
1531 """Construct SchemaAggregationPool instance.
1534 backup_id: Required BackupInformation Key.
1535 kind: Required kind name as string.
1536 shard_id: Required shard id as string.
1538 self
.__backup
_id
= backup_id
1540 self
.__shard
_id
= shard_id
1541 self
.__aggregation
= SchemaAggregationResult
.load(backup_id
, kind
, shard_id
)
1542 if not self
.__aggregation
:
1543 self
.__aggregation
= SchemaAggregationResult
.create(backup_id
, kind
,
1545 self
.__needs
_save
= True
1547 self
.__needs
_save
= False
1549 def merge(self
, entity_type_info
):
1550 """Merge EntityTypeInfo into aggregated type information."""
1551 if self
.__aggregation
.merge(entity_type_info
):
1552 self
.__needs
_save
= True
1555 """Save aggregated type information to the datastore if changed."""
1556 if self
.__needs
_save
:
1558 def update_aggregation_tx():
1559 aggregation
= SchemaAggregationResult
.load(
1560 self
.__backup
_id
, self
.__kind
, self
.__shard
_id
)
1562 if aggregation
.merge(self
.__aggregation
):
1563 aggregation
.put(force_writes
=True)
1564 self
.__aggregation
= aggregation
1566 self
.__aggregation
.put(force_writes
=True)
1568 def mark_aggregation_as_partial_tx():
1569 aggregation
= SchemaAggregationResult
.load(
1570 self
.__backup
_id
, self
.__kind
, self
.__shard
_id
)
1571 if aggregation
is None:
1572 aggregation
= SchemaAggregationResult
.create(
1573 self
.__backup
_id
, self
.__kind
, self
.__shard
_id
)
1574 aggregation
.is_partial
= True
1575 aggregation
.put(force_writes
=True)
1576 self
.__aggregation
= aggregation
1579 db
.run_in_transaction(update_aggregation_tx
)
1580 except apiproxy_errors
.RequestTooLargeError
:
1581 db
.run_in_transaction(mark_aggregation_as_partial_tx
)
1582 self
.__needs
_save
= False
1585 class AggregateSchema(op
.Operation
):
1586 """An MR Operation to aggregation type information for a kind.
1588 This operation will register an MR pool, SchemaAggregationPool, if
1589 one is not already registered and will invoke the pool's merge operation
1590 per entity. The pool is responsible for keeping a persistent state of
1591 type aggregation using the sharded db model, SchemaAggregationResult.
1594 def __init__(self
, entity_proto
):
1595 self
.__entity
_info
= EntityTypeInfo
.create_from_entity_proto(entity_proto
)
1597 def __call__(self
, ctx
):
1598 pool
= ctx
.get_pool('schema_aggregation_pool')
1600 backup_id
= datastore_types
.Key(
1601 context
.get().mapreduce_spec
.params
['backup_info_pk'])
1602 pool
= SchemaAggregationPool(
1603 backup_id
, self
.__entity
_info
.kind
, ctx
.shard_id
)
1604 ctx
.register_pool('schema_aggregation_pool', pool
)
1605 pool
.merge(self
.__entity
_info
)
1608 class BackupEntity(object):
1609 """A class which dumps the entity to the writer."""
1611 def map(self
, entity_proto
):
1612 """Backup entity map handler.
1615 entity_proto: An instance of entity_pb.EntityProto.
1618 A serialized entity_pb.EntityProto as a string
1620 yield entity_proto
.SerializeToString()
1621 yield AggregateSchema(entity_proto
)
1624 class RestoreEntity(object):
1625 """A class which restore the entity to datastore."""
1628 self
.initialized
= False
1629 self
.kind_filter
= None
1632 def initialize(self
):
1633 if self
.initialized
:
1635 mapper_params
= context
.get().mapreduce_spec
.mapper
.params
1636 kind_filter
= mapper_params
.get('kind_filter')
1637 self
.kind_filter
= set(kind_filter
) if kind_filter
else None
1638 original_app
= mapper_params
.get('original_app')
1639 if original_app
and os
.getenv('APPLICATION_ID') != original_app
:
1640 self
.app_id
= os
.getenv('APPLICATION_ID')
1641 self
.initialized
= True
1643 def map(self
, record
):
1644 """Restore entity map handler.
1647 record: A serialized entity_pb.EntityProto.
1650 A operation.db.Put for the mapped entity
1653 pb
= entity_pb
.EntityProto(contents
=record
)
1655 utils
.FixKeys(pb
, self
.app_id
)
1656 entity
= datastore
.Entity
.FromPb(pb
)
1657 if not self
.kind_filter
or entity
.kind() in self
.kind_filter
:
1658 yield op
.db
.Put(entity
)
1660 yield utils
.ReserveKey(entity
.key())
1663 def validate_gs_bucket_name(bucket_name
):
1664 """Validate the format of the given bucket_name.
1666 Validation rules are based:
1667 https://developers.google.com/storage/docs/bucketnaming#requirements
1670 bucket_name: The bucket name to validate.
1673 BackupValidationError: If the bucket name is invalid.
1675 if len(bucket_name
) > MAX_BUCKET_LEN
:
1676 raise BackupValidationError(
1677 'Bucket name length should not be longer than %d' % MAX_BUCKET_LEN
)
1678 if len(bucket_name
) < MIN_BUCKET_LEN
:
1679 raise BackupValidationError(
1680 'Bucket name length should be longer than %d' % MIN_BUCKET_LEN
)
1681 if bucket_name
.lower().startswith('goog'):
1682 raise BackupValidationError(
1683 'Bucket name should not start with a "goog" prefix')
1684 bucket_elements
= bucket_name
.split('.')
1685 for bucket_element
in bucket_elements
:
1686 if len(bucket_element
) > MAX_BUCKET_SEGMENT_LEN
:
1687 raise BackupValidationError(
1688 'Segment length of bucket name should not be longer than %d' %
1689 MAX_BUCKET_SEGMENT_LEN
)
1690 if not re
.match(BUCKET_PATTERN
, bucket_name
):
1691 raise BackupValidationError('Invalid bucket name "%s"' % bucket_name
)
1694 def is_accessible_bucket_name(bucket_name
):
1695 """Returns True if the application has access to the specified bucket."""
1696 scope
= config
.GoogleApiScope('devstorage.read_write')
1697 bucket_url
= config
.GsBucketURL(bucket_name
)
1698 auth_token
, _
= app_identity
.get_access_token(scope
)
1699 result
= urlfetch
.fetch(bucket_url
, method
=urlfetch
.HEAD
, headers
={
1700 'Authorization': 'OAuth %s' % auth_token
,
1701 'x-goog-api-version': '2'})
1702 return result
and result
.status_code
== 200
1705 def verify_bucket_writable(bucket_name
):
1706 """Verify the application can write to the specified bucket.
1709 bucket_name: The bucket to verify.
1712 BackupValidationError: If the bucket is not writable.
1714 path
= '/gs/%s' % bucket_name
1716 file_names
= files
.gs
.listdir(path
,
1717 {'prefix': TEST_WRITE_FILENAME_PREFIX
,
1718 'max_keys': MAX_KEYS_LIST_SIZE
})
1719 except (files
.InvalidParameterError
, files
.PermissionDeniedError
):
1720 raise BackupValidationError('Bucket "%s" not accessible' % bucket_name
)
1721 except files
.InvalidFileNameError
:
1722 raise BackupValidationError('Bucket "%s" does not exist' % bucket_name
)
1723 file_name
= '%s/%s.tmp' % (path
, TEST_WRITE_FILENAME_PREFIX
)
1726 if file_name_try
>= MAX_TEST_FILENAME_TRIES
:
1730 if file_name
not in file_names
:
1732 gen
= random
.randint(0, 9999)
1733 file_name
= '%s/%s_%s.tmp' % (path
, TEST_WRITE_FILENAME_PREFIX
, gen
)
1736 test_file
= files
.open(files
.gs
.create(file_name
), 'a', exclusive_lock
=True)
1738 test_file
.write('test')
1740 test_file
.close(finalize
=True)
1741 except files
.PermissionDeniedError
:
1742 raise BackupValidationError('Bucket "%s" is not writable' % bucket_name
)
1744 files
.delete(file_name
)
1745 except (files
.InvalidArgumentError
, files
.InvalidFileNameError
, IOError):
1746 logging
.warn('Failed to delete test file %s', file_name
)
1749 def is_readable_gs_handle(gs_handle
):
1750 """Return True if the application can read the specified gs_handle."""
1752 with files
.open(gs_handle
) as bak_file
:
1754 except files
.PermissionDeniedError
:
1760 def parse_gs_handle(gs_handle
):
1761 """Splits [/gs/]?bucket_name[/folder]*[/file]? to (bucket_name, path | '')."""
1762 if gs_handle
.startswith('/'):
1763 filesystem
= gs_handle
[1:].split('/', 1)[0]
1764 if filesystem
== 'gs':
1765 gs_handle
= gs_handle
[4:]
1767 raise BackupValidationError('Unsupported filesystem: %s' % filesystem
)
1768 tokens
= gs_handle
.split('/', 1)
1769 return (tokens
[0], '') if len(tokens
) == 1 else tuple(tokens
)
1772 def validate_and_canonicalize_gs_bucket(gs_bucket_name
):
1773 bucket_name
, path
= parse_gs_handle(gs_bucket_name
)
1774 gs_bucket_name
= ('%s/%s' % (bucket_name
, path
)).rstrip('/')
1775 validate_gs_bucket_name(bucket_name
)
1776 verify_bucket_writable(bucket_name
)
1777 return gs_bucket_name
1780 def list_bucket_files(bucket_name
, prefix
, max_keys
=1000):
1781 """Returns a listing of of a bucket that matches the given prefix."""
1782 scope
= config
.GoogleApiScope('devstorage.read_only')
1783 bucket_url
= config
.GsBucketURL(bucket_name
)
1784 url
= bucket_url
+ '?'
1785 query
= [('max-keys', max_keys
)]
1787 query
.append(('prefix', prefix
))
1788 url
+= urllib
.urlencode(query
)
1789 auth_token
, _
= app_identity
.get_access_token(scope
)
1790 result
= urlfetch
.fetch(url
, method
=urlfetch
.GET
, headers
={
1791 'Authorization': 'OAuth %s' % auth_token
,
1792 'x-goog-api-version': '2'})
1793 if result
and result
.status_code
== 200:
1794 doc
= xml
.dom
.minidom
.parseString(result
.content
)
1795 return [node
.childNodes
[0].data
for node
in doc
.getElementsByTagName('Key')]
1796 raise BackupValidationError('Request to Google Cloud Storage failed')
1799 def get_gs_object(bucket_name
, path
):
1800 """Returns a listing of of a bucket that matches the given prefix."""
1801 scope
= config
.GoogleApiScope('devstorage.read_only')
1802 bucket_url
= config
.GsBucketURL(bucket_name
)
1803 url
= bucket_url
+ path
1804 auth_token
, _
= app_identity
.get_access_token(scope
)
1805 result
= urlfetch
.fetch(url
, method
=urlfetch
.GET
, headers
={
1806 'Authorization': 'OAuth %s' % auth_token
,
1807 'x-goog-api-version': '2'})
1808 if result
and result
.status_code
== 200:
1809 return result
.content
1810 if result
and result
.status_code
== 403:
1811 raise BackupValidationError(
1812 'Requested path %s is not accessible/access denied' % url
)
1813 if result
and result
.status_code
== 404:
1814 raise BackupValidationError('Requested path %s was not found' % url
)
1815 raise BackupValidationError('Error encountered accessing requested path %s' %
1821 def get_queue_names(app_id
=None, max_rows
=100):
1822 """Returns a list with all non-special queue names for app_id."""
1823 rpc
= apiproxy_stub_map
.UserRPC('taskqueue')
1824 request
= taskqueue_service_pb
.TaskQueueFetchQueuesRequest()
1825 response
= taskqueue_service_pb
.TaskQueueFetchQueuesResponse()
1827 request
.set_app_id(app_id
)
1828 request
.set_max_rows(max_rows
)
1829 queues
= ['default']
1831 rpc
.make_call('FetchQueues', request
, response
)
1834 for queue
in response
.queue_list():
1835 if (queue
.mode() == taskqueue_service_pb
.TaskQueueMode
.PUSH
and
1836 not queue
.queue_name().startswith('__') and
1837 queue
.queue_name() != 'default'):
1838 queues
.append(queue
.queue_name())
1840 logging
.exception('Failed to get queue names.')
1844 def handlers_list(base_path
):
1846 (r
'%s/%s' % (base_path
, BackupLinkHandler
.SUFFIX
),
1848 (r
'%s/%s' % (base_path
, ConfirmBackupHandler
.SUFFIX
),
1849 ConfirmBackupHandler
),
1850 (r
'%s/%s' % (base_path
, DoBackupHandler
.SUFFIX
), DoBackupHandler
),
1851 (r
'%s/%s' % (base_path
, DoBackupRestoreHandler
.SUFFIX
),
1852 DoBackupRestoreHandler
),
1853 (r
'%s/%s' % (base_path
, DoBackupDeleteHandler
.SUFFIX
),
1854 DoBackupDeleteHandler
),
1855 (r
'%s/%s' % (base_path
, DoBackupAbortHandler
.SUFFIX
),
1856 DoBackupAbortHandler
),
1857 (r
'%s/%s' % (base_path
, DoBackupImportHandler
.SUFFIX
),
1858 DoBackupImportHandler
),