3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
22 """Handler for data backup operation.
24 Generic datastore admin console transfers control to ConfirmBackupHandler
25 after selection of entities. The ConfirmBackupHandler confirms with user
26 his choice, enters a backup name and transfers control to
27 DoBackupHandler. DoBackupHandler starts backup mappers and displays confirmation
30 This module also contains actual mapper code for backing data over.
33 from __future__
import with_statement
46 import xml
.dom
.minidom
49 from google
.appengine
.datastore
import entity_pb
50 from google
.appengine
.api
import apiproxy_stub_map
51 from google
.appengine
.api
import app_identity
52 from google
.appengine
.api
import blobstore
as blobstore_api
53 from google
.appengine
.api
import capabilities
54 from google
.appengine
.api
import datastore
55 from google
.appengine
.api
import datastore_types
56 from google
.appengine
.api
import files
57 from google
.appengine
.api
import taskqueue
58 from google
.appengine
.api
import urlfetch
59 from google
.appengine
.api
.files
import records
60 from google
.appengine
.api
.taskqueue
import taskqueue_service_pb
61 from google
.appengine
.datastore
import datastore_query
62 from google
.appengine
.datastore
import datastore_rpc
63 from google
.appengine
.ext
import blobstore
64 from google
.appengine
.ext
import db
65 from google
.appengine
.ext
import deferred
66 from google
.appengine
.ext
import webapp
67 from google
.appengine
.ext
.datastore_admin
import backup_pb2
68 from google
.appengine
.ext
.datastore_admin
import config
69 from google
.appengine
.ext
.datastore_admin
import utils
70 from google
.appengine
.runtime
import apiproxy_errors
75 from google
.appengine
.ext
.mapreduce
import context
76 from google
.appengine
.ext
.mapreduce
import datastore_range_iterators
as db_iters
77 from google
.appengine
.ext
.mapreduce
import input_readers
78 from google
.appengine
.ext
.mapreduce
import json_util
79 from google
.appengine
.ext
.mapreduce
import operation
as op
80 from google
.appengine
.ext
.mapreduce
import output_writers
83 from google
.appengine
._internal
.mapreduce
import context
84 from google
.appengine
._internal
.mapreduce
import datastore_range_iterators
as db_iters
85 from google
.appengine
._internal
.mapreduce
import input_readers
86 from google
.appengine
._internal
.mapreduce
import json_util
87 from google
.appengine
._internal
.mapreduce
import operation
as op
88 from google
.appengine
._internal
.mapreduce
import output_writers
92 from google
.appengine
.ext
.datastore_admin
import services_client
98 XSRF_ACTION
= 'backup'
99 BUCKET_PATTERN
= (r
'^([a-zA-Z0-9]+([\-_]+[a-zA-Z0-9]+)*)'
100 r
'(\.([a-zA-Z0-9]+([\-_]+[a-zA-Z0-9]+)*))*$')
103 MAX_BUCKET_SEGMENT_LEN
= 63
104 NUM_KINDS_DEFERRED_THRESHOLD
= 10
105 MAX_BLOBS_PER_DELETE
= 500
106 TEST_WRITE_FILENAME_PREFIX
= 'datastore_backup_write_test'
107 MAX_KEYS_LIST_SIZE
= 100
108 MAX_TEST_FILENAME_TRIES
= 10
110 MEANING_TO_PRIMITIVE_TYPE
= {
111 entity_pb
.Property
.GD_WHEN
: backup_pb2
.EntitySchema
.DATE_TIME
,
112 entity_pb
.Property
.GD_RATING
: backup_pb2
.EntitySchema
.RATING
,
113 entity_pb
.Property
.ATOM_LINK
: backup_pb2
.EntitySchema
.LINK
,
114 entity_pb
.Property
.ATOM_CATEGORY
: backup_pb2
.EntitySchema
.CATEGORY
,
115 entity_pb
.Property
.GD_PHONENUMBER
: backup_pb2
.EntitySchema
.PHONE_NUMBER
,
116 entity_pb
.Property
.GD_POSTALADDRESS
: backup_pb2
.EntitySchema
.POSTAL_ADDRESS
,
117 entity_pb
.Property
.GD_EMAIL
: backup_pb2
.EntitySchema
.EMAIL
,
118 entity_pb
.Property
.GD_IM
: backup_pb2
.EntitySchema
.IM_HANDLE
,
119 entity_pb
.Property
.BLOBKEY
: backup_pb2
.EntitySchema
.BLOB_KEY
,
120 entity_pb
.Property
.TEXT
: backup_pb2
.EntitySchema
.TEXT
,
121 entity_pb
.Property
.BLOB
: backup_pb2
.EntitySchema
.BLOB
,
122 entity_pb
.Property
.BYTESTRING
: backup_pb2
.EntitySchema
.SHORT_BLOB
126 class ConfirmBackupHandler(webapp
.RequestHandler
):
127 """Handler to deal with requests from the admin console to backup data."""
129 SUFFIX
= 'confirm_backup'
132 def Render(cls
, handler
):
133 """Rendering method that can be called by main.py.
136 handler: the webapp.RequestHandler invoking the method
138 kinds
= handler
.request
.get_all('kind')
139 sizes_known
, size_total
, remainder
= utils
.ParseKindsAndSizes(kinds
)
140 notreadonly_warning
= capabilities
.CapabilitySet(
141 'datastore_v3', capabilities
=['write']).is_enabled()
142 blob_warning
= bool(blobstore
.BlobInfo
.all().count(1))
144 'run_as_a_service': handler
.request
.get('run_as_a_service'),
145 'form_target': DoBackupHandler
.SUFFIX
,
147 'remainder': remainder
,
148 'sizes_known': sizes_known
,
149 'size_total': size_total
,
151 'datastore_admin_home': utils
.GenerateHomeUrl(handler
.request
),
152 'namespaces': get_namespaces(handler
.request
.get('namespace', None)),
153 'xsrf_token': utils
.CreateXsrfToken(XSRF_ACTION
),
154 'notreadonly_warning': notreadonly_warning
,
155 'blob_warning': blob_warning
,
156 'backup_name': 'datastore_backup_%s' % time
.strftime('%Y_%m_%d')
158 utils
.RenderToResponse(handler
, 'confirm_backup.html', template_params
)
161 def get_namespaces(selected_namespace
):
162 namespaces
= [('--All--', '*', selected_namespace
is None)]
163 for ns
in datastore
.Query('__namespace__', keys_only
=True).Run():
164 ns_name
= ns
.name() or ''
165 namespaces
.append((ns_name
or '--Default--',
167 ns_name
== selected_namespace
))
171 class ConfirmDeleteBackupHandler(webapp
.RequestHandler
):
172 """Handler to confirm admin console requests to delete a backup copy."""
174 SUFFIX
= 'confirm_delete_backup'
177 def Render(cls
, handler
):
178 """Rendering method that can be called by main.py.
181 handler: the webapp.RequestHandler invoking the method
183 requested_backup_ids
= handler
.request
.get_all('backup_id')
186 if requested_backup_ids
:
187 for backup
in db
.get(requested_backup_ids
):
189 backups
.append(backup
)
190 gs_warning |
= backup
.filesystem
== files
.GS_FILESYSTEM
192 'form_target': DoBackupDeleteHandler
.SUFFIX
,
193 'datastore_admin_home': utils
.GenerateHomeUrl(handler
.request
),
195 'xsrf_token': utils
.CreateXsrfToken(XSRF_ACTION
),
196 'gs_warning': gs_warning
,
197 'run_as_a_service': handler
.request
.get('run_as_a_service'),
199 utils
.RenderToResponse(handler
, 'confirm_delete_backup.html',
203 class ConfirmAbortBackupHandler(webapp
.RequestHandler
):
204 """Handler to confirm admin console requests to abort a backup copy."""
206 SUFFIX
= 'confirm_abort_backup'
209 def Render(cls
, handler
):
210 """Rendering method that can be called by main.py.
213 handler: the webapp.RequestHandler invoking the method
215 requested_backup_ids
= handler
.request
.get_all('backup_id')
217 if requested_backup_ids
:
218 for backup
in db
.get(requested_backup_ids
):
220 backups
.append(backup
)
222 'form_target': DoBackupAbortHandler
.SUFFIX
,
223 'datastore_admin_home': utils
.GenerateHomeUrl(handler
.request
),
225 'xsrf_token': utils
.CreateXsrfToken(XSRF_ACTION
),
226 'run_as_a_service': handler
.request
.get('run_as_a_service'),
228 utils
.RenderToResponse(handler
, 'confirm_abort_backup.html',
232 class ConfirmRestoreFromBackupHandler(webapp
.RequestHandler
):
233 """Handler to confirm admin console requests to restore from backup."""
235 SUFFIX
= 'confirm_restore_from_backup'
238 def Render(cls
, handler
, default_backup_id
=None,
239 default_delete_backup_after_restore
=False):
240 """Rendering method that can be called by main.py.
243 handler: the webapp.RequestHandler invoking the method
244 default_backup_id: default value for handler.request
245 default_delete_backup_after_restore: default value for handler.request
247 backup_id
= handler
.request
.get('backup_id', default_backup_id
)
248 backup
= db
.get(backup_id
) if backup_id
else None
249 notreadonly_warning
= capabilities
.CapabilitySet(
250 'datastore_v3', capabilities
=['write']).is_enabled()
251 original_app_warning
= backup
.original_app
252 if os
.getenv('APPLICATION_ID') == original_app_warning
:
253 original_app_warning
= None
255 'form_target': DoBackupRestoreHandler
.SUFFIX
,
257 'datastore_admin_home': utils
.GenerateHomeUrl(handler
.request
),
259 'delete_backup_after_restore': handler
.request
.get(
260 'delete_backup_after_restore', default_delete_backup_after_restore
),
261 'xsrf_token': utils
.CreateXsrfToken(XSRF_ACTION
),
262 'notreadonly_warning': notreadonly_warning
,
263 'original_app_warning': original_app_warning
,
264 'run_as_a_service': handler
.request
.get('run_as_a_service'),
266 utils
.RenderToResponse(handler
, 'confirm_restore_from_backup.html',
270 class ConfirmBackupImportHandler(webapp
.RequestHandler
):
271 """Handler to import backup information."""
273 SUFFIX
= 'backup_information'
276 def Render(cls
, handler
):
277 """Rendering method that can be called by main.py.
280 handler: the webapp.RequestHandler invoking the method
282 gs_handle
= handler
.request
.get('gs_handle')
283 error
= None if gs_handle
else 'Google Cloud Storage path is missing'
284 other_backup_info_files
= []
285 selected_backup_info_file
= None
286 backup_info_specified
= False
289 gs_handle
= gs_handle
.rstrip()
290 bucket_name
, prefix
= parse_gs_handle(gs_handle
)
291 validate_gs_bucket_name(bucket_name
)
292 if not is_accessible_bucket_name(bucket_name
):
293 raise BackupValidationError(
294 'Bucket "%s" is not accessible' % bucket_name
)
295 if prefix
.endswith('.backup_info'):
296 prefix
= prefix
[0:prefix
.rfind('/')]
297 backup_info_specified
= True
298 elif prefix
and not prefix
.endswith('/'):
300 for backup_info_file
in list_bucket_files(bucket_name
, prefix
):
301 backup_info_path
= '/gs/%s/%s' % (bucket_name
, backup_info_file
)
302 if backup_info_specified
and backup_info_path
== gs_handle
:
303 selected_backup_info_file
= backup_info_path
304 elif (backup_info_file
.endswith('.backup_info')
305 and backup_info_file
.count('.') == 1):
306 other_backup_info_files
.append(backup_info_path
)
307 except Exception, ex
:
308 error
= 'Failed to read bucket: %s' % ex
.message
309 logging
.exception(ex
.message
)
312 'form_target': DoBackupImportHandler
.SUFFIX
,
313 'datastore_admin_home': utils
.GenerateHomeUrl(handler
.request
),
314 'selected_backup_info_file': selected_backup_info_file
,
315 'other_backup_info_files': other_backup_info_files
,
316 'backup_info_specified': backup_info_specified
,
317 'xsrf_token': utils
.CreateXsrfToken(XSRF_ACTION
),
318 'run_as_a_service': handler
.request
.get('run_as_a_service'),
320 utils
.RenderToResponse(handler
, 'confirm_backup_import.html',
324 class BackupInformationHandler(webapp
.RequestHandler
):
325 """Handler to display backup information."""
327 SUFFIX
= 'backup_information'
330 def Render(cls
, handler
):
331 """Rendering method that can be called by main.py.
334 handler: the webapp.RequestHandler invoking the method
336 backup_ids
= handler
.request
.get_all('backup_id')
338 'backups': db
.get(backup_ids
),
339 'datastore_admin_home': utils
.GenerateHomeUrl(handler
.request
),
340 'run_as_a_service': handler
.request
.get('run_as_a_service'),
342 utils
.RenderToResponse(handler
, 'backup_information.html', template_params
)
345 class BaseDoHandler(webapp
.RequestHandler
):
346 """Base class for all Do*Handlers."""
348 MAPREDUCE_DETAIL
= config
.MAPREDUCE_PATH
+ '/detail?mapreduce_id='
351 """Handler for get requests to datastore_admin backup operations.
353 Status of executed jobs is displayed.
355 jobs
= self
.request
.get_all('job')
356 remote_job
= self
.request
.get('remote_job')
357 tasks
= self
.request
.get_all('task')
358 error
= self
.request
.get('error', '')
359 xsrf_error
= self
.request
.get('xsrf_error', '')
362 'remote_job': remote_job
,
364 'mapreduce_detail': self
.MAPREDUCE_DETAIL
,
366 'xsrf_error': xsrf_error
,
367 'datastore_admin_home': utils
.GenerateHomeUrl(self
.request
),
369 utils
.RenderToResponse(self
, self
._get
_html
_page
, template_params
)
372 def _get_html_page(self
):
373 """Return the name of the HTML page for HTTP/GET requests."""
374 raise NotImplementedError
377 def _get_post_html_page(self
):
378 """Return the name of the HTML page for HTTP/POST requests."""
379 raise NotImplementedError
381 def _ProcessPostRequest(self
):
382 """Process the HTTP/POST request and return the result as parametrs."""
383 raise NotImplementedError
385 def _GetBasicMapperParams(self
):
386 namespace
= self
.request
.get('namespace', None)
389 return {'namespace': namespace
}
391 def SendRedirect(self
, path
=None, params
=()):
392 """Send a redirect response."""
394 run_as_a_service
= self
.request
.get('run_as_a_service')
396 params
= list(params
)
397 params
.append(('run_as_a_service', True))
398 dest
= config
.BASE_PATH
400 dest
= '%s/%s' % (dest
, path
)
402 dest
= '%s?%s' % (dest
, urllib
.urlencode(params
))
406 """Handler for post requests to datastore_admin/backup.do.
408 Redirects to the get handler after processing the request.
410 token
= self
.request
.get('xsrf_token')
412 if not utils
.ValidateXsrfToken(token
, XSRF_ACTION
):
413 parameters
= [('xsrf_error', '1')]
416 parameters
= self
._ProcessPostRequest
()
420 error
= self
._HandleException
(e
)
421 parameters
= [('error', error
)]
423 self
.SendRedirect(self
._get
_post
_html
_page
, parameters
)
425 def _HandleException(self
, e
):
426 """Make exception handling overridable by tests.
429 e: The exception to handle.
432 The exception error string.
434 logging
.exception(e
.message
)
435 return '%s: %s' % (type(e
), e
.message
)
438 class BackupValidationError(utils
.Error
):
439 """Raised upon backup request validation."""
442 def _perform_backup(run_as_a_service
, kinds
, selected_namespace
,
443 filesystem
, gs_bucket_name
, backup
,
444 queue
, mapper_params
, max_jobs
):
445 """Triggers backup mapper jobs.
448 run_as_a_service: True if backup should be done via admin-jobs
449 kinds: a sequence of kind names
450 selected_namespace: The selected namespace or None for all
451 filesystem: files.BLOBSTORE_FILESYSTEM or files.GS_FILESYSTEM
452 or None to default to blobstore
453 gs_bucket_name: the GS file system bucket in which to store the backup
454 when using the GS file system, and otherwise ignored
455 backup: the backup name
456 queue: the task queue for the backup task
457 mapper_params: the mapper parameters
458 max_jobs: if backup needs more jobs than this, defer them
464 BackupValidationError: On validation error.
465 Exception: On other error.
467 BACKUP_COMPLETE_HANDLER
= __name__
+ '.BackupCompleteHandler'
468 BACKUP_HANDLER
= __name__
+ '.BackupEntity.map'
469 INPUT_READER
= __name__
+ '.DatastoreEntityProtoInputReader'
470 OUTPUT_WRITER
= output_writers
.__name
__ + '.FileRecordsOutputWriter'
473 if not gs_bucket_name
:
474 raise BackupValidationError('Bucket name missing.')
475 gs_bucket_name
= validate_and_canonicalize_gs_bucket(gs_bucket_name
)
476 datastore_admin_service
= services_client
.DatastoreAdminClient()
477 description
= 'Remote backup job: %s' % backup
478 remote_job_id
= datastore_admin_service
.create_backup(
479 description
, backup
, gs_bucket_name
, selected_namespace
, kinds
)
480 return [('remote_job', remote_job_id
)]
482 queue
= queue
or os
.environ
.get('HTTP_X_APPENGINE_QUEUENAME', 'default')
487 filesystem
= files
.BLOBSTORE_FILESYSTEM
488 if filesystem
== files
.GS_FILESYSTEM
:
490 if not gs_bucket_name
:
491 raise BackupValidationError('Bucket name missing.')
492 gs_bucket_name
= validate_and_canonicalize_gs_bucket(gs_bucket_name
)
493 elif filesystem
== files
.BLOBSTORE_FILESYSTEM
:
496 raise BackupValidationError('Unknown filesystem "%s".' % filesystem
)
501 job_name
= 'datastore_backup_%s_%%(kind)s' % re
.sub(r
'[^\w]', '_', backup
)
503 job_operation
= utils
.StartOperation('Backup: %s' % backup
)
504 backup_info
= BackupInformation(parent
=job_operation
)
505 backup_info
.filesystem
= filesystem
506 backup_info
.name
= backup
507 backup_info
.kinds
= kinds
508 if selected_namespace
is not None:
509 backup_info
.namespaces
= [selected_namespace
]
510 backup_info
.put(force_writes
=True)
512 'done_callback_handler': BACKUP_COMPLETE_HANDLER
,
513 'backup_info_pk': str(backup_info
.key()),
514 'force_ops_writes': True,
516 mapper_params
= dict(mapper_params
)
517 mapper_params
['filesystem'] = filesystem
518 if filesystem
== files
.GS_FILESYSTEM
:
519 mapper_params
['gs_bucket_name'] = gs_bucket_name
520 if len(kinds
) <= max_jobs
:
521 return [('job', job
) for job
in _run_map_jobs(
522 job_operation
.key(), backup_info
.key(), kinds
, job_name
,
523 BACKUP_HANDLER
, INPUT_READER
, OUTPUT_WRITER
,
524 mapper_params
, mapreduce_params
, queue
)]
526 retry_options
= taskqueue
.TaskRetryOptions(task_retry_limit
=1)
527 deferred_task
= deferred
.defer(_run_map_jobs_deferred
,
528 backup
, job_operation
.key(),
529 backup_info
.key(), kinds
, job_name
,
530 BACKUP_HANDLER
, INPUT_READER
,
531 OUTPUT_WRITER
, mapper_params
,
532 mapreduce_params
, queue
, _queue
=queue
,
533 _url
=config
.DEFERRED_PATH
,
534 _retry_options
=retry_options
)
535 return [('task', deferred_task
.name
)]
537 logging
.exception('Failed to start a datastore backup job[s] for "%s".',
540 delete_backup_info(backup_info
)
542 job_operation
.status
= utils
.DatastoreAdminOperation
.STATUS_FAILED
543 job_operation
.put(force_writes
=True)
547 class BackupLinkHandler(webapp
.RequestHandler
):
548 """Handler to deal with requests to the backup link to backup data."""
550 SUFFIX
= 'backup.create'
553 """Handler for get requests to datastore_admin/backup.create."""
557 """Handler for post requests to datastore_admin/backup.create."""
563 if ('X-AppEngine-TaskName' not in self
.request
.headers
and
564 'X-AppEngine-Cron' not in self
.request
.headers
):
565 logging
.critical('Scheduled backups must be started via task queue or '
567 self
.response
.set_status(403)
570 backup_prefix
= self
.request
.get('name')
571 if not backup_prefix
:
572 if self
.request
.headers
.get('X-AppEngine-Cron'):
573 backup_prefix
= 'cron-'
575 backup_prefix
= 'link-'
576 backup_prefix_with_date
= backup_prefix
+ time
.strftime('%Y_%m_%d')
577 backup_name
= backup_prefix_with_date
578 backup_suffix_counter
= 1
579 while BackupInformation
.name_exists(backup_name
):
580 backup_suffix_counter
+= 1
581 backup_name
= backup_prefix_with_date
+ '-' + str(backup_suffix_counter
)
582 kinds
= self
.request
.get_all('kind')
584 self
.errorResponse('Backup must include at least one kind.')
587 if not utils
.IsKindNameVisible(kind
):
588 self
.errorResponse('Invalid kind %s.' % kind
)
590 namespace
= self
.request
.get('namespace', None)
593 mapper_params
= {'namespace': namespace
}
594 _perform_backup(self
.request
.get('run_as_a_service', False),
597 self
.request
.get('filesystem'),
598 self
.request
.get('gs_bucket_name'),
600 self
.request
.get('queue'),
604 self
.errorResponse(e
.message
)
606 def errorResponse(self
, message
):
607 logging
.error('Could not create backup via link: %s', message
)
608 self
.response
.set_status(400, message
)
611 class DatastoreEntityProtoInputReader(input_readers
.RawDatastoreInputReader
):
612 """An input reader which yields datastore entity proto for a kind."""
614 _KEY_RANGE_ITER_CLS
= db_iters
.KeyRangeEntityProtoIterator
617 class DoBackupHandler(BaseDoHandler
):
618 """Handler to deal with requests from the admin console to backup data."""
621 _get_html_page
= 'do_backup.html'
622 _get_post_html_page
= SUFFIX
624 def _ProcessPostRequest(self
):
625 """Triggers backup mapper jobs and returns their ids."""
627 backup
= self
.request
.get('backup_name').strip()
629 raise BackupValidationError('Unspecified backup name.')
630 if BackupInformation
.name_exists(backup
):
631 raise BackupValidationError('Backup "%s" already exists.' % backup
)
632 mapper_params
= self
._GetBasicMapperParams
()
633 backup_result
= _perform_backup(self
.request
.get('run_as_a_service',
635 self
.request
.get_all('kind'),
636 mapper_params
.get('namespace'),
637 self
.request
.get('filesystem'),
638 self
.request
.get('gs_bucket_name'),
640 self
.request
.get('queue'),
645 logging
.exception(e
.message
)
646 return [('error', e
.message
)]
649 def _run_map_jobs_deferred(backup_name
, job_operation_key
, backup_info_key
,
650 kinds
, job_name
, backup_handler
, input_reader
,
651 output_writer
, mapper_params
, mapreduce_params
,
653 backup_info
= BackupInformation
.get(backup_info_key
)
656 _run_map_jobs(job_operation_key
, backup_info_key
, kinds
, job_name
,
657 backup_handler
, input_reader
, output_writer
, mapper_params
,
658 mapreduce_params
, queue
)
659 except BaseException
:
660 logging
.exception('Failed to start a datastore backup job[s] for "%s".',
662 delete_backup_info(backup_info
)
664 logging
.info('Missing backup info, can not start backup jobs for "%s"',
668 def _run_map_jobs(job_operation_key
, backup_info_key
, kinds
, job_name
,
669 backup_handler
, input_reader
, output_writer
, mapper_params
,
670 mapreduce_params
, queue
):
671 """Creates backup/restore MR jobs for the given operation.
674 job_operation_key: a key of utils.DatastoreAdminOperation entity.
675 backup_info_key: a key of BackupInformation entity.
676 kinds: a list of kinds to run the M/R for.
677 job_name: the M/R job name prefix.
678 backup_handler: M/R job completion handler.
679 input_reader: M/R input reader.
680 output_writer: M/R output writer.
681 mapper_params: custom parameters to pass to mapper.
682 mapreduce_params: dictionary parameters relevant to the whole job.
683 queue: the name of the queue that will be used by the M/R.
686 Ids of all started mapper jobs as list of strings.
688 backup_info
= BackupInformation
.get(backup_info_key
)
691 jobs
= utils
.RunMapForKinds(
701 backup_info
.active_jobs
= jobs
702 backup_info
.put(force_writes
=True)
706 def get_backup_files(backup_info
, selected_kinds
=None):
707 """Returns the backup filenames for selected kinds or all if None/Empty."""
708 if backup_info
.blob_files
:
710 return backup_info
.blob_files
712 kinds_backup_files
= backup_info
.get_kind_backup_files(selected_kinds
)
713 return list(itertools
.chain(*(
714 kind_backup_files
.files
for kind_backup_files
in kinds_backup_files
)))
717 def delete_backup_files(filesystem
, backup_files
):
722 if filesystem
== files
.BLOBSTORE_FILESYSTEM
:
726 for fname
in backup_files
:
727 blob_key
= files
.blobstore
.get_blob_key(fname
)
729 blob_keys
.append(blob_key
)
730 if len(blob_keys
) == MAX_BLOBS_PER_DELETE
:
731 blobstore_api
.delete(blob_keys
)
734 blobstore_api
.delete(blob_keys
)
737 def delete_backup_info(backup_info
, delete_files
=True):
738 """Deletes a backup including its associated files and other metadata."""
739 if backup_info
.blob_files
:
740 delete_backup_files(backup_info
.filesystem
, backup_info
.blob_files
)
741 backup_info
.delete(force_writes
=True)
743 kinds_backup_files
= tuple(backup_info
.get_kind_backup_files())
745 delete_backup_files(backup_info
.filesystem
, itertools
.chain(*(
746 kind_backup_files
.files
for kind_backup_files
in kinds_backup_files
)))
747 db
.delete(kinds_backup_files
+ (backup_info
,), force_writes
=True)
750 class DoBackupDeleteHandler(BaseDoHandler
):
751 """Handler to deal with datastore admin requests to delete backup data."""
753 SUFFIX
= 'backup_delete.do'
759 """Handler for post requests to datastore_admin/backup_delete.do.
761 Deletes are executed and user is redirected to the base-path handler.
763 backup_ids
= self
.request
.get_all('backup_id')
764 token
= self
.request
.get('xsrf_token')
766 if backup_ids
and utils
.ValidateXsrfToken(token
, XSRF_ACTION
):
768 for backup_info
in db
.get(backup_ids
):
770 delete_backup_info(backup_info
)
772 logging
.exception('Failed to delete datastore backup.')
773 params
= [('error', e
.message
)]
775 self
.SendRedirect(params
=params
)
778 class DoBackupAbortHandler(BaseDoHandler
):
779 """Handler to deal with datastore admin requests to abort pending backups."""
781 SUFFIX
= 'backup_abort.do'
787 """Handler for post requests to datastore_admin/backup_abort.do.
789 Abort is executed and user is redirected to the base-path handler.
791 backup_ids
= self
.request
.get_all('backup_id')
792 token
= self
.request
.get('xsrf_token')
794 if backup_ids
and utils
.ValidateXsrfToken(token
, XSRF_ACTION
):
796 for backup_info
in db
.get(backup_ids
):
798 operation
= backup_info
.parent()
799 if operation
.parent_key():
800 job_id
= str(operation
.parent_key())
801 datastore_admin_service
= services_client
.DatastoreAdminClient()
802 datastore_admin_service
.abort_backup(job_id
)
804 utils
.AbortAdminOperation(operation
.key())
805 delete_backup_info(backup_info
)
807 logging
.exception('Failed to abort pending datastore backup.')
808 params
= [('error', e
.message
)]
810 self
.SendRedirect(params
=params
)
813 class DoBackupRestoreHandler(BaseDoHandler
):
814 """Handler to restore backup data.
816 Deals with requests from the admin console.
818 SUFFIX
= 'backup_restore.do'
819 BACKUP_RESTORE_HANDLER
= __name__
+ '.RestoreEntity.map'
820 RESTORE_COMPLETE_HANDLER
= __name__
+ '.RestoreCompleteHandler'
822 INPUT_READER
= input_readers
.__name
__ + '.RecordsReader'
823 _get_html_page
= 'do_restore_from_backup.html'
824 _get_post_html_page
= SUFFIX
826 def _ProcessPostRequest(self
):
827 """Triggers backup restore mapper jobs and returns their ids."""
828 backup_id
= self
.request
.get('backup_id')
830 return [('error', 'Unspecified Backup.')]
832 backup
= db
.get(db
.Key(backup_id
))
834 return [('error', 'Invalid Backup id.')]
837 if not is_readable_gs_handle(backup
.gs_handle
):
838 return [('error', 'Backup not readable')]
840 kinds
= set(self
.request
.get_all('kind'))
841 if not (backup
.blob_files
or kinds
):
842 return [('error', 'No kinds were selected')]
843 backup_kinds
= set(backup
.kinds
)
844 difference
= kinds
.difference(backup_kinds
)
846 return [('error', 'Backup does not have kind[s] %s' %
847 ', '.join(difference
))]
849 if self
.request
.get('run_as_a_service', False):
850 if backup
.filesystem
!= files
.GS_FILESYSTEM
:
852 'Restore as a service is only available for GS backups')]
853 datastore_admin_service
= services_client
.DatastoreAdminClient()
854 description
= 'Remote restore job: %s' % backup
.name
855 remote_job_id
= datastore_admin_service
.restore_from_backup(
856 description
, backup_id
, list(kinds
))
857 return [('remote_job', remote_job_id
)]
859 queue
= self
.request
.get('queue')
860 job_name
= 'datastore_backup_restore_%s' % re
.sub(r
'[^\w]', '_',
864 operation_name
= 'Restoring %s from backup: %s' % (
865 ', '.join(kinds
) if kinds
else 'all', backup
.name
)
866 job_operation
= utils
.StartOperation(operation_name
)
867 mapper_params
= self
._GetBasicMapperParams
()
870 kinds
= list(kinds
) if len(backup_kinds
) != len(kinds
) else []
871 mapper_params
['files'] = get_backup_files(backup
, kinds
)
872 mapper_params
['kind_filter'] = kinds
873 mapper_params
['original_app'] = backup
.original_app
875 'backup_name': backup
.name
,
876 'force_ops_writes': True,
878 shard_count
= min(max(utils
.MAPREDUCE_MIN_SHARDS
,
879 len(mapper_params
['files'])),
880 utils
.MAPREDUCE_MAX_SHARDS
)
881 job
= utils
.StartMap(job_operation
.key(), job_name
,
882 self
.BACKUP_RESTORE_HANDLER
, self
.INPUT_READER
, None,
883 mapper_params
, mapreduce_params
, queue_name
=queue
,
884 shard_count
=shard_count
)
885 return [('job', job
)]
887 logging
.exception('Failed to start a restore from backup job "%s".',
890 job_operation
.status
= utils
.DatastoreAdminOperation
.STATUS_FAILED
891 job_operation
.put(force_writes
=True)
895 class DoBackupImportHandler(BaseDoHandler
):
896 """Handler to deal with datastore admin requests to import backup info."""
898 SUFFIX
= 'import_backup.do'
904 """Handler for post requests to datastore_admin/import_backup.do.
906 Import is executed and user is redirected to the base-path handler.
908 gs_handle
= self
.request
.get('gs_handle')
909 token
= self
.request
.get('xsrf_token')
911 if gs_handle
and utils
.ValidateXsrfToken(token
, XSRF_ACTION
):
913 bucket_name
, path
= parse_gs_handle(gs_handle
)
914 file_content
= get_gs_object(bucket_name
, path
)
915 entities
= parse_backup_info_file(file_content
)
916 original_backup_info
= entities
.next()
917 entity
= datastore
.Entity(BackupInformation
.kind())
918 entity
.update(original_backup_info
)
919 backup_info
= BackupInformation
.from_entity(entity
)
920 if original_backup_info
.key().app() != os
.getenv('APPLICATION_ID'):
921 backup_info
.original_app
= original_backup_info
.key().app()
924 backup_info
.put(force_writes
=True)
925 kind_files_models
= []
926 for entity
in entities
:
927 kind_files
= backup_info
.create_kind_backup_files(
928 entity
.key().name(), entity
['files'])
929 kind_files_models
.append(kind_files
)
930 db
.put(kind_files_models
, force_writes
=True)
931 db
.run_in_transaction(tx
)
932 backup_id
= str(backup_info
.key())
934 logging
.exception('Failed to Import datastore backup information.')
938 self
.SendRedirect(params
=[('error', error
)])
939 elif self
.request
.get('Restore'):
940 ConfirmRestoreFromBackupHandler
.Render(
941 self
, default_backup_id
=backup_id
,
942 default_delete_backup_after_restore
=True)
948 class BackupInformation(db
.Model
):
949 """An entity to keep information on a datastore backup."""
951 name
= db
.StringProperty()
952 kinds
= db
.StringListProperty()
953 namespaces
= db
.StringListProperty()
954 filesystem
= db
.StringProperty(default
=files
.BLOBSTORE_FILESYSTEM
)
955 start_time
= db
.DateTimeProperty(auto_now_add
=True)
956 active_jobs
= db
.StringListProperty()
957 completed_jobs
= db
.StringListProperty()
958 complete_time
= db
.DateTimeProperty(default
=None)
959 blob_files
= db
.StringListProperty()
960 original_app
= db
.StringProperty(default
=None)
961 gs_handle
= db
.TextProperty(default
=None)
962 destination
= db
.StringProperty()
966 return utils
.BACKUP_INFORMATION_KIND
969 def name_exists(cls
, backup_name
):
970 query
= BackupInformation
.all(keys_only
=True)
971 query
.filter('name =', backup_name
)
972 return query
.get() is not None
974 def create_kind_backup_files_key(self
, kind
):
975 return db
.Key
.from_path(KindBackupFiles
.kind(), kind
, parent
=self
.key())
977 def create_kind_backup_files(self
, kind
, kind_files
):
978 return KindBackupFiles(key
=self
.create_kind_backup_files_key(kind
),
981 def get_kind_backup_files(self
, kinds
=None):
983 return db
.get([self
.create_kind_backup_files_key(kind
) for kind
in kinds
])
985 return KindBackupFiles
.all().ancestor(self
).run()
989 class KindBackupFiles(db
.Model
):
990 """An entity to keep files information per kind for a backup.
992 A key for this model should created using kind as a name and the associated
993 BackupInformation as a parent.
995 files
= db
.StringListProperty(indexed
=False)
998 def backup_kind(self
):
999 return self
.key().name()
1003 return utils
.BACKUP_INFORMATION_FILES_KIND
1006 def BackupCompleteHandler(operation
, job_id
, mapreduce_state
):
1007 """Updates BackupInformation record for a completed mapper job."""
1008 mapreduce_spec
= mapreduce_state
.mapreduce_spec
1009 filenames
= mapreduce_spec
.mapper
.output_writer_class().get_filenames(
1011 _perform_backup_complete(operation
,
1013 mapreduce_spec
.mapper
.params
['entity_kind'],
1014 mapreduce_spec
.params
['backup_info_pk'],
1015 mapreduce_spec
.mapper
.params
.get('gs_bucket_name'),
1017 mapreduce_spec
.params
.get('done_callback_queue'))
1021 def _perform_backup_complete(
1022 operation
, job_id
, kind
, backup_info_pk
, gs_bucket_name
, filenames
, queue
):
1023 backup_info
= BackupInformation
.get(backup_info_pk
)
1025 if job_id
in backup_info
.active_jobs
:
1026 backup_info
.active_jobs
.remove(job_id
)
1027 backup_info
.completed_jobs
= list(
1028 set(backup_info
.completed_jobs
+ [job_id
]))
1031 if backup_info
.filesystem
== files
.BLOBSTORE_FILESYSTEM
:
1032 filenames
= drop_empty_files(filenames
)
1033 kind_backup_files
= backup_info
.get_kind_backup_files([kind
])[0]
1034 if kind_backup_files
:
1035 kind_backup_files
.files
= list(set(kind_backup_files
.files
+ filenames
))
1037 kind_backup_files
= backup_info
.create_kind_backup_files(kind
, filenames
)
1038 db
.put((backup_info
, kind_backup_files
), force_writes
=True)
1039 if operation
.status
== utils
.DatastoreAdminOperation
.STATUS_COMPLETED
:
1040 deferred
.defer(finalize_backup_info
, backup_info
.key(),
1042 _url
=config
.DEFERRED_PATH
,
1044 _transactional
=True)
1046 logging
.warn('BackupInfo was not found for %s', backup_info_pk
)
1049 def finalize_backup_info(backup_info_pk
, gs_bucket
):
1050 """Finalize the state of BackupInformation and creates info file for GS."""
1052 def get_backup_info():
1053 return BackupInformation
.get(backup_info_pk
)
1055 backup_info
= db
.run_in_transaction(get_backup_info
)
1057 complete_time
= datetime
.datetime
.now()
1058 backup_info
.complete_time
= complete_time
1060 if backup_info
.filesystem
== files
.GS_FILESYSTEM
:
1066 gs_handle
= BackupInfoWriter(gs_bucket
).write(backup_info
)[0]
1068 def set_backup_info_with_finalize_info():
1069 backup_info
= get_backup_info()
1070 backup_info
.complete_time
= complete_time
1071 backup_info
.gs_handle
= gs_handle
1072 backup_info
.put(force_writes
=True)
1073 db
.run_in_transaction(set_backup_info_with_finalize_info
)
1074 logging
.info('Backup %s completed', backup_info
.name
)
1076 logging
.warn('Backup %s could not be found', backup_info_pk
)
1079 def parse_backup_info_file(content
):
1080 """Returns entities iterator from a backup_info file content."""
1081 reader
= records
.RecordsReader(cStringIO
.StringIO(content
))
1082 version
= reader
.read()
1084 raise IOError('Unsupported version')
1085 return (datastore
.Entity
.FromPb(record
) for record
in reader
)
1088 @db.non_transactional
1089 def drop_empty_files(filenames
):
1090 """Deletes empty files and returns filenames minus the deleted ones."""
1091 non_empty_filenames
= []
1092 empty_file_keys
= []
1093 blobs_info
= blobstore
.BlobInfo
.get(
1094 [files
.blobstore
.get_blob_key(fn
) for fn
in filenames
])
1095 for filename
, blob_info
in itertools
.izip(filenames
, blobs_info
):
1097 if blob_info
.size
> 0:
1098 non_empty_filenames
.append(filename
)
1100 empty_file_keys
.append(blob_info
.key())
1101 blobstore_api
.delete(empty_file_keys
)
1102 return non_empty_filenames
1105 class BackupInfoWriter(object):
1106 """A class for writing Datastore backup metadata files."""
1108 def __init__(self
, gs_bucket
):
1109 """Construct a BackupInfoWriter.
1112 gs_bucket: Required string for the target GS bucket.
1114 self
.__gs
_bucket
= gs_bucket
1116 def write(self
, backup_info
):
1117 """Write the metadata files for the given backup_info.
1119 As a side effect, updates the backup_info in-memory entity object with the
1120 gs_handle to the Backup info filename. This is not saved to the datastore.
1123 backup_info: Required BackupInformation.
1126 A list with Backup info filename followed by Kind info filenames.
1128 fn
= self
._write
_backup
_info
(backup_info
)
1129 return [fn
] + self
._write
_kind
_info
(backup_info
)
1131 def _generate_filename(self
, backup_info
, suffix
):
1132 key_str
= str(backup_info
.key()).replace('/', '_')
1133 return '/gs/%s/%s%s' % (self
.__gs
_bucket
, key_str
, suffix
)
1135 def _write_backup_info(self
, backup_info
):
1136 """Writes a backup_info_file.
1139 backup_info: Required BackupInformation.
1142 Backup info filename.
1144 filename
= self
._generate
_filename
(backup_info
, '.backup_info')
1145 backup_info
.gs_handle
= filename
1146 info_file
= files
.open(files
.gs
.create(filename
), 'a', exclusive_lock
=True)
1148 with records
.RecordsWriter(info_file
) as writer
:
1152 writer
.write(db
.model_to_protobuf(backup_info
).SerializeToString())
1154 for kind_files
in backup_info
.get_kind_backup_files():
1155 writer
.write(db
.model_to_protobuf(kind_files
).SerializeToString())
1157 info_file
.close(finalize
=True)
1160 def _write_kind_info(self
, backup_info
):
1161 """Writes type information schema for each kind in backup_info.
1164 backup_info: Required BackupInformation.
1167 A list with all created filenames.
1169 def get_backup_files_tx():
1170 kind_backup_files_list
= []
1172 for kind_backup_files
in backup_info
.get_kind_backup_files():
1173 kind_backup_files_list
.append(kind_backup_files
)
1174 return kind_backup_files_list
1176 kind_backup_files_list
= db
.run_in_transaction(get_backup_files_tx
)
1178 for kind_backup_files
in kind_backup_files_list
:
1179 backup
= self
._create
_kind
_backup
(backup_info
, kind_backup_files
)
1180 filename
= self
._generate
_filename
(
1181 backup_info
, '.%s.backup_info' % kind_backup_files
.backup_kind
)
1182 self
._write
_kind
_backup
_info
_file
(filename
, backup
)
1183 filenames
.append(filename
)
1186 def _create_kind_backup(self
, backup_info
, kind_backup_files
):
1187 """Creates and populate a backup_pb2.Backup."""
1188 backup
= backup_pb2
.Backup()
1189 backup
.backup_info
.backup_name
= backup_info
.name
1190 backup
.backup_info
.start_timestamp
= datastore_types
.DatetimeToTimestamp(
1191 backup_info
.start_time
)
1192 backup
.backup_info
.end_timestamp
= datastore_types
.DatetimeToTimestamp(
1193 backup_info
.complete_time
)
1194 kind
= kind_backup_files
.backup_kind
1195 kind_info
= backup
.kind_info
.add()
1196 kind_info
.kind
= kind
1197 kind_info
.entity_schema
.kind
= kind
1198 kind_info
.file.extend(kind_backup_files
.files
)
1199 entity_type_info
= EntityTypeInfo(kind
=kind
)
1200 for sharded_aggregation
in SchemaAggregationResult
.load(
1201 backup_info
.key(), kind
):
1202 if sharded_aggregation
.is_partial
:
1203 kind_info
.is_partial
= True
1204 if sharded_aggregation
.entity_type_info
:
1205 entity_type_info
.merge(sharded_aggregation
.entity_type_info
)
1206 entity_type_info
.populate_entity_schema(kind_info
.entity_schema
)
1210 def _write_kind_backup_info_file(cls
, filename
, backup
):
1211 """Writes a kind backup_info.
1214 filename: The name of the file to be created as string.
1215 backup: apphosting.ext.datastore_admin.Backup proto.
1217 f
= files
.open(files
.gs
.create(filename
), 'a', exclusive_lock
=True)
1219 f
.write(backup
.SerializeToString())
1221 f
.close(finalize
=True)
1224 class PropertyTypeInfo(json_util
.JsonMixin
):
1225 """Type information for an entity property."""
1227 def __init__(self
, name
, is_repeated
=False, primitive_types
=None,
1228 embedded_entities
=None):
1229 """Construct a PropertyTypeInfo instance.
1232 name: The name of the property as a string.
1233 is_repeated: A boolean that indicates if the property is repeated.
1234 primitive_types: Optional list of PrimitiveType integer values.
1235 embedded_entities: Optional list of EntityTypeInfo.
1238 self
.__is
_repeated
= is_repeated
1239 self
.__primitive
_types
= set(primitive_types
) if primitive_types
else set()
1240 self
.__embedded
_entities
= {}
1241 for entity
in embedded_entities
or ():
1242 if entity
.kind
in self
.__embedded
_entities
:
1243 self
.__embedded
_entities
[entity
.kind
].merge(entity
)
1245 self
.__embedded
_entities
[entity
.kind
] = entity
1252 def is_repeated(self
):
1253 return self
.__is
_repeated
1256 def primitive_types(self
):
1257 return self
.__primitive
_types
1259 def embedded_entities_kind_iter(self
):
1260 return self
.__embedded
_entities
.iterkeys()
1262 def get_embedded_entity(self
, kind
):
1263 return self
.__embedded
_entities
.get(kind
)
1265 def merge(self
, other
):
1266 """Merge a PropertyTypeInfo with this instance.
1269 other: Required PropertyTypeInfo to merge.
1272 True if anything was changed. False otherwise.
1275 ValueError: if property names do not match.
1276 TypeError: if other is not instance of PropertyTypeInfo.
1278 if not isinstance(other
, PropertyTypeInfo
):
1279 raise TypeError('Expected PropertyTypeInfo, was %r' % (other
,))
1281 if other
.__name
!= self
.__name
:
1282 raise ValueError('Property names mismatch (%s, %s)' %
1283 (self
.__name
, other
.__name
))
1285 if other
.__is
_repeated
and not self
.__is
_repeated
:
1286 self
.__is
_repeated
= True
1288 if not other
.__primitive
_types
.issubset(self
.__primitive
_types
):
1289 self
.__primitive
_types
= self
.__primitive
_types
.union(
1290 other
.__primitive
_types
)
1292 for kind
, other_embedded_entity
in other
.__embedded
_entities
.iteritems():
1293 embedded_entity
= self
.__embedded
_entities
.get(kind
)
1295 changed
= embedded_entity
.merge(other_embedded_entity
) or changed
1297 self
.__embedded
_entities
[kind
] = other_embedded_entity
1301 def populate_entity_schema_field(self
, entity_schema
):
1302 """Add an populate a Field to the given entity_schema.
1305 entity_schema: apphosting.ext.datastore_admin.EntitySchema proto.
1307 if not (self
.__primitive
_types
or self
.__embedded
_entities
):
1310 field
= entity_schema
.field
.add()
1311 field
.name
= self
.__name
1312 field_type
= field
.type.add()
1313 field_type
.is_list
= self
.__is
_repeated
1314 field_type
.primitive_type
.extend(self
.__primitive
_types
)
1315 for embedded_entity
in self
.__embedded
_entities
.itervalues():
1316 embedded_entity_schema
= field_type
.embedded_schema
.add()
1317 embedded_entity
.populate_entity_schema(embedded_entity_schema
)
1321 json
['name'] = self
.__name
1322 json
['is_repeated'] = self
.__is
_repeated
1323 json
['primitive_types'] = list(self
.__primitive
_types
)
1324 json
['embedded_entities'] = [e
.to_json() for e
in
1325 self
.__embedded
_entities
.itervalues()]
1329 def from_json(cls
, json
):
1330 return cls(json
['name'], json
['is_repeated'], json
.get('primitive_types'),
1331 [EntityTypeInfo
.from_json(entity_json
) for entity_json
1332 in json
.get('embedded_entities')])
1335 class EntityTypeInfo(json_util
.JsonMixin
):
1336 """Type information for an entity."""
1338 def __init__(self
, kind
=None, properties
=None):
1339 """Construct an EntityTypeInfo instance.
1342 kind: An optional kind name as string.
1343 properties: An optional list of PropertyTypeInfo.
1346 self
.__properties
= {}
1347 for property_type_info
in properties
or ():
1348 if property_type_info
.name
in self
.__properties
:
1349 self
.__properties
[property_type_info
.name
].merge(property_type_info
)
1351 self
.__properties
[property_type_info
.name
] = property_type_info
1357 def properties_name_iter(self
):
1358 return self
.__properties
.iterkeys()
1360 def get_property(self
, name
):
1361 return self
.__properties
.get(name
)
1363 def merge(self
, other
):
1364 """Merge an EntityTypeInfo with this instance.
1367 other: Required EntityTypeInfo to merge.
1370 True if anything was changed. False otherwise.
1373 ValueError: if kinds do not match.
1374 TypeError: if other is not instance of EntityTypeInfo.
1376 if not isinstance(other
, EntityTypeInfo
):
1377 raise TypeError('Expected EntityTypeInfo, was %r' % (other
,))
1379 if other
.__kind
!= self
.__kind
:
1380 raise ValueError('Kinds mismatch (%s, %s)' % (self
.__kind
, other
.__kind
))
1382 for name
, other_property
in other
.__properties
.iteritems():
1383 self_property
= self
.__properties
.get(name
)
1385 changed
= self_property
.merge(other_property
) or changed
1387 self
.__properties
[name
] = other_property
1391 def populate_entity_schema(self
, entity_schema
):
1392 """Populates the given entity_schema with values from this instance.
1395 entity_schema: apphosting.ext.datastore_admin.EntitySchema proto.
1398 entity_schema
.kind
= self
.__kind
1399 for property_type_info
in self
.__properties
.itervalues():
1400 property_type_info
.populate_entity_schema_field(entity_schema
)
1404 'kind': self
.__kind
,
1405 'properties': [p
.to_json() for p
in self
.__properties
.itervalues()]
1409 def from_json(cls
, json
):
1410 kind
= json
.get('kind')
1411 properties_json
= json
.get('properties')
1413 return cls(kind
, [PropertyTypeInfo
.from_json(p
) for p
in properties_json
])
1418 def create_from_entity_proto(cls
, entity_proto
):
1419 """Creates and populates an EntityTypeInfo from an EntityProto."""
1420 properties
= [cls
.__get
_property
_type
_info
(property_proto
) for
1421 property_proto
in itertools
.chain(
1422 entity_proto
.property_list(),
1423 entity_proto
.raw_property_list())]
1424 kind
= utils
.get_kind_from_entity_pb(entity_proto
)
1425 return cls(kind
, properties
)
1428 def __get_property_type_info(cls
, property_proto
):
1429 """Returns the type mapping for the provided property."""
1430 name
= property_proto
.name()
1431 is_repeated
= bool(property_proto
.multiple())
1432 primitive_type
= None
1434 if property_proto
.has_meaning():
1435 primitive_type
= MEANING_TO_PRIMITIVE_TYPE
.get(property_proto
.meaning())
1436 if primitive_type
is None:
1437 value
= property_proto
.value()
1438 if value
.has_int64value():
1439 primitive_type
= backup_pb2
.EntitySchema
.INTEGER
1440 elif value
.has_booleanvalue():
1441 primitive_type
= backup_pb2
.EntitySchema
.BOOLEAN
1442 elif value
.has_stringvalue():
1443 if property_proto
.meaning() == entity_pb
.Property
.ENTITY_PROTO
:
1444 entity_proto
= entity_pb
.EntityProto()
1446 entity_proto
.ParsePartialFromString(value
.stringvalue())
1451 entity_type
= EntityTypeInfo
.create_from_entity_proto(entity_proto
)
1453 primitive_type
= backup_pb2
.EntitySchema
.STRING
1454 elif value
.has_doublevalue():
1455 primitive_type
= backup_pb2
.EntitySchema
.FLOAT
1456 elif value
.has_pointvalue():
1457 primitive_type
= backup_pb2
.EntitySchema
.GEO_POINT
1458 elif value
.has_uservalue():
1459 primitive_type
= backup_pb2
.EntitySchema
.USER
1460 elif value
.has_referencevalue():
1461 primitive_type
= backup_pb2
.EntitySchema
.REFERENCE
1462 return PropertyTypeInfo(
1464 (primitive_type
,) if primitive_type
is not None else None,
1465 (entity_type
,) if entity_type
else None)
1468 class SchemaAggregationResult(db
.Model
):
1469 """Persistent aggregated type information for a kind.
1471 An instance can be retrieved via the load method or created
1472 using the create method. An instance aggregates all type information
1473 for all seen embedded_entities via the merge method and persisted when needed
1474 using the model put method.
1477 entity_type_info
= json_util
.JsonProperty(
1478 EntityTypeInfo
, default
=EntityTypeInfo(), indexed
=False)
1479 is_partial
= db
.BooleanProperty(default
=False)
1481 def merge(self
, other
):
1482 """Merge a SchemaAggregationResult or an EntityTypeInfo with this instance.
1485 other: Required SchemaAggregationResult or EntityTypeInfo to merge.
1488 True if anything was changed. False otherwise.
1492 if isinstance(other
, SchemaAggregationResult
):
1493 other
= other
.entity_type_info
1494 return self
.entity_type_info
.merge(other
)
1497 def _get_parent_key(cls
, backup_id
, kind_name
):
1498 return datastore_types
.Key
.from_path('Kind', kind_name
, parent
=backup_id
)
1501 def create(cls
, backup_id
, kind_name
, shard_id
):
1502 """Create SchemaAggregationResult instance.
1505 backup_id: Required BackupInformation Key.
1506 kind_name: Required kind name as string.
1507 shard_id: Required shard id as string.
1510 A new SchemaAggregationResult instance.
1512 parent
= cls
._get
_parent
_key
(backup_id
, kind_name
)
1513 return SchemaAggregationResult(
1514 key_name
=shard_id
, parent
=parent
,
1515 entity_type_info
=EntityTypeInfo(kind
=kind_name
))
1518 def load(cls
, backup_id
, kind_name
, shard_id
=None):
1519 """Retrieve SchemaAggregationResult from the Datastore.
1522 backup_id: Required BackupInformation Key.
1523 kind_name: Required kind name as string.
1524 shard_id: Optional shard id as string.
1527 SchemaAggregationResult iterator or an entity if shard_id not None.
1529 parent
= cls
._get
_parent
_key
(backup_id
, kind_name
)
1531 key
= datastore_types
.Key
.from_path(cls
.kind(), shard_id
, parent
=parent
)
1532 return SchemaAggregationResult
.get(key
)
1534 return db
.Query(cls
).ancestor(parent
).run()
1538 return utils
.BACKUP_INFORMATION_KIND_TYPE_INFO
1542 class SchemaAggregationPool(object):
1543 """An MR pool to aggregation type information per kind."""
1545 def __init__(self
, backup_id
, kind
, shard_id
):
1546 """Construct SchemaAggregationPool instance.
1549 backup_id: Required BackupInformation Key.
1550 kind: Required kind name as string.
1551 shard_id: Required shard id as string.
1553 self
.__backup
_id
= backup_id
1555 self
.__shard
_id
= shard_id
1556 self
.__aggregation
= SchemaAggregationResult
.load(backup_id
, kind
, shard_id
)
1557 if not self
.__aggregation
:
1558 self
.__aggregation
= SchemaAggregationResult
.create(backup_id
, kind
,
1560 self
.__needs
_save
= True
1562 self
.__needs
_save
= False
1564 def merge(self
, entity_type_info
):
1565 """Merge EntityTypeInfo into aggregated type information."""
1566 if self
.__aggregation
.merge(entity_type_info
):
1567 self
.__needs
_save
= True
1570 """Save aggregated type information to the datastore if changed."""
1571 if self
.__needs
_save
:
1573 def update_aggregation_tx():
1574 aggregation
= SchemaAggregationResult
.load(
1575 self
.__backup
_id
, self
.__kind
, self
.__shard
_id
)
1577 if aggregation
.merge(self
.__aggregation
):
1578 aggregation
.put(force_writes
=True)
1579 self
.__aggregation
= aggregation
1581 self
.__aggregation
.put(force_writes
=True)
1583 def mark_aggregation_as_partial_tx():
1584 aggregation
= SchemaAggregationResult
.load(
1585 self
.__backup
_id
, self
.__kind
, self
.__shard
_id
)
1586 if aggregation
is None:
1587 aggregation
= SchemaAggregationResult
.create(
1588 self
.__backup
_id
, self
.__kind
, self
.__shard
_id
)
1589 aggregation
.is_partial
= True
1590 aggregation
.put(force_writes
=True)
1591 self
.__aggregation
= aggregation
1594 db
.run_in_transaction(update_aggregation_tx
)
1595 except apiproxy_errors
.RequestTooLargeError
:
1596 db
.run_in_transaction(mark_aggregation_as_partial_tx
)
1597 self
.__needs
_save
= False
1600 class AggregateSchema(op
.Operation
):
1601 """An MR Operation to aggregation type information for a kind.
1603 This operation will register an MR pool, SchemaAggregationPool, if
1604 one is not already registered and will invoke the pool's merge operation
1605 per entity. The pool is responsible for keeping a persistent state of
1606 type aggregation using the sharded db model, SchemaAggregationResult.
1609 def __init__(self
, entity_proto
):
1610 self
.__entity
_info
= EntityTypeInfo
.create_from_entity_proto(entity_proto
)
1612 def __call__(self
, ctx
):
1613 pool
= ctx
.get_pool('schema_aggregation_pool')
1615 backup_id
= datastore_types
.Key(
1616 context
.get().mapreduce_spec
.params
['backup_info_pk'])
1617 pool
= SchemaAggregationPool(
1618 backup_id
, self
.__entity
_info
.kind
, ctx
.shard_id
)
1619 ctx
.register_pool('schema_aggregation_pool', pool
)
1620 pool
.merge(self
.__entity
_info
)
1623 class BackupEntity(object):
1624 """A class which dumps the entity to the writer."""
1626 def map(self
, entity_proto
):
1627 """Backup entity map handler.
1630 entity_proto: An instance of entity_pb.EntityProto.
1633 A serialized entity_pb.EntityProto as a string
1635 yield entity_proto
.SerializeToString()
1636 yield AggregateSchema(entity_proto
)
1639 class RestoreEntity(object):
1640 """A class which restore the entity to datastore."""
1643 self
.initialized
= False
1644 self
.kind_filter
= None
1648 def initialize(self
):
1649 """Initialize a restore mapper instance."""
1650 if self
.initialized
:
1652 mapper_params
= get_mapper_params_from_context()
1653 kind_filter
= mapper_params
.get('kind_filter')
1654 self
.kind_filter
= set(kind_filter
) if kind_filter
else None
1655 original_app
= mapper_params
.get('original_app')
1656 target_app
= os
.getenv('APPLICATION_ID')
1657 if original_app
and target_app
!= original_app
:
1658 self
.app_id
= target_app
1659 self
.initialized
= True
1661 def map(self
, record
):
1662 """Restore entity map handler.
1665 record: A serialized entity_pb.EntityProto.
1668 A operation.db.Put for the mapped entity
1671 pb
= entity_pb
.EntityProto(contents
=record
)
1673 utils
.FixKeys(pb
, self
.app_id
)
1677 if not self
.kind_filter
or (
1678 utils
.get_kind_from_entity_pb(pb
) in self
.kind_filter
):
1682 yield utils
.ReserveKey(datastore_types
.Key
._FromPb
(pb
.key()))
1685 def get_mapper_params_from_context():
1686 """Get mapper params from MR context. Split out for ease of testing."""
1687 return context
.get().mapreduce_spec
.mapper
.params
1690 def validate_gs_bucket_name(bucket_name
):
1691 """Validate the format of the given bucket_name.
1693 Validation rules are based:
1694 https://developers.google.com/storage/docs/bucketnaming#requirements
1697 bucket_name: The bucket name to validate.
1700 BackupValidationError: If the bucket name is invalid.
1702 if len(bucket_name
) > MAX_BUCKET_LEN
:
1703 raise BackupValidationError(
1704 'Bucket name length should not be longer than %d' % MAX_BUCKET_LEN
)
1705 if len(bucket_name
) < MIN_BUCKET_LEN
:
1706 raise BackupValidationError(
1707 'Bucket name length should be longer than %d' % MIN_BUCKET_LEN
)
1708 if bucket_name
.lower().startswith('goog'):
1709 raise BackupValidationError(
1710 'Bucket name should not start with a "goog" prefix')
1711 bucket_elements
= bucket_name
.split('.')
1712 for bucket_element
in bucket_elements
:
1713 if len(bucket_element
) > MAX_BUCKET_SEGMENT_LEN
:
1714 raise BackupValidationError(
1715 'Segment length of bucket name should not be longer than %d' %
1716 MAX_BUCKET_SEGMENT_LEN
)
1717 if not re
.match(BUCKET_PATTERN
, bucket_name
):
1718 raise BackupValidationError('Invalid bucket name "%s"' % bucket_name
)
1721 def is_accessible_bucket_name(bucket_name
):
1722 """Returns True if the application has access to the specified bucket."""
1723 scope
= config
.GoogleApiScope('devstorage.read_write')
1724 bucket_url
= config
.GsBucketURL(bucket_name
)
1725 auth_token
, _
= app_identity
.get_access_token(scope
)
1726 result
= urlfetch
.fetch(bucket_url
, method
=urlfetch
.HEAD
, headers
={
1727 'Authorization': 'OAuth %s' % auth_token
,
1728 'x-goog-api-version': '2'})
1729 return result
and result
.status_code
== 200
1732 def verify_bucket_writable(bucket_name
):
1733 """Verify the application can write to the specified bucket.
1736 bucket_name: The bucket to verify.
1739 BackupValidationError: If the bucket is not writable.
1741 path
= '/gs/%s' % bucket_name
1743 file_names
= files
.gs
.listdir(path
,
1744 {'prefix': TEST_WRITE_FILENAME_PREFIX
,
1745 'max_keys': MAX_KEYS_LIST_SIZE
})
1746 except (files
.InvalidParameterError
, files
.PermissionDeniedError
):
1747 raise BackupValidationError('Bucket "%s" not accessible' % bucket_name
)
1748 except files
.InvalidFileNameError
:
1749 raise BackupValidationError('Bucket "%s" does not exist' % bucket_name
)
1750 file_name
= '%s/%s.tmp' % (path
, TEST_WRITE_FILENAME_PREFIX
)
1753 if file_name_try
>= MAX_TEST_FILENAME_TRIES
:
1757 if file_name
not in file_names
:
1759 gen
= random
.randint(0, 9999)
1760 file_name
= '%s/%s_%s.tmp' % (path
, TEST_WRITE_FILENAME_PREFIX
, gen
)
1763 test_file
= files
.open(files
.gs
.create(file_name
), 'a', exclusive_lock
=True)
1765 test_file
.write('test')
1767 test_file
.close(finalize
=True)
1768 except files
.PermissionDeniedError
:
1769 raise BackupValidationError('Bucket "%s" is not writable' % bucket_name
)
1771 files
.delete(file_name
)
1772 except (files
.InvalidArgumentError
, files
.InvalidFileNameError
, IOError):
1773 logging
.warn('Failed to delete test file %s', file_name
)
1776 def is_readable_gs_handle(gs_handle
):
1777 """Return True if the application can read the specified gs_handle."""
1779 with files
.open(gs_handle
) as bak_file
:
1781 except files
.PermissionDeniedError
:
1787 def parse_gs_handle(gs_handle
):
1788 """Splits [/gs/]?bucket_name[/folder]*[/file]? to (bucket_name, path | '')."""
1789 if gs_handle
.startswith('/'):
1790 filesystem
= gs_handle
[1:].split('/', 1)[0]
1791 if filesystem
== 'gs':
1792 gs_handle
= gs_handle
[4:]
1794 raise BackupValidationError('Unsupported filesystem: %s' % filesystem
)
1795 tokens
= gs_handle
.split('/', 1)
1796 return (tokens
[0], '') if len(tokens
) == 1 else tuple(tokens
)
1799 def validate_and_canonicalize_gs_bucket(gs_bucket_name
):
1800 bucket_name
, path
= parse_gs_handle(gs_bucket_name
)
1801 gs_bucket_name
= ('%s/%s' % (bucket_name
, path
)).rstrip('/')
1802 validate_gs_bucket_name(bucket_name
)
1803 verify_bucket_writable(bucket_name
)
1804 return gs_bucket_name
1807 def list_bucket_files(bucket_name
, prefix
, max_keys
=1000):
1808 """Returns a listing of of a bucket that matches the given prefix."""
1809 scope
= config
.GoogleApiScope('devstorage.read_only')
1810 bucket_url
= config
.GsBucketURL(bucket_name
)
1811 url
= bucket_url
+ '?'
1812 query
= [('max-keys', max_keys
)]
1814 query
.append(('prefix', prefix
))
1815 url
+= urllib
.urlencode(query
)
1816 auth_token
, _
= app_identity
.get_access_token(scope
)
1817 result
= urlfetch
.fetch(url
, method
=urlfetch
.GET
, headers
={
1818 'Authorization': 'OAuth %s' % auth_token
,
1819 'x-goog-api-version': '2'})
1820 if result
and result
.status_code
== 200:
1821 doc
= xml
.dom
.minidom
.parseString(result
.content
)
1822 return [node
.childNodes
[0].data
for node
in doc
.getElementsByTagName('Key')]
1823 raise BackupValidationError('Request to Google Cloud Storage failed')
1826 def get_gs_object(bucket_name
, path
):
1827 """Returns a listing of of a bucket that matches the given prefix."""
1828 scope
= config
.GoogleApiScope('devstorage.read_only')
1829 bucket_url
= config
.GsBucketURL(bucket_name
)
1830 url
= bucket_url
+ path
1831 auth_token
, _
= app_identity
.get_access_token(scope
)
1832 result
= urlfetch
.fetch(url
, method
=urlfetch
.GET
, headers
={
1833 'Authorization': 'OAuth %s' % auth_token
,
1834 'x-goog-api-version': '2'})
1835 if result
and result
.status_code
== 200:
1836 return result
.content
1837 if result
and result
.status_code
== 403:
1838 raise BackupValidationError(
1839 'Requested path %s is not accessible/access denied' % url
)
1840 if result
and result
.status_code
== 404:
1841 raise BackupValidationError('Requested path %s was not found' % url
)
1842 raise BackupValidationError('Error encountered accessing requested path %s' %
1848 def get_queue_names(app_id
=None, max_rows
=100):
1849 """Returns a list with all non-special queue names for app_id."""
1850 rpc
= apiproxy_stub_map
.UserRPC('taskqueue')
1851 request
= taskqueue_service_pb
.TaskQueueFetchQueuesRequest()
1852 response
= taskqueue_service_pb
.TaskQueueFetchQueuesResponse()
1854 request
.set_app_id(app_id
)
1855 request
.set_max_rows(max_rows
)
1856 queues
= ['default']
1858 rpc
.make_call('FetchQueues', request
, response
)
1861 for queue
in response
.queue_list():
1862 if (queue
.mode() == taskqueue_service_pb
.TaskQueueMode
.PUSH
and
1863 not queue
.queue_name().startswith('__') and
1864 queue
.queue_name() != 'default'):
1865 queues
.append(queue
.queue_name())
1867 logging
.exception('Failed to get queue names.')
1871 def handlers_list(base_path
):
1873 (r
'%s/%s' % (base_path
, BackupLinkHandler
.SUFFIX
),
1875 (r
'%s/%s' % (base_path
, ConfirmBackupHandler
.SUFFIX
),
1876 ConfirmBackupHandler
),
1877 (r
'%s/%s' % (base_path
, DoBackupHandler
.SUFFIX
), DoBackupHandler
),
1878 (r
'%s/%s' % (base_path
, DoBackupRestoreHandler
.SUFFIX
),
1879 DoBackupRestoreHandler
),
1880 (r
'%s/%s' % (base_path
, DoBackupDeleteHandler
.SUFFIX
),
1881 DoBackupDeleteHandler
),
1882 (r
'%s/%s' % (base_path
, DoBackupAbortHandler
.SUFFIX
),
1883 DoBackupAbortHandler
),
1884 (r
'%s/%s' % (base_path
, DoBackupImportHandler
.SUFFIX
),
1885 DoBackupImportHandler
),