App Engine Python SDK version 1.9.9
[gae.git] / python / google / appengine / tools / bulkloader.py
blob07ae88993a0afa61511d39660ef8becdff8241bf
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 """Imports data over HTTP.
19 Usage:
20 %(arg0)s [flags]
22 --debug Show debugging information. (Optional)
23 --application=<string> Application ID of endpoint (Optional for
24 *.appspot.com)
25 --auth_domain=<domain> The auth domain to use for logging in and for
26 UserProperties. (Default: gmail.com)
27 --bandwidth_limit=<int> The maximum number of bytes per second for the
28 aggregate transfer of data to/from the server.
29 Bursts may exceed this, but overall transfer rate is
30 restricted to this rate. (Default: 250000)
31 --batch_size=<int> Number of Entity objects to include in each request
32 to/from the URL endpoint. The more data per
33 row/Entity, the smaller the batch size should be.
34 (Default: downloads 100, uploads 10)
35 --config_file=<path> File containing Model and Loader definitions or
36 bulkloader.yaml transforms. (Required unless --dump,
37 --restore, or --create_config are used.)
38 --create_config Write a bulkloader.yaml configuration file to
39 --filename based on the server side datastore state.
40 --db_filename=<path> Specific progress database to write to, or to
41 resume from. If not supplied, then a new database
42 will be started, named:
43 bulkloader-progress-TIMESTAMP.
44 The special filename "skip" may be used to simply
45 skip reading/writing any progress information.
46 --download Export entities to a file.
47 --dry_run Do not execute any remote_api calls.
48 --dump Use zero-configuration dump format.
49 --email=<string> The username to use. Will prompt if omitted.
50 --exporter_opts=<string>
51 A string to pass to the Exporter.initialize method.
52 --filename=<path> Path to the file to import/export. (Required when
53 importing or exporting, not mapping.)
54 --has_header Skip the first row of the input.
55 --http_limit=<int> The maximum numer of HTTP requests per second to
56 send to the server. (Default: 8)
57 --kind=<string> Name of the Entity object kind to put in the
58 datastore. (Required)
59 --loader_opts=<string> A string to pass to the Loader.initialize method.
60 --log_file=<path> File to write bulkloader logs. If not supplied
61 then a new log file will be created, named:
62 bulkloader-log-TIMESTAMP.
63 --map Map an action across datastore entities.
64 --mapper_opts=<string> A string to pass to the Mapper.Initialize method.
65 --num_threads=<int> Number of threads to use for uploading/downloading
66 entities (Default: 10)
67 --passin Read the login password from stdin.
68 --restore Restore from zero-configuration dump format.
69 --result_db_filename=<path>
70 Result database to write to for downloads.
71 --rps_limit=<int> The maximum number of records per second to
72 transfer to/from the server. (Default: 20)
73 --url=<string> URL endpoint to post to for importing/exporting
74 data. (Required)
75 --namespace=<string> Use specified namespace instead of the default one
76 for all datastore operations.
78 The exit status will be 0 on success, non-zero on import failure.
80 Works with the remote_api mix-in library for google.appengine.ext.remote_api.
81 Please look there for documentation about how to setup the server side.
83 Example:
85 %(arg0)s --url=http://app.appspot.com/remote_api --kind=Model \
86 --filename=data.csv --config_file=loader_config.py
88 """
97 import csv
98 import errno
99 import getopt
100 import getpass
101 import imp
102 import logging
103 import os
104 import Queue
105 import re
106 import shutil
107 import signal
108 import StringIO
109 import sys
110 import threading
111 import time
112 import traceback
113 import urllib2
114 import urlparse
116 from google.appengine.datastore import entity_pb
118 from google.appengine.api import apiproxy_stub_map
119 from google.appengine.api import datastore
120 from google.appengine.api import datastore_errors
121 from google.appengine.api.namespace_manager import namespace_manager
122 from google.appengine.datastore import datastore_pb
123 from google.appengine.ext import db
124 from google.appengine.ext import key_range as key_range_module
125 from google.appengine.ext.bulkload import bulkloader_config
126 from google.appengine.ext.db import polymodel
127 from google.appengine.ext.db import stats
128 from google.appengine.ext.remote_api import remote_api_stub
129 from google.appengine.ext.remote_api import throttle as remote_api_throttle
130 from google.appengine.runtime import apiproxy_errors
131 from google.appengine.tools import adaptive_thread_pool
132 from google.appengine.tools import appengine_rpc
133 from google.appengine.tools.requeue import ReQueue
139 try:
140 import sqlite3
141 except ImportError:
142 pass
144 logger = logging.getLogger('google.appengine.tools.bulkloader')
146 KeyRange = key_range_module.KeyRange
149 DEFAULT_THREAD_COUNT = 10
152 DEFAULT_BATCH_SIZE = 10
155 DEFAULT_DOWNLOAD_BATCH_SIZE = 100
158 DEFAULT_QUEUE_SIZE = DEFAULT_THREAD_COUNT * 10
161 _THREAD_SHOULD_EXIT = '_THREAD_SHOULD_EXIT'
164 STATE_READ = 0
165 STATE_SENDING = 1
166 STATE_SENT = 2
167 STATE_NOT_SENT = 3
171 STATE_GETTING = 1
172 STATE_GOT = 2
173 STATE_ERROR = 3
177 DATA_CONSUMED_TO_HERE = 'DATA_CONSUMED_TO_HERE'
182 INITIAL_BACKOFF = 1.0
185 BACKOFF_FACTOR = 2.0
192 DEFAULT_BANDWIDTH_LIMIT = 250000
195 DEFAULT_RPS_LIMIT = 20
198 DEFAULT_REQUEST_LIMIT = 8
207 MAXIMUM_INCREASE_DURATION = 5.0
208 MAXIMUM_HOLD_DURATION = 12.0
210 AUTH_FAILED_MESSAGE = ('Authentication Failed: Incorrect credentials or '
211 'unsupported authentication type (e.g. OpenId).')
214 def ImportStateMessage(state):
215 """Converts a numeric state identifier to a status message."""
216 return ({
217 STATE_READ: 'Batch read from file.',
218 STATE_SENDING: 'Sending batch to server.',
219 STATE_SENT: 'Batch successfully sent.',
220 STATE_NOT_SENT: 'Error while sending batch.'
221 }[state])
224 def ExportStateMessage(state):
225 """Converts a numeric state identifier to a status message."""
226 return ({
227 STATE_READ: 'Batch read from file.',
228 STATE_GETTING: 'Fetching batch from server',
229 STATE_GOT: 'Batch successfully fetched.',
230 STATE_ERROR: 'Error while fetching batch'
231 }[state])
234 def MapStateMessage(state):
235 """Converts a numeric state identifier to a status message."""
236 return ({
237 STATE_READ: 'Batch read from file.',
238 STATE_GETTING: 'Querying for batch from server',
239 STATE_GOT: 'Batch successfully fetched.',
240 STATE_ERROR: 'Error while fetching or mapping.'
241 }[state])
244 def ExportStateName(state):
245 """Converts a numeric state identifier to a string."""
246 return ({
247 STATE_READ: 'READ',
248 STATE_GETTING: 'GETTING',
249 STATE_GOT: 'GOT',
250 STATE_ERROR: 'NOT_GOT'
251 }[state])
254 def ImportStateName(state):
255 """Converts a numeric state identifier to a string."""
256 return ({
257 STATE_READ: 'READ',
258 STATE_GETTING: 'SENDING',
259 STATE_GOT: 'SENT',
260 STATE_NOT_SENT: 'NOT_SENT'
261 }[state])
264 class Error(Exception):
265 """Base-class for exceptions in this module."""
268 class MissingPropertyError(Error):
269 """An expected field is missing from an entity, and no default was given."""
272 class FatalServerError(Error):
273 """An unrecoverable error occurred while posting data to the server."""
276 class ResumeError(Error):
277 """Error while trying to resume a partial upload."""
280 class ConfigurationError(Error):
281 """Error in configuration options."""
284 class AuthenticationError(Error):
285 """Error while trying to authenticate with the server."""
288 class FileNotFoundError(Error):
289 """A filename passed in by the user refers to a non-existent input file."""
292 class FileNotReadableError(Error):
293 """A filename passed in by the user refers to a non-readable input file."""
296 class FileExistsError(Error):
297 """A filename passed in by the user refers to an existing output file."""
300 class FileNotWritableError(Error):
301 """A filename passed in by the user refers to a non-writable output file."""
304 class BadStateError(Error):
305 """A work item in an unexpected state was encountered."""
308 class KeyRangeError(Error):
309 """An error during construction of a KeyRangeItem."""
312 class KindStatError(Error):
313 """Unable to find kind stats for an all-kinds download."""
316 class FieldSizeLimitError(Error):
317 """The csv module tried to read a field larger than the size limit."""
319 def __init__(self, limit):
320 self.message = """
321 A field in your CSV input file has exceeded the current limit of %d.
323 You can raise this limit by adding the following lines to your config file:
325 import csv
326 csv.field_size_limit(new_limit)
328 where new_limit is number larger than the size in bytes of the largest
329 field in your CSV.
330 """ % limit
331 Error.__init__(self, self.message)
334 class NameClashError(Error):
335 """A name clash occurred while trying to alias old method names."""
337 def __init__(self, old_name, new_name, klass):
338 Error.__init__(self, old_name, new_name, klass)
339 self.old_name = old_name
340 self.new_name = new_name
341 self.klass = klass
344 def GetCSVGeneratorFactory(kind, csv_filename, batch_size, csv_has_header,
345 openfile=open, create_csv_reader=csv.reader):
346 """Return a factory that creates a CSV-based UploadWorkItem generator.
348 Args:
349 kind: The kind of the entities being uploaded.
350 csv_filename: File on disk containing CSV data.
351 batch_size: Maximum number of CSV rows to stash into an UploadWorkItem.
352 csv_has_header: Whether to skip the first row of the CSV.
353 openfile: Used for dependency injection.
354 create_csv_reader: Used for dependency injection.
356 Returns:
357 A callable (accepting the Progress Queue and Progress Generators
358 as input) which creates the UploadWorkItem generator.
360 loader = Loader.RegisteredLoader(kind)
361 loader._Loader__openfile = openfile
362 loader._Loader__create_csv_reader = create_csv_reader
363 record_generator = loader.generate_records(csv_filename)
365 def CreateGenerator(request_manager, progress_queue, progress_generator,
366 unused_kinds):
367 """Initialize a UploadWorkItem generator.
369 Args:
370 request_manager: A RequestManager instance.
371 progress_queue: A ProgressQueue instance to send progress information.
372 progress_generator: A generator of progress information or None.
373 unused_kinds: The kinds being generated (ignored in this method).
375 Returns:
376 An UploadWorkItemGenerator instance.
378 return UploadWorkItemGenerator(request_manager,
379 progress_queue,
380 progress_generator,
381 record_generator,
382 csv_has_header,
383 batch_size)
385 return CreateGenerator
388 class UploadWorkItemGenerator(object):
389 """Reads rows from a row generator and generates UploadWorkItems."""
391 def __init__(self,
392 request_manager,
393 progress_queue,
394 progress_generator,
395 record_generator,
396 skip_first,
397 batch_size):
398 """Initialize a WorkItemGenerator.
400 Args:
401 request_manager: A RequestManager instance with which to associate
402 WorkItems.
403 progress_queue: A progress queue with which to associate WorkItems.
404 progress_generator: A generator of progress information.
405 record_generator: A generator of data records.
406 skip_first: Whether to skip the first data record.
407 batch_size: The number of data records per WorkItem.
409 self.request_manager = request_manager
410 self.progress_queue = progress_queue
411 self.progress_generator = progress_generator
412 self.reader = record_generator
413 self.skip_first = skip_first
414 self.batch_size = batch_size
415 self.line_number = 1
416 self.column_count = None
417 self.read_rows = []
418 self.row_count = 0
419 self.xfer_count = 0
421 def _AdvanceTo(self, line):
422 """Advance the reader to the given line.
424 Args:
425 line: A line number to advance to.
427 while self.line_number < line:
428 self.reader.next()
429 self.line_number += 1
430 self.row_count += 1
431 self.xfer_count += 1
433 def _ReadRows(self, key_start, key_end):
434 """Attempts to read and encode rows [key_start, key_end].
436 The encoded rows are stored in self.read_rows.
438 Args:
439 key_start: The starting line number.
440 key_end: The ending line number.
442 Raises:
443 StopIteration: if the reader runs out of rows
444 ResumeError: if there are an inconsistent number of columns.
446 assert self.line_number == key_start
450 self.read_rows = []
451 while self.line_number <= key_end:
452 row = self.reader.next()
453 self.row_count += 1
454 if self.column_count is None:
455 self.column_count = len(row)
456 self.read_rows.append((self.line_number, row))
457 self.line_number += 1
459 def _MakeItem(self, key_start, key_end, rows, progress_key=None):
460 """Makes a UploadWorkItem containing the given rows, with the given keys.
462 Args:
463 key_start: The start key for the UploadWorkItem.
464 key_end: The end key for the UploadWorkItem.
465 rows: A list of the rows for the UploadWorkItem.
466 progress_key: The progress key for the UploadWorkItem
468 Returns:
469 An UploadWorkItem instance for the given batch.
471 assert rows
473 item = UploadWorkItem(self.request_manager, self.progress_queue, rows,
474 key_start, key_end, progress_key=progress_key)
476 return item
478 def Batches(self):
479 """Reads from the record_generator and generates UploadWorkItems.
481 Yields:
482 Instances of class UploadWorkItem
484 Raises:
485 ResumeError: If the progress database and data file indicate a different
486 number of rows.
488 if self.skip_first:
491 logger.info('Skipping header line.')
492 try:
493 self.reader.next()
494 except StopIteration:
497 return
499 exhausted = False
501 self.line_number = 1
502 self.column_count = None
504 logger.info('Starting import; maximum %d entities per post',
505 self.batch_size)
507 state = None
510 if self.progress_generator:
511 for progress_key, state, kind, key_start, key_end in (
512 self.progress_generator):
513 if key_start:
514 try:
515 self._AdvanceTo(key_start)
516 self._ReadRows(key_start, key_end)
517 yield self._MakeItem(key_start,
518 key_end,
519 self.read_rows,
520 progress_key=progress_key)
521 except StopIteration:
524 logger.error('Mismatch between data file and progress database')
525 raise ResumeError(
526 'Mismatch between data file and progress database')
527 elif state == DATA_CONSUMED_TO_HERE:
528 try:
529 self._AdvanceTo(key_end + 1)
530 except StopIteration:
533 state = None
535 if self.progress_generator is None or state == DATA_CONSUMED_TO_HERE:
538 while not exhausted:
539 key_start = self.line_number
540 key_end = self.line_number + self.batch_size - 1
541 try:
542 self._ReadRows(key_start, key_end)
543 except StopIteration:
544 exhausted = True
547 key_end = self.line_number - 1
548 if key_start <= key_end:
549 yield self._MakeItem(key_start, key_end, self.read_rows)
552 class CSVGenerator(object):
553 """Reads a CSV file and generates data records."""
555 def __init__(self,
556 csv_filename,
557 openfile=open,
558 create_csv_reader=csv.reader):
559 """Initializes a CSV generator.
561 Args:
562 csv_filename: File on disk containing CSV data.
563 openfile: Used for dependency injection of 'open'.
564 create_csv_reader: Used for dependency injection of 'csv.reader'.
566 self.csv_filename = csv_filename
567 self.openfile = openfile
568 self.create_csv_reader = create_csv_reader
570 def Records(self):
571 """Reads the CSV data file and generates row records.
573 Yields:
574 Lists of strings
576 Raises:
577 ResumeError: If the progress database and data file indicate a different
578 number of rows.
580 csv_file = self.openfile(self.csv_filename, 'rb')
581 reader = self.create_csv_reader(csv_file, skipinitialspace=True)
582 try:
584 for record in reader:
585 yield record
586 except csv.Error, e:
587 if e.args and e.args[0].startswith('field larger than field limit'):
588 raise FieldSizeLimitError(csv.field_size_limit())
589 else:
590 raise
593 class KeyRangeItemGenerator(object):
594 """Generates ranges of keys to download.
596 Reads progress information from the progress database and creates
597 KeyRangeItem objects corresponding to incompletely downloaded parts of an
598 export.
601 def __init__(self, request_manager, kinds, progress_queue, progress_generator,
602 key_range_item_factory):
603 """Initialize the KeyRangeItemGenerator.
605 Args:
606 request_manager: A RequestManager instance.
607 kinds: The kind of entities being transferred, or a list of kinds.
608 progress_queue: A queue used for tracking progress information.
609 progress_generator: A generator of prior progress information, or None
610 if there is no prior status.
611 key_range_item_factory: A factory to produce KeyRangeItems.
613 self.request_manager = request_manager
614 if isinstance(kinds, basestring):
615 self.kinds = [kinds]
616 else:
617 self.kinds = kinds
618 self.row_count = 0
619 self.xfer_count = 0
620 self.progress_queue = progress_queue
621 self.progress_generator = progress_generator
622 self.key_range_item_factory = key_range_item_factory
624 def Batches(self):
625 """Iterate through saved progress information.
627 Yields:
628 KeyRangeItem instances corresponding to undownloaded key ranges.
630 if self.progress_generator is not None:
631 for progress_key, state, kind, key_start, key_end in (
632 self.progress_generator):
633 if state is not None and state != STATE_GOT and key_start is not None:
634 key_start = ParseKey(key_start)
635 key_end = ParseKey(key_end)
637 key_range = KeyRange(key_start=key_start,
638 key_end=key_end)
640 result = self.key_range_item_factory(self.request_manager,
641 self.progress_queue,
642 kind,
643 key_range,
644 progress_key=progress_key,
645 state=STATE_READ)
646 yield result
647 else:
649 for kind in self.kinds:
650 key_range = KeyRange()
651 yield self.key_range_item_factory(self.request_manager,
652 self.progress_queue,
653 kind,
654 key_range)
657 class DownloadResult(object):
658 """Holds the result of an entity download."""
660 def __init__(self, continued, direction, keys, entities):
661 self.continued = continued
662 self.direction = direction
663 self.keys = keys
664 self.entities = entities
665 self.count = len(keys)
666 assert self.count == len(entities)
667 assert direction in (key_range_module.KeyRange.ASC,
668 key_range_module.KeyRange.DESC)
669 if self.count > 0:
670 if direction == key_range_module.KeyRange.ASC:
671 self.key_start = keys[0]
672 self.key_end = keys[-1]
673 else:
674 self.key_start = keys[-1]
675 self.key_end = keys[0]
677 def Entities(self):
678 """Returns the list of entities for this result in key order."""
679 if self.direction == key_range_module.KeyRange.ASC:
680 return list(self.entities)
681 else:
682 result = list(self.entities)
683 result.reverse()
684 return result
686 def __str__(self):
687 return 'continued = %s\n%s' % (
688 str(self.continued), '\n'.join(str(self.entities)))
691 class _WorkItem(adaptive_thread_pool.WorkItem):
692 """Holds a description of a unit of upload or download work."""
694 def __init__(self, progress_queue, key_start, key_end, state_namer,
695 state=STATE_READ, progress_key=None):
696 """Initialize the _WorkItem instance.
698 Args:
699 progress_queue: A queue used for tracking progress information.
700 key_start: The start key of the work item.
701 key_end: The end key of the work item.
702 state_namer: Function to describe work item states.
703 state: The initial state of the work item.
704 progress_key: If this WorkItem represents state from a prior run,
705 then this will be the key within the progress database.
707 adaptive_thread_pool.WorkItem.__init__(self,
708 '[%s-%s]' % (key_start, key_end))
709 self.progress_queue = progress_queue
710 self.state_namer = state_namer
711 self.state = state
712 self.progress_key = progress_key
713 self.progress_event = threading.Event()
714 self.key_start = key_start
715 self.key_end = key_end
716 self.error = None
717 self.traceback = None
718 self.kind = None
720 def _TransferItem(self, thread_pool):
721 raise NotImplementedError()
723 def SetError(self):
724 """Sets the error and traceback information for this thread.
726 This must be called from an exception handler.
728 if not self.error:
729 exc_info = sys.exc_info()
730 self.error = exc_info[1]
731 self.traceback = exc_info[2]
733 def PerformWork(self, thread_pool):
734 """Perform the work of this work item and report the results.
736 Args:
737 thread_pool: An AdaptiveThreadPool instance.
739 Returns:
740 A tuple (status, instruction) of the work status and an instruction
741 for the ThreadGate.
743 status = adaptive_thread_pool.WorkItem.FAILURE
744 instruction = adaptive_thread_pool.ThreadGate.DECREASE
747 try:
748 self.MarkAsTransferring()
752 try:
753 transfer_time = self._TransferItem(thread_pool)
754 if transfer_time is None:
755 status = adaptive_thread_pool.WorkItem.RETRY
756 instruction = adaptive_thread_pool.ThreadGate.HOLD
757 else:
758 logger.debug('[%s] %s Transferred %d entities in %0.1f seconds',
759 threading.currentThread().getName(), self, self.count,
760 transfer_time)
761 sys.stdout.write('.')
762 sys.stdout.flush()
763 status = adaptive_thread_pool.WorkItem.SUCCESS
764 if transfer_time <= MAXIMUM_INCREASE_DURATION:
765 instruction = adaptive_thread_pool.ThreadGate.INCREASE
766 elif transfer_time <= MAXIMUM_HOLD_DURATION:
767 instruction = adaptive_thread_pool.ThreadGate.HOLD
768 except (db.InternalError, db.NotSavedError, db.Timeout,
769 db.TransactionFailedError,
770 apiproxy_errors.OverQuotaError,
771 apiproxy_errors.DeadlineExceededError,
772 apiproxy_errors.ApplicationError), e:
774 status = adaptive_thread_pool.WorkItem.RETRY
775 logger.exception('Retrying on non-fatal datastore error: %s', e)
776 except urllib2.HTTPError, e:
777 http_status = e.code
778 if http_status >= 500 and http_status < 600:
780 status = adaptive_thread_pool.WorkItem.RETRY
781 logger.exception('Retrying on non-fatal HTTP error: %d %s',
782 http_status, e.msg)
783 else:
784 self.SetError()
785 status = adaptive_thread_pool.WorkItem.FAILURE
786 except urllib2.URLError, e:
787 if IsURLErrorFatal(e):
788 self.SetError()
789 status = adaptive_thread_pool.WorkItem.FAILURE
790 else:
791 status = adaptive_thread_pool.WorkItem.RETRY
792 logger.exception('Retrying on non-fatal URL error: %s', e.reason)
794 finally:
795 if status == adaptive_thread_pool.WorkItem.SUCCESS:
796 self.MarkAsTransferred()
797 else:
798 self.MarkAsError()
800 return (status, instruction)
802 def _AssertInState(self, *states):
803 """Raises an Error if the state of this range is not in states."""
804 if not self.state in states:
805 raise BadStateError('%s:%s not in %s' %
806 (str(self),
807 self.state_namer(self.state),
808 map(self.state_namer, states)))
810 def _AssertProgressKey(self):
811 """Raises an Error if the progress key is None."""
812 if self.progress_key is None:
813 raise BadStateError('%s: Progress key is missing' % str(self))
815 def MarkAsRead(self):
816 """Mark this _WorkItem as read, updating the progress database."""
817 self._AssertInState(STATE_READ)
818 self._StateTransition(STATE_READ, blocking=True)
820 def MarkAsTransferring(self):
821 """Mark this _WorkItem as transferring, updating the progress database."""
822 self._AssertInState(STATE_READ, STATE_ERROR)
823 self._AssertProgressKey()
824 self._StateTransition(STATE_GETTING, blocking=True)
826 def MarkAsTransferred(self):
827 """Mark this _WorkItem as transferred, updating the progress database."""
828 raise NotImplementedError()
830 def MarkAsError(self):
831 """Mark this _WorkItem as failed, updating the progress database."""
832 self._AssertInState(STATE_GETTING)
833 self._AssertProgressKey()
834 self._StateTransition(STATE_ERROR, blocking=True)
836 def _StateTransition(self, new_state, blocking=False):
837 """Transition the work item to a new state, storing progress information.
839 Args:
840 new_state: The state to transition to.
841 blocking: Whether to block for the progress thread to acknowledge the
842 transition.
845 assert not self.progress_event.isSet()
848 self.state = new_state
851 self.progress_queue.put(self)
853 if blocking:
857 self.progress_event.wait()
860 self.progress_event.clear()
868 class UploadWorkItem(_WorkItem):
869 """Holds a unit of uploading work.
871 A UploadWorkItem represents a number of entities that need to be uploaded to
872 Google App Engine. These entities are encoded in the "content" field of
873 the UploadWorkItem, and will be POST'd as-is to the server.
875 The entities are identified by a range of numeric keys, inclusively. In
876 the case of a resumption of an upload, or a replay to correct errors,
877 these keys must be able to identify the same set of entities.
879 Note that keys specify a range. The entities do not have to sequentially
880 fill the entire range, they must simply bound a range of valid keys.
883 def __init__(self, request_manager, progress_queue, rows, key_start, key_end,
884 progress_key=None):
885 """Initialize the UploadWorkItem instance.
887 Args:
888 request_manager: A RequestManager instance.
889 progress_queue: A queue used for tracking progress information.
890 rows: A list of pairs of a line number and a list of column values.
891 key_start: The (numeric) starting key, inclusive.
892 key_end: The (numeric) ending key, inclusive.
893 progress_key: If this UploadWorkItem represents state from a prior run,
894 then this will be the key within the progress database.
896 _WorkItem.__init__(self, progress_queue, key_start, key_end,
897 ImportStateName, state=STATE_READ,
898 progress_key=progress_key)
901 assert isinstance(key_start, (int, long))
902 assert isinstance(key_end, (int, long))
903 assert key_start <= key_end
905 self.request_manager = request_manager
906 self.rows = rows
907 self.content = None
908 self.count = len(rows)
910 def __str__(self):
911 return '[%s-%s]' % (self.key_start, self.key_end)
913 def _TransferItem(self, thread_pool, get_time=time.time):
914 """Transfers the entities associated with an item.
916 Args:
917 thread_pool: An AdaptiveThreadPool instance.
918 get_time: Used for dependency injection.
920 t = get_time()
921 if not self.content:
922 self.content = self.request_manager.EncodeContent(self.rows)
923 try:
924 self.request_manager.PostEntities(self.content)
925 except:
926 raise
927 return get_time() - t
929 def MarkAsTransferred(self):
930 """Mark this UploadWorkItem as sucessfully-sent to the server."""
932 self._AssertInState(STATE_SENDING)
933 self._AssertProgressKey()
935 self._StateTransition(STATE_SENT, blocking=False)
938 def GetImplementationClass(kind_or_class_key):
939 """Returns the implementation class for a given kind or class key.
941 Args:
942 kind_or_class_key: A kind string or a tuple of kind strings.
944 Return:
945 A db.Model subclass for the given kind or class key.
947 if isinstance(kind_or_class_key, tuple):
948 try:
949 implementation_class = polymodel._class_map[kind_or_class_key]
950 except KeyError:
951 raise db.KindError('No implementation for class \'%s\'' %
952 kind_or_class_key)
953 else:
954 implementation_class = db.class_for_kind(kind_or_class_key)
955 return implementation_class
958 def KeyLEQ(key1, key2):
959 """Compare two keys for less-than-or-equal-to.
961 All keys with numeric ids come before all keys with names. None represents
962 an unbounded end-point so it is both greater and less than any other key.
964 Args:
965 key1: An int or datastore.Key instance.
966 key2: An int or datastore.Key instance.
968 Returns:
969 True if key1 <= key2
971 if key1 is None or key2 is None:
972 return True
973 return key1 <= key2
976 class KeyRangeItem(_WorkItem):
977 """Represents an item of work that scans over a key range.
979 A KeyRangeItem object represents holds a KeyRange
980 and has an associated state: STATE_READ, STATE_GETTING, STATE_GOT,
981 and STATE_ERROR.
983 - STATE_READ indicates the range ready to be downloaded by a worker thread.
984 - STATE_GETTING indicates the range is currently being downloaded.
985 - STATE_GOT indicates that the range was successfully downloaded
986 - STATE_ERROR indicates that an error occurred during the last download
987 attempt
989 KeyRangeItems not in the STATE_GOT state are stored in the progress database.
990 When a piece of KeyRangeItem work is downloaded, the download may cover only
991 a portion of the range. In this case, the old KeyRangeItem is removed from
992 the progress database and ranges covering the undownloaded range are
993 generated and stored as STATE_READ in the export progress database.
996 def __init__(self,
997 request_manager,
998 progress_queue,
999 kind,
1000 key_range,
1001 progress_key=None,
1002 state=STATE_READ,
1003 first=False):
1004 """Initialize a KeyRangeItem object.
1006 Args:
1007 request_manager: A RequestManager instance.
1008 progress_queue: A queue used for tracking progress information.
1009 kind: The kind of entities for this range.
1010 key_range: A KeyRange instance for this work item.
1011 progress_key: The key for this range within the progress database.
1012 state: The initial state of this range.
1013 first: boolean, default False, whether this is the first WorkItem
1014 of its kind.
1016 _WorkItem.__init__(self, progress_queue, key_range.key_start,
1017 key_range.key_end, ExportStateName, state=state,
1018 progress_key=progress_key)
1019 assert KeyLEQ(key_range.key_start, key_range.key_end), (
1020 '%s not less than %s' %
1021 (repr(key_range.key_start), repr(key_range.key_end)))
1022 self.request_manager = request_manager
1023 self.kind = kind
1024 self.key_range = key_range
1025 self.download_result = None
1026 self.count = 0
1027 self.key_start = key_range.key_start
1028 self.key_end = key_range.key_end
1029 self.first = first
1031 def __str__(self):
1032 return '%s-%s' % (self.kind, self.key_range)
1034 def __repr__(self):
1035 return self.__str__()
1037 def MarkAsTransferred(self):
1038 """Mark this KeyRangeItem as transferred, updating the progress database."""
1041 pass
1043 def Process(self, download_result, thread_pool, batch_size,
1044 new_state=STATE_GOT):
1045 """Mark this KeyRangeItem as success, updating the progress database.
1047 Process will split this KeyRangeItem based on the content of
1048 download_result and adds the unfinished ranges to the work queue.
1050 Args:
1051 download_result: A DownloadResult instance.
1052 thread_pool: An AdaptiveThreadPool instance.
1053 batch_size: The number of entities to transfer per request.
1054 new_state: The state to transition the completed range to.
1056 self._AssertInState(STATE_GETTING)
1057 self._AssertProgressKey()
1059 self.download_result = download_result
1060 self.count = len(download_result.keys)
1061 if download_result.continued:
1062 self._FinishedRange()._StateTransition(new_state, blocking=True)
1063 self._AddUnfinishedRanges(thread_pool, batch_size)
1064 else:
1065 self._StateTransition(new_state, blocking=True)
1067 def _FinishedRange(self):
1068 """Returns the range completed by the download_result.
1070 Returns:
1071 A KeyRangeItem representing a completed range.
1073 assert self.download_result is not None
1075 if self.key_range.direction == key_range_module.KeyRange.ASC:
1076 key_start = self.key_range.key_start
1077 if self.download_result.continued:
1078 key_end = self.download_result.key_end
1079 else:
1080 key_end = self.key_range.key_end
1081 else:
1082 key_end = self.key_range.key_end
1083 if self.download_result.continued:
1084 key_start = self.download_result.key_start
1085 else:
1086 key_start = self.key_range.key_start
1088 key_range = KeyRange(key_start=key_start,
1089 key_end=key_end,
1090 direction=self.key_range.direction)
1092 result = self.__class__(self.request_manager,
1093 self.progress_queue,
1094 self.kind,
1095 key_range,
1096 progress_key=self.progress_key,
1097 state=self.state)
1099 result.download_result = self.download_result
1100 result.count = self.count
1101 return result
1103 def _SplitAndAddRanges(self, thread_pool, batch_size):
1104 """Split the key range [key_start, key_end] into a list of ranges."""
1106 if self.download_result.direction == key_range_module.KeyRange.ASC:
1107 key_range = KeyRange(
1108 key_start=self.download_result.key_end,
1109 key_end=self.key_range.key_end,
1110 include_start=False)
1111 else:
1112 key_range = KeyRange(
1113 key_start=self.key_range.key_start,
1114 key_end=self.download_result.key_start,
1115 include_end=False)
1117 if thread_pool.QueuedItemCount() > 2 * thread_pool.num_threads():
1119 ranges = [key_range]
1120 else:
1122 ranges = key_range.split_range(batch_size=batch_size)
1124 for key_range in ranges:
1125 key_range_item = self.__class__(self.request_manager,
1126 self.progress_queue,
1127 self.kind,
1128 key_range)
1129 key_range_item.MarkAsRead()
1130 thread_pool.SubmitItem(key_range_item, block=True)
1132 def _AddUnfinishedRanges(self, thread_pool, batch_size):
1133 """Adds incomplete KeyRanges to the thread_pool.
1135 Args:
1136 thread_pool: An AdaptiveThreadPool instance.
1137 batch_size: The number of entities to transfer per request.
1139 Returns:
1140 A list of KeyRanges representing incomplete datastore key ranges.
1142 Raises:
1143 KeyRangeError: if this key range has already been completely transferred.
1145 assert self.download_result is not None
1146 if self.download_result.continued:
1147 self._SplitAndAddRanges(thread_pool, batch_size)
1148 else:
1149 raise KeyRangeError('No unfinished part of key range.')
1152 class DownloadItem(KeyRangeItem):
1153 """A KeyRangeItem for downloading key ranges."""
1155 def _TransferItem(self, thread_pool, get_time=time.time):
1156 """Transfers the entities associated with an item."""
1157 t = get_time()
1158 download_result = self.request_manager.GetEntities(
1159 self, retry_parallel=self.first)
1160 transfer_time = get_time() - t
1161 self.Process(download_result, thread_pool,
1162 self.request_manager.batch_size)
1163 return transfer_time
1166 class MapperItem(KeyRangeItem):
1167 """A KeyRangeItem for mapping over key ranges."""
1169 def _TransferItem(self, thread_pool, get_time=time.time):
1170 t = get_time()
1171 mapper = self.request_manager.GetMapper(self.kind)
1173 download_result = self.request_manager.GetEntities(
1174 self, keys_only=mapper.map_over_keys_only(), retry_parallel=self.first)
1175 transfer_time = get_time() - t
1176 try:
1177 mapper.batch_apply(download_result.Entities())
1178 except MapperRetry:
1179 return None
1180 self.Process(download_result, thread_pool,
1181 self.request_manager.batch_size)
1182 return transfer_time
1185 def ConvertKeys(keys):
1186 """Convert a list of keys to a list of keys with the app_id of the caller.
1188 Args:
1189 keys: A list of datastore Entity Keys.
1191 Returns:
1192 A new list of keys in the same order as the input with app_id set to the
1193 default app_id in the calling context. Whichever input keys were already
1194 of this app_id are copied by reference.
1196 def ChangeApp(key, app_id):
1197 if key.app() == app_id:
1198 return key
1199 return datastore.Key.from_path(namespace=key.namespace(),
1200 _app=app_id, *key.to_path())
1202 app_id = datastore.Key.from_path('kind', 'name').app()
1203 return [ChangeApp(key, app_id) for key in keys]
1206 def ReserveKeys(keys):
1207 """Reserve all ids in the paths of the given keys.
1209 Args:
1210 keys: A list of keys with ids in their paths, for which the corresponding
1211 id sequences should be advanced to prevent id collisions.
1214 datastore._GetConnection()._reserve_keys(ConvertKeys(keys))
1217 def _AuthFunction(host, email, passin, raw_input_fn, password_input_fn):
1218 """Internal method shared between RequestManager and _GetRemoteAppId.
1220 Args:
1221 host: Hostname to present to the user.
1222 email: Existing email address to use; if none, will prompt the user.
1223 passin: Value of the --passin command line flag. If true, will get the
1224 password using raw_input_fn insetad of password_input_fn.
1225 raw_input_fn: Method to get a string, typically raw_input.
1226 password_input_fn: Method to get a string, typically getpass.
1228 Returns:
1229 Pair, (email, password).
1231 if not email:
1232 print 'Please enter login credentials for %s' % host
1233 email = raw_input_fn('Email: ')
1235 if email:
1236 password_prompt = 'Password for %s: ' % email
1237 if passin:
1238 password = raw_input_fn(password_prompt)
1239 else:
1240 password = password_input_fn(password_prompt)
1241 else:
1242 password = None
1244 return email, password
1247 class RequestManager(object):
1248 """A class which wraps a connection to the server."""
1250 def __init__(self,
1251 app_id,
1252 host_port,
1253 url_path,
1254 kind,
1255 throttle,
1256 batch_size,
1257 secure,
1258 email,
1259 passin,
1260 dry_run=False,
1261 server=None,
1262 throttle_class=None):
1263 """Initialize a RequestManager object.
1265 Args:
1266 app_id: String containing the application id for requests.
1267 host_port: String containing the "host:port" pair; the port is optional.
1268 url_path: partial URL (path) to post entity data to.
1269 kind: Kind of the Entity records being posted.
1270 throttle: A Throttle instance.
1271 batch_size: The number of entities to transfer per request.
1272 secure: Use SSL when communicating with server.
1273 email: If not none, the username to log in with.
1274 passin: If True, the password will be read from standard in.
1275 server: An existing AbstractRpcServer to reuse.
1276 throttle_class: A class to use instead of the default
1277 ThrottledHttpRpcServer.
1279 self.app_id = app_id
1280 self.host_port = host_port
1281 self.host = host_port.split(':')[0]
1282 if url_path and url_path[0] != '/':
1283 url_path = '/' + url_path
1284 self.url_path = url_path
1285 self.kind = kind
1286 self.throttle = throttle
1287 self.batch_size = batch_size
1288 self.secure = secure
1289 self.authenticated = False
1290 self.auth_called = False
1291 self.parallel_download = True
1292 self.email = email
1293 self.passin = passin
1294 self.mapper = None
1295 self.dry_run = dry_run
1297 if self.dry_run:
1298 logger.info('Running in dry run mode, skipping remote_api setup')
1299 return
1301 logger.debug('Configuring remote_api. url_path = %s, '
1302 'servername = %s' % (url_path, host_port))
1304 throttled_rpc_server_factory = (
1305 remote_api_throttle.ThrottledHttpRpcServerFactory(
1306 self.throttle, throttle_class=throttle_class))
1308 if server:
1309 remote_api_stub.ConfigureRemoteApiFromServer(server, url_path, app_id)
1310 else:
1311 remote_api_stub.ConfigureRemoteApi(
1312 app_id,
1313 url_path,
1314 self.AuthFunction,
1315 servername=host_port,
1316 rpc_server_factory=throttled_rpc_server_factory,
1317 secure=self.secure)
1319 remote_api_throttle.ThrottleRemoteDatastore(self.throttle)
1320 logger.debug('Bulkloader using app_id: %s', os.environ['APPLICATION_ID'])
1322 def Authenticate(self):
1323 """Invoke authentication if necessary."""
1324 logger.info('Connecting to %s%s', self.host_port, self.url_path)
1325 if self.dry_run:
1326 self.authenticated = True
1327 return
1329 remote_api_stub.MaybeInvokeAuthentication()
1330 self.authenticated = True
1332 def AuthFunction(self,
1333 raw_input_fn=raw_input,
1334 password_input_fn=getpass.getpass):
1335 """Prompts the user for a username and password.
1337 Caches the results the first time it is called and returns the
1338 same result every subsequent time.
1340 Args:
1341 raw_input_fn: Used for dependency injection.
1342 password_input_fn: Used for dependency injection.
1344 Returns:
1345 A pair of the username and password.
1347 self.auth_called = True
1348 return _AuthFunction(self.host, self.email, self.passin,
1349 raw_input_fn, password_input_fn)
1351 def ReserveKeys(self, keys):
1352 """Reserve all ids in the paths of the given keys.
1354 Args:
1355 keys: A list of keys with ids in their paths, for which the corresponding
1356 id sequences should be advanced to prevent id collisions.
1358 if self.dry_run:
1359 return
1360 ReserveKeys(keys)
1362 def GetSchemaKinds(self):
1363 """Returns the list of kinds for this app.
1365 There can be 3 possible cases using namespaces:
1366 a.) No namespace specified and Datastore has only default namespace ->
1367 Query GlobalStat and KindStat.
1368 b.) No namespace specified but Datastore has multiple namespace ->
1369 Query NamespaceGlobalStat and NamespaceKindStat.
1370 c.) Namespace specified and Datastore has multiple namespaces ->
1371 Query NamespaceGlobalStat and NamespaceKindStat.
1373 Returns:
1374 A list of kinds.
1376 namespaces = False
1378 if (namespace_manager.get_namespace() or
1379 stats.NamespaceStat.all().count() > 1):
1380 namespaces = True
1382 if namespaces:
1383 global_kind = stats.NamespaceGlobalStat
1384 else:
1385 global_kind = stats.GlobalStat
1387 kinds_kind = stats.NamespaceKindStat if namespaces else stats.KindStat
1389 global_stat = global_kind.all().get()
1390 if not global_stat:
1391 raise KindStatError()
1392 timestamp = global_stat.timestamp
1393 kind_stat = kinds_kind.all().filter(
1394 "timestamp =", timestamp).fetch(1000)
1395 kind_list = [stat.kind_name for stat in kind_stat
1396 if stat.kind_name and not stat.kind_name.startswith('__')]
1397 return list(set(kind_list))
1399 def EncodeContent(self, rows, loader=None):
1400 """Encodes row data to the wire format.
1402 Args:
1403 rows: A list of pairs of a line number and a list of column values.
1404 loader: Used for dependency injection.
1406 Returns:
1407 A list of datastore.Entity instances.
1409 Raises:
1410 ConfigurationError: if no loader is defined for self.kind
1412 if not loader:
1413 try:
1414 loader = Loader.RegisteredLoader(self.kind)
1415 except KeyError:
1416 logger.error('No Loader defined for kind %s.' % self.kind)
1417 raise ConfigurationError('No Loader defined for kind %s.' % self.kind)
1418 entities = []
1419 for line_number, values in rows:
1420 key = loader.generate_key(line_number, values)
1421 if isinstance(key, datastore.Key):
1422 parent = key.parent()
1423 key = key.name()
1424 else:
1425 parent = None
1426 entity = loader.create_entity(values, key_name=key, parent=parent)
1428 def ToEntity(entity):
1429 if isinstance(entity, db.Model):
1430 return entity._populate_entity()
1431 else:
1432 return entity
1434 if not entity:
1436 continue
1437 if isinstance(entity, list):
1438 entities.extend(map(ToEntity, entity))
1439 elif entity:
1440 entities.append(ToEntity(entity))
1442 return entities
1444 def PostEntities(self, entities):
1445 """Posts Entity records to a remote endpoint over HTTP.
1447 Args:
1448 entities: A list of datastore entities.
1450 if self.dry_run:
1451 return
1452 datastore.Put(entities)
1454 def _QueryForPbs(self, query):
1455 """Perform the given query and return a list of entity_pb's."""
1456 try:
1457 query_pb = query._ToPb(limit=self.batch_size, count=self.batch_size)
1458 result_pb = datastore_pb.QueryResult()
1459 apiproxy_stub_map.MakeSyncCall('datastore_v3', 'RunQuery', query_pb,
1460 result_pb)
1461 results = result_pb.result_list()
1463 while result_pb.more_results():
1464 next_pb = datastore_pb.NextRequest()
1465 next_pb.set_count(self.batch_size - len(results))
1466 next_pb.mutable_cursor().CopyFrom(result_pb.cursor())
1467 result_pb = datastore_pb.QueryResult()
1468 apiproxy_stub_map.MakeSyncCall('datastore_v3', 'Next', next_pb,
1469 result_pb)
1470 results += result_pb.result_list()
1472 return results
1473 except apiproxy_errors.ApplicationError, e:
1474 raise datastore._ToDatastoreError(e)
1476 def GetEntities(
1477 self, key_range_item, key_factory=datastore.Key, keys_only=False,
1478 retry_parallel=False):
1479 """Gets Entity records from a remote endpoint over HTTP.
1481 Args:
1482 key_range_item: Range of keys to get.
1483 key_factory: Used for dependency injection.
1484 keys_only: bool, default False, only get keys values
1485 retry_parallel: bool, default False, to try a parallel download despite
1486 past parallel download failures.
1487 Returns:
1488 A DownloadResult instance.
1490 Raises:
1491 ConfigurationError: if no Exporter is defined for key_range_item.kind
1493 keys = []
1494 entities = []
1495 kind = key_range_item.kind
1496 if retry_parallel:
1497 self.parallel_download = True
1499 if self.parallel_download:
1500 query = key_range_item.key_range.make_directed_datastore_query(
1501 kind, keys_only=keys_only)
1502 try:
1503 results = self._QueryForPbs(query)
1504 except datastore_errors.NeedIndexError:
1506 logger.info('%s: No descending index on __key__, '
1507 'performing serial download', kind)
1508 self.parallel_download = False
1510 if not self.parallel_download:
1513 key_range_item.key_range.direction = key_range_module.KeyRange.ASC
1514 query = key_range_item.key_range.make_ascending_datastore_query(
1515 kind, keys_only=keys_only)
1516 results = self._QueryForPbs(query)
1518 size = len(results)
1520 for entity in results:
1523 key = key_factory()
1524 key._Key__reference = entity.key()
1525 entities.append(entity)
1526 keys.append(key)
1528 continued = (size == self.batch_size)
1529 key_range_item.count = size
1531 return DownloadResult(continued, key_range_item.key_range.direction,
1532 keys, entities)
1534 def GetMapper(self, kind):
1535 """Returns a mapper for the registered kind.
1537 Returns:
1538 A Mapper instance.
1540 Raises:
1541 ConfigurationError: if no Mapper is defined for kind
1543 if not self.mapper:
1544 try:
1545 self.mapper = Mapper.RegisteredMapper(kind)
1546 except KeyError:
1547 logger.error('No Mapper defined for kind %s.' % kind)
1548 raise ConfigurationError('No Mapper defined for kind %s.' % kind)
1549 return self.mapper
1552 def InterruptibleSleep(sleep_time):
1553 """Puts thread to sleep, checking this threads exit_flag twice a second.
1555 Args:
1556 sleep_time: Time to sleep.
1558 slept = 0.0
1559 epsilon = .0001
1560 thread = threading.currentThread()
1561 while slept < sleep_time - epsilon:
1562 remaining = sleep_time - slept
1563 this_sleep_time = min(remaining, 0.5)
1564 time.sleep(this_sleep_time)
1565 slept += this_sleep_time
1566 if thread.exit_flag:
1567 return
1570 class _ThreadBase(threading.Thread):
1571 """Provide some basic features for the threads used in the uploader.
1573 This abstract base class is used to provide some common features:
1575 * Flag to ask thread to exit as soon as possible.
1576 * Record exit/error status for the primary thread to pick up.
1577 * Capture exceptions and record them for pickup.
1578 * Some basic logging of thread start/stop.
1579 * All threads are "daemon" threads.
1580 * Friendly names for presenting to users.
1582 Concrete sub-classes must implement PerformWork().
1584 Either self.NAME should be set or GetFriendlyName() be overridden to
1585 return a human-friendly name for this thread.
1587 The run() method starts the thread and prints start/exit messages.
1589 self.exit_flag is intended to signal that this thread should exit
1590 when it gets the chance. PerformWork() should check self.exit_flag
1591 whenever it has the opportunity to exit gracefully.
1594 def __init__(self):
1595 threading.Thread.__init__(self)
1600 self.setDaemon(True)
1602 self.exit_flag = False
1603 self.error = None
1604 self.traceback = None
1606 def run(self):
1607 """Perform the work of the thread."""
1608 logger.debug('[%s] %s: started', self.getName(), self.__class__.__name__)
1610 try:
1611 self.PerformWork()
1612 except:
1613 self.SetError()
1614 logger.exception('[%s] %s:', self.getName(), self.__class__.__name__)
1616 logger.debug('[%s] %s: exiting', self.getName(), self.__class__.__name__)
1618 def SetError(self):
1619 """Sets the error and traceback information for this thread.
1621 This must be called from an exception handler.
1623 if not self.error:
1624 exc_info = sys.exc_info()
1625 self.error = exc_info[1]
1626 self.traceback = exc_info[2]
1628 def PerformWork(self):
1629 """Perform the thread-specific work."""
1630 raise NotImplementedError()
1632 def CheckError(self):
1633 """If an error is present, then log it."""
1634 if self.error:
1635 logger.error('Error in %s: %s', self.GetFriendlyName(), self.error)
1636 if self.traceback:
1637 logger.debug(''.join(traceback.format_exception(self.error.__class__,
1638 self.error,
1639 self.traceback)))
1641 def GetFriendlyName(self):
1642 """Returns a human-friendly description of the thread."""
1643 if hasattr(self, 'NAME'):
1644 return self.NAME
1645 return 'unknown thread'
1648 non_fatal_error_codes = set([errno.EAGAIN,
1649 errno.ENETUNREACH,
1650 errno.ENETRESET,
1651 errno.ECONNRESET,
1652 errno.ETIMEDOUT,
1653 errno.EHOSTUNREACH])
1656 def IsURLErrorFatal(error):
1657 """Returns False if the given URLError may be from a transient failure.
1659 Args:
1660 error: A urllib2.URLError instance.
1662 assert isinstance(error, urllib2.URLError)
1663 if not hasattr(error, 'reason'):
1664 return True
1665 if not isinstance(error.reason[0], int):
1666 return True
1667 return error.reason[0] not in non_fatal_error_codes
1670 class DataSourceThread(_ThreadBase):
1671 """A thread which reads WorkItems and pushes them into queue.
1673 This thread will read/consume WorkItems from a generator (produced by
1674 the generator factory). These WorkItems will then be pushed into the
1675 thread_pool. Note that reading will block if/when the thread_pool becomes
1676 full. Information on content consumed from the generator will be pushed
1677 into the progress_queue.
1680 NAME = 'data source thread'
1682 def __init__(self,
1683 request_manager,
1684 kinds,
1685 thread_pool,
1686 progress_queue,
1687 workitem_generator_factory,
1688 progress_generator_factory):
1689 """Initialize the DataSourceThread instance.
1691 Args:
1692 request_manager: A RequestManager instance.
1693 kinds: The kinds of entities being transferred.
1694 thread_pool: An AdaptiveThreadPool instance.
1695 progress_queue: A queue used for tracking progress information.
1696 workitem_generator_factory: A factory that creates a WorkItem generator
1697 progress_generator_factory: A factory that creates a generator which
1698 produces prior progress status, or None if there is no prior status
1699 to use.
1701 _ThreadBase.__init__(self)
1703 self.request_manager = request_manager
1704 self.kinds = kinds
1705 self.thread_pool = thread_pool
1706 self.progress_queue = progress_queue
1707 self.workitem_generator_factory = workitem_generator_factory
1708 self.progress_generator_factory = progress_generator_factory
1710 self.entity_count = 0
1712 def PerformWork(self):
1713 """Performs the work of a DataSourceThread."""
1716 if self.progress_generator_factory:
1717 progress_gen = self.progress_generator_factory()
1718 else:
1719 progress_gen = None
1721 content_gen = self.workitem_generator_factory(self.request_manager,
1722 self.progress_queue,
1723 progress_gen,
1724 self.kinds)
1726 self.xfer_count = 0
1727 self.read_count = 0
1728 self.read_all = False
1730 for item in content_gen.Batches():
1736 item.MarkAsRead()
1741 while not self.exit_flag:
1742 try:
1744 self.thread_pool.SubmitItem(item, block=True, timeout=1.0)
1745 self.entity_count += item.count
1746 break
1747 except Queue.Full:
1748 pass
1751 if self.exit_flag:
1752 break
1754 if not self.exit_flag:
1755 self.read_all = True
1756 self.read_count = content_gen.row_count
1757 self.xfer_count = content_gen.xfer_count
1762 def _RunningInThread(thread):
1763 """Return True if we are running within the specified thread."""
1764 return threading.currentThread().getName() == thread.getName()
1767 class _Database(object):
1768 """Base class for database connections in this module.
1770 The table is created by a primary thread (the python main thread)
1771 but all future lookups and updates are performed by a secondary
1772 thread.
1775 SIGNATURE_TABLE_NAME = 'bulkloader_database_signature'
1777 def __init__(self,
1778 db_filename,
1779 create_table,
1780 signature,
1781 index=None,
1782 commit_periodicity=100):
1783 """Initialize the _Database instance.
1785 Args:
1786 db_filename: The sqlite3 file to use for the database.
1787 create_table: A string containing the SQL table creation command.
1788 signature: A string identifying the important invocation options,
1789 used to make sure we are not using an old database.
1790 index: An optional string to create an index for the database.
1791 commit_periodicity: Number of operations between database commits.
1795 self.db_filename = db_filename
1799 logger.info('Opening database: %s', db_filename)
1800 self.primary_conn = sqlite3.connect(db_filename, isolation_level=None)
1801 self.primary_thread = threading.currentThread()
1804 self.secondary_conn = None
1805 self.secondary_thread = None
1807 self.operation_count = 0
1808 self.commit_periodicity = commit_periodicity
1812 try:
1813 self.primary_conn.execute(create_table)
1814 except sqlite3.OperationalError, e:
1816 if 'already exists' not in e.message:
1817 raise
1819 if index:
1820 try:
1821 self.primary_conn.execute(index)
1822 except sqlite3.OperationalError, e:
1824 if 'already exists' not in e.message:
1825 raise
1827 self.existing_table = False
1828 signature_cursor = self.primary_conn.cursor()
1829 create_signature = """
1830 create table %s (
1831 value TEXT not null)
1832 """ % _Database.SIGNATURE_TABLE_NAME
1833 try:
1834 self.primary_conn.execute(create_signature)
1835 self.primary_conn.cursor().execute(
1836 'insert into %s (value) values (?)' % _Database.SIGNATURE_TABLE_NAME,
1837 (signature,))
1838 except sqlite3.OperationalError, e:
1839 if 'already exists' not in e.message:
1840 logger.exception('Exception creating table:')
1841 raise
1842 else:
1843 self.existing_table = True
1844 signature_cursor.execute(
1845 'select * from %s' % _Database.SIGNATURE_TABLE_NAME)
1846 (result,) = signature_cursor.fetchone()
1847 if result and result != signature:
1848 logger.error('Database signature mismatch:\n\n'
1849 'Found:\n'
1850 '%s\n\n'
1851 'Expecting:\n'
1852 '%s\n',
1853 result, signature)
1854 raise ResumeError('Database signature mismatch: %s != %s' % (
1855 signature, result))
1857 def ThreadComplete(self):
1858 """Finalize any operations the secondary thread has performed.
1860 The database aggregates lots of operations into a single commit, and
1861 this method is used to commit any pending operations as the thread
1862 is about to shut down.
1864 if self.secondary_conn:
1865 self._MaybeCommit(force_commit=True)
1867 def _MaybeCommit(self, force_commit=False):
1868 """Periodically commit changes into the SQLite database.
1870 Committing every operation is quite expensive, and slows down the
1871 operation of the script. Thus, we only commit after every N operations,
1872 as determined by the self.commit_periodicity value. Optionally, the
1873 caller can force a commit.
1875 Args:
1876 force_commit: Pass True in order for a commit to occur regardless
1877 of the current operation count.
1879 self.operation_count += 1
1880 if force_commit or (self.operation_count % self.commit_periodicity) == 0:
1881 self.secondary_conn.commit()
1883 def _OpenSecondaryConnection(self):
1884 """Possibly open a database connection for the secondary thread.
1886 If the connection is not open (for the calling thread, which is assumed
1887 to be the unique secondary thread), then open it. We also open a couple
1888 cursors for later use (and reuse).
1890 if self.secondary_conn:
1891 return
1893 assert not _RunningInThread(self.primary_thread)
1895 self.secondary_thread = threading.currentThread()
1903 self.secondary_conn = sqlite3.connect(self.db_filename)
1906 self.insert_cursor = self.secondary_conn.cursor()
1907 self.update_cursor = self.secondary_conn.cursor()
1911 zero_matcher = re.compile(r'\x00')
1913 zero_one_matcher = re.compile(r'\x00\x01')
1916 def KeyStr(key):
1917 """Returns a string to represent a key, preserving ordering.
1919 Unlike datastore.Key.__str__(), we have the property:
1921 key1 < key2 ==> KeyStr(key1) < KeyStr(key2)
1923 The key string is constructed from the key path as follows:
1924 (1) Strings are prepended with ':' and numeric id's are padded to
1925 20 digits.
1926 (2) Any null characters (u'\0') present are replaced with u'\0\1'
1927 (3) The sequence u'\0\0' is used to separate each component of the path.
1929 (1) assures that names and ids compare properly, while (2) and (3) enforce
1930 the part-by-part comparison of pieces of the path.
1932 Args:
1933 key: A datastore.Key instance.
1935 Returns:
1936 A string representation of the key, which preserves ordering.
1938 assert isinstance(key, datastore.Key)
1939 path = key.to_path()
1941 out_path = []
1942 for part in path:
1943 if isinstance(part, (int, long)):
1946 part = '%020d' % part
1947 else:
1949 part = ':%s' % part
1951 out_path.append(zero_matcher.sub(u'\0\1', part))
1953 out_str = u'\0\0'.join(out_path)
1955 return out_str
1958 def StrKey(key_str):
1959 """The inverse of the KeyStr function.
1961 Args:
1962 key_str: A string in the range of KeyStr.
1964 Returns:
1965 A datastore.Key instance k, such that KeyStr(k) == key_str.
1967 parts = key_str.split(u'\0\0')
1968 for i in xrange(len(parts)):
1969 if parts[i][0] == ':':
1971 part = parts[i][1:]
1972 part = zero_one_matcher.sub(u'\0', part)
1973 parts[i] = part
1974 else:
1975 parts[i] = int(parts[i])
1976 return datastore.Key.from_path(*parts)
1979 class ResultDatabase(_Database):
1980 """Persistently record all the entities downloaded during an export.
1982 The entities are held in the database by their unique datastore key
1983 in order to avoid duplication if an export is restarted.
1986 def __init__(self, db_filename, signature, commit_periodicity=1,
1987 exporter=None):
1988 """Initialize a ResultDatabase object.
1990 Args:
1991 db_filename: The name of the SQLite database to use.
1992 signature: A string identifying the important invocation options,
1993 used to make sure we are not using an old database.
1994 commit_periodicity: How many operations to perform between commits.
1995 exporter: Exporter instance; if exporter.calculate_sort_key_from_entity
1996 is true then exporter.sort_key_from_entity(entity) will be called.
1998 self.complete = False
1999 create_table = ('create table result (\n'
2000 'id BLOB primary key,\n'
2001 'value BLOB not null,\n'
2002 'sort_key BLOB)')
2004 _Database.__init__(self,
2005 db_filename,
2006 create_table,
2007 signature,
2008 commit_periodicity=commit_periodicity)
2009 if self.existing_table:
2010 cursor = self.primary_conn.cursor()
2011 cursor.execute('select count(*) from result')
2012 self.existing_count = int(cursor.fetchone()[0])
2013 else:
2014 self.existing_count = 0
2015 self.count = self.existing_count
2016 if exporter and getattr(exporter, 'calculate_sort_key_from_entity', False):
2017 self.sort_key_from_entity = exporter.sort_key_from_entity
2018 else:
2019 self.sort_key_from_entity = None
2021 def _StoreEntity(self, entity_id, entity):
2022 """Store an entity in the result database.
2024 Args:
2025 entity_id: A datastore.Key for the entity.
2026 entity: The entity to store.
2028 Returns:
2029 True if this entities is not already present in the result database.
2031 assert _RunningInThread(self.secondary_thread)
2032 assert isinstance(entity_id, datastore.Key), (
2033 'expected a datastore.Key, got a %s' % entity_id.__class__.__name__)
2035 key_str = buffer(KeyStr(entity_id).encode('utf-8'))
2036 self.insert_cursor.execute(
2037 'select count(*) from result where id = ?', (key_str,))
2039 already_present = self.insert_cursor.fetchone()[0]
2040 result = True
2041 if already_present:
2042 result = False
2043 self.insert_cursor.execute('delete from result where id = ?',
2044 (key_str,))
2045 else:
2046 self.count += 1
2047 if self.sort_key_from_entity:
2048 sort_key = self.sort_key_from_entity(datastore.Entity._FromPb(entity))
2049 else:
2050 sort_key = ''
2051 value = entity.Encode()
2052 self.insert_cursor.execute(
2053 'insert into result (id, value, sort_key) values (?, ?, ?)',
2054 (key_str, buffer(value), sort_key))
2055 return result
2057 def StoreEntities(self, keys, entities):
2058 """Store a group of entities in the result database.
2060 Args:
2061 keys: A list of entity keys.
2062 entities: A list of entities.
2064 Returns:
2065 The number of new entities stored in the result database.
2067 self._OpenSecondaryConnection()
2068 t = time.time()
2069 count = 0
2070 for entity_id, entity in zip(keys,
2071 entities):
2072 if self._StoreEntity(entity_id, entity):
2073 count += 1
2074 logger.debug('%s insert: delta=%.3f',
2075 self.db_filename,
2076 time.time() - t)
2077 logger.debug('Entities transferred total: %s', self.count)
2078 self._MaybeCommit()
2079 return count
2081 def ResultsComplete(self):
2082 """Marks the result database as containing complete results."""
2083 self.complete = True
2085 def AllEntities(self):
2086 """Yields all pairs of (id, value) from the result table."""
2088 conn = sqlite3.connect(self.db_filename, isolation_level=None)
2089 cursor = conn.cursor()
2094 cursor.execute(
2095 'select id, value from result order by sort_key, id')
2097 for unused_entity_id, entity in cursor:
2098 entity_proto = entity_pb.EntityProto(contents=entity)
2099 yield datastore.Entity._FromPb(entity_proto)
2102 class _ProgressDatabase(_Database):
2103 """Persistently record all progress information during an upload.
2105 This class wraps a very simple SQLite database which records each of
2106 the relevant details from a chunk of work. If the loader is
2107 resumed, then data is replayed out of the database.
2110 def __init__(self,
2111 db_filename,
2112 sql_type,
2113 py_type,
2114 signature,
2115 commit_periodicity=100):
2116 """Initialize the ProgressDatabase instance.
2118 Args:
2119 db_filename: The name of the SQLite database to use.
2120 sql_type: A string of the SQL type to use for entity keys.
2121 py_type: The python type of entity keys.
2122 signature: A string identifying the important invocation options,
2123 used to make sure we are not using an old database.
2124 commit_periodicity: How many operations to perform between commits.
2126 self.prior_key_end = None
2133 create_table = ('create table progress (\n'
2134 'id integer primary key autoincrement,\n'
2135 'state integer not null,\n'
2136 'kind text not null,\n'
2137 'key_start %s,\n'
2138 'key_end %s)'
2139 % (sql_type, sql_type))
2140 self.py_type = py_type
2142 index = 'create index i_state on progress (state)'
2143 _Database.__init__(self,
2144 db_filename,
2145 create_table,
2146 signature,
2147 index=index,
2148 commit_periodicity=commit_periodicity)
2150 def UseProgressData(self):
2151 """Returns True if the database has progress information.
2153 Note there are two basic cases for progress information:
2154 1) All saved records indicate a successful upload. In this case, we
2155 need to skip everything transmitted so far and then send the rest.
2156 2) Some records for incomplete transfer are present. These need to be
2157 sent again, and then we resume sending after all the successful
2158 data.
2160 Returns:
2161 True: if the database has progress information.
2163 Raises:
2164 ResumeError: if there is an error retrieving rows from the database.
2166 assert _RunningInThread(self.primary_thread)
2170 cursor = self.primary_conn.cursor()
2171 cursor.execute('select count(*) from progress')
2172 row = cursor.fetchone()
2173 if row is None:
2174 raise ResumeError('Cannot retrieve progress information from database.')
2177 return row[0] != 0
2179 def StoreKeys(self, kind, key_start, key_end):
2180 """Record a new progress record, returning a key for later updates.
2182 The specified progress information will be persisted into the database.
2183 A unique key will be returned that identifies this progress state. The
2184 key is later used to (quickly) update this record.
2186 For the progress resumption to proceed properly, calls to StoreKeys
2187 MUST specify monotonically increasing key ranges. This will result in
2188 a database whereby the ID, KEY_START, and KEY_END rows are all
2189 increasing (rather than having ranges out of order).
2191 NOTE: the above precondition is NOT tested by this method (since it
2192 would imply an additional table read or two on each invocation).
2194 Args:
2195 kind: The kind for the WorkItem
2196 key_start: The starting key of the WorkItem (inclusive)
2197 key_end: The end key of the WorkItem (inclusive)
2199 Returns:
2200 A string to later be used as a unique key to update this state.
2202 self._OpenSecondaryConnection()
2204 assert _RunningInThread(self.secondary_thread)
2205 assert (not key_start) or isinstance(key_start, self.py_type), (
2206 '%s is a %s, %s expected %s' % (key_start,
2207 key_start.__class__,
2208 self.__class__.__name__,
2209 self.py_type))
2210 assert (not key_end) or isinstance(key_end, self.py_type), (
2211 '%s is a %s, %s expected %s' % (key_end,
2212 key_end.__class__,
2213 self.__class__.__name__,
2214 self.py_type))
2215 assert KeyLEQ(key_start, key_end), '%s not less than %s' % (
2216 repr(key_start), repr(key_end))
2218 self.insert_cursor.execute(
2219 'insert into progress (state, kind, key_start, key_end)'
2220 ' values (?, ?, ?, ?)',
2221 (STATE_READ, unicode(kind), unicode(key_start), unicode(key_end)))
2223 progress_key = self.insert_cursor.lastrowid
2225 self._MaybeCommit()
2227 return progress_key
2229 def UpdateState(self, key, new_state):
2230 """Update a specified progress record with new information.
2232 Args:
2233 key: The key for this progress record, returned from StoreKeys
2234 new_state: The new state to associate with this progress record.
2236 self._OpenSecondaryConnection()
2238 assert _RunningInThread(self.secondary_thread)
2239 assert isinstance(new_state, int)
2241 self.update_cursor.execute('update progress set state=? where id=?',
2242 (new_state, key))
2244 self._MaybeCommit()
2246 def DeleteKey(self, progress_key):
2247 """Delete the entities with the given key from the result database."""
2248 self._OpenSecondaryConnection()
2250 assert _RunningInThread(self.secondary_thread)
2252 t = time.time()
2253 self.insert_cursor.execute(
2254 'delete from progress where rowid = ?', (progress_key,))
2256 logger.debug('delete: delta=%.3f', time.time() - t)
2258 self._MaybeCommit()
2260 def GetProgressStatusGenerator(self):
2261 """Get a generator which yields progress information.
2263 The returned generator will yield a series of 5-tuples that specify
2264 progress information about a prior run of the uploader. The 5-tuples
2265 have the following values:
2267 progress_key: The unique key to later update this record with new
2268 progress information.
2269 state: The last state saved for this progress record.
2270 kind: The datastore kind of the items for uploading.
2271 key_start: The starting key of the items for uploading (inclusive).
2272 key_end: The ending key of the items for uploading (inclusive).
2274 After all incompletely-transferred records are provided, then one
2275 more 5-tuple will be generated:
2277 None
2278 DATA_CONSUMED_TO_HERE: A unique string value indicating this record
2279 is being provided.
2280 None
2281 None
2282 key_end: An integer value specifying the last data source key that
2283 was handled by the previous run of the uploader.
2285 The caller should begin uploading records which occur after key_end.
2287 Yields:
2288 Five-tuples of (progress_key, state, kind, key_start, key_end)
2293 conn = sqlite3.connect(self.db_filename, isolation_level=None)
2294 cursor = conn.cursor()
2298 cursor.execute('select max(key_end) from progress')
2300 result = cursor.fetchone()
2301 if result is not None:
2302 key_end = result[0]
2303 else:
2304 logger.debug('No rows in progress database.')
2305 return
2307 self.prior_key_end = key_end
2311 cursor.execute(
2312 'select id, state, kind, key_start, key_end from progress'
2313 ' where state != ?'
2314 ' order by id',
2315 (STATE_SENT,))
2319 rows = cursor.fetchall()
2321 for row in rows:
2322 if row is None:
2323 break
2324 progress_key, state, kind, key_start, key_end = row
2326 yield progress_key, state, kind, key_start, key_end
2329 yield None, DATA_CONSUMED_TO_HERE, None, None, key_end
2332 def ProgressDatabase(db_filename, signature):
2333 """Returns a database to store upload progress information."""
2334 return _ProgressDatabase(db_filename, 'INTEGER', int, signature)
2337 class ExportProgressDatabase(_ProgressDatabase):
2338 """A database to store download progress information."""
2340 def __init__(self, db_filename, signature):
2341 """Initialize an ExportProgressDatabase."""
2342 _ProgressDatabase.__init__(self,
2343 db_filename,
2344 'TEXT',
2345 datastore.Key,
2346 signature,
2347 commit_periodicity=1)
2349 def UseProgressData(self):
2350 """Check if the progress database contains progress data.
2352 Returns:
2353 True: if the database contains progress data.
2359 return self.existing_table
2362 class StubProgressDatabase(object):
2363 """A stub implementation of ProgressDatabase which does nothing."""
2365 def UseProgressData(self):
2366 """Whether the stub database has progress information (it doesn't)."""
2367 return False
2369 def StoreKeys(self, unused_kind, unused_key_start, unused_key_end):
2370 """Pretend to store a key in the stub database."""
2371 return 'fake-key'
2373 def UpdateState(self, unused_key, unused_new_state):
2374 """Pretend to update the state of a progress item."""
2375 pass
2377 def ThreadComplete(self):
2378 """Finalize operations on the stub database (i.e. do nothing)."""
2379 pass
2381 def DeleteKey(self, unused_key):
2382 """Delete the operations with a given key (but, do nothing.)"""
2383 pass
2386 class _ProgressThreadBase(_ThreadBase):
2387 """A thread which records progress information for the upload process.
2389 The progress information is stored into the provided progress database.
2390 This class is not responsible for replaying a prior run's progress
2391 information out of the database. Separate mechanisms must be used to
2392 resume a prior upload attempt.
2395 NAME = 'progress tracking thread'
2397 def __init__(self, progress_queue, progress_db):
2398 """Initialize the ProgressTrackerThread instance.
2400 Args:
2401 progress_queue: A Queue used for tracking progress information.
2402 progress_db: The database for tracking progress information; should
2403 be an instance of ProgressDatabase.
2405 _ThreadBase.__init__(self)
2407 self.progress_queue = progress_queue
2408 self.db = progress_db
2409 self.entities_transferred = 0
2411 def EntitiesTransferred(self):
2412 """Return the total number of unique entities transferred."""
2413 return self.entities_transferred
2415 def UpdateProgress(self, item):
2416 """Updates the progress information for the given item.
2418 Args:
2419 item: A work item whose new state will be recorded
2421 raise NotImplementedError()
2423 def WorkFinished(self):
2424 """Performs final actions after the entity transfer is complete."""
2425 raise NotImplementedError()
2427 def PerformWork(self):
2428 """Performs the work of a ProgressTrackerThread."""
2429 while not self.exit_flag:
2430 try:
2431 item = self.progress_queue.get(block=True, timeout=1.0)
2432 except Queue.Empty:
2434 continue
2435 if item == _THREAD_SHOULD_EXIT:
2436 break
2438 if item.state == STATE_READ and item.progress_key is None:
2441 item.progress_key = self.db.StoreKeys(item.kind,
2442 item.key_start,
2443 item.key_end)
2444 else:
2448 assert item.progress_key is not None
2449 self.UpdateProgress(item)
2452 item.progress_event.set()
2454 self.progress_queue.task_done()
2456 self.db.ThreadComplete()
2461 class ProgressTrackerThread(_ProgressThreadBase):
2462 """A thread which records progress information for the upload process.
2464 The progress information is stored into the provided progress database.
2465 This class is not responsible for replaying a prior run's progress
2466 information out of the database. Separate mechanisms must be used to
2467 resume a prior upload attempt.
2469 NAME = 'progress tracking thread'
2471 def __init__(self, progress_queue, progress_db):
2472 """Initialize the ProgressTrackerThread instance.
2474 Args:
2475 progress_queue: A Queue used for tracking progress information.
2476 progress_db: The database for tracking progress information; should
2477 be an instance of ProgressDatabase.
2479 _ProgressThreadBase.__init__(self, progress_queue, progress_db)
2481 def UpdateProgress(self, item):
2482 """Update the state of the given WorkItem.
2484 Args:
2485 item: A WorkItem instance.
2487 self.db.UpdateState(item.progress_key, item.state)
2488 if item.state == STATE_SENT:
2489 self.entities_transferred += item.count
2491 def WorkFinished(self):
2492 """Performs final actions after the entity transfer is complete."""
2493 pass
2496 class ExportProgressThread(_ProgressThreadBase):
2497 """A thread to record progress information and write record data for exports.
2499 The progress information is stored into a provided progress database.
2500 Exported results are stored in the result database and dumped to an output
2501 file at the end of the download.
2504 def __init__(self, exporter, progress_queue, progress_db, result_db):
2505 """Initialize the ExportProgressThread instance.
2507 Args:
2508 exporter: An Exporter instance for the download.
2509 progress_queue: A Queue used for tracking progress information.
2510 progress_db: The database for tracking progress information; should
2511 be an instance of ProgressDatabase.
2512 result_db: The database for holding exported entities; should be an
2513 instance of ResultDatabase.
2515 _ProgressThreadBase.__init__(self, progress_queue, progress_db)
2517 self.exporter = exporter
2518 self.existing_count = result_db.existing_count
2519 self.result_db = result_db
2521 def EntitiesTransferred(self):
2522 """Return the total number of unique entities transferred."""
2523 return self.result_db.count
2525 def WorkFinished(self):
2526 """Write the contents of the result database."""
2527 self.exporter.output_entities(self.result_db.AllEntities())
2529 def UpdateProgress(self, item):
2530 """Update the state of the given KeyRangeItem.
2532 Args:
2533 item: A KeyRange instance.
2535 if item.state == STATE_GOT:
2536 count = self.result_db.StoreEntities(item.download_result.keys,
2537 item.download_result.entities)
2539 self.db.DeleteKey(item.progress_key)
2540 self.entities_transferred += count
2541 else:
2542 self.db.UpdateState(item.progress_key, item.state)
2545 class MapperProgressThread(_ProgressThreadBase):
2546 """A thread to record progress information for maps over the datastore."""
2548 def __init__(self, mapper, progress_queue, progress_db):
2549 """Initialize the MapperProgressThread instance.
2551 Args:
2552 mapper: A Mapper object for this map run.
2553 progress_queue: A Queue used for tracking progress information.
2554 progress_db: The database for tracking progress information; should
2555 be an instance of ProgressDatabase.
2557 _ProgressThreadBase.__init__(self, progress_queue, progress_db)
2559 self.mapper = mapper
2561 def EntitiesTransferred(self):
2562 """Return the total number of unique entities transferred."""
2563 return self.entities_transferred
2565 def WorkFinished(self):
2566 """Perform actions after map is complete."""
2567 pass
2569 def UpdateProgress(self, item):
2570 """Update the state of the given KeyRangeItem.
2572 Args:
2573 item: A KeyRange instance.
2575 if item.state == STATE_GOT:
2576 self.entities_transferred += item.count
2578 self.db.DeleteKey(item.progress_key)
2579 else:
2580 self.db.UpdateState(item.progress_key, item.state)
2583 def ParseKey(key_string):
2584 """Turn a key stored in the database into a Key or None.
2586 Args:
2587 key_string: The string representation of a Key.
2589 Returns:
2590 A datastore.Key instance or None
2592 if not key_string:
2593 return None
2594 if key_string == 'None':
2595 return None
2596 return datastore.Key(encoded=key_string)
2599 def Validate(value, typ):
2600 """Checks that value is non-empty and of the right type.
2602 Args:
2603 value: any value
2604 typ: a type or tuple of types
2606 Raises:
2607 ValueError: if value is None or empty.
2608 TypeError: if it's not the given type.
2610 if not value:
2611 raise ValueError('Value should not be empty; received %s.' % value)
2612 elif not isinstance(value, typ):
2613 raise TypeError('Expected a %s, but received %s (a %s).' %
2614 (typ, value, value.__class__))
2617 def CheckFile(filename):
2618 """Check that the given file exists and can be opened for reading.
2620 Args:
2621 filename: The name of the file.
2623 Raises:
2624 FileNotFoundError: if the given filename is not found
2625 FileNotReadableError: if the given filename is not readable.
2627 if not os.path.exists(filename):
2628 raise FileNotFoundError('%s: file not found' % filename)
2629 elif not os.access(filename, os.R_OK):
2630 raise FileNotReadableError('%s: file not readable' % filename)
2633 class Loader(object):
2634 """A base class for creating datastore entities from input data.
2636 To add a handler for bulk loading a new entity kind into your datastore,
2637 write a subclass of this class that calls Loader.__init__ from your
2638 class's __init__.
2640 If you need to run extra code to convert entities from the input
2641 data, create new properties, or otherwise modify the entities before
2642 they're inserted, override handle_entity.
2644 See the create_entity method for the creation of entities from the
2645 (parsed) input data.
2648 __loaders = {}
2649 kind = None
2650 __properties = None
2652 def __init__(self, kind, properties):
2653 """Constructor.
2655 Populates this Loader's kind and properties map.
2657 Args:
2658 kind: a string containing the entity kind that this loader handles
2660 properties: list of (name, converter) tuples.
2662 This is used to automatically convert the input columns into
2663 properties. The converter should be a function that takes one
2664 argument, a string value from the input file, and returns a
2665 correctly typed property value that should be inserted. The
2666 tuples in this list should match the columns in your input file,
2667 in order.
2669 For example:
2670 [('name', str),
2671 ('id_number', int),
2672 ('email', datastore_types.Email),
2673 ('user', users.User),
2674 ('birthdate', lambda x: datetime.datetime.fromtimestamp(float(x))),
2675 ('description', datastore_types.Text),
2678 Validate(kind, (basestring, tuple))
2679 self.kind = kind
2680 self.__openfile = open
2681 self.__create_csv_reader = csv.reader
2684 GetImplementationClass(kind)
2686 Validate(properties, list)
2687 for name, fn in properties:
2688 Validate(name, basestring)
2689 assert callable(fn), (
2690 'Conversion function %s for property %s is not callable.' % (fn, name))
2692 self.__properties = properties
2694 @staticmethod
2695 def RegisterLoader(loader):
2696 """Register loader and the Loader instance for its kind.
2698 Args:
2699 loader: A Loader instance.
2701 Loader.__loaders[loader.kind] = loader
2703 def get_keys_to_reserve(self):
2704 """Returns keys with ids in their paths to be reserved.
2706 Returns:
2707 A list of keys used to advance the id sequences associated with
2708 each id to prevent collisions with future ids.
2710 return []
2712 def alias_old_names(self):
2713 """Aliases method names so that Loaders defined with old names work."""
2714 aliases = (
2715 ('CreateEntity', 'create_entity'),
2716 ('HandleEntity', 'handle_entity'),
2717 ('GenerateKey', 'generate_key'),
2719 for old_name, new_name in aliases:
2722 setattr(Loader, old_name, getattr(Loader, new_name))
2725 if hasattr(self.__class__, old_name) and not (
2726 getattr(self.__class__, old_name).im_func ==
2727 getattr(Loader, new_name).im_func):
2728 if hasattr(self.__class__, new_name) and not (
2729 getattr(self.__class__, new_name).im_func ==
2730 getattr(Loader, new_name).im_func):
2732 raise NameClashError(old_name, new_name, self.__class__)
2734 setattr(self, new_name, getattr(self, old_name))
2736 def create_entity(self, values, key_name=None, parent=None):
2737 """Creates a entity from a list of property values.
2739 Args:
2740 values: list/tuple of str
2741 key_name: if provided, the name for the (single) resulting entity
2742 parent: A datastore.Key instance for the parent, or None
2744 Returns:
2745 list of db.Model
2747 The returned entities are populated with the property values from the
2748 argument, converted to native types using the properties map given in
2749 the constructor, and passed through handle_entity. They're ready to be
2750 inserted.
2752 Raises:
2753 AssertionError: if the number of values doesn't match the number
2754 of properties in the properties map.
2755 ValueError: if any element of values is None or empty.
2756 TypeError: if values is not a list or tuple.
2758 Validate(values, (list, tuple))
2759 assert len(values) == len(self.__properties), (
2760 'Expected %d columns, found %d.' %
2761 (len(self.__properties), len(values)))
2763 model_class = GetImplementationClass(self.kind)
2765 properties = {
2766 'key_name': key_name,
2767 'parent': parent,
2769 for (name, converter), val in zip(self.__properties, values):
2770 if converter is bool and val.lower() in ('0', 'false', 'no'):
2771 val = False
2772 properties[name] = converter(val)
2774 entity = model_class(**properties)
2775 entities = self.handle_entity(entity)
2777 if entities:
2778 if not isinstance(entities, (list, tuple)):
2779 entities = [entities]
2781 for entity in entities:
2782 if not isinstance(entity, db.Model):
2783 raise TypeError('Expected a db.Model, received %s (a %s).' %
2784 (entity, entity.__class__))
2786 return entities
2788 def generate_key(self, i, values):
2789 """Generates a key_name to be used in creating the underlying object.
2791 The default implementation returns None.
2793 This method can be overridden to control the key generation for
2794 uploaded entities. The value returned should be None (to use a
2795 server generated numeric key), or a string which neither starts
2796 with a digit nor has the form __*__ (see
2797 https://developers.google.com/appengine/docs/python/datastore/entities),
2798 or a datastore.Key instance.
2800 If you generate your own string keys, keep in mind:
2802 1. The key name for each entity must be unique.
2803 2. If an entity of the same kind and key already exists in the
2804 datastore, it will be overwritten.
2806 Args:
2807 i: Number corresponding to this object (assume it's run in a loop,
2808 this is your current count.
2809 values: list/tuple of str.
2811 Returns:
2812 A string to be used as the key_name for an entity.
2814 return None
2816 def handle_entity(self, entity):
2817 """Subclasses can override this to add custom entity conversion code.
2819 This is called for each entity, after its properties are populated
2820 from the input but before it is stored. Subclasses can override
2821 this to add custom entity handling code.
2823 The entity to be inserted should be returned. If multiple entities
2824 should be inserted, return a list of entities. If no entities
2825 should be inserted, return None or [].
2827 Args:
2828 entity: db.Model
2830 Returns:
2831 db.Model or list of db.Model
2833 return entity
2835 def initialize(self, filename, loader_opts):
2836 """Performs initialization and validation of the input file.
2838 This implementation checks that the input file exists and can be
2839 opened for reading.
2841 Args:
2842 filename: The string given as the --filename flag argument.
2843 loader_opts: The string given as the --loader_opts flag argument.
2845 CheckFile(filename)
2847 def finalize(self):
2848 """Performs finalization actions after the upload completes."""
2849 pass
2851 def generate_records(self, filename):
2852 """Subclasses can override this to add custom data input code.
2854 This method must yield fixed-length lists of strings.
2856 The default implementation uses csv.reader to read CSV rows
2857 from filename.
2859 Args:
2860 filename: The string input for the --filename option.
2862 Yields:
2863 Lists of strings.
2865 csv_generator = CSVGenerator(filename, openfile=self.__openfile,
2866 create_csv_reader=self.__create_csv_reader
2867 ).Records()
2868 return csv_generator
2870 @staticmethod
2871 def RegisteredLoaders():
2872 """Returns a dict of the Loader instances that have been created."""
2874 return dict(Loader.__loaders)
2876 @staticmethod
2877 def RegisteredLoader(kind):
2878 """Returns the loader instance for the given kind if it exists."""
2879 return Loader.__loaders[kind]
2882 class RestoreThread(_ThreadBase):
2883 """A thread to read saved entity_pbs from sqlite3."""
2884 NAME = 'RestoreThread'
2885 _ENTITIES_DONE = 'Entities Done'
2887 def __init__(self, queue, filename):
2888 _ThreadBase.__init__(self)
2889 self.queue = queue
2890 self.filename = filename
2892 def PerformWork(self):
2893 db_conn = sqlite3.connect(self.filename)
2894 cursor = db_conn.cursor()
2895 cursor.execute('select id, value from result')
2896 for entity_id, value in cursor:
2897 self.queue.put(value, block=True)
2898 self.queue.put(RestoreThread._ENTITIES_DONE, block=True)
2901 class RestoreLoader(Loader):
2902 """A Loader which imports protobuffers from a file."""
2904 def __init__(self, kind, app_id):
2905 self.kind = kind
2906 self.app_id = app_id
2911 self.namespace = namespace_manager.get_namespace()
2913 def initialize(self, filename, loader_opts):
2914 CheckFile(filename)
2915 self.queue = Queue.Queue(1000)
2916 restore_thread = RestoreThread(self.queue, filename)
2917 restore_thread.start()
2918 self.keys_to_reserve = self._find_keys_to_reserve(
2919 self.generate_records(filename))
2920 restore_thread = RestoreThread(self.queue, filename)
2921 restore_thread.start()
2923 def get_keys_to_reserve(self):
2924 """Returns keys with ids in their paths to be reserved.
2926 Returns:
2927 A list of keys used to advance the id sequences associated with
2928 each id to prevent collisions with future ids.
2930 return self.keys_to_reserve
2932 def _find_keys_to_reserve(self, record_generator):
2933 """Find all entity keys with ids in their paths.
2935 Args:
2936 record_generator: A generator of entity_encoding strings.
2938 Returns:
2939 A list of keys to reserve.
2941 keys_to_reserve = []
2942 for values in record_generator:
2943 entity = self.create_entity(values)
2944 key = entity.key()
2945 for id_or_name in key.to_path()[1::2]:
2946 if isinstance(id_or_name, (int, long)):
2947 keys_to_reserve.append(key)
2948 break
2949 return keys_to_reserve
2951 def generate_records(self, filename):
2952 while True:
2953 record = self.queue.get(block=True)
2954 if id(record) == id(RestoreThread._ENTITIES_DONE):
2955 break
2956 entity_proto = entity_pb.EntityProto(contents=str(record))
2957 fixed_entity_proto = self._translate_entity_proto(entity_proto)
2958 yield datastore.Entity._FromPb(fixed_entity_proto)
2960 def create_entity(self, values, key_name=None, parent=None):
2961 return values
2963 def rewrite_reference_proto(self, entity_namespace, reference_proto):
2964 """Transform the Reference protobuffer which underlies keys and references.
2966 Args:
2967 entity_namespace: The 'before' namespace of the entity that has this
2968 reference property. If this value does not match the reference
2969 properties current namespace, then the reference property namespace will
2970 not be modified.
2971 reference_proto: A Onestore Reference proto
2973 reference_proto.set_app(self.app_id)
2974 if entity_namespace != reference_proto.name_space():
2975 return
2977 if self.namespace:
2978 reference_proto.set_name_space(self.namespace)
2979 else:
2980 reference_proto.clear_name_space()
2982 def _translate_entity_proto(self, entity_proto):
2983 """Transform the ReferenceProperties of the given entity to fix app_id."""
2984 entity_key = entity_proto.mutable_key()
2985 entity_key.set_app(self.app_id)
2986 original_entity_namespace = entity_key.name_space()
2987 if self.namespace:
2988 entity_key.set_name_space(self.namespace)
2989 else:
2990 entity_key.clear_name_space()
2992 for prop in entity_proto.property_list():
2993 prop_value = prop.mutable_value()
2994 if prop_value.has_referencevalue():
2995 self.rewrite_reference_proto(original_entity_namespace,
2996 prop_value.mutable_referencevalue())
2998 for prop in entity_proto.raw_property_list():
2999 prop_value = prop.mutable_value()
3000 if prop_value.has_referencevalue():
3001 self.rewrite_reference_proto(original_entity_namespace,
3002 prop_value.mutable_referencevalue())
3004 return entity_proto
3007 class Exporter(object):
3008 """A base class for serializing datastore entities.
3010 To add a handler for exporting an entity kind from your datastore,
3011 write a subclass of this class that calls Exporter.__init__ from your
3012 class's __init__.
3014 If you need to run extra code to convert entities from the input
3015 data, create new properties, or otherwise modify the entities before
3016 they're inserted, override handle_entity.
3018 See the output_entities method for the writing of data from entities.
3021 __exporters = {}
3022 kind = None
3023 __properties = None
3024 calculate_sort_key_from_entity = False
3026 def __init__(self, kind, properties):
3027 """Constructor.
3029 Populates this Exporters's kind and properties map.
3031 Args:
3032 kind: a string containing the entity kind that this exporter handles
3034 properties: list of (name, converter, default) tuples.
3036 This is used to automatically convert the entities to strings.
3037 The converter should be a function that takes one argument, a property
3038 value of the appropriate type, and returns a str or unicode. The default
3039 is a string to be used if the property is not present, or None to fail
3040 with an error if the property is missing.
3042 For example:
3043 [('name', str, None),
3044 ('id_number', str, None),
3045 ('email', str, ''),
3046 ('user', str, None),
3047 ('birthdate',
3048 lambda x: str(datetime.datetime.fromtimestamp(float(x))),
3049 None),
3050 ('description', str, ''),
3053 Validate(kind, basestring)
3054 self.kind = kind
3057 GetImplementationClass(kind)
3059 Validate(properties, list)
3060 for name, fn, default in properties:
3061 Validate(name, basestring)
3062 assert callable(fn), (
3063 'Conversion function %s for property %s is not callable.' % (
3064 fn, name))
3065 if default:
3066 Validate(default, basestring)
3068 self.__properties = properties
3070 @staticmethod
3071 def RegisterExporter(exporter):
3072 """Register exporter and the Exporter instance for its kind.
3074 Args:
3075 exporter: A Exporter instance.
3077 Exporter.__exporters[exporter.kind] = exporter
3079 def __ExtractProperties(self, entity):
3080 """Converts an entity into a list of string values.
3082 Args:
3083 entity: An entity to extract the properties from.
3085 Returns:
3086 A list of the properties of the entity.
3088 Raises:
3089 MissingPropertyError: if an expected field on the entity is missing.
3091 encoding = []
3092 for name, fn, default in self.__properties:
3093 try:
3094 encoding.append(fn(entity[name]))
3095 except KeyError:
3096 if name == '__key__':
3097 encoding.append(fn(entity.key()))
3098 elif default is None:
3099 raise MissingPropertyError(name)
3100 else:
3101 encoding.append(default)
3102 return encoding
3104 def __EncodeEntity(self, entity):
3105 """Convert the given entity into CSV string.
3107 Args:
3108 entity: The entity to encode.
3110 Returns:
3111 A CSV string.
3113 output = StringIO.StringIO()
3114 writer = csv.writer(output)
3115 writer.writerow(self.__ExtractProperties(entity))
3116 return output.getvalue()
3118 def __SerializeEntity(self, entity):
3119 """Creates a string representation of an entity.
3121 Args:
3122 entity: The entity to serialize.
3124 Returns:
3125 A serialized representation of an entity.
3127 encoding = self.__EncodeEntity(entity)
3128 if not isinstance(encoding, unicode):
3129 encoding = unicode(encoding, 'utf-8')
3130 encoding = encoding.encode('utf-8')
3131 return encoding
3133 def output_entities(self, entity_generator):
3134 """Outputs the downloaded entities.
3136 This implementation writes CSV.
3138 Args:
3139 entity_generator: A generator that yields the downloaded entities
3140 in key order.
3142 CheckOutputFile(self.output_filename)
3143 output_file = open(self.output_filename, 'w')
3144 logger.debug('Export complete, writing to file')
3145 output_file.writelines(self.__SerializeEntity(entity)
3146 for entity in entity_generator)
3148 def initialize(self, filename, exporter_opts):
3149 """Performs initialization and validation of the output file.
3151 This implementation checks that the input file exists and can be
3152 opened for writing.
3154 Args:
3155 filename: The string given as the --filename flag argument.
3156 exporter_opts: The string given as the --exporter_opts flag argument.
3158 CheckOutputFile(filename)
3159 self.output_filename = filename
3161 def finalize(self):
3162 """Performs finalization actions after the download completes."""
3163 pass
3165 def sort_key_from_entity(self, entity):
3166 """A value to alter sorting of entities in output_entities entity_generator.
3168 Will only be called if calculate_sort_key_from_entity is true.
3169 Args:
3170 entity: A datastore.Entity.
3171 Returns:
3172 A value to store in the intermediate sqlite table. The table will later
3173 be sorted by this value then by the datastore key, so the sort_key need
3174 not be unique.
3176 return ''
3178 @staticmethod
3179 def RegisteredExporters():
3180 """Returns a dictionary of the exporter instances that have been created."""
3182 return dict(Exporter.__exporters)
3184 @staticmethod
3185 def RegisteredExporter(kind):
3186 """Returns an exporter instance for the given kind if it exists."""
3187 return Exporter.__exporters[kind]
3190 class DumpExporter(Exporter):
3191 """An exporter which dumps protobuffers to a file."""
3193 def __init__(self, kind, result_db_filename):
3194 self.kind = kind
3195 self.result_db_filename = result_db_filename
3197 def output_entities(self, entity_generator):
3199 shutil.copyfile(self.result_db_filename, self.output_filename)
3202 class MapperRetry(Error):
3203 """An exception that indicates a non-fatal error during mapping."""
3206 class Mapper(object):
3207 """A base class for serializing datastore entities.
3209 To add a handler for exporting an entity kind from your datastore,
3210 write a subclass of this class that calls Mapper.__init__ from your
3211 class's __init__.
3213 You need to implement to batch_apply or apply method on your subclass
3214 for the map to do anything.
3217 __mappers = {}
3218 kind = None
3220 def __init__(self, kind):
3221 """Constructor.
3223 Populates this Mappers's kind.
3225 Args:
3226 kind: a string containing the entity kind that this mapper handles
3228 Validate(kind, basestring)
3229 self.kind = kind
3232 GetImplementationClass(kind)
3234 @staticmethod
3235 def RegisterMapper(mapper):
3236 """Register mapper and the Mapper instance for its kind.
3238 Args:
3239 mapper: A Mapper instance.
3241 Mapper.__mappers[mapper.kind] = mapper
3243 def initialize(self, mapper_opts):
3244 """Performs initialization.
3246 Args:
3247 mapper_opts: The string given as the --mapper_opts flag argument.
3249 pass
3251 def finalize(self):
3252 """Performs finalization actions after the download completes."""
3253 pass
3255 def apply(self, entity):
3256 print 'Default map function doing nothing to %s' % entity
3258 def batch_apply(self, entities):
3259 for entity in entities:
3260 self.apply(entity)
3262 def map_over_keys_only(self):
3263 """Return whether this mapper should iterate over only keys or not.
3265 Override this method in subclasses to return True values.
3267 Returns:
3268 True or False
3270 return False
3272 @staticmethod
3273 def RegisteredMappers():
3274 """Returns a dictionary of the mapper instances that have been created."""
3276 return dict(Mapper.__mappers)
3278 @staticmethod
3279 def RegisteredMapper(kind):
3280 """Returns an mapper instance for the given kind if it exists."""
3281 return Mapper.__mappers[kind]
3284 class QueueJoinThread(threading.Thread):
3285 """A thread that joins a queue and exits.
3287 Queue joins do not have a timeout. To simulate a queue join with
3288 timeout, run this thread and join it with a timeout.
3291 def __init__(self, queue):
3292 """Initialize a QueueJoinThread.
3294 Args:
3295 queue: The queue for this thread to join.
3297 threading.Thread.__init__(self)
3298 self.setDaemon(True)
3299 assert isinstance(queue, (Queue.Queue, ReQueue))
3300 self.queue = queue
3302 def run(self):
3303 """Perform the queue join in this thread."""
3304 self.queue.join()
3307 def InterruptibleQueueJoin(queue,
3308 thread_local,
3309 thread_pool,
3310 queue_join_thread_factory=QueueJoinThread,
3311 check_workers=True):
3312 """Repeatedly joins the given ReQueue or Queue.Queue with short timeout.
3314 Between each timeout on the join, worker threads are checked.
3316 Args:
3317 queue: A Queue.Queue or ReQueue instance.
3318 thread_local: A threading.local instance which indicates interrupts.
3319 thread_pool: An AdaptiveThreadPool instance.
3320 queue_join_thread_factory: Used for dependency injection.
3321 check_workers: Whether to interrupt the join on worker death.
3323 Returns:
3324 True unless the queue join is interrupted by SIGINT or worker death.
3326 thread = queue_join_thread_factory(queue)
3327 thread.start()
3328 while True:
3329 thread.join(timeout=.5)
3330 if not thread.isAlive():
3331 return True
3332 if thread_local.shut_down:
3333 logger.debug('Queue join interrupted')
3334 return False
3335 if check_workers:
3336 for worker_thread in thread_pool.Threads():
3337 if not worker_thread.isAlive():
3338 return False
3341 def ShutdownThreads(data_source_thread, thread_pool):
3342 """Shuts down the worker and data source threads.
3344 Args:
3345 data_source_thread: A running DataSourceThread instance.
3346 thread_pool: An AdaptiveThreadPool instance with workers registered.
3348 logger.info('An error occurred. Shutting down...')
3351 data_source_thread.exit_flag = True
3353 thread_pool.Shutdown()
3360 data_source_thread.join(timeout=3.0)
3361 if data_source_thread.isAlive():
3366 logger.warn('%s hung while trying to exit',
3367 data_source_thread.GetFriendlyName())
3370 class BulkTransporterApp(object):
3371 """Class to wrap bulk transport application functionality."""
3373 def __init__(self,
3374 arg_dict,
3375 input_generator_factory,
3376 throttle,
3377 progress_db,
3378 progresstrackerthread_factory,
3379 max_queue_size=DEFAULT_QUEUE_SIZE,
3380 request_manager_factory=RequestManager,
3381 datasourcethread_factory=DataSourceThread,
3382 progress_queue_factory=Queue.Queue,
3383 thread_pool_factory=adaptive_thread_pool.AdaptiveThreadPool,
3384 server=None):
3385 """Instantiate a BulkTransporterApp.
3387 Uploads or downloads data to or from application using HTTP requests.
3388 When run, the class will spin up a number of threads to read entities
3389 from the data source, pass those to a number of worker threads
3390 for sending to the application, and track all of the progress in a
3391 small database in case an error or pause/termination requires a
3392 restart/resumption of the upload process.
3394 Args:
3395 arg_dict: Dictionary of command line options.
3396 input_generator_factory: A factory that creates a WorkItem generator.
3397 throttle: A Throttle instance.
3398 progress_db: The database to use for replaying/recording progress.
3399 progresstrackerthread_factory: Used for dependency injection.
3400 max_queue_size: Maximum size of the queues before they should block.
3401 request_manager_factory: Used for dependency injection.
3402 datasourcethread_factory: Used for dependency injection.
3403 progress_queue_factory: Used for dependency injection.
3404 thread_pool_factory: Used for dependency injection.
3405 server: An existing AbstractRpcServer to reuse.
3407 self.app_id = arg_dict['application']
3408 self.post_url = arg_dict['url']
3409 self.kind = arg_dict['kind']
3410 self.batch_size = arg_dict['batch_size']
3411 self.input_generator_factory = input_generator_factory
3412 self.num_threads = arg_dict['num_threads']
3413 self.email = arg_dict['email']
3414 self.passin = arg_dict['passin']
3415 self.dry_run = arg_dict['dry_run']
3416 self.throttle_class = arg_dict['throttle_class']
3417 self.throttle = throttle
3418 self.progress_db = progress_db
3419 self.progresstrackerthread_factory = progresstrackerthread_factory
3420 self.max_queue_size = max_queue_size
3421 self.request_manager_factory = request_manager_factory
3422 self.datasourcethread_factory = datasourcethread_factory
3423 self.progress_queue_factory = progress_queue_factory
3424 self.thread_pool_factory = thread_pool_factory
3425 self.server = server
3426 (scheme,
3427 self.host_port, self.url_path,
3428 unused_query, unused_fragment) = urlparse.urlsplit(self.post_url)
3429 self.secure = (scheme == 'https')
3431 def RunPostAuthentication(self):
3432 """Method that gets called after authentication."""
3433 if isinstance(self.kind, basestring):
3434 return [self.kind]
3435 return self.kind
3437 def Run(self):
3438 """Perform the work of the BulkTransporterApp.
3440 Raises:
3441 AuthenticationError: If authentication is required and fails.
3443 Returns:
3444 Error code suitable for sys.exit, e.g. 0 on success, 1 on failure.
3446 self.error = False
3447 thread_pool = self.thread_pool_factory(
3448 self.num_threads, queue_size=self.max_queue_size)
3450 progress_queue = self.progress_queue_factory(self.max_queue_size)
3451 self.request_manager = self.request_manager_factory(self.app_id,
3452 self.host_port,
3453 self.url_path,
3454 self.kind,
3455 self.throttle,
3456 self.batch_size,
3457 self.secure,
3458 self.email,
3459 self.passin,
3460 self.dry_run,
3461 self.server,
3462 self.throttle_class)
3463 try:
3466 self.request_manager.Authenticate()
3467 except Exception, e:
3468 self.error = True
3471 if not isinstance(e, urllib2.HTTPError) or (
3472 e.code != 302 and e.code != 401):
3473 logger.exception('Exception during authentication')
3474 raise AuthenticationError()
3475 if (self.request_manager.auth_called and
3476 not self.request_manager.authenticated):
3479 self.error = True
3480 raise AuthenticationError('Authentication failed')
3482 kinds = self.RunPostAuthentication()
3484 for thread in thread_pool.Threads():
3485 self.throttle.Register(thread)
3487 self.progress_thread = self.progresstrackerthread_factory(
3488 progress_queue, self.progress_db)
3490 if self.progress_db.UseProgressData():
3491 logger.debug('Restarting upload using progress database')
3492 progress_generator_factory = self.progress_db.GetProgressStatusGenerator
3493 else:
3494 progress_generator_factory = None
3496 self.data_source_thread = (
3497 self.datasourcethread_factory(self.request_manager,
3498 kinds,
3499 thread_pool,
3500 progress_queue,
3501 self.input_generator_factory,
3502 progress_generator_factory))
3506 self.throttle.Register(self.data_source_thread)
3509 thread_local = threading.local()
3510 thread_local.shut_down = False
3512 def Interrupt(unused_signum, unused_frame):
3513 """Shutdown gracefully in response to a signal."""
3514 thread_local.shut_down = True
3515 self.error = True
3517 signal.signal(signal.SIGINT, Interrupt)
3520 self.progress_thread.start()
3521 self.data_source_thread.start()
3525 while not thread_local.shut_down:
3531 self.data_source_thread.join(timeout=0.25)
3534 if self.data_source_thread.isAlive():
3537 for thread in list(thread_pool.Threads()) + [self.progress_thread]:
3538 if not thread.isAlive():
3539 logger.info('Unexpected thread death: %s', thread.getName())
3540 thread_local.shut_down = True
3541 self.error = True
3542 break
3543 else:
3545 break
3547 def _Join(ob, msg):
3548 logger.debug('Waiting for %s...', msg)
3550 if isinstance(ob, threading.Thread):
3551 ob.join(timeout=3.0)
3552 if ob.isAlive():
3553 logger.debug('Joining %s failed', ob)
3554 else:
3555 logger.debug('... done.')
3556 elif isinstance(ob, (Queue.Queue, ReQueue)):
3557 if not InterruptibleQueueJoin(ob, thread_local, thread_pool):
3558 ShutdownThreads(self.data_source_thread, thread_pool)
3559 else:
3560 ob.join()
3561 logger.debug('... done.')
3564 if self.data_source_thread.error or thread_local.shut_down:
3565 ShutdownThreads(self.data_source_thread, thread_pool)
3566 else:
3568 _Join(thread_pool.requeue, 'worker threads to finish')
3570 thread_pool.Shutdown()
3572 thread_pool.JoinThreads()
3573 thread_pool.CheckErrors()
3574 print ''
3580 if self.progress_thread.isAlive():
3581 InterruptibleQueueJoin(progress_queue, thread_local, thread_pool,
3582 check_workers=False)
3583 else:
3584 logger.warn('Progress thread exited prematurely')
3588 progress_queue.put(_THREAD_SHOULD_EXIT)
3589 _Join(self.progress_thread, 'progress_thread to terminate')
3590 self.progress_thread.CheckError()
3591 if not thread_local.shut_down:
3592 self.progress_thread.WorkFinished()
3595 self.data_source_thread.CheckError()
3597 return self.ReportStatus()
3599 def ReportStatus(self):
3600 """Display a message reporting the final status of the transfer."""
3601 raise NotImplementedError()
3604 class BulkUploaderApp(BulkTransporterApp):
3605 """Class to encapsulate bulk uploader functionality."""
3607 def __init__(self, *args, **kwargs):
3608 BulkTransporterApp.__init__(self, *args, **kwargs)
3610 def RunPostAuthentication(self):
3611 loader = Loader.RegisteredLoader(self.kind)
3612 self.request_manager.ReserveKeys(loader.get_keys_to_reserve())
3613 return [self.kind]
3615 def ReportStatus(self):
3616 """Display a message reporting the final status of the transfer."""
3617 total_up, duration = self.throttle.TotalTransferred(
3618 remote_api_throttle.BANDWIDTH_UP)
3619 s_total_up, unused_duration = self.throttle.TotalTransferred(
3620 remote_api_throttle.HTTPS_BANDWIDTH_UP)
3621 total_up += s_total_up
3622 total = total_up
3623 logger.info('%d entities total, %d previously transferred',
3624 self.data_source_thread.read_count,
3625 self.data_source_thread.xfer_count)
3626 transfer_count = self.progress_thread.EntitiesTransferred()
3627 logger.info('%d entities (%d bytes) transferred in %.1f seconds',
3628 transfer_count, total, duration)
3629 if (self.data_source_thread.read_all and
3630 transfer_count +
3631 self.data_source_thread.xfer_count >=
3632 self.data_source_thread.read_count):
3633 logger.info('All entities successfully transferred')
3634 return 0
3635 else:
3636 logger.info('Some entities not successfully transferred')
3637 return 1
3640 class BulkDownloaderApp(BulkTransporterApp):
3641 """Class to encapsulate bulk downloader functionality."""
3643 def __init__(self, *args, **kwargs):
3644 BulkTransporterApp.__init__(self, *args, **kwargs)
3646 def RunPostAuthentication(self):
3647 if not self.kind:
3648 return self.request_manager.GetSchemaKinds()
3649 elif isinstance(self.kind, basestring):
3650 return [self.kind]
3651 else:
3652 return self.kind
3654 def ReportStatus(self):
3655 """Display a message reporting the final status of the transfer."""
3656 total_down, duration = self.throttle.TotalTransferred(
3657 remote_api_throttle.BANDWIDTH_DOWN)
3658 s_total_down, unused_duration = self.throttle.TotalTransferred(
3659 remote_api_throttle.HTTPS_BANDWIDTH_DOWN)
3660 total_down += s_total_down
3661 total = total_down
3662 existing_count = self.progress_thread.existing_count
3663 xfer_count = self.progress_thread.EntitiesTransferred()
3664 logger.info('Have %d entities, %d previously transferred',
3665 xfer_count, existing_count)
3666 logger.info('%d entities (%d bytes) transferred in %.1f seconds',
3667 xfer_count, total, duration)
3668 if self.error:
3669 return 1
3670 else:
3671 return 0
3674 class BulkMapperApp(BulkTransporterApp):
3675 """Class to encapsulate bulk map functionality."""
3677 def __init__(self, *args, **kwargs):
3678 BulkTransporterApp.__init__(self, *args, **kwargs)
3680 def ReportStatus(self):
3681 """Display a message reporting the final status of the transfer."""
3682 total_down, duration = self.throttle.TotalTransferred(
3683 remote_api_throttle.BANDWIDTH_DOWN)
3684 s_total_down, unused_duration = self.throttle.TotalTransferred(
3685 remote_api_throttle.HTTPS_BANDWIDTH_DOWN)
3686 total_down += s_total_down
3687 total = total_down
3688 xfer_count = self.progress_thread.EntitiesTransferred()
3689 logger.info('The following may be inaccurate if any mapper tasks '
3690 'encountered errors and had to be retried.')
3691 logger.info('Applied mapper to %s entities.',
3692 xfer_count)
3693 logger.info('%s entities (%s bytes) transferred in %.1f seconds',
3694 xfer_count, total, duration)
3695 if self.error:
3696 return 1
3697 else:
3698 return 0
3701 def PrintUsageExit(code):
3702 """Prints usage information and exits with a status code.
3704 Args:
3705 code: Status code to pass to sys.exit() after displaying usage information.
3707 print __doc__ % {'arg0': sys.argv[0]}
3708 sys.stdout.flush()
3709 sys.stderr.flush()
3710 sys.exit(code)
3713 REQUIRED_OPTION = object()
3716 BOOL_ARGS = ('create_config', 'debug', 'download', 'dry_run', 'dump',
3717 'has_header', 'map', 'passin', 'restore')
3718 INT_ARGS = ('bandwidth_limit', 'batch_size', 'http_limit', 'num_threads',
3719 'rps_limit')
3720 FILENAME_ARGS = ('config_file', 'db_filename', 'filename', 'log_file',
3721 'result_db_filename')
3722 STRING_ARGS = ('application', 'auth_domain', 'email', 'exporter_opts',
3723 'kind', 'loader_opts', 'mapper_opts', 'namespace', 'url')
3724 DEPRECATED_OPTIONS = {'csv_has_header': 'has_header', 'app_id': 'application'}
3725 FLAG_SPEC = (['csv_has_header', 'help', 'app_id='] +
3726 list(BOOL_ARGS) +
3727 [arg + '=' for arg in INT_ARGS + FILENAME_ARGS + STRING_ARGS])
3729 def ParseArguments(argv, die_fn=lambda: PrintUsageExit(1)):
3730 """Parses command-line arguments.
3732 Prints out a help message if -h or --help is supplied.
3734 Args:
3735 argv: List of command-line arguments.
3736 die_fn: Function to invoke to end the program.
3738 Returns:
3739 A dictionary containing the value of command-line options.
3741 opts, unused_args = getopt.getopt(
3742 argv[1:],
3743 'h',
3744 FLAG_SPEC)
3746 arg_dict = {}
3748 arg_dict['url'] = REQUIRED_OPTION
3749 arg_dict['filename'] = None
3750 arg_dict['config_file'] = None
3751 arg_dict['kind'] = None
3753 arg_dict['batch_size'] = None
3754 arg_dict['num_threads'] = DEFAULT_THREAD_COUNT
3755 arg_dict['bandwidth_limit'] = DEFAULT_BANDWIDTH_LIMIT
3756 arg_dict['rps_limit'] = DEFAULT_RPS_LIMIT
3757 arg_dict['http_limit'] = DEFAULT_REQUEST_LIMIT
3759 arg_dict['application'] = ''
3760 arg_dict['auth_domain'] = 'gmail.com'
3761 arg_dict['create_config'] = False
3762 arg_dict['db_filename'] = None
3763 arg_dict['debug'] = False
3764 arg_dict['download'] = False
3765 arg_dict['dry_run'] = False
3766 arg_dict['dump'] = False
3767 arg_dict['email'] = None
3768 arg_dict['exporter_opts'] = None
3769 arg_dict['has_header'] = False
3770 arg_dict['loader_opts'] = None
3771 arg_dict['log_file'] = None
3772 arg_dict['map'] = False
3773 arg_dict['mapper_opts'] = None
3774 arg_dict['namespace'] = ''
3775 arg_dict['passin'] = False
3776 arg_dict['restore'] = False
3777 arg_dict['result_db_filename'] = None
3778 arg_dict['throttle_class'] = None
3780 def ExpandFilename(filename):
3781 """Expand shell variables and ~usernames in filename."""
3782 return os.path.expandvars(os.path.expanduser(filename))
3784 for option, value in opts:
3785 if option in ('-h', '--help'):
3786 PrintUsageExit(0)
3787 if not option.startswith('--'):
3789 continue
3790 option = option[2:]
3791 if option in DEPRECATED_OPTIONS:
3792 print >> sys.stderr, ('--%s is deprecated, please use --%s.' %
3793 (option, DEPRECATED_OPTIONS[option]))
3794 option = DEPRECATED_OPTIONS[option]
3796 if option in BOOL_ARGS:
3797 arg_dict[option] = True
3798 elif option in INT_ARGS:
3799 arg_dict[option] = int(value)
3800 elif option in FILENAME_ARGS:
3801 arg_dict[option] = ExpandFilename(value)
3802 elif option in STRING_ARGS:
3803 arg_dict[option] = value
3805 return ProcessArguments(arg_dict, die_fn=die_fn)
3808 def ThrottleLayout(bandwidth_limit, http_limit, rps_limit):
3809 """Return a dictionary indicating the throttle options."""
3810 bulkloader_limits = dict(remote_api_throttle.NO_LIMITS)
3811 bulkloader_limits.update({
3812 remote_api_throttle.BANDWIDTH_UP: bandwidth_limit,
3813 remote_api_throttle.BANDWIDTH_DOWN: bandwidth_limit,
3814 remote_api_throttle.REQUESTS: http_limit,
3815 remote_api_throttle.HTTPS_BANDWIDTH_UP: bandwidth_limit,
3816 remote_api_throttle.HTTPS_BANDWIDTH_DOWN: bandwidth_limit,
3817 remote_api_throttle.HTTPS_REQUESTS: http_limit,
3818 remote_api_throttle.ENTITIES_FETCHED: rps_limit,
3819 remote_api_throttle.ENTITIES_MODIFIED: rps_limit,
3821 return bulkloader_limits
3824 def CheckOutputFile(filename):
3825 """Check that the given file does not exist and can be opened for writing.
3827 Args:
3828 filename: The name of the file.
3830 Raises:
3831 FileExistsError: if the given filename is not found
3832 FileNotWritableError: if the given filename is not readable.
3834 full_path = os.path.abspath(filename)
3835 if os.path.exists(full_path):
3836 raise FileExistsError('%s: output file exists' % filename)
3837 elif not os.access(os.path.dirname(full_path), os.W_OK):
3838 raise FileNotWritableError(
3839 '%s: not writable' % os.path.dirname(full_path))
3842 def LoadYamlConfig(config_file_name):
3843 """Loads a config file and registers any Loader classes present.
3845 Used for a the second generation Yaml configuration file.
3847 Args:
3848 config_file_name: The name of the configuration file.
3850 (loaders, exporters) = bulkloader_config.load_config(config_file_name,
3851 reserve_keys=ReserveKeys)
3852 for cls in loaders:
3853 Loader.RegisterLoader(cls())
3854 for cls in exporters:
3855 Exporter.RegisterExporter(cls())
3858 def LoadConfig(config_file_name, exit_fn=sys.exit):
3859 """Loads a config file and registers any Loader classes present.
3861 Used for a legacy Python configuration file.
3863 Args:
3864 config_file_name: The name of the configuration file.
3865 exit_fn: Used for dependency injection.
3867 if config_file_name:
3868 config_file = open(config_file_name, 'r')
3870 try:
3874 bulkloader_config = imp.load_module(
3875 'bulkloader_config', config_file, config_file_name,
3876 ('', 'r', imp.PY_SOURCE))
3877 sys.modules['bulkloader_config'] = bulkloader_config
3881 if hasattr(bulkloader_config, 'loaders'):
3882 for cls in bulkloader_config.loaders:
3883 Loader.RegisterLoader(cls())
3885 if hasattr(bulkloader_config, 'exporters'):
3886 for cls in bulkloader_config.exporters:
3887 Exporter.RegisterExporter(cls())
3889 if hasattr(bulkloader_config, 'mappers'):
3890 for cls in bulkloader_config.mappers:
3891 Mapper.RegisterMapper(cls())
3893 except NameError, e:
3896 m = re.search(r"[^']*'([^']*)'.*", str(e))
3897 if m.groups() and m.group(1) == 'Loader':
3898 print >> sys.stderr, """
3899 The config file format has changed and you appear to be using an old-style
3900 config file. Please make the following changes:
3902 1. At the top of the file, add this:
3904 from google.appengine.tools.bulkloader import Loader
3906 2. For each of your Loader subclasses add the following at the end of the
3907 __init__ definitioion:
3909 self.alias_old_names()
3911 3. At the bottom of the file, add this:
3913 loaders = [MyLoader1,...,MyLoaderN]
3915 Where MyLoader1,...,MyLoaderN are the Loader subclasses you want the bulkloader
3916 to have access to.
3918 exit_fn(1)
3919 else:
3920 raise
3921 except Exception, e:
3925 if isinstance(e, NameClashError) or 'bulkloader_config' in vars() and (
3926 hasattr(bulkloader_config, 'bulkloader') and
3927 isinstance(e, bulkloader_config.bulkloader.NameClashError)):
3928 print >> sys.stderr, (
3929 'Found both %s and %s while aliasing old names on %s.' %
3930 (e.old_name, e.new_name, e.klass))
3931 exit_fn(1)
3932 else:
3933 raise
3935 def GetArgument(kwargs, name, die_fn):
3936 """Get the value of the key name in kwargs, or die with die_fn.
3938 Args:
3939 kwargs: A dictionary containing the options for the bulkloader.
3940 name: The name of a bulkloader option.
3941 die_fn: The function to call to exit the program.
3943 Returns:
3944 The value of kwargs[name] is name in kwargs
3946 if name in kwargs:
3947 return kwargs[name]
3948 else:
3949 print >> sys.stderr, '%s argument required' % name
3950 die_fn()
3953 def _MakeSignature(app_id=None,
3954 url=None,
3955 kind=None,
3956 db_filename=None,
3957 perform_map=None,
3958 download=None,
3959 has_header=None,
3960 result_db_filename=None,
3961 dump=None,
3962 restore=None):
3963 """Returns a string that identifies the important options for the database."""
3965 if download:
3966 result_db_line = 'result_db: %s' % result_db_filename
3967 else:
3968 result_db_line = ''
3969 return u"""
3970 app_id: %s
3971 url: %s
3972 kind: %s
3973 download: %s
3974 map: %s
3975 dump: %s
3976 restore: %s
3977 progress_db: %s
3978 has_header: %s
3980 """ % (app_id, url, kind, download, perform_map, dump, restore, db_filename,
3981 has_header, result_db_line)
3984 def ProcessArguments(arg_dict,
3985 die_fn=lambda: sys.exit(1)):
3986 """Processes non command-line input arguments.
3988 Args:
3989 arg_dict: Dictionary containing the values of bulkloader options.
3990 die_fn: Function to call in case of an error during argument processing.
3992 Returns:
3993 A dictionary of bulkloader options.
3997 unused_application = GetArgument(arg_dict, 'application', die_fn)
3998 url = GetArgument(arg_dict, 'url', die_fn)
3999 dump = GetArgument(arg_dict, 'dump', die_fn)
4000 restore = GetArgument(arg_dict, 'restore', die_fn)
4001 create_config = GetArgument(arg_dict, 'create_config', die_fn)
4002 filename = GetArgument(arg_dict, 'filename', die_fn)
4003 batch_size = GetArgument(arg_dict, 'batch_size', die_fn)
4004 kind = GetArgument(arg_dict, 'kind', die_fn)
4005 db_filename = GetArgument(arg_dict, 'db_filename', die_fn)
4006 config_file = GetArgument(arg_dict, 'config_file', die_fn)
4007 result_db_filename = GetArgument(arg_dict, 'result_db_filename', die_fn)
4008 download = GetArgument(arg_dict, 'download', die_fn)
4009 log_file = GetArgument(arg_dict, 'log_file', die_fn)
4010 perform_map = GetArgument(arg_dict, 'map', die_fn)
4011 namespace = GetArgument(arg_dict, 'namespace', die_fn)
4013 errors = []
4015 if batch_size is None:
4016 if download or perform_map or dump or create_config:
4017 arg_dict['batch_size'] = DEFAULT_DOWNLOAD_BATCH_SIZE
4018 else:
4019 arg_dict['batch_size'] = DEFAULT_BATCH_SIZE
4020 elif batch_size <= 0:
4021 errors.append('batch_size must be at least 1')
4023 if db_filename is None:
4024 arg_dict['db_filename'] = time.strftime(
4025 'bulkloader-progress-%Y%m%d.%H%M%S.sql3')
4027 if result_db_filename is None:
4028 arg_dict['result_db_filename'] = time.strftime(
4029 'bulkloader-results-%Y%m%d.%H%M%S.sql3')
4031 if log_file is None:
4032 arg_dict['log_file'] = time.strftime('bulkloader-log-%Y%m%d.%H%M%S')
4034 required = '%s argument required'
4037 if config_file is None and not dump and not restore and not create_config:
4038 errors.append('One of --config_file, --dump, --restore, or --create_config '
4039 'is required')
4041 if url is REQUIRED_OPTION:
4042 errors.append(required % 'url')
4044 if not filename and not perform_map:
4045 errors.append(required % 'filename')
4047 if not kind:
4048 if download or perform_map:
4049 errors.append('kind argument required for this operation')
4050 elif not dump and not restore and not create_config:
4051 errors.append(
4052 'kind argument required unless --dump, --restore or --create_config '
4053 'specified')
4055 if namespace:
4056 try:
4057 namespace_manager.validate_namespace(namespace)
4058 except namespace_manager.BadValueError, msg:
4059 errors.append('namespace parameter %s' % msg)
4062 POSSIBLE_COMMANDS = ('create_config', 'download', 'dump', 'map', 'restore')
4063 commands = []
4064 for command in POSSIBLE_COMMANDS:
4065 if arg_dict[command]:
4066 commands.append(command)
4067 if len(commands) > 1:
4068 errors.append('%s are mutually exclusive.' % ' and '.join(commands))
4073 if errors:
4074 print >> sys.stderr, '\n'.join(errors)
4075 die_fn()
4077 return arg_dict
4080 def _GetRemoteAppId(url, throttle, email, passin,
4081 raw_input_fn=raw_input, password_input_fn=getpass.getpass,
4082 throttle_class=None):
4083 """Get the App ID from the remote server."""
4084 scheme, host_port, url_path, _, _ = urlparse.urlsplit(url)
4086 secure = (scheme == 'https')
4088 throttled_rpc_server_factory = (
4089 remote_api_throttle.ThrottledHttpRpcServerFactory(
4090 throttle, throttle_class=throttle_class))
4092 def AuthFunction():
4093 return _AuthFunction(host_port, email, passin, raw_input_fn,
4094 password_input_fn)
4096 app_id, server = remote_api_stub.GetRemoteAppId(
4097 host_port, url_path, AuthFunction,
4098 rpc_server_factory=throttled_rpc_server_factory, secure=secure)
4100 return app_id, server
4103 def ParseKind(kind):
4104 if kind and kind[0] == '(' and kind[-1] == ')':
4105 return tuple(kind[1:-1].split(','))
4106 else:
4107 return kind
4110 def _PerformBulkload(arg_dict,
4111 check_file=CheckFile,
4112 check_output_file=CheckOutputFile):
4113 """Runs the bulkloader, given the command line options.
4115 Args:
4116 arg_dict: Dictionary of bulkloader options.
4117 check_file: Used for dependency injection.
4118 check_output_file: Used for dependency injection.
4120 Returns:
4121 An exit code.
4123 Raises:
4124 ConfigurationError: if inconsistent options are passed.
4126 app_id = arg_dict['application']
4127 url = arg_dict['url']
4128 filename = arg_dict['filename']
4129 batch_size = arg_dict['batch_size']
4130 kind = arg_dict['kind']
4131 num_threads = arg_dict['num_threads']
4132 bandwidth_limit = arg_dict['bandwidth_limit']
4133 rps_limit = arg_dict['rps_limit']
4134 http_limit = arg_dict['http_limit']
4135 db_filename = arg_dict['db_filename']
4136 config_file = arg_dict['config_file']
4137 auth_domain = arg_dict['auth_domain']
4138 has_header = arg_dict['has_header']
4139 download = arg_dict['download']
4140 result_db_filename = arg_dict['result_db_filename']
4141 loader_opts = arg_dict['loader_opts']
4142 exporter_opts = arg_dict['exporter_opts']
4143 mapper_opts = arg_dict['mapper_opts']
4144 email = arg_dict['email']
4145 passin = arg_dict['passin']
4146 perform_map = arg_dict['map']
4147 dump = arg_dict['dump']
4148 restore = arg_dict['restore']
4149 create_config = arg_dict['create_config']
4150 namespace = arg_dict['namespace']
4151 dry_run = arg_dict['dry_run']
4152 throttle_class = arg_dict['throttle_class']
4154 if namespace:
4155 namespace_manager.set_namespace(namespace)
4156 os.environ['AUTH_DOMAIN'] = auth_domain
4158 kind = ParseKind(kind)
4160 if not dump and not restore and not create_config:
4162 check_file(config_file)
4164 if download or dump or create_config:
4166 check_output_file(filename)
4167 elif not perform_map:
4168 check_file(filename)
4171 throttle_layout = ThrottleLayout(bandwidth_limit, http_limit, rps_limit)
4172 logger.info('Throttling transfers:')
4173 logger.info('Bandwidth: %s bytes/second', bandwidth_limit)
4174 logger.info('HTTP connections: %s/second', http_limit)
4175 logger.info('Entities inserted/fetched/modified: %s/second', rps_limit)
4176 logger.info('Batch Size: %s', batch_size)
4178 throttle = remote_api_throttle.Throttle(layout=throttle_layout)
4181 throttle.Register(threading.currentThread())
4182 threading.currentThread().exit_flag = False
4185 server = None
4186 if not app_id:
4187 if dry_run:
4189 raise ConfigurationError('Must sepcify application ID in dry run mode.')
4190 app_id, server = _GetRemoteAppId(url, throttle, email, passin,
4191 throttle_class=throttle_class)
4193 arg_dict['application'] = app_id
4195 if dump:
4196 Exporter.RegisterExporter(DumpExporter(kind, result_db_filename))
4197 elif restore:
4198 Loader.RegisterLoader(RestoreLoader(kind, app_id))
4199 elif create_config:
4201 kind = '__Stat_PropertyType_PropertyName_Kind__'
4202 arg_dict['kind'] = kind
4205 root_dir = os.path.dirname(os.path.abspath(__file__))
4206 if os.path.basename(root_dir) == 'tools':
4207 root_dir = os.path.dirname(os.path.dirname(os.path.dirname(root_dir)))
4209 LoadYamlConfig(os.path.join(
4210 root_dir, os.path.normpath(
4211 'google/appengine/ext/bulkload/bulkloader_wizard.yaml')))
4212 elif (config_file and
4213 (config_file.endswith('.yaml') or config_file.endswith('.yml'))):
4214 LoadYamlConfig(config_file)
4215 else:
4216 LoadConfig(config_file)
4218 os.environ['APPLICATION_ID'] = app_id
4220 signature = _MakeSignature(app_id=app_id,
4221 url=url,
4222 kind=kind,
4223 db_filename=db_filename,
4224 download=download,
4225 perform_map=perform_map,
4226 has_header=has_header,
4227 result_db_filename=result_db_filename,
4228 dump=dump,
4229 restore=restore)
4233 max_queue_size = max(DEFAULT_QUEUE_SIZE, 3 * num_threads + 5)
4235 upload = not (download or dump or restore or perform_map or create_config)
4237 if db_filename == 'skip':
4238 progress_db = StubProgressDatabase()
4239 elif upload or restore:
4240 progress_db = ProgressDatabase(db_filename, signature)
4241 else:
4242 progress_db = ExportProgressDatabase(db_filename, signature)
4244 return_code = 1
4246 if upload or restore:
4247 loader = Loader.RegisteredLoader(kind)
4248 try:
4249 loader.initialize(filename, loader_opts)
4252 workitem_generator_factory = GetCSVGeneratorFactory(
4253 kind, filename, batch_size, has_header)
4255 app = BulkUploaderApp(arg_dict,
4256 workitem_generator_factory,
4257 throttle,
4258 progress_db,
4259 ProgressTrackerThread,
4260 max_queue_size,
4261 RequestManager,
4262 DataSourceThread,
4263 Queue.Queue,
4264 server=server)
4265 try:
4266 return_code = app.Run()
4267 except AuthenticationError:
4268 logger.error(AUTH_FAILED_MESSAGE)
4269 finally:
4270 loader.finalize()
4271 elif download or dump or create_config:
4272 exporter = Exporter.RegisteredExporter(kind)
4273 result_db = ResultDatabase(result_db_filename, signature, exporter=exporter)
4274 try:
4275 exporter.initialize(filename, exporter_opts)
4277 def KeyRangeGeneratorFactory(request_manager, progress_queue,
4278 progress_gen, kinds):
4279 logger.info('Downloading kinds: %s', kinds)
4280 return KeyRangeItemGenerator(request_manager, kinds, progress_queue,
4281 progress_gen, DownloadItem)
4283 def ExportProgressThreadFactory(progress_queue, progress_db):
4284 return ExportProgressThread(exporter,
4285 progress_queue,
4286 progress_db,
4287 result_db)
4289 app = BulkDownloaderApp(arg_dict,
4290 KeyRangeGeneratorFactory,
4291 throttle,
4292 progress_db,
4293 ExportProgressThreadFactory,
4295 RequestManager,
4296 DataSourceThread,
4297 Queue.Queue,
4298 server=server)
4299 try:
4300 return_code = app.Run()
4301 except AuthenticationError:
4302 logger.error(AUTH_FAILED_MESSAGE)
4303 except KindStatError:
4304 logger.error('Unable to download kind stats for all-kinds download.')
4305 logger.error('Kind stats are generated periodically by the appserver')
4306 logger.error('Kind stats are not available on dev_appserver.')
4307 finally:
4308 exporter.finalize()
4309 elif perform_map:
4310 mapper = Mapper.RegisteredMapper(kind)
4311 try:
4312 mapper.initialize(mapper_opts)
4314 def KeyRangeGeneratorFactory(request_manager, progress_queue,
4315 progress_gen, kinds):
4316 return KeyRangeItemGenerator(request_manager, kinds, progress_queue,
4317 progress_gen, MapperItem)
4319 def MapperProgressThreadFactory(progress_queue, progress_db):
4320 return MapperProgressThread(mapper,
4321 progress_queue,
4322 progress_db)
4324 app = BulkMapperApp(arg_dict,
4325 KeyRangeGeneratorFactory,
4326 throttle,
4327 progress_db,
4328 MapperProgressThreadFactory,
4330 RequestManager,
4331 DataSourceThread,
4332 Queue.Queue,
4333 server=server)
4334 try:
4335 return_code = app.Run()
4336 except AuthenticationError:
4337 logger.error(AUTH_FAILED_MESSAGE)
4338 finally:
4339 mapper.finalize()
4340 return return_code
4343 def SetupLogging(arg_dict):
4344 """Sets up logging for the bulkloader.
4346 Args:
4347 arg_dict: Dictionary mapping flag names to their arguments.
4349 format = '[%(levelname)-8s %(asctime)s %(filename)s] %(message)s'
4350 debug = arg_dict['debug']
4351 log_file = arg_dict['log_file']
4353 logger.setLevel(logging.DEBUG)
4356 logger.propagate = False
4359 file_handler = logging.FileHandler(log_file, 'w')
4360 file_handler.setLevel(logging.DEBUG)
4361 file_formatter = logging.Formatter(format)
4362 file_handler.setFormatter(file_formatter)
4363 logger.addHandler(file_handler)
4366 console = logging.StreamHandler()
4367 level = logging.INFO
4368 if debug:
4369 level = logging.DEBUG
4370 console.setLevel(level)
4371 console_format = '[%(levelname)-8s] %(message)s'
4372 formatter = logging.Formatter(console_format)
4373 console.setFormatter(formatter)
4374 logger.addHandler(console)
4376 logger.info('Logging to %s', log_file)
4378 remote_api_throttle.logger.setLevel(level)
4379 remote_api_throttle.logger.addHandler(file_handler)
4380 remote_api_throttle.logger.addHandler(console)
4384 appengine_rpc.logger.setLevel(logging.WARN)
4386 adaptive_thread_pool.logger.setLevel(logging.DEBUG)
4387 adaptive_thread_pool.logger.addHandler(console)
4388 adaptive_thread_pool.logger.addHandler(file_handler)
4389 adaptive_thread_pool.logger.propagate = False
4392 def Run(arg_dict):
4393 """Sets up and runs the bulkloader, given the options as keyword arguments.
4395 Args:
4396 arg_dict: Dictionary of bulkloader options
4398 Returns:
4399 An exit code.
4401 arg_dict = ProcessArguments(arg_dict)
4403 SetupLogging(arg_dict)
4405 return _PerformBulkload(arg_dict)
4408 def main(argv):
4409 """Runs the importer from the command line."""
4412 arg_dict = ParseArguments(argv)
4415 errors = ['%s argument required' % key
4416 for (key, value) in arg_dict.iteritems()
4417 if value is REQUIRED_OPTION]
4418 if errors:
4419 print >> sys.stderr, '\n'.join(errors)
4420 PrintUsageExit(1)
4422 SetupLogging(arg_dict)
4423 return _PerformBulkload(arg_dict)
4426 if __name__ == '__main__':
4427 sys.exit(main(sys.argv))