3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 """Imports data over HTTP.
22 --debug Show debugging information. (Optional)
23 --application=<string> Application ID of endpoint (Optional for
25 --auth_domain=<domain> The auth domain to use for logging in and for
26 UserProperties. (Default: gmail.com)
27 --bandwidth_limit=<int> The maximum number of bytes per second for the
28 aggregate transfer of data to/from the server.
29 Bursts may exceed this, but overall transfer rate is
30 restricted to this rate. (Default: 250000)
31 --batch_size=<int> Number of Entity objects to include in each request
32 to/from the URL endpoint. The more data per
33 row/Entity, the smaller the batch size should be.
34 (Default: downloads 100, uploads 10)
35 --config_file=<path> File containing Model and Loader definitions or
36 bulkloader.yaml transforms. (Required unless --dump,
37 --restore, or --create_config are used.)
38 --create_config Write a bulkloader.yaml configuration file to
39 --filename based on the server side datastore state.
40 --db_filename=<path> Specific progress database to write to, or to
41 resume from. If not supplied, then a new database
42 will be started, named:
43 bulkloader-progress-TIMESTAMP.
44 The special filename "skip" may be used to simply
45 skip reading/writing any progress information.
46 --download Export entities to a file.
47 --dry_run Do not execute any remote_api calls.
48 --dump Use zero-configuration dump format.
49 --email=<string> The username to use. Will prompt if omitted.
50 --exporter_opts=<string>
51 A string to pass to the Exporter.initialize method.
52 --filename=<path> Path to the file to import/export. (Required when
53 importing or exporting, not mapping.)
54 --has_header Skip the first row of the input.
55 --http_limit=<int> The maximum numer of HTTP requests per second to
56 send to the server. (Default: 8)
57 --kind=<string> Name of the Entity object kind to put in the
59 --loader_opts=<string> A string to pass to the Loader.initialize method.
60 --log_file=<path> File to write bulkloader logs. If not supplied
61 then a new log file will be created, named:
62 bulkloader-log-TIMESTAMP.
63 --map Map an action across datastore entities.
64 --mapper_opts=<string> A string to pass to the Mapper.Initialize method.
65 --num_threads=<int> Number of threads to use for uploading/downloading
66 entities (Default: 10)
67 --passin Read the login password from stdin.
68 --restore Restore from zero-configuration dump format.
69 --result_db_filename=<path>
70 Result database to write to for downloads.
71 --rps_limit=<int> The maximum number of records per second to
72 transfer to/from the server. (Default: 20)
73 --url=<string> URL endpoint to post to for importing/exporting
75 --namespace=<string> Use specified namespace instead of the default one
76 for all datastore operations.
78 The exit status will be 0 on success, non-zero on import failure.
80 Works with the remote_api mix-in library for google.appengine.ext.remote_api.
81 Please look there for documentation about how to setup the server side.
85 %(arg0)s --url=http://app.appspot.com/remote_api --kind=Model \
86 --filename=data.csv --config_file=loader_config.py
116 from google
.appengine
.datastore
import entity_pb
118 from google
.appengine
.api
import apiproxy_stub_map
119 from google
.appengine
.api
import datastore
120 from google
.appengine
.api
import datastore_errors
121 from google
.appengine
.api
.namespace_manager
import namespace_manager
122 from google
.appengine
.datastore
import datastore_pb
123 from google
.appengine
.ext
import db
124 from google
.appengine
.ext
import key_range
as key_range_module
125 from google
.appengine
.ext
.bulkload
import bulkloader_config
126 from google
.appengine
.ext
.db
import polymodel
127 from google
.appengine
.ext
.db
import stats
128 from google
.appengine
.ext
.remote_api
import remote_api_stub
129 from google
.appengine
.ext
.remote_api
import throttle
as remote_api_throttle
130 from google
.appengine
.runtime
import apiproxy_errors
131 from google
.appengine
.tools
import adaptive_thread_pool
132 from google
.appengine
.tools
import appengine_rpc
133 from google
.appengine
.tools
.requeue
import ReQueue
144 logger
= logging
.getLogger('google.appengine.tools.bulkloader')
146 KeyRange
= key_range_module
.KeyRange
149 DEFAULT_THREAD_COUNT
= 10
152 DEFAULT_BATCH_SIZE
= 10
155 DEFAULT_DOWNLOAD_BATCH_SIZE
= 100
158 DEFAULT_QUEUE_SIZE
= DEFAULT_THREAD_COUNT
* 10
161 _THREAD_SHOULD_EXIT
= '_THREAD_SHOULD_EXIT'
177 DATA_CONSUMED_TO_HERE
= 'DATA_CONSUMED_TO_HERE'
182 INITIAL_BACKOFF
= 1.0
192 DEFAULT_BANDWIDTH_LIMIT
= 250000
195 DEFAULT_RPS_LIMIT
= 20
198 DEFAULT_REQUEST_LIMIT
= 8
207 MAXIMUM_INCREASE_DURATION
= 5.0
208 MAXIMUM_HOLD_DURATION
= 12.0
210 AUTH_FAILED_MESSAGE
= ('Authentication Failed: Incorrect credentials or '
211 'unsupported authentication type (e.g. OpenId).')
214 def ImportStateMessage(state
):
215 """Converts a numeric state identifier to a status message."""
217 STATE_READ
: 'Batch read from file.',
218 STATE_SENDING
: 'Sending batch to server.',
219 STATE_SENT
: 'Batch successfully sent.',
220 STATE_NOT_SENT
: 'Error while sending batch.'
224 def ExportStateMessage(state
):
225 """Converts a numeric state identifier to a status message."""
227 STATE_READ
: 'Batch read from file.',
228 STATE_GETTING
: 'Fetching batch from server',
229 STATE_GOT
: 'Batch successfully fetched.',
230 STATE_ERROR
: 'Error while fetching batch'
234 def MapStateMessage(state
):
235 """Converts a numeric state identifier to a status message."""
237 STATE_READ
: 'Batch read from file.',
238 STATE_GETTING
: 'Querying for batch from server',
239 STATE_GOT
: 'Batch successfully fetched.',
240 STATE_ERROR
: 'Error while fetching or mapping.'
244 def ExportStateName(state
):
245 """Converts a numeric state identifier to a string."""
248 STATE_GETTING
: 'GETTING',
250 STATE_ERROR
: 'NOT_GOT'
254 def ImportStateName(state
):
255 """Converts a numeric state identifier to a string."""
258 STATE_GETTING
: 'SENDING',
260 STATE_NOT_SENT
: 'NOT_SENT'
264 class Error(Exception):
265 """Base-class for exceptions in this module."""
268 class MissingPropertyError(Error
):
269 """An expected field is missing from an entity, and no default was given."""
272 class FatalServerError(Error
):
273 """An unrecoverable error occurred while posting data to the server."""
276 class ResumeError(Error
):
277 """Error while trying to resume a partial upload."""
280 class ConfigurationError(Error
):
281 """Error in configuration options."""
284 class AuthenticationError(Error
):
285 """Error while trying to authenticate with the server."""
288 class FileNotFoundError(Error
):
289 """A filename passed in by the user refers to a non-existent input file."""
292 class FileNotReadableError(Error
):
293 """A filename passed in by the user refers to a non-readable input file."""
296 class FileExistsError(Error
):
297 """A filename passed in by the user refers to an existing output file."""
300 class FileNotWritableError(Error
):
301 """A filename passed in by the user refers to a non-writable output file."""
304 class BadStateError(Error
):
305 """A work item in an unexpected state was encountered."""
308 class KeyRangeError(Error
):
309 """An error during construction of a KeyRangeItem."""
312 class KindStatError(Error
):
313 """Unable to find kind stats for an all-kinds download."""
316 class FieldSizeLimitError(Error
):
317 """The csv module tried to read a field larger than the size limit."""
319 def __init__(self
, limit
):
321 A field in your CSV input file has exceeded the current limit of %d.
323 You can raise this limit by adding the following lines to your config file:
326 csv.field_size_limit(new_limit)
328 where new_limit is number larger than the size in bytes of the largest
331 Error
.__init
__(self
, self
.message
)
334 class NameClashError(Error
):
335 """A name clash occurred while trying to alias old method names."""
337 def __init__(self
, old_name
, new_name
, klass
):
338 Error
.__init
__(self
, old_name
, new_name
, klass
)
339 self
.old_name
= old_name
340 self
.new_name
= new_name
344 def GetCSVGeneratorFactory(kind
, csv_filename
, batch_size
, csv_has_header
,
345 openfile
=open, create_csv_reader
=csv
.reader
):
346 """Return a factory that creates a CSV-based UploadWorkItem generator.
349 kind: The kind of the entities being uploaded.
350 csv_filename: File on disk containing CSV data.
351 batch_size: Maximum number of CSV rows to stash into an UploadWorkItem.
352 csv_has_header: Whether to skip the first row of the CSV.
353 openfile: Used for dependency injection.
354 create_csv_reader: Used for dependency injection.
357 A callable (accepting the Progress Queue and Progress Generators
358 as input) which creates the UploadWorkItem generator.
360 loader
= Loader
.RegisteredLoader(kind
)
361 loader
._Loader
__openfile
= openfile
362 loader
._Loader
__create
_csv
_reader
= create_csv_reader
363 record_generator
= loader
.generate_records(csv_filename
)
365 def CreateGenerator(request_manager
, progress_queue
, progress_generator
,
367 """Initialize a UploadWorkItem generator.
370 request_manager: A RequestManager instance.
371 progress_queue: A ProgressQueue instance to send progress information.
372 progress_generator: A generator of progress information or None.
373 unused_kinds: The kinds being generated (ignored in this method).
376 An UploadWorkItemGenerator instance.
378 return UploadWorkItemGenerator(request_manager
,
385 return CreateGenerator
388 class UploadWorkItemGenerator(object):
389 """Reads rows from a row generator and generates UploadWorkItems."""
398 """Initialize a WorkItemGenerator.
401 request_manager: A RequestManager instance with which to associate
403 progress_queue: A progress queue with which to associate WorkItems.
404 progress_generator: A generator of progress information.
405 record_generator: A generator of data records.
406 skip_first: Whether to skip the first data record.
407 batch_size: The number of data records per WorkItem.
409 self
.request_manager
= request_manager
410 self
.progress_queue
= progress_queue
411 self
.progress_generator
= progress_generator
412 self
.reader
= record_generator
413 self
.skip_first
= skip_first
414 self
.batch_size
= batch_size
416 self
.column_count
= None
421 def _AdvanceTo(self
, line
):
422 """Advance the reader to the given line.
425 line: A line number to advance to.
427 while self
.line_number
< line
:
429 self
.line_number
+= 1
433 def _ReadRows(self
, key_start
, key_end
):
434 """Attempts to read and encode rows [key_start, key_end].
436 The encoded rows are stored in self.read_rows.
439 key_start: The starting line number.
440 key_end: The ending line number.
443 StopIteration: if the reader runs out of rows
444 ResumeError: if there are an inconsistent number of columns.
446 assert self
.line_number
== key_start
451 while self
.line_number
<= key_end
:
452 row
= self
.reader
.next()
454 if self
.column_count
is None:
455 self
.column_count
= len(row
)
456 self
.read_rows
.append((self
.line_number
, row
))
457 self
.line_number
+= 1
459 def _MakeItem(self
, key_start
, key_end
, rows
, progress_key
=None):
460 """Makes a UploadWorkItem containing the given rows, with the given keys.
463 key_start: The start key for the UploadWorkItem.
464 key_end: The end key for the UploadWorkItem.
465 rows: A list of the rows for the UploadWorkItem.
466 progress_key: The progress key for the UploadWorkItem
469 An UploadWorkItem instance for the given batch.
473 item
= UploadWorkItem(self
.request_manager
, self
.progress_queue
, rows
,
474 key_start
, key_end
, progress_key
=progress_key
)
479 """Reads from the record_generator and generates UploadWorkItems.
482 Instances of class UploadWorkItem
485 ResumeError: If the progress database and data file indicate a different
491 logger
.info('Skipping header line.')
494 except StopIteration:
502 self
.column_count
= None
504 logger
.info('Starting import; maximum %d entities per post',
510 if self
.progress_generator
:
511 for progress_key
, state
, kind
, key_start
, key_end
in (
512 self
.progress_generator
):
515 self
._AdvanceTo
(key_start
)
516 self
._ReadRows
(key_start
, key_end
)
517 yield self
._MakeItem
(key_start
,
520 progress_key
=progress_key
)
521 except StopIteration:
524 logger
.error('Mismatch between data file and progress database')
526 'Mismatch between data file and progress database')
527 elif state
== DATA_CONSUMED_TO_HERE
:
529 self
._AdvanceTo
(key_end
+ 1)
530 except StopIteration:
535 if self
.progress_generator
is None or state
== DATA_CONSUMED_TO_HERE
:
539 key_start
= self
.line_number
540 key_end
= self
.line_number
+ self
.batch_size
- 1
542 self
._ReadRows
(key_start
, key_end
)
543 except StopIteration:
547 key_end
= self
.line_number
- 1
548 if key_start
<= key_end
:
549 yield self
._MakeItem
(key_start
, key_end
, self
.read_rows
)
552 class CSVGenerator(object):
553 """Reads a CSV file and generates data records."""
558 create_csv_reader
=csv
.reader
):
559 """Initializes a CSV generator.
562 csv_filename: File on disk containing CSV data.
563 openfile: Used for dependency injection of 'open'.
564 create_csv_reader: Used for dependency injection of 'csv.reader'.
566 self
.csv_filename
= csv_filename
567 self
.openfile
= openfile
568 self
.create_csv_reader
= create_csv_reader
571 """Reads the CSV data file and generates row records.
577 ResumeError: If the progress database and data file indicate a different
580 csv_file
= self
.openfile(self
.csv_filename
, 'rb')
581 reader
= self
.create_csv_reader(csv_file
, skipinitialspace
=True)
584 for record
in reader
:
587 if e
.args
and e
.args
[0].startswith('field larger than field limit'):
588 raise FieldSizeLimitError(csv
.field_size_limit())
593 class KeyRangeItemGenerator(object):
594 """Generates ranges of keys to download.
596 Reads progress information from the progress database and creates
597 KeyRangeItem objects corresponding to incompletely downloaded parts of an
601 def __init__(self
, request_manager
, kinds
, progress_queue
, progress_generator
,
602 key_range_item_factory
):
603 """Initialize the KeyRangeItemGenerator.
606 request_manager: A RequestManager instance.
607 kinds: The kind of entities being transferred, or a list of kinds.
608 progress_queue: A queue used for tracking progress information.
609 progress_generator: A generator of prior progress information, or None
610 if there is no prior status.
611 key_range_item_factory: A factory to produce KeyRangeItems.
613 self
.request_manager
= request_manager
614 if isinstance(kinds
, basestring
):
620 self
.progress_queue
= progress_queue
621 self
.progress_generator
= progress_generator
622 self
.key_range_item_factory
= key_range_item_factory
625 """Iterate through saved progress information.
628 KeyRangeItem instances corresponding to undownloaded key ranges.
630 if self
.progress_generator
is not None:
631 for progress_key
, state
, kind
, key_start
, key_end
in (
632 self
.progress_generator
):
633 if state
is not None and state
!= STATE_GOT
and key_start
is not None:
634 key_start
= ParseKey(key_start
)
635 key_end
= ParseKey(key_end
)
637 key_range
= KeyRange(key_start
=key_start
,
640 result
= self
.key_range_item_factory(self
.request_manager
,
644 progress_key
=progress_key
,
649 for kind
in self
.kinds
:
650 key_range
= KeyRange()
651 yield self
.key_range_item_factory(self
.request_manager
,
657 class DownloadResult(object):
658 """Holds the result of an entity download."""
660 def __init__(self
, continued
, direction
, keys
, entities
):
661 self
.continued
= continued
662 self
.direction
= direction
664 self
.entities
= entities
665 self
.count
= len(keys
)
666 assert self
.count
== len(entities
)
667 assert direction
in (key_range_module
.KeyRange
.ASC
,
668 key_range_module
.KeyRange
.DESC
)
670 if direction
== key_range_module
.KeyRange
.ASC
:
671 self
.key_start
= keys
[0]
672 self
.key_end
= keys
[-1]
674 self
.key_start
= keys
[-1]
675 self
.key_end
= keys
[0]
678 """Returns the list of entities for this result in key order."""
679 if self
.direction
== key_range_module
.KeyRange
.ASC
:
680 return list(self
.entities
)
682 result
= list(self
.entities
)
687 return 'continued = %s\n%s' % (
688 str(self
.continued
), '\n'.join(str(self
.entities
)))
691 class _WorkItem(adaptive_thread_pool
.WorkItem
):
692 """Holds a description of a unit of upload or download work."""
694 def __init__(self
, progress_queue
, key_start
, key_end
, state_namer
,
695 state
=STATE_READ
, progress_key
=None):
696 """Initialize the _WorkItem instance.
699 progress_queue: A queue used for tracking progress information.
700 key_start: The start key of the work item.
701 key_end: The end key of the work item.
702 state_namer: Function to describe work item states.
703 state: The initial state of the work item.
704 progress_key: If this WorkItem represents state from a prior run,
705 then this will be the key within the progress database.
707 adaptive_thread_pool
.WorkItem
.__init
__(self
,
708 '[%s-%s]' % (key_start
, key_end
))
709 self
.progress_queue
= progress_queue
710 self
.state_namer
= state_namer
712 self
.progress_key
= progress_key
713 self
.progress_event
= threading
.Event()
714 self
.key_start
= key_start
715 self
.key_end
= key_end
717 self
.traceback
= None
720 def _TransferItem(self
, thread_pool
):
721 raise NotImplementedError()
724 """Sets the error and traceback information for this thread.
726 This must be called from an exception handler.
729 exc_info
= sys
.exc_info()
730 self
.error
= exc_info
[1]
731 self
.traceback
= exc_info
[2]
733 def PerformWork(self
, thread_pool
):
734 """Perform the work of this work item and report the results.
737 thread_pool: An AdaptiveThreadPool instance.
740 A tuple (status, instruction) of the work status and an instruction
743 status
= adaptive_thread_pool
.WorkItem
.FAILURE
744 instruction
= adaptive_thread_pool
.ThreadGate
.DECREASE
748 self
.MarkAsTransferring()
753 transfer_time
= self
._TransferItem
(thread_pool
)
754 if transfer_time
is None:
755 status
= adaptive_thread_pool
.WorkItem
.RETRY
756 instruction
= adaptive_thread_pool
.ThreadGate
.HOLD
758 logger
.debug('[%s] %s Transferred %d entities in %0.1f seconds',
759 threading
.currentThread().getName(), self
, self
.count
,
761 sys
.stdout
.write('.')
763 status
= adaptive_thread_pool
.WorkItem
.SUCCESS
764 if transfer_time
<= MAXIMUM_INCREASE_DURATION
:
765 instruction
= adaptive_thread_pool
.ThreadGate
.INCREASE
766 elif transfer_time
<= MAXIMUM_HOLD_DURATION
:
767 instruction
= adaptive_thread_pool
.ThreadGate
.HOLD
768 except (db
.InternalError
, db
.NotSavedError
, db
.Timeout
,
769 db
.TransactionFailedError
,
770 apiproxy_errors
.OverQuotaError
,
771 apiproxy_errors
.DeadlineExceededError
,
772 apiproxy_errors
.ApplicationError
), e
:
774 status
= adaptive_thread_pool
.WorkItem
.RETRY
775 logger
.exception('Retrying on non-fatal datastore error: %s', e
)
776 except urllib2
.HTTPError
, e
:
778 if http_status
>= 500 and http_status
< 600:
780 status
= adaptive_thread_pool
.WorkItem
.RETRY
781 logger
.exception('Retrying on non-fatal HTTP error: %d %s',
785 status
= adaptive_thread_pool
.WorkItem
.FAILURE
786 except urllib2
.URLError
, e
:
787 if IsURLErrorFatal(e
):
789 status
= adaptive_thread_pool
.WorkItem
.FAILURE
791 status
= adaptive_thread_pool
.WorkItem
.RETRY
792 logger
.exception('Retrying on non-fatal URL error: %s', e
.reason
)
795 if status
== adaptive_thread_pool
.WorkItem
.SUCCESS
:
796 self
.MarkAsTransferred()
800 return (status
, instruction
)
802 def _AssertInState(self
, *states
):
803 """Raises an Error if the state of this range is not in states."""
804 if not self
.state
in states
:
805 raise BadStateError('%s:%s not in %s' %
807 self
.state_namer(self
.state
),
808 map(self
.state_namer
, states
)))
810 def _AssertProgressKey(self
):
811 """Raises an Error if the progress key is None."""
812 if self
.progress_key
is None:
813 raise BadStateError('%s: Progress key is missing' % str(self
))
815 def MarkAsRead(self
):
816 """Mark this _WorkItem as read, updating the progress database."""
817 self
._AssertInState
(STATE_READ
)
818 self
._StateTransition
(STATE_READ
, blocking
=True)
820 def MarkAsTransferring(self
):
821 """Mark this _WorkItem as transferring, updating the progress database."""
822 self
._AssertInState
(STATE_READ
, STATE_ERROR
)
823 self
._AssertProgressKey
()
824 self
._StateTransition
(STATE_GETTING
, blocking
=True)
826 def MarkAsTransferred(self
):
827 """Mark this _WorkItem as transferred, updating the progress database."""
828 raise NotImplementedError()
830 def MarkAsError(self
):
831 """Mark this _WorkItem as failed, updating the progress database."""
832 self
._AssertInState
(STATE_GETTING
)
833 self
._AssertProgressKey
()
834 self
._StateTransition
(STATE_ERROR
, blocking
=True)
836 def _StateTransition(self
, new_state
, blocking
=False):
837 """Transition the work item to a new state, storing progress information.
840 new_state: The state to transition to.
841 blocking: Whether to block for the progress thread to acknowledge the
845 assert not self
.progress_event
.isSet()
848 self
.state
= new_state
851 self
.progress_queue
.put(self
)
857 self
.progress_event
.wait()
860 self
.progress_event
.clear()
868 class UploadWorkItem(_WorkItem
):
869 """Holds a unit of uploading work.
871 A UploadWorkItem represents a number of entities that need to be uploaded to
872 Google App Engine. These entities are encoded in the "content" field of
873 the UploadWorkItem, and will be POST'd as-is to the server.
875 The entities are identified by a range of numeric keys, inclusively. In
876 the case of a resumption of an upload, or a replay to correct errors,
877 these keys must be able to identify the same set of entities.
879 Note that keys specify a range. The entities do not have to sequentially
880 fill the entire range, they must simply bound a range of valid keys.
883 def __init__(self
, request_manager
, progress_queue
, rows
, key_start
, key_end
,
885 """Initialize the UploadWorkItem instance.
888 request_manager: A RequestManager instance.
889 progress_queue: A queue used for tracking progress information.
890 rows: A list of pairs of a line number and a list of column values.
891 key_start: The (numeric) starting key, inclusive.
892 key_end: The (numeric) ending key, inclusive.
893 progress_key: If this UploadWorkItem represents state from a prior run,
894 then this will be the key within the progress database.
896 _WorkItem
.__init
__(self
, progress_queue
, key_start
, key_end
,
897 ImportStateName
, state
=STATE_READ
,
898 progress_key
=progress_key
)
901 assert isinstance(key_start
, (int, long))
902 assert isinstance(key_end
, (int, long))
903 assert key_start
<= key_end
905 self
.request_manager
= request_manager
908 self
.count
= len(rows
)
911 return '[%s-%s]' % (self
.key_start
, self
.key_end
)
913 def _TransferItem(self
, thread_pool
, get_time
=time
.time
):
914 """Transfers the entities associated with an item.
917 thread_pool: An AdaptiveThreadPool instance.
918 get_time: Used for dependency injection.
922 self
.content
= self
.request_manager
.EncodeContent(self
.rows
)
924 self
.request_manager
.PostEntities(self
.content
)
927 return get_time() - t
929 def MarkAsTransferred(self
):
930 """Mark this UploadWorkItem as sucessfully-sent to the server."""
932 self
._AssertInState
(STATE_SENDING
)
933 self
._AssertProgressKey
()
935 self
._StateTransition
(STATE_SENT
, blocking
=False)
938 def GetImplementationClass(kind_or_class_key
):
939 """Returns the implementation class for a given kind or class key.
942 kind_or_class_key: A kind string or a tuple of kind strings.
945 A db.Model subclass for the given kind or class key.
947 if isinstance(kind_or_class_key
, tuple):
949 implementation_class
= polymodel
._class
_map
[kind_or_class_key
]
951 raise db
.KindError('No implementation for class \'%s\'' %
954 implementation_class
= db
.class_for_kind(kind_or_class_key
)
955 return implementation_class
958 def KeyLEQ(key1
, key2
):
959 """Compare two keys for less-than-or-equal-to.
961 All keys with numeric ids come before all keys with names. None represents
962 an unbounded end-point so it is both greater and less than any other key.
965 key1: An int or datastore.Key instance.
966 key2: An int or datastore.Key instance.
971 if key1
is None or key2
is None:
976 class KeyRangeItem(_WorkItem
):
977 """Represents an item of work that scans over a key range.
979 A KeyRangeItem object represents holds a KeyRange
980 and has an associated state: STATE_READ, STATE_GETTING, STATE_GOT,
983 - STATE_READ indicates the range ready to be downloaded by a worker thread.
984 - STATE_GETTING indicates the range is currently being downloaded.
985 - STATE_GOT indicates that the range was successfully downloaded
986 - STATE_ERROR indicates that an error occurred during the last download
989 KeyRangeItems not in the STATE_GOT state are stored in the progress database.
990 When a piece of KeyRangeItem work is downloaded, the download may cover only
991 a portion of the range. In this case, the old KeyRangeItem is removed from
992 the progress database and ranges covering the undownloaded range are
993 generated and stored as STATE_READ in the export progress database.
1004 """Initialize a KeyRangeItem object.
1007 request_manager: A RequestManager instance.
1008 progress_queue: A queue used for tracking progress information.
1009 kind: The kind of entities for this range.
1010 key_range: A KeyRange instance for this work item.
1011 progress_key: The key for this range within the progress database.
1012 state: The initial state of this range.
1013 first: boolean, default False, whether this is the first WorkItem
1016 _WorkItem
.__init
__(self
, progress_queue
, key_range
.key_start
,
1017 key_range
.key_end
, ExportStateName
, state
=state
,
1018 progress_key
=progress_key
)
1019 assert KeyLEQ(key_range
.key_start
, key_range
.key_end
), (
1020 '%s not less than %s' %
1021 (repr(key_range
.key_start
), repr(key_range
.key_end
)))
1022 self
.request_manager
= request_manager
1024 self
.key_range
= key_range
1025 self
.download_result
= None
1027 self
.key_start
= key_range
.key_start
1028 self
.key_end
= key_range
.key_end
1032 return '%s-%s' % (self
.kind
, self
.key_range
)
1035 return self
.__str
__()
1037 def MarkAsTransferred(self
):
1038 """Mark this KeyRangeItem as transferred, updating the progress database."""
1043 def Process(self
, download_result
, thread_pool
, batch_size
,
1044 new_state
=STATE_GOT
):
1045 """Mark this KeyRangeItem as success, updating the progress database.
1047 Process will split this KeyRangeItem based on the content of
1048 download_result and adds the unfinished ranges to the work queue.
1051 download_result: A DownloadResult instance.
1052 thread_pool: An AdaptiveThreadPool instance.
1053 batch_size: The number of entities to transfer per request.
1054 new_state: The state to transition the completed range to.
1056 self
._AssertInState
(STATE_GETTING
)
1057 self
._AssertProgressKey
()
1059 self
.download_result
= download_result
1060 self
.count
= len(download_result
.keys
)
1061 if download_result
.continued
:
1062 self
._FinishedRange
()._StateTransition
(new_state
, blocking
=True)
1063 self
._AddUnfinishedRanges
(thread_pool
, batch_size
)
1065 self
._StateTransition
(new_state
, blocking
=True)
1067 def _FinishedRange(self
):
1068 """Returns the range completed by the download_result.
1071 A KeyRangeItem representing a completed range.
1073 assert self
.download_result
is not None
1075 if self
.key_range
.direction
== key_range_module
.KeyRange
.ASC
:
1076 key_start
= self
.key_range
.key_start
1077 if self
.download_result
.continued
:
1078 key_end
= self
.download_result
.key_end
1080 key_end
= self
.key_range
.key_end
1082 key_end
= self
.key_range
.key_end
1083 if self
.download_result
.continued
:
1084 key_start
= self
.download_result
.key_start
1086 key_start
= self
.key_range
.key_start
1088 key_range
= KeyRange(key_start
=key_start
,
1090 direction
=self
.key_range
.direction
)
1092 result
= self
.__class
__(self
.request_manager
,
1093 self
.progress_queue
,
1096 progress_key
=self
.progress_key
,
1099 result
.download_result
= self
.download_result
1100 result
.count
= self
.count
1103 def _SplitAndAddRanges(self
, thread_pool
, batch_size
):
1104 """Split the key range [key_start, key_end] into a list of ranges."""
1106 if self
.download_result
.direction
== key_range_module
.KeyRange
.ASC
:
1107 key_range
= KeyRange(
1108 key_start
=self
.download_result
.key_end
,
1109 key_end
=self
.key_range
.key_end
,
1110 include_start
=False)
1112 key_range
= KeyRange(
1113 key_start
=self
.key_range
.key_start
,
1114 key_end
=self
.download_result
.key_start
,
1117 if thread_pool
.QueuedItemCount() > 2 * thread_pool
.num_threads():
1119 ranges
= [key_range
]
1122 ranges
= key_range
.split_range(batch_size
=batch_size
)
1124 for key_range
in ranges
:
1125 key_range_item
= self
.__class
__(self
.request_manager
,
1126 self
.progress_queue
,
1129 key_range_item
.MarkAsRead()
1130 thread_pool
.SubmitItem(key_range_item
, block
=True)
1132 def _AddUnfinishedRanges(self
, thread_pool
, batch_size
):
1133 """Adds incomplete KeyRanges to the thread_pool.
1136 thread_pool: An AdaptiveThreadPool instance.
1137 batch_size: The number of entities to transfer per request.
1140 A list of KeyRanges representing incomplete datastore key ranges.
1143 KeyRangeError: if this key range has already been completely transferred.
1145 assert self
.download_result
is not None
1146 if self
.download_result
.continued
:
1147 self
._SplitAndAddRanges
(thread_pool
, batch_size
)
1149 raise KeyRangeError('No unfinished part of key range.')
1152 class DownloadItem(KeyRangeItem
):
1153 """A KeyRangeItem for downloading key ranges."""
1155 def _TransferItem(self
, thread_pool
, get_time
=time
.time
):
1156 """Transfers the entities associated with an item."""
1158 download_result
= self
.request_manager
.GetEntities(
1159 self
, retry_parallel
=self
.first
)
1160 transfer_time
= get_time() - t
1161 self
.Process(download_result
, thread_pool
,
1162 self
.request_manager
.batch_size
)
1163 return transfer_time
1166 class MapperItem(KeyRangeItem
):
1167 """A KeyRangeItem for mapping over key ranges."""
1169 def _TransferItem(self
, thread_pool
, get_time
=time
.time
):
1171 mapper
= self
.request_manager
.GetMapper(self
.kind
)
1173 download_result
= self
.request_manager
.GetEntities(
1174 self
, keys_only
=mapper
.map_over_keys_only(), retry_parallel
=self
.first
)
1175 transfer_time
= get_time() - t
1177 mapper
.batch_apply(download_result
.Entities())
1180 self
.Process(download_result
, thread_pool
,
1181 self
.request_manager
.batch_size
)
1182 return transfer_time
1185 def ConvertKeys(keys
):
1186 """Convert a list of keys to a list of keys with the app_id of the caller.
1189 keys: A list of datastore Entity Keys.
1192 A new list of keys in the same order as the input with app_id set to the
1193 default app_id in the calling context. Whichever input keys were already
1194 of this app_id are copied by reference.
1196 def ChangeApp(key
, app_id
):
1197 if key
.app() == app_id
:
1199 return datastore
.Key
.from_path(namespace
=key
.namespace(),
1200 _app
=app_id
, *key
.to_path())
1202 app_id
= datastore
.Key
.from_path('kind', 'name').app()
1203 return [ChangeApp(key
, app_id
) for key
in keys
]
1206 def ReserveKeys(keys
):
1207 """Reserve all ids in the paths of the given keys.
1210 keys: A list of keys with ids in their paths, for which the corresponding
1211 id sequences should be advanced to prevent id collisions.
1214 datastore
._GetConnection
()._reserve
_keys
(ConvertKeys(keys
))
1217 def _AuthFunction(host
, email
, passin
, raw_input_fn
, password_input_fn
):
1218 """Internal method shared between RequestManager and _GetRemoteAppId.
1221 host: Hostname to present to the user.
1222 email: Existing email address to use; if none, will prompt the user.
1223 passin: Value of the --passin command line flag. If true, will get the
1224 password using raw_input_fn insetad of password_input_fn.
1225 raw_input_fn: Method to get a string, typically raw_input.
1226 password_input_fn: Method to get a string, typically getpass.
1229 Pair, (email, password).
1232 print 'Please enter login credentials for %s' % host
1233 email
= raw_input_fn('Email: ')
1236 password_prompt
= 'Password for %s: ' % email
1238 password
= raw_input_fn(password_prompt
)
1240 password
= password_input_fn(password_prompt
)
1244 return email
, password
1247 class RequestManager(object):
1248 """A class which wraps a connection to the server."""
1262 throttle_class
=None):
1263 """Initialize a RequestManager object.
1266 app_id: String containing the application id for requests.
1267 host_port: String containing the "host:port" pair; the port is optional.
1268 url_path: partial URL (path) to post entity data to.
1269 kind: Kind of the Entity records being posted.
1270 throttle: A Throttle instance.
1271 batch_size: The number of entities to transfer per request.
1272 secure: Use SSL when communicating with server.
1273 email: If not none, the username to log in with.
1274 passin: If True, the password will be read from standard in.
1275 server: An existing AbstractRpcServer to reuse.
1276 throttle_class: A class to use instead of the default
1277 ThrottledHttpRpcServer.
1279 self
.app_id
= app_id
1280 self
.host_port
= host_port
1281 self
.host
= host_port
.split(':')[0]
1282 if url_path
and url_path
[0] != '/':
1283 url_path
= '/' + url_path
1284 self
.url_path
= url_path
1286 self
.throttle
= throttle
1287 self
.batch_size
= batch_size
1288 self
.secure
= secure
1289 self
.authenticated
= False
1290 self
.auth_called
= False
1291 self
.parallel_download
= True
1293 self
.passin
= passin
1295 self
.dry_run
= dry_run
1298 logger
.info('Running in dry run mode, skipping remote_api setup')
1301 logger
.debug('Configuring remote_api. url_path = %s, '
1302 'servername = %s' % (url_path
, host_port
))
1304 throttled_rpc_server_factory
= (
1305 remote_api_throttle
.ThrottledHttpRpcServerFactory(
1306 self
.throttle
, throttle_class
=throttle_class
))
1309 remote_api_stub
.ConfigureRemoteApiFromServer(server
, url_path
, app_id
)
1311 remote_api_stub
.ConfigureRemoteApi(
1315 servername
=host_port
,
1316 rpc_server_factory
=throttled_rpc_server_factory
,
1319 remote_api_throttle
.ThrottleRemoteDatastore(self
.throttle
)
1320 logger
.debug('Bulkloader using app_id: %s', os
.environ
['APPLICATION_ID'])
1322 def Authenticate(self
):
1323 """Invoke authentication if necessary."""
1324 logger
.info('Connecting to %s%s', self
.host_port
, self
.url_path
)
1326 self
.authenticated
= True
1329 remote_api_stub
.MaybeInvokeAuthentication()
1330 self
.authenticated
= True
1332 def AuthFunction(self
,
1333 raw_input_fn
=raw_input,
1334 password_input_fn
=getpass
.getpass
):
1335 """Prompts the user for a username and password.
1337 Caches the results the first time it is called and returns the
1338 same result every subsequent time.
1341 raw_input_fn: Used for dependency injection.
1342 password_input_fn: Used for dependency injection.
1345 A pair of the username and password.
1347 self
.auth_called
= True
1348 return _AuthFunction(self
.host
, self
.email
, self
.passin
,
1349 raw_input_fn
, password_input_fn
)
1351 def ReserveKeys(self
, keys
):
1352 """Reserve all ids in the paths of the given keys.
1355 keys: A list of keys with ids in their paths, for which the corresponding
1356 id sequences should be advanced to prevent id collisions.
1362 def GetSchemaKinds(self
):
1363 """Returns the list of kinds for this app.
1365 There can be 3 possible cases using namespaces:
1366 a.) No namespace specified and Datastore has only default namespace ->
1367 Query GlobalStat and KindStat.
1368 b.) No namespace specified but Datastore has multiple namespace ->
1369 Query NamespaceGlobalStat and NamespaceKindStat.
1370 c.) Namespace specified and Datastore has multiple namespaces ->
1371 Query NamespaceGlobalStat and NamespaceKindStat.
1378 if (namespace_manager
.get_namespace() or
1379 stats
.NamespaceStat
.all().count() > 1):
1383 global_kind
= stats
.NamespaceGlobalStat
1385 global_kind
= stats
.GlobalStat
1387 kinds_kind
= stats
.NamespaceKindStat
if namespaces
else stats
.KindStat
1389 global_stat
= global_kind
.all().get()
1391 raise KindStatError()
1392 timestamp
= global_stat
.timestamp
1393 kind_stat
= kinds_kind
.all().filter(
1394 "timestamp =", timestamp
).fetch(1000)
1395 kind_list
= [stat
.kind_name
for stat
in kind_stat
1396 if stat
.kind_name
and not stat
.kind_name
.startswith('__')]
1397 return list(set(kind_list
))
1399 def EncodeContent(self
, rows
, loader
=None):
1400 """Encodes row data to the wire format.
1403 rows: A list of pairs of a line number and a list of column values.
1404 loader: Used for dependency injection.
1407 A list of datastore.Entity instances.
1410 ConfigurationError: if no loader is defined for self.kind
1414 loader
= Loader
.RegisteredLoader(self
.kind
)
1416 logger
.error('No Loader defined for kind %s.' % self
.kind
)
1417 raise ConfigurationError('No Loader defined for kind %s.' % self
.kind
)
1419 for line_number
, values
in rows
:
1420 key
= loader
.generate_key(line_number
, values
)
1421 if isinstance(key
, datastore
.Key
):
1422 parent
= key
.parent()
1426 entity
= loader
.create_entity(values
, key_name
=key
, parent
=parent
)
1428 def ToEntity(entity
):
1429 if isinstance(entity
, db
.Model
):
1430 return entity
._populate
_entity
()
1437 if isinstance(entity
, list):
1438 entities
.extend(map(ToEntity
, entity
))
1440 entities
.append(ToEntity(entity
))
1444 def PostEntities(self
, entities
):
1445 """Posts Entity records to a remote endpoint over HTTP.
1448 entities: A list of datastore entities.
1452 datastore
.Put(entities
)
1454 def _QueryForPbs(self
, query
):
1455 """Perform the given query and return a list of entity_pb's."""
1457 query_pb
= query
._ToPb
(limit
=self
.batch_size
, count
=self
.batch_size
)
1458 result_pb
= datastore_pb
.QueryResult()
1459 apiproxy_stub_map
.MakeSyncCall('datastore_v3', 'RunQuery', query_pb
,
1461 results
= result_pb
.result_list()
1463 while result_pb
.more_results():
1464 next_pb
= datastore_pb
.NextRequest()
1465 next_pb
.set_count(self
.batch_size
- len(results
))
1466 next_pb
.mutable_cursor().CopyFrom(result_pb
.cursor())
1467 result_pb
= datastore_pb
.QueryResult()
1468 apiproxy_stub_map
.MakeSyncCall('datastore_v3', 'Next', next_pb
,
1470 results
+= result_pb
.result_list()
1473 except apiproxy_errors
.ApplicationError
, e
:
1474 raise datastore
._ToDatastoreError
(e
)
1477 self
, key_range_item
, key_factory
=datastore
.Key
, keys_only
=False,
1478 retry_parallel
=False):
1479 """Gets Entity records from a remote endpoint over HTTP.
1482 key_range_item: Range of keys to get.
1483 key_factory: Used for dependency injection.
1484 keys_only: bool, default False, only get keys values
1485 retry_parallel: bool, default False, to try a parallel download despite
1486 past parallel download failures.
1488 A DownloadResult instance.
1491 ConfigurationError: if no Exporter is defined for key_range_item.kind
1495 kind
= key_range_item
.kind
1497 self
.parallel_download
= True
1499 if self
.parallel_download
:
1500 query
= key_range_item
.key_range
.make_directed_datastore_query(
1501 kind
, keys_only
=keys_only
)
1503 results
= self
._QueryForPbs
(query
)
1504 except datastore_errors
.NeedIndexError
:
1506 logger
.info('%s: No descending index on __key__, '
1507 'performing serial download', kind
)
1508 self
.parallel_download
= False
1510 if not self
.parallel_download
:
1513 key_range_item
.key_range
.direction
= key_range_module
.KeyRange
.ASC
1514 query
= key_range_item
.key_range
.make_ascending_datastore_query(
1515 kind
, keys_only
=keys_only
)
1516 results
= self
._QueryForPbs
(query
)
1520 for entity
in results
:
1524 key
._Key
__reference
= entity
.key()
1525 entities
.append(entity
)
1528 continued
= (size
== self
.batch_size
)
1529 key_range_item
.count
= size
1531 return DownloadResult(continued
, key_range_item
.key_range
.direction
,
1534 def GetMapper(self
, kind
):
1535 """Returns a mapper for the registered kind.
1541 ConfigurationError: if no Mapper is defined for kind
1545 self
.mapper
= Mapper
.RegisteredMapper(kind
)
1547 logger
.error('No Mapper defined for kind %s.' % kind
)
1548 raise ConfigurationError('No Mapper defined for kind %s.' % kind
)
1552 def InterruptibleSleep(sleep_time
):
1553 """Puts thread to sleep, checking this threads exit_flag twice a second.
1556 sleep_time: Time to sleep.
1560 thread
= threading
.currentThread()
1561 while slept
< sleep_time
- epsilon
:
1562 remaining
= sleep_time
- slept
1563 this_sleep_time
= min(remaining
, 0.5)
1564 time
.sleep(this_sleep_time
)
1565 slept
+= this_sleep_time
1566 if thread
.exit_flag
:
1570 class _ThreadBase(threading
.Thread
):
1571 """Provide some basic features for the threads used in the uploader.
1573 This abstract base class is used to provide some common features:
1575 * Flag to ask thread to exit as soon as possible.
1576 * Record exit/error status for the primary thread to pick up.
1577 * Capture exceptions and record them for pickup.
1578 * Some basic logging of thread start/stop.
1579 * All threads are "daemon" threads.
1580 * Friendly names for presenting to users.
1582 Concrete sub-classes must implement PerformWork().
1584 Either self.NAME should be set or GetFriendlyName() be overridden to
1585 return a human-friendly name for this thread.
1587 The run() method starts the thread and prints start/exit messages.
1589 self.exit_flag is intended to signal that this thread should exit
1590 when it gets the chance. PerformWork() should check self.exit_flag
1591 whenever it has the opportunity to exit gracefully.
1595 threading
.Thread
.__init
__(self
)
1600 self
.setDaemon(True)
1602 self
.exit_flag
= False
1604 self
.traceback
= None
1607 """Perform the work of the thread."""
1608 logger
.debug('[%s] %s: started', self
.getName(), self
.__class
__.__name
__)
1614 logger
.exception('[%s] %s:', self
.getName(), self
.__class
__.__name
__)
1616 logger
.debug('[%s] %s: exiting', self
.getName(), self
.__class
__.__name
__)
1619 """Sets the error and traceback information for this thread.
1621 This must be called from an exception handler.
1624 exc_info
= sys
.exc_info()
1625 self
.error
= exc_info
[1]
1626 self
.traceback
= exc_info
[2]
1628 def PerformWork(self
):
1629 """Perform the thread-specific work."""
1630 raise NotImplementedError()
1632 def CheckError(self
):
1633 """If an error is present, then log it."""
1635 logger
.error('Error in %s: %s', self
.GetFriendlyName(), self
.error
)
1637 logger
.debug(''.join(traceback
.format_exception(self
.error
.__class
__,
1641 def GetFriendlyName(self
):
1642 """Returns a human-friendly description of the thread."""
1643 if hasattr(self
, 'NAME'):
1645 return 'unknown thread'
1648 non_fatal_error_codes
= set([errno
.EAGAIN
,
1653 errno
.EHOSTUNREACH
])
1656 def IsURLErrorFatal(error
):
1657 """Returns False if the given URLError may be from a transient failure.
1660 error: A urllib2.URLError instance.
1662 assert isinstance(error
, urllib2
.URLError
)
1663 if not hasattr(error
, 'reason'):
1665 if not isinstance(error
.reason
[0], int):
1667 return error
.reason
[0] not in non_fatal_error_codes
1670 class DataSourceThread(_ThreadBase
):
1671 """A thread which reads WorkItems and pushes them into queue.
1673 This thread will read/consume WorkItems from a generator (produced by
1674 the generator factory). These WorkItems will then be pushed into the
1675 thread_pool. Note that reading will block if/when the thread_pool becomes
1676 full. Information on content consumed from the generator will be pushed
1677 into the progress_queue.
1680 NAME
= 'data source thread'
1687 workitem_generator_factory
,
1688 progress_generator_factory
):
1689 """Initialize the DataSourceThread instance.
1692 request_manager: A RequestManager instance.
1693 kinds: The kinds of entities being transferred.
1694 thread_pool: An AdaptiveThreadPool instance.
1695 progress_queue: A queue used for tracking progress information.
1696 workitem_generator_factory: A factory that creates a WorkItem generator
1697 progress_generator_factory: A factory that creates a generator which
1698 produces prior progress status, or None if there is no prior status
1701 _ThreadBase
.__init
__(self
)
1703 self
.request_manager
= request_manager
1705 self
.thread_pool
= thread_pool
1706 self
.progress_queue
= progress_queue
1707 self
.workitem_generator_factory
= workitem_generator_factory
1708 self
.progress_generator_factory
= progress_generator_factory
1710 self
.entity_count
= 0
1712 def PerformWork(self
):
1713 """Performs the work of a DataSourceThread."""
1716 if self
.progress_generator_factory
:
1717 progress_gen
= self
.progress_generator_factory()
1721 content_gen
= self
.workitem_generator_factory(self
.request_manager
,
1722 self
.progress_queue
,
1728 self
.read_all
= False
1730 for item
in content_gen
.Batches():
1741 while not self
.exit_flag
:
1744 self
.thread_pool
.SubmitItem(item
, block
=True, timeout
=1.0)
1745 self
.entity_count
+= item
.count
1754 if not self
.exit_flag
:
1755 self
.read_all
= True
1756 self
.read_count
= content_gen
.row_count
1757 self
.xfer_count
= content_gen
.xfer_count
1762 def _RunningInThread(thread
):
1763 """Return True if we are running within the specified thread."""
1764 return threading
.currentThread().getName() == thread
.getName()
1767 class _Database(object):
1768 """Base class for database connections in this module.
1770 The table is created by a primary thread (the python main thread)
1771 but all future lookups and updates are performed by a secondary
1775 SIGNATURE_TABLE_NAME
= 'bulkloader_database_signature'
1782 commit_periodicity
=100):
1783 """Initialize the _Database instance.
1786 db_filename: The sqlite3 file to use for the database.
1787 create_table: A string containing the SQL table creation command.
1788 signature: A string identifying the important invocation options,
1789 used to make sure we are not using an old database.
1790 index: An optional string to create an index for the database.
1791 commit_periodicity: Number of operations between database commits.
1795 self
.db_filename
= db_filename
1799 logger
.info('Opening database: %s', db_filename
)
1800 self
.primary_conn
= sqlite3
.connect(db_filename
, isolation_level
=None)
1801 self
.primary_thread
= threading
.currentThread()
1804 self
.secondary_conn
= None
1805 self
.secondary_thread
= None
1807 self
.operation_count
= 0
1808 self
.commit_periodicity
= commit_periodicity
1813 self
.primary_conn
.execute(create_table
)
1814 except sqlite3
.OperationalError
, e
:
1816 if 'already exists' not in e
.message
:
1821 self
.primary_conn
.execute(index
)
1822 except sqlite3
.OperationalError
, e
:
1824 if 'already exists' not in e
.message
:
1827 self
.existing_table
= False
1828 signature_cursor
= self
.primary_conn
.cursor()
1829 create_signature
= """
1831 value TEXT not null)
1832 """ % _Database
.SIGNATURE_TABLE_NAME
1834 self
.primary_conn
.execute(create_signature
)
1835 self
.primary_conn
.cursor().execute(
1836 'insert into %s (value) values (?)' % _Database
.SIGNATURE_TABLE_NAME
,
1838 except sqlite3
.OperationalError
, e
:
1839 if 'already exists' not in e
.message
:
1840 logger
.exception('Exception creating table:')
1843 self
.existing_table
= True
1844 signature_cursor
.execute(
1845 'select * from %s' % _Database
.SIGNATURE_TABLE_NAME
)
1846 (result
,) = signature_cursor
.fetchone()
1847 if result
and result
!= signature
:
1848 logger
.error('Database signature mismatch:\n\n'
1854 raise ResumeError('Database signature mismatch: %s != %s' % (
1857 def ThreadComplete(self
):
1858 """Finalize any operations the secondary thread has performed.
1860 The database aggregates lots of operations into a single commit, and
1861 this method is used to commit any pending operations as the thread
1862 is about to shut down.
1864 if self
.secondary_conn
:
1865 self
._MaybeCommit
(force_commit
=True)
1867 def _MaybeCommit(self
, force_commit
=False):
1868 """Periodically commit changes into the SQLite database.
1870 Committing every operation is quite expensive, and slows down the
1871 operation of the script. Thus, we only commit after every N operations,
1872 as determined by the self.commit_periodicity value. Optionally, the
1873 caller can force a commit.
1876 force_commit: Pass True in order for a commit to occur regardless
1877 of the current operation count.
1879 self
.operation_count
+= 1
1880 if force_commit
or (self
.operation_count
% self
.commit_periodicity
) == 0:
1881 self
.secondary_conn
.commit()
1883 def _OpenSecondaryConnection(self
):
1884 """Possibly open a database connection for the secondary thread.
1886 If the connection is not open (for the calling thread, which is assumed
1887 to be the unique secondary thread), then open it. We also open a couple
1888 cursors for later use (and reuse).
1890 if self
.secondary_conn
:
1893 assert not _RunningInThread(self
.primary_thread
)
1895 self
.secondary_thread
= threading
.currentThread()
1903 self
.secondary_conn
= sqlite3
.connect(self
.db_filename
)
1906 self
.insert_cursor
= self
.secondary_conn
.cursor()
1907 self
.update_cursor
= self
.secondary_conn
.cursor()
1911 zero_matcher
= re
.compile(r
'\x00')
1913 zero_one_matcher
= re
.compile(r
'\x00\x01')
1917 """Returns a string to represent a key, preserving ordering.
1919 Unlike datastore.Key.__str__(), we have the property:
1921 key1 < key2 ==> KeyStr(key1) < KeyStr(key2)
1923 The key string is constructed from the key path as follows:
1924 (1) Strings are prepended with ':' and numeric id's are padded to
1926 (2) Any null characters (u'\0') present are replaced with u'\0\1'
1927 (3) The sequence u'\0\0' is used to separate each component of the path.
1929 (1) assures that names and ids compare properly, while (2) and (3) enforce
1930 the part-by-part comparison of pieces of the path.
1933 key: A datastore.Key instance.
1936 A string representation of the key, which preserves ordering.
1938 assert isinstance(key
, datastore
.Key
)
1939 path
= key
.to_path()
1943 if isinstance(part
, (int, long)):
1946 part
= '%020d' % part
1951 out_path
.append(zero_matcher
.sub(u
'\0\1', part
))
1953 out_str
= u
'\0\0'.join(out_path
)
1958 def StrKey(key_str
):
1959 """The inverse of the KeyStr function.
1962 key_str: A string in the range of KeyStr.
1965 A datastore.Key instance k, such that KeyStr(k) == key_str.
1967 parts
= key_str
.split(u
'\0\0')
1968 for i
in xrange(len(parts
)):
1969 if parts
[i
][0] == ':':
1972 part
= zero_one_matcher
.sub(u
'\0', part
)
1975 parts
[i
] = int(parts
[i
])
1976 return datastore
.Key
.from_path(*parts
)
1979 class ResultDatabase(_Database
):
1980 """Persistently record all the entities downloaded during an export.
1982 The entities are held in the database by their unique datastore key
1983 in order to avoid duplication if an export is restarted.
1986 def __init__(self
, db_filename
, signature
, commit_periodicity
=1,
1988 """Initialize a ResultDatabase object.
1991 db_filename: The name of the SQLite database to use.
1992 signature: A string identifying the important invocation options,
1993 used to make sure we are not using an old database.
1994 commit_periodicity: How many operations to perform between commits.
1995 exporter: Exporter instance; if exporter.calculate_sort_key_from_entity
1996 is true then exporter.sort_key_from_entity(entity) will be called.
1998 self
.complete
= False
1999 create_table
= ('create table result (\n'
2000 'id BLOB primary key,\n'
2001 'value BLOB not null,\n'
2004 _Database
.__init
__(self
,
2008 commit_periodicity
=commit_periodicity
)
2009 if self
.existing_table
:
2010 cursor
= self
.primary_conn
.cursor()
2011 cursor
.execute('select count(*) from result')
2012 self
.existing_count
= int(cursor
.fetchone()[0])
2014 self
.existing_count
= 0
2015 self
.count
= self
.existing_count
2016 if exporter
and getattr(exporter
, 'calculate_sort_key_from_entity', False):
2017 self
.sort_key_from_entity
= exporter
.sort_key_from_entity
2019 self
.sort_key_from_entity
= None
2021 def _StoreEntity(self
, entity_id
, entity
):
2022 """Store an entity in the result database.
2025 entity_id: A datastore.Key for the entity.
2026 entity: The entity to store.
2029 True if this entities is not already present in the result database.
2031 assert _RunningInThread(self
.secondary_thread
)
2032 assert isinstance(entity_id
, datastore
.Key
), (
2033 'expected a datastore.Key, got a %s' % entity_id
.__class
__.__name
__)
2035 key_str
= buffer(KeyStr(entity_id
).encode('utf-8'))
2036 self
.insert_cursor
.execute(
2037 'select count(*) from result where id = ?', (key_str
,))
2039 already_present
= self
.insert_cursor
.fetchone()[0]
2043 self
.insert_cursor
.execute('delete from result where id = ?',
2047 if self
.sort_key_from_entity
:
2048 sort_key
= self
.sort_key_from_entity(datastore
.Entity
._FromPb
(entity
))
2051 value
= entity
.Encode()
2052 self
.insert_cursor
.execute(
2053 'insert into result (id, value, sort_key) values (?, ?, ?)',
2054 (key_str
, buffer(value
), sort_key
))
2057 def StoreEntities(self
, keys
, entities
):
2058 """Store a group of entities in the result database.
2061 keys: A list of entity keys.
2062 entities: A list of entities.
2065 The number of new entities stored in the result database.
2067 self
._OpenSecondaryConnection
()
2070 for entity_id
, entity
in zip(keys
,
2072 if self
._StoreEntity
(entity_id
, entity
):
2074 logger
.debug('%s insert: delta=%.3f',
2077 logger
.debug('Entities transferred total: %s', self
.count
)
2081 def ResultsComplete(self
):
2082 """Marks the result database as containing complete results."""
2083 self
.complete
= True
2085 def AllEntities(self
):
2086 """Yields all pairs of (id, value) from the result table."""
2088 conn
= sqlite3
.connect(self
.db_filename
, isolation_level
=None)
2089 cursor
= conn
.cursor()
2095 'select id, value from result order by sort_key, id')
2097 for unused_entity_id
, entity
in cursor
:
2098 entity_proto
= entity_pb
.EntityProto(contents
=entity
)
2099 yield datastore
.Entity
._FromPb
(entity_proto
)
2102 class _ProgressDatabase(_Database
):
2103 """Persistently record all progress information during an upload.
2105 This class wraps a very simple SQLite database which records each of
2106 the relevant details from a chunk of work. If the loader is
2107 resumed, then data is replayed out of the database.
2115 commit_periodicity
=100):
2116 """Initialize the ProgressDatabase instance.
2119 db_filename: The name of the SQLite database to use.
2120 sql_type: A string of the SQL type to use for entity keys.
2121 py_type: The python type of entity keys.
2122 signature: A string identifying the important invocation options,
2123 used to make sure we are not using an old database.
2124 commit_periodicity: How many operations to perform between commits.
2126 self
.prior_key_end
= None
2133 create_table
= ('create table progress (\n'
2134 'id integer primary key autoincrement,\n'
2135 'state integer not null,\n'
2136 'kind text not null,\n'
2139 % (sql_type
, sql_type
))
2140 self
.py_type
= py_type
2142 index
= 'create index i_state on progress (state)'
2143 _Database
.__init
__(self
,
2148 commit_periodicity
=commit_periodicity
)
2150 def UseProgressData(self
):
2151 """Returns True if the database has progress information.
2153 Note there are two basic cases for progress information:
2154 1) All saved records indicate a successful upload. In this case, we
2155 need to skip everything transmitted so far and then send the rest.
2156 2) Some records for incomplete transfer are present. These need to be
2157 sent again, and then we resume sending after all the successful
2161 True: if the database has progress information.
2164 ResumeError: if there is an error retrieving rows from the database.
2166 assert _RunningInThread(self
.primary_thread
)
2170 cursor
= self
.primary_conn
.cursor()
2171 cursor
.execute('select count(*) from progress')
2172 row
= cursor
.fetchone()
2174 raise ResumeError('Cannot retrieve progress information from database.')
2179 def StoreKeys(self
, kind
, key_start
, key_end
):
2180 """Record a new progress record, returning a key for later updates.
2182 The specified progress information will be persisted into the database.
2183 A unique key will be returned that identifies this progress state. The
2184 key is later used to (quickly) update this record.
2186 For the progress resumption to proceed properly, calls to StoreKeys
2187 MUST specify monotonically increasing key ranges. This will result in
2188 a database whereby the ID, KEY_START, and KEY_END rows are all
2189 increasing (rather than having ranges out of order).
2191 NOTE: the above precondition is NOT tested by this method (since it
2192 would imply an additional table read or two on each invocation).
2195 kind: The kind for the WorkItem
2196 key_start: The starting key of the WorkItem (inclusive)
2197 key_end: The end key of the WorkItem (inclusive)
2200 A string to later be used as a unique key to update this state.
2202 self
._OpenSecondaryConnection
()
2204 assert _RunningInThread(self
.secondary_thread
)
2205 assert (not key_start
) or isinstance(key_start
, self
.py_type
), (
2206 '%s is a %s, %s expected %s' % (key_start
,
2207 key_start
.__class
__,
2208 self
.__class
__.__name
__,
2210 assert (not key_end
) or isinstance(key_end
, self
.py_type
), (
2211 '%s is a %s, %s expected %s' % (key_end
,
2213 self
.__class
__.__name
__,
2215 assert KeyLEQ(key_start
, key_end
), '%s not less than %s' % (
2216 repr(key_start
), repr(key_end
))
2218 self
.insert_cursor
.execute(
2219 'insert into progress (state, kind, key_start, key_end)'
2220 ' values (?, ?, ?, ?)',
2221 (STATE_READ
, unicode(kind
), unicode(key_start
), unicode(key_end
)))
2223 progress_key
= self
.insert_cursor
.lastrowid
2229 def UpdateState(self
, key
, new_state
):
2230 """Update a specified progress record with new information.
2233 key: The key for this progress record, returned from StoreKeys
2234 new_state: The new state to associate with this progress record.
2236 self
._OpenSecondaryConnection
()
2238 assert _RunningInThread(self
.secondary_thread
)
2239 assert isinstance(new_state
, int)
2241 self
.update_cursor
.execute('update progress set state=? where id=?',
2246 def DeleteKey(self
, progress_key
):
2247 """Delete the entities with the given key from the result database."""
2248 self
._OpenSecondaryConnection
()
2250 assert _RunningInThread(self
.secondary_thread
)
2253 self
.insert_cursor
.execute(
2254 'delete from progress where rowid = ?', (progress_key
,))
2256 logger
.debug('delete: delta=%.3f', time
.time() - t
)
2260 def GetProgressStatusGenerator(self
):
2261 """Get a generator which yields progress information.
2263 The returned generator will yield a series of 5-tuples that specify
2264 progress information about a prior run of the uploader. The 5-tuples
2265 have the following values:
2267 progress_key: The unique key to later update this record with new
2268 progress information.
2269 state: The last state saved for this progress record.
2270 kind: The datastore kind of the items for uploading.
2271 key_start: The starting key of the items for uploading (inclusive).
2272 key_end: The ending key of the items for uploading (inclusive).
2274 After all incompletely-transferred records are provided, then one
2275 more 5-tuple will be generated:
2278 DATA_CONSUMED_TO_HERE: A unique string value indicating this record
2282 key_end: An integer value specifying the last data source key that
2283 was handled by the previous run of the uploader.
2285 The caller should begin uploading records which occur after key_end.
2288 Five-tuples of (progress_key, state, kind, key_start, key_end)
2293 conn
= sqlite3
.connect(self
.db_filename
, isolation_level
=None)
2294 cursor
= conn
.cursor()
2298 cursor
.execute('select max(key_end) from progress')
2300 result
= cursor
.fetchone()
2301 if result
is not None:
2304 logger
.debug('No rows in progress database.')
2307 self
.prior_key_end
= key_end
2312 'select id, state, kind, key_start, key_end from progress'
2319 rows
= cursor
.fetchall()
2324 progress_key
, state
, kind
, key_start
, key_end
= row
2326 yield progress_key
, state
, kind
, key_start
, key_end
2329 yield None, DATA_CONSUMED_TO_HERE
, None, None, key_end
2332 def ProgressDatabase(db_filename
, signature
):
2333 """Returns a database to store upload progress information."""
2334 return _ProgressDatabase(db_filename
, 'INTEGER', int, signature
)
2337 class ExportProgressDatabase(_ProgressDatabase
):
2338 """A database to store download progress information."""
2340 def __init__(self
, db_filename
, signature
):
2341 """Initialize an ExportProgressDatabase."""
2342 _ProgressDatabase
.__init
__(self
,
2347 commit_periodicity
=1)
2349 def UseProgressData(self
):
2350 """Check if the progress database contains progress data.
2353 True: if the database contains progress data.
2359 return self
.existing_table
2362 class StubProgressDatabase(object):
2363 """A stub implementation of ProgressDatabase which does nothing."""
2365 def UseProgressData(self
):
2366 """Whether the stub database has progress information (it doesn't)."""
2369 def StoreKeys(self
, unused_kind
, unused_key_start
, unused_key_end
):
2370 """Pretend to store a key in the stub database."""
2373 def UpdateState(self
, unused_key
, unused_new_state
):
2374 """Pretend to update the state of a progress item."""
2377 def ThreadComplete(self
):
2378 """Finalize operations on the stub database (i.e. do nothing)."""
2381 def DeleteKey(self
, unused_key
):
2382 """Delete the operations with a given key (but, do nothing.)"""
2386 class _ProgressThreadBase(_ThreadBase
):
2387 """A thread which records progress information for the upload process.
2389 The progress information is stored into the provided progress database.
2390 This class is not responsible for replaying a prior run's progress
2391 information out of the database. Separate mechanisms must be used to
2392 resume a prior upload attempt.
2395 NAME
= 'progress tracking thread'
2397 def __init__(self
, progress_queue
, progress_db
):
2398 """Initialize the ProgressTrackerThread instance.
2401 progress_queue: A Queue used for tracking progress information.
2402 progress_db: The database for tracking progress information; should
2403 be an instance of ProgressDatabase.
2405 _ThreadBase
.__init
__(self
)
2407 self
.progress_queue
= progress_queue
2408 self
.db
= progress_db
2409 self
.entities_transferred
= 0
2411 def EntitiesTransferred(self
):
2412 """Return the total number of unique entities transferred."""
2413 return self
.entities_transferred
2415 def UpdateProgress(self
, item
):
2416 """Updates the progress information for the given item.
2419 item: A work item whose new state will be recorded
2421 raise NotImplementedError()
2423 def WorkFinished(self
):
2424 """Performs final actions after the entity transfer is complete."""
2425 raise NotImplementedError()
2427 def PerformWork(self
):
2428 """Performs the work of a ProgressTrackerThread."""
2429 while not self
.exit_flag
:
2431 item
= self
.progress_queue
.get(block
=True, timeout
=1.0)
2435 if item
== _THREAD_SHOULD_EXIT
:
2438 if item
.state
== STATE_READ
and item
.progress_key
is None:
2441 item
.progress_key
= self
.db
.StoreKeys(item
.kind
,
2448 assert item
.progress_key
is not None
2449 self
.UpdateProgress(item
)
2452 item
.progress_event
.set()
2454 self
.progress_queue
.task_done()
2456 self
.db
.ThreadComplete()
2461 class ProgressTrackerThread(_ProgressThreadBase
):
2462 """A thread which records progress information for the upload process.
2464 The progress information is stored into the provided progress database.
2465 This class is not responsible for replaying a prior run's progress
2466 information out of the database. Separate mechanisms must be used to
2467 resume a prior upload attempt.
2469 NAME
= 'progress tracking thread'
2471 def __init__(self
, progress_queue
, progress_db
):
2472 """Initialize the ProgressTrackerThread instance.
2475 progress_queue: A Queue used for tracking progress information.
2476 progress_db: The database for tracking progress information; should
2477 be an instance of ProgressDatabase.
2479 _ProgressThreadBase
.__init
__(self
, progress_queue
, progress_db
)
2481 def UpdateProgress(self
, item
):
2482 """Update the state of the given WorkItem.
2485 item: A WorkItem instance.
2487 self
.db
.UpdateState(item
.progress_key
, item
.state
)
2488 if item
.state
== STATE_SENT
:
2489 self
.entities_transferred
+= item
.count
2491 def WorkFinished(self
):
2492 """Performs final actions after the entity transfer is complete."""
2496 class ExportProgressThread(_ProgressThreadBase
):
2497 """A thread to record progress information and write record data for exports.
2499 The progress information is stored into a provided progress database.
2500 Exported results are stored in the result database and dumped to an output
2501 file at the end of the download.
2504 def __init__(self
, exporter
, progress_queue
, progress_db
, result_db
):
2505 """Initialize the ExportProgressThread instance.
2508 exporter: An Exporter instance for the download.
2509 progress_queue: A Queue used for tracking progress information.
2510 progress_db: The database for tracking progress information; should
2511 be an instance of ProgressDatabase.
2512 result_db: The database for holding exported entities; should be an
2513 instance of ResultDatabase.
2515 _ProgressThreadBase
.__init
__(self
, progress_queue
, progress_db
)
2517 self
.exporter
= exporter
2518 self
.existing_count
= result_db
.existing_count
2519 self
.result_db
= result_db
2521 def EntitiesTransferred(self
):
2522 """Return the total number of unique entities transferred."""
2523 return self
.result_db
.count
2525 def WorkFinished(self
):
2526 """Write the contents of the result database."""
2527 self
.exporter
.output_entities(self
.result_db
.AllEntities())
2529 def UpdateProgress(self
, item
):
2530 """Update the state of the given KeyRangeItem.
2533 item: A KeyRange instance.
2535 if item
.state
== STATE_GOT
:
2536 count
= self
.result_db
.StoreEntities(item
.download_result
.keys
,
2537 item
.download_result
.entities
)
2539 self
.db
.DeleteKey(item
.progress_key
)
2540 self
.entities_transferred
+= count
2542 self
.db
.UpdateState(item
.progress_key
, item
.state
)
2545 class MapperProgressThread(_ProgressThreadBase
):
2546 """A thread to record progress information for maps over the datastore."""
2548 def __init__(self
, mapper
, progress_queue
, progress_db
):
2549 """Initialize the MapperProgressThread instance.
2552 mapper: A Mapper object for this map run.
2553 progress_queue: A Queue used for tracking progress information.
2554 progress_db: The database for tracking progress information; should
2555 be an instance of ProgressDatabase.
2557 _ProgressThreadBase
.__init
__(self
, progress_queue
, progress_db
)
2559 self
.mapper
= mapper
2561 def EntitiesTransferred(self
):
2562 """Return the total number of unique entities transferred."""
2563 return self
.entities_transferred
2565 def WorkFinished(self
):
2566 """Perform actions after map is complete."""
2569 def UpdateProgress(self
, item
):
2570 """Update the state of the given KeyRangeItem.
2573 item: A KeyRange instance.
2575 if item
.state
== STATE_GOT
:
2576 self
.entities_transferred
+= item
.count
2578 self
.db
.DeleteKey(item
.progress_key
)
2580 self
.db
.UpdateState(item
.progress_key
, item
.state
)
2583 def ParseKey(key_string
):
2584 """Turn a key stored in the database into a Key or None.
2587 key_string: The string representation of a Key.
2590 A datastore.Key instance or None
2594 if key_string
== 'None':
2596 return datastore
.Key(encoded
=key_string
)
2599 def Validate(value
, typ
):
2600 """Checks that value is non-empty and of the right type.
2604 typ: a type or tuple of types
2607 ValueError: if value is None or empty.
2608 TypeError: if it's not the given type.
2611 raise ValueError('Value should not be empty; received %s.' % value
)
2612 elif not isinstance(value
, typ
):
2613 raise TypeError('Expected a %s, but received %s (a %s).' %
2614 (typ
, value
, value
.__class
__))
2617 def CheckFile(filename
):
2618 """Check that the given file exists and can be opened for reading.
2621 filename: The name of the file.
2624 FileNotFoundError: if the given filename is not found
2625 FileNotReadableError: if the given filename is not readable.
2627 if not os
.path
.exists(filename
):
2628 raise FileNotFoundError('%s: file not found' % filename
)
2629 elif not os
.access(filename
, os
.R_OK
):
2630 raise FileNotReadableError('%s: file not readable' % filename
)
2633 class Loader(object):
2634 """A base class for creating datastore entities from input data.
2636 To add a handler for bulk loading a new entity kind into your datastore,
2637 write a subclass of this class that calls Loader.__init__ from your
2640 If you need to run extra code to convert entities from the input
2641 data, create new properties, or otherwise modify the entities before
2642 they're inserted, override handle_entity.
2644 See the create_entity method for the creation of entities from the
2645 (parsed) input data.
2652 def __init__(self
, kind
, properties
):
2655 Populates this Loader's kind and properties map.
2658 kind: a string containing the entity kind that this loader handles
2660 properties: list of (name, converter) tuples.
2662 This is used to automatically convert the input columns into
2663 properties. The converter should be a function that takes one
2664 argument, a string value from the input file, and returns a
2665 correctly typed property value that should be inserted. The
2666 tuples in this list should match the columns in your input file,
2672 ('email', datastore_types.Email),
2673 ('user', users.User),
2674 ('birthdate', lambda x: datetime.datetime.fromtimestamp(float(x))),
2675 ('description', datastore_types.Text),
2678 Validate(kind
, (basestring
, tuple))
2680 self
.__openfile
= open
2681 self
.__create
_csv
_reader
= csv
.reader
2684 GetImplementationClass(kind
)
2686 Validate(properties
, list)
2687 for name
, fn
in properties
:
2688 Validate(name
, basestring
)
2689 assert callable(fn
), (
2690 'Conversion function %s for property %s is not callable.' % (fn
, name
))
2692 self
.__properties
= properties
2695 def RegisterLoader(loader
):
2696 """Register loader and the Loader instance for its kind.
2699 loader: A Loader instance.
2701 Loader
.__loaders
[loader
.kind
] = loader
2703 def get_keys_to_reserve(self
):
2704 """Returns keys with ids in their paths to be reserved.
2707 A list of keys used to advance the id sequences associated with
2708 each id to prevent collisions with future ids.
2712 def alias_old_names(self
):
2713 """Aliases method names so that Loaders defined with old names work."""
2715 ('CreateEntity', 'create_entity'),
2716 ('HandleEntity', 'handle_entity'),
2717 ('GenerateKey', 'generate_key'),
2719 for old_name
, new_name
in aliases
:
2722 setattr(Loader
, old_name
, getattr(Loader
, new_name
))
2725 if hasattr(self
.__class
__, old_name
) and not (
2726 getattr(self
.__class
__, old_name
).im_func
==
2727 getattr(Loader
, new_name
).im_func
):
2728 if hasattr(self
.__class
__, new_name
) and not (
2729 getattr(self
.__class
__, new_name
).im_func
==
2730 getattr(Loader
, new_name
).im_func
):
2732 raise NameClashError(old_name
, new_name
, self
.__class
__)
2734 setattr(self
, new_name
, getattr(self
, old_name
))
2736 def create_entity(self
, values
, key_name
=None, parent
=None):
2737 """Creates a entity from a list of property values.
2740 values: list/tuple of str
2741 key_name: if provided, the name for the (single) resulting entity
2742 parent: A datastore.Key instance for the parent, or None
2747 The returned entities are populated with the property values from the
2748 argument, converted to native types using the properties map given in
2749 the constructor, and passed through handle_entity. They're ready to be
2753 AssertionError: if the number of values doesn't match the number
2754 of properties in the properties map.
2755 ValueError: if any element of values is None or empty.
2756 TypeError: if values is not a list or tuple.
2758 Validate(values
, (list, tuple))
2759 assert len(values
) == len(self
.__properties
), (
2760 'Expected %d columns, found %d.' %
2761 (len(self
.__properties
), len(values
)))
2763 model_class
= GetImplementationClass(self
.kind
)
2766 'key_name': key_name
,
2769 for (name
, converter
), val
in zip(self
.__properties
, values
):
2770 if converter
is bool and val
.lower() in ('0', 'false', 'no'):
2772 properties
[name
] = converter(val
)
2774 entity
= model_class(**properties
)
2775 entities
= self
.handle_entity(entity
)
2778 if not isinstance(entities
, (list, tuple)):
2779 entities
= [entities
]
2781 for entity
in entities
:
2782 if not isinstance(entity
, db
.Model
):
2783 raise TypeError('Expected a db.Model, received %s (a %s).' %
2784 (entity
, entity
.__class
__))
2788 def generate_key(self
, i
, values
):
2789 """Generates a key_name to be used in creating the underlying object.
2791 The default implementation returns None.
2793 This method can be overridden to control the key generation for
2794 uploaded entities. The value returned should be None (to use a
2795 server generated numeric key), or a string which neither starts
2796 with a digit nor has the form __*__ (see
2797 https://developers.google.com/appengine/docs/python/datastore/entities),
2798 or a datastore.Key instance.
2800 If you generate your own string keys, keep in mind:
2802 1. The key name for each entity must be unique.
2803 2. If an entity of the same kind and key already exists in the
2804 datastore, it will be overwritten.
2807 i: Number corresponding to this object (assume it's run in a loop,
2808 this is your current count.
2809 values: list/tuple of str.
2812 A string to be used as the key_name for an entity.
2816 def handle_entity(self
, entity
):
2817 """Subclasses can override this to add custom entity conversion code.
2819 This is called for each entity, after its properties are populated
2820 from the input but before it is stored. Subclasses can override
2821 this to add custom entity handling code.
2823 The entity to be inserted should be returned. If multiple entities
2824 should be inserted, return a list of entities. If no entities
2825 should be inserted, return None or [].
2831 db.Model or list of db.Model
2835 def initialize(self
, filename
, loader_opts
):
2836 """Performs initialization and validation of the input file.
2838 This implementation checks that the input file exists and can be
2842 filename: The string given as the --filename flag argument.
2843 loader_opts: The string given as the --loader_opts flag argument.
2848 """Performs finalization actions after the upload completes."""
2851 def generate_records(self
, filename
):
2852 """Subclasses can override this to add custom data input code.
2854 This method must yield fixed-length lists of strings.
2856 The default implementation uses csv.reader to read CSV rows
2860 filename: The string input for the --filename option.
2865 csv_generator
= CSVGenerator(filename
, openfile
=self
.__openfile
,
2866 create_csv_reader
=self
.__create
_csv
_reader
2868 return csv_generator
2871 def RegisteredLoaders():
2872 """Returns a dict of the Loader instances that have been created."""
2874 return dict(Loader
.__loaders
)
2877 def RegisteredLoader(kind
):
2878 """Returns the loader instance for the given kind if it exists."""
2879 return Loader
.__loaders
[kind
]
2882 class RestoreThread(_ThreadBase
):
2883 """A thread to read saved entity_pbs from sqlite3."""
2884 NAME
= 'RestoreThread'
2885 _ENTITIES_DONE
= 'Entities Done'
2887 def __init__(self
, queue
, filename
):
2888 _ThreadBase
.__init
__(self
)
2890 self
.filename
= filename
2892 def PerformWork(self
):
2893 db_conn
= sqlite3
.connect(self
.filename
)
2894 cursor
= db_conn
.cursor()
2895 cursor
.execute('select id, value from result')
2896 for entity_id
, value
in cursor
:
2897 self
.queue
.put(value
, block
=True)
2898 self
.queue
.put(RestoreThread
._ENTITIES
_DONE
, block
=True)
2901 class RestoreLoader(Loader
):
2902 """A Loader which imports protobuffers from a file."""
2904 def __init__(self
, kind
, app_id
):
2906 self
.app_id
= app_id
2911 self
.namespace
= namespace_manager
.get_namespace()
2913 def initialize(self
, filename
, loader_opts
):
2915 self
.queue
= Queue
.Queue(1000)
2916 restore_thread
= RestoreThread(self
.queue
, filename
)
2917 restore_thread
.start()
2918 self
.keys_to_reserve
= self
._find
_keys
_to
_reserve
(
2919 self
.generate_records(filename
))
2920 restore_thread
= RestoreThread(self
.queue
, filename
)
2921 restore_thread
.start()
2923 def get_keys_to_reserve(self
):
2924 """Returns keys with ids in their paths to be reserved.
2927 A list of keys used to advance the id sequences associated with
2928 each id to prevent collisions with future ids.
2930 return self
.keys_to_reserve
2932 def _find_keys_to_reserve(self
, record_generator
):
2933 """Find all entity keys with ids in their paths.
2936 record_generator: A generator of entity_encoding strings.
2939 A list of keys to reserve.
2941 keys_to_reserve
= []
2942 for values
in record_generator
:
2943 entity
= self
.create_entity(values
)
2945 for id_or_name
in key
.to_path()[1::2]:
2946 if isinstance(id_or_name
, (int, long)):
2947 keys_to_reserve
.append(key
)
2949 return keys_to_reserve
2951 def generate_records(self
, filename
):
2953 record
= self
.queue
.get(block
=True)
2954 if id(record
) == id(RestoreThread
._ENTITIES
_DONE
):
2956 entity_proto
= entity_pb
.EntityProto(contents
=str(record
))
2957 fixed_entity_proto
= self
._translate
_entity
_proto
(entity_proto
)
2958 yield datastore
.Entity
._FromPb
(fixed_entity_proto
)
2960 def create_entity(self
, values
, key_name
=None, parent
=None):
2963 def rewrite_reference_proto(self
, entity_namespace
, reference_proto
):
2964 """Transform the Reference protobuffer which underlies keys and references.
2967 entity_namespace: The 'before' namespace of the entity that has this
2968 reference property. If this value does not match the reference
2969 properties current namespace, then the reference property namespace will
2971 reference_proto: A Onestore Reference proto
2973 reference_proto
.set_app(self
.app_id
)
2974 if entity_namespace
!= reference_proto
.name_space():
2978 reference_proto
.set_name_space(self
.namespace
)
2980 reference_proto
.clear_name_space()
2982 def _translate_entity_proto(self
, entity_proto
):
2983 """Transform the ReferenceProperties of the given entity to fix app_id."""
2984 entity_key
= entity_proto
.mutable_key()
2985 entity_key
.set_app(self
.app_id
)
2986 original_entity_namespace
= entity_key
.name_space()
2988 entity_key
.set_name_space(self
.namespace
)
2990 entity_key
.clear_name_space()
2992 for prop
in entity_proto
.property_list():
2993 prop_value
= prop
.mutable_value()
2994 if prop_value
.has_referencevalue():
2995 self
.rewrite_reference_proto(original_entity_namespace
,
2996 prop_value
.mutable_referencevalue())
2998 for prop
in entity_proto
.raw_property_list():
2999 prop_value
= prop
.mutable_value()
3000 if prop_value
.has_referencevalue():
3001 self
.rewrite_reference_proto(original_entity_namespace
,
3002 prop_value
.mutable_referencevalue())
3007 class Exporter(object):
3008 """A base class for serializing datastore entities.
3010 To add a handler for exporting an entity kind from your datastore,
3011 write a subclass of this class that calls Exporter.__init__ from your
3014 If you need to run extra code to convert entities from the input
3015 data, create new properties, or otherwise modify the entities before
3016 they're inserted, override handle_entity.
3018 See the output_entities method for the writing of data from entities.
3024 calculate_sort_key_from_entity
= False
3026 def __init__(self
, kind
, properties
):
3029 Populates this Exporters's kind and properties map.
3032 kind: a string containing the entity kind that this exporter handles
3034 properties: list of (name, converter, default) tuples.
3036 This is used to automatically convert the entities to strings.
3037 The converter should be a function that takes one argument, a property
3038 value of the appropriate type, and returns a str or unicode. The default
3039 is a string to be used if the property is not present, or None to fail
3040 with an error if the property is missing.
3043 [('name', str, None),
3044 ('id_number', str, None),
3046 ('user', str, None),
3048 lambda x: str(datetime.datetime.fromtimestamp(float(x))),
3050 ('description', str, ''),
3053 Validate(kind
, basestring
)
3057 GetImplementationClass(kind
)
3059 Validate(properties
, list)
3060 for name
, fn
, default
in properties
:
3061 Validate(name
, basestring
)
3062 assert callable(fn
), (
3063 'Conversion function %s for property %s is not callable.' % (
3066 Validate(default
, basestring
)
3068 self
.__properties
= properties
3071 def RegisterExporter(exporter
):
3072 """Register exporter and the Exporter instance for its kind.
3075 exporter: A Exporter instance.
3077 Exporter
.__exporters
[exporter
.kind
] = exporter
3079 def __ExtractProperties(self
, entity
):
3080 """Converts an entity into a list of string values.
3083 entity: An entity to extract the properties from.
3086 A list of the properties of the entity.
3089 MissingPropertyError: if an expected field on the entity is missing.
3092 for name
, fn
, default
in self
.__properties
:
3094 encoding
.append(fn(entity
[name
]))
3096 if name
== '__key__':
3097 encoding
.append(fn(entity
.key()))
3098 elif default
is None:
3099 raise MissingPropertyError(name
)
3101 encoding
.append(default
)
3104 def __EncodeEntity(self
, entity
):
3105 """Convert the given entity into CSV string.
3108 entity: The entity to encode.
3113 output
= StringIO
.StringIO()
3114 writer
= csv
.writer(output
)
3115 writer
.writerow(self
.__ExtractProperties
(entity
))
3116 return output
.getvalue()
3118 def __SerializeEntity(self
, entity
):
3119 """Creates a string representation of an entity.
3122 entity: The entity to serialize.
3125 A serialized representation of an entity.
3127 encoding
= self
.__EncodeEntity
(entity
)
3128 if not isinstance(encoding
, unicode):
3129 encoding
= unicode(encoding
, 'utf-8')
3130 encoding
= encoding
.encode('utf-8')
3133 def output_entities(self
, entity_generator
):
3134 """Outputs the downloaded entities.
3136 This implementation writes CSV.
3139 entity_generator: A generator that yields the downloaded entities
3142 CheckOutputFile(self
.output_filename
)
3143 output_file
= open(self
.output_filename
, 'w')
3144 logger
.debug('Export complete, writing to file')
3145 output_file
.writelines(self
.__SerializeEntity
(entity
)
3146 for entity
in entity_generator
)
3148 def initialize(self
, filename
, exporter_opts
):
3149 """Performs initialization and validation of the output file.
3151 This implementation checks that the input file exists and can be
3155 filename: The string given as the --filename flag argument.
3156 exporter_opts: The string given as the --exporter_opts flag argument.
3158 CheckOutputFile(filename
)
3159 self
.output_filename
= filename
3162 """Performs finalization actions after the download completes."""
3165 def sort_key_from_entity(self
, entity
):
3166 """A value to alter sorting of entities in output_entities entity_generator.
3168 Will only be called if calculate_sort_key_from_entity is true.
3170 entity: A datastore.Entity.
3172 A value to store in the intermediate sqlite table. The table will later
3173 be sorted by this value then by the datastore key, so the sort_key need
3179 def RegisteredExporters():
3180 """Returns a dictionary of the exporter instances that have been created."""
3182 return dict(Exporter
.__exporters
)
3185 def RegisteredExporter(kind
):
3186 """Returns an exporter instance for the given kind if it exists."""
3187 return Exporter
.__exporters
[kind
]
3190 class DumpExporter(Exporter
):
3191 """An exporter which dumps protobuffers to a file."""
3193 def __init__(self
, kind
, result_db_filename
):
3195 self
.result_db_filename
= result_db_filename
3197 def output_entities(self
, entity_generator
):
3199 shutil
.copyfile(self
.result_db_filename
, self
.output_filename
)
3202 class MapperRetry(Error
):
3203 """An exception that indicates a non-fatal error during mapping."""
3206 class Mapper(object):
3207 """A base class for serializing datastore entities.
3209 To add a handler for exporting an entity kind from your datastore,
3210 write a subclass of this class that calls Mapper.__init__ from your
3213 You need to implement to batch_apply or apply method on your subclass
3214 for the map to do anything.
3220 def __init__(self
, kind
):
3223 Populates this Mappers's kind.
3226 kind: a string containing the entity kind that this mapper handles
3228 Validate(kind
, basestring
)
3232 GetImplementationClass(kind
)
3235 def RegisterMapper(mapper
):
3236 """Register mapper and the Mapper instance for its kind.
3239 mapper: A Mapper instance.
3241 Mapper
.__mappers
[mapper
.kind
] = mapper
3243 def initialize(self
, mapper_opts
):
3244 """Performs initialization.
3247 mapper_opts: The string given as the --mapper_opts flag argument.
3252 """Performs finalization actions after the download completes."""
3255 def apply(self
, entity
):
3256 print 'Default map function doing nothing to %s' % entity
3258 def batch_apply(self
, entities
):
3259 for entity
in entities
:
3262 def map_over_keys_only(self
):
3263 """Return whether this mapper should iterate over only keys or not.
3265 Override this method in subclasses to return True values.
3273 def RegisteredMappers():
3274 """Returns a dictionary of the mapper instances that have been created."""
3276 return dict(Mapper
.__mappers
)
3279 def RegisteredMapper(kind
):
3280 """Returns an mapper instance for the given kind if it exists."""
3281 return Mapper
.__mappers
[kind
]
3284 class QueueJoinThread(threading
.Thread
):
3285 """A thread that joins a queue and exits.
3287 Queue joins do not have a timeout. To simulate a queue join with
3288 timeout, run this thread and join it with a timeout.
3291 def __init__(self
, queue
):
3292 """Initialize a QueueJoinThread.
3295 queue: The queue for this thread to join.
3297 threading
.Thread
.__init
__(self
)
3298 self
.setDaemon(True)
3299 assert isinstance(queue
, (Queue
.Queue
, ReQueue
))
3303 """Perform the queue join in this thread."""
3307 def InterruptibleQueueJoin(queue
,
3310 queue_join_thread_factory
=QueueJoinThread
,
3311 check_workers
=True):
3312 """Repeatedly joins the given ReQueue or Queue.Queue with short timeout.
3314 Between each timeout on the join, worker threads are checked.
3317 queue: A Queue.Queue or ReQueue instance.
3318 thread_local: A threading.local instance which indicates interrupts.
3319 thread_pool: An AdaptiveThreadPool instance.
3320 queue_join_thread_factory: Used for dependency injection.
3321 check_workers: Whether to interrupt the join on worker death.
3324 True unless the queue join is interrupted by SIGINT or worker death.
3326 thread
= queue_join_thread_factory(queue
)
3329 thread
.join(timeout
=.5)
3330 if not thread
.isAlive():
3332 if thread_local
.shut_down
:
3333 logger
.debug('Queue join interrupted')
3336 for worker_thread
in thread_pool
.Threads():
3337 if not worker_thread
.isAlive():
3341 def ShutdownThreads(data_source_thread
, thread_pool
):
3342 """Shuts down the worker and data source threads.
3345 data_source_thread: A running DataSourceThread instance.
3346 thread_pool: An AdaptiveThreadPool instance with workers registered.
3348 logger
.info('An error occurred. Shutting down...')
3351 data_source_thread
.exit_flag
= True
3353 thread_pool
.Shutdown()
3360 data_source_thread
.join(timeout
=3.0)
3361 if data_source_thread
.isAlive():
3366 logger
.warn('%s hung while trying to exit',
3367 data_source_thread
.GetFriendlyName())
3370 class BulkTransporterApp(object):
3371 """Class to wrap bulk transport application functionality."""
3375 input_generator_factory
,
3378 progresstrackerthread_factory
,
3379 max_queue_size
=DEFAULT_QUEUE_SIZE
,
3380 request_manager_factory
=RequestManager
,
3381 datasourcethread_factory
=DataSourceThread
,
3382 progress_queue_factory
=Queue
.Queue
,
3383 thread_pool_factory
=adaptive_thread_pool
.AdaptiveThreadPool
,
3385 """Instantiate a BulkTransporterApp.
3387 Uploads or downloads data to or from application using HTTP requests.
3388 When run, the class will spin up a number of threads to read entities
3389 from the data source, pass those to a number of worker threads
3390 for sending to the application, and track all of the progress in a
3391 small database in case an error or pause/termination requires a
3392 restart/resumption of the upload process.
3395 arg_dict: Dictionary of command line options.
3396 input_generator_factory: A factory that creates a WorkItem generator.
3397 throttle: A Throttle instance.
3398 progress_db: The database to use for replaying/recording progress.
3399 progresstrackerthread_factory: Used for dependency injection.
3400 max_queue_size: Maximum size of the queues before they should block.
3401 request_manager_factory: Used for dependency injection.
3402 datasourcethread_factory: Used for dependency injection.
3403 progress_queue_factory: Used for dependency injection.
3404 thread_pool_factory: Used for dependency injection.
3405 server: An existing AbstractRpcServer to reuse.
3407 self
.app_id
= arg_dict
['application']
3408 self
.post_url
= arg_dict
['url']
3409 self
.kind
= arg_dict
['kind']
3410 self
.batch_size
= arg_dict
['batch_size']
3411 self
.input_generator_factory
= input_generator_factory
3412 self
.num_threads
= arg_dict
['num_threads']
3413 self
.email
= arg_dict
['email']
3414 self
.passin
= arg_dict
['passin']
3415 self
.dry_run
= arg_dict
['dry_run']
3416 self
.throttle_class
= arg_dict
['throttle_class']
3417 self
.throttle
= throttle
3418 self
.progress_db
= progress_db
3419 self
.progresstrackerthread_factory
= progresstrackerthread_factory
3420 self
.max_queue_size
= max_queue_size
3421 self
.request_manager_factory
= request_manager_factory
3422 self
.datasourcethread_factory
= datasourcethread_factory
3423 self
.progress_queue_factory
= progress_queue_factory
3424 self
.thread_pool_factory
= thread_pool_factory
3425 self
.server
= server
3427 self
.host_port
, self
.url_path
,
3428 unused_query
, unused_fragment
) = urlparse
.urlsplit(self
.post_url
)
3429 self
.secure
= (scheme
== 'https')
3431 def RunPostAuthentication(self
):
3432 """Method that gets called after authentication."""
3433 if isinstance(self
.kind
, basestring
):
3438 """Perform the work of the BulkTransporterApp.
3441 AuthenticationError: If authentication is required and fails.
3444 Error code suitable for sys.exit, e.g. 0 on success, 1 on failure.
3447 thread_pool
= self
.thread_pool_factory(
3448 self
.num_threads
, queue_size
=self
.max_queue_size
)
3450 progress_queue
= self
.progress_queue_factory(self
.max_queue_size
)
3451 self
.request_manager
= self
.request_manager_factory(self
.app_id
,
3462 self
.throttle_class
)
3466 self
.request_manager
.Authenticate()
3467 except Exception, e
:
3471 if not isinstance(e
, urllib2
.HTTPError
) or (
3472 e
.code
!= 302 and e
.code
!= 401):
3473 logger
.exception('Exception during authentication')
3474 raise AuthenticationError()
3475 if (self
.request_manager
.auth_called
and
3476 not self
.request_manager
.authenticated
):
3480 raise AuthenticationError('Authentication failed')
3482 kinds
= self
.RunPostAuthentication()
3484 for thread
in thread_pool
.Threads():
3485 self
.throttle
.Register(thread
)
3487 self
.progress_thread
= self
.progresstrackerthread_factory(
3488 progress_queue
, self
.progress_db
)
3490 if self
.progress_db
.UseProgressData():
3491 logger
.debug('Restarting upload using progress database')
3492 progress_generator_factory
= self
.progress_db
.GetProgressStatusGenerator
3494 progress_generator_factory
= None
3496 self
.data_source_thread
= (
3497 self
.datasourcethread_factory(self
.request_manager
,
3501 self
.input_generator_factory
,
3502 progress_generator_factory
))
3506 self
.throttle
.Register(self
.data_source_thread
)
3509 thread_local
= threading
.local()
3510 thread_local
.shut_down
= False
3512 def Interrupt(unused_signum
, unused_frame
):
3513 """Shutdown gracefully in response to a signal."""
3514 thread_local
.shut_down
= True
3517 signal
.signal(signal
.SIGINT
, Interrupt
)
3520 self
.progress_thread
.start()
3521 self
.data_source_thread
.start()
3525 while not thread_local
.shut_down
:
3531 self
.data_source_thread
.join(timeout
=0.25)
3534 if self
.data_source_thread
.isAlive():
3537 for thread
in list(thread_pool
.Threads()) + [self
.progress_thread
]:
3538 if not thread
.isAlive():
3539 logger
.info('Unexpected thread death: %s', thread
.getName())
3540 thread_local
.shut_down
= True
3548 logger
.debug('Waiting for %s...', msg
)
3550 if isinstance(ob
, threading
.Thread
):
3551 ob
.join(timeout
=3.0)
3553 logger
.debug('Joining %s failed', ob
)
3555 logger
.debug('... done.')
3556 elif isinstance(ob
, (Queue
.Queue
, ReQueue
)):
3557 if not InterruptibleQueueJoin(ob
, thread_local
, thread_pool
):
3558 ShutdownThreads(self
.data_source_thread
, thread_pool
)
3561 logger
.debug('... done.')
3564 if self
.data_source_thread
.error
or thread_local
.shut_down
:
3565 ShutdownThreads(self
.data_source_thread
, thread_pool
)
3568 _Join(thread_pool
.requeue
, 'worker threads to finish')
3570 thread_pool
.Shutdown()
3572 thread_pool
.JoinThreads()
3573 thread_pool
.CheckErrors()
3580 if self
.progress_thread
.isAlive():
3581 InterruptibleQueueJoin(progress_queue
, thread_local
, thread_pool
,
3582 check_workers
=False)
3584 logger
.warn('Progress thread exited prematurely')
3588 progress_queue
.put(_THREAD_SHOULD_EXIT
)
3589 _Join(self
.progress_thread
, 'progress_thread to terminate')
3590 self
.progress_thread
.CheckError()
3591 if not thread_local
.shut_down
:
3592 self
.progress_thread
.WorkFinished()
3595 self
.data_source_thread
.CheckError()
3597 return self
.ReportStatus()
3599 def ReportStatus(self
):
3600 """Display a message reporting the final status of the transfer."""
3601 raise NotImplementedError()
3604 class BulkUploaderApp(BulkTransporterApp
):
3605 """Class to encapsulate bulk uploader functionality."""
3607 def __init__(self
, *args
, **kwargs
):
3608 BulkTransporterApp
.__init
__(self
, *args
, **kwargs
)
3610 def RunPostAuthentication(self
):
3611 loader
= Loader
.RegisteredLoader(self
.kind
)
3612 self
.request_manager
.ReserveKeys(loader
.get_keys_to_reserve())
3615 def ReportStatus(self
):
3616 """Display a message reporting the final status of the transfer."""
3617 total_up
, duration
= self
.throttle
.TotalTransferred(
3618 remote_api_throttle
.BANDWIDTH_UP
)
3619 s_total_up
, unused_duration
= self
.throttle
.TotalTransferred(
3620 remote_api_throttle
.HTTPS_BANDWIDTH_UP
)
3621 total_up
+= s_total_up
3623 logger
.info('%d entities total, %d previously transferred',
3624 self
.data_source_thread
.read_count
,
3625 self
.data_source_thread
.xfer_count
)
3626 transfer_count
= self
.progress_thread
.EntitiesTransferred()
3627 logger
.info('%d entities (%d bytes) transferred in %.1f seconds',
3628 transfer_count
, total
, duration
)
3629 if (self
.data_source_thread
.read_all
and
3631 self
.data_source_thread
.xfer_count
>=
3632 self
.data_source_thread
.read_count
):
3633 logger
.info('All entities successfully transferred')
3636 logger
.info('Some entities not successfully transferred')
3640 class BulkDownloaderApp(BulkTransporterApp
):
3641 """Class to encapsulate bulk downloader functionality."""
3643 def __init__(self
, *args
, **kwargs
):
3644 BulkTransporterApp
.__init
__(self
, *args
, **kwargs
)
3646 def RunPostAuthentication(self
):
3648 return self
.request_manager
.GetSchemaKinds()
3649 elif isinstance(self
.kind
, basestring
):
3654 def ReportStatus(self
):
3655 """Display a message reporting the final status of the transfer."""
3656 total_down
, duration
= self
.throttle
.TotalTransferred(
3657 remote_api_throttle
.BANDWIDTH_DOWN
)
3658 s_total_down
, unused_duration
= self
.throttle
.TotalTransferred(
3659 remote_api_throttle
.HTTPS_BANDWIDTH_DOWN
)
3660 total_down
+= s_total_down
3662 existing_count
= self
.progress_thread
.existing_count
3663 xfer_count
= self
.progress_thread
.EntitiesTransferred()
3664 logger
.info('Have %d entities, %d previously transferred',
3665 xfer_count
, existing_count
)
3666 logger
.info('%d entities (%d bytes) transferred in %.1f seconds',
3667 xfer_count
, total
, duration
)
3674 class BulkMapperApp(BulkTransporterApp
):
3675 """Class to encapsulate bulk map functionality."""
3677 def __init__(self
, *args
, **kwargs
):
3678 BulkTransporterApp
.__init
__(self
, *args
, **kwargs
)
3680 def ReportStatus(self
):
3681 """Display a message reporting the final status of the transfer."""
3682 total_down
, duration
= self
.throttle
.TotalTransferred(
3683 remote_api_throttle
.BANDWIDTH_DOWN
)
3684 s_total_down
, unused_duration
= self
.throttle
.TotalTransferred(
3685 remote_api_throttle
.HTTPS_BANDWIDTH_DOWN
)
3686 total_down
+= s_total_down
3688 xfer_count
= self
.progress_thread
.EntitiesTransferred()
3689 logger
.info('The following may be inaccurate if any mapper tasks '
3690 'encountered errors and had to be retried.')
3691 logger
.info('Applied mapper to %s entities.',
3693 logger
.info('%s entities (%s bytes) transferred in %.1f seconds',
3694 xfer_count
, total
, duration
)
3701 def PrintUsageExit(code
):
3702 """Prints usage information and exits with a status code.
3705 code: Status code to pass to sys.exit() after displaying usage information.
3707 print __doc__
% {'arg0': sys
.argv
[0]}
3713 REQUIRED_OPTION
= object()
3716 BOOL_ARGS
= ('create_config', 'debug', 'download', 'dry_run', 'dump',
3717 'has_header', 'map', 'passin', 'restore')
3718 INT_ARGS
= ('bandwidth_limit', 'batch_size', 'http_limit', 'num_threads',
3720 FILENAME_ARGS
= ('config_file', 'db_filename', 'filename', 'log_file',
3721 'result_db_filename')
3722 STRING_ARGS
= ('application', 'auth_domain', 'email', 'exporter_opts',
3723 'kind', 'loader_opts', 'mapper_opts', 'namespace', 'url')
3724 DEPRECATED_OPTIONS
= {'csv_has_header': 'has_header', 'app_id': 'application'}
3725 FLAG_SPEC
= (['csv_has_header', 'help', 'app_id='] +
3727 [arg
+ '=' for arg
in INT_ARGS
+ FILENAME_ARGS
+ STRING_ARGS
])
3729 def ParseArguments(argv
, die_fn
=lambda: PrintUsageExit(1)):
3730 """Parses command-line arguments.
3732 Prints out a help message if -h or --help is supplied.
3735 argv: List of command-line arguments.
3736 die_fn: Function to invoke to end the program.
3739 A dictionary containing the value of command-line options.
3741 opts
, unused_args
= getopt
.getopt(
3748 arg_dict
['url'] = REQUIRED_OPTION
3749 arg_dict
['filename'] = None
3750 arg_dict
['config_file'] = None
3751 arg_dict
['kind'] = None
3753 arg_dict
['batch_size'] = None
3754 arg_dict
['num_threads'] = DEFAULT_THREAD_COUNT
3755 arg_dict
['bandwidth_limit'] = DEFAULT_BANDWIDTH_LIMIT
3756 arg_dict
['rps_limit'] = DEFAULT_RPS_LIMIT
3757 arg_dict
['http_limit'] = DEFAULT_REQUEST_LIMIT
3759 arg_dict
['application'] = ''
3760 arg_dict
['auth_domain'] = 'gmail.com'
3761 arg_dict
['create_config'] = False
3762 arg_dict
['db_filename'] = None
3763 arg_dict
['debug'] = False
3764 arg_dict
['download'] = False
3765 arg_dict
['dry_run'] = False
3766 arg_dict
['dump'] = False
3767 arg_dict
['email'] = None
3768 arg_dict
['exporter_opts'] = None
3769 arg_dict
['has_header'] = False
3770 arg_dict
['loader_opts'] = None
3771 arg_dict
['log_file'] = None
3772 arg_dict
['map'] = False
3773 arg_dict
['mapper_opts'] = None
3774 arg_dict
['namespace'] = ''
3775 arg_dict
['passin'] = False
3776 arg_dict
['restore'] = False
3777 arg_dict
['result_db_filename'] = None
3778 arg_dict
['throttle_class'] = None
3780 def ExpandFilename(filename
):
3781 """Expand shell variables and ~usernames in filename."""
3782 return os
.path
.expandvars(os
.path
.expanduser(filename
))
3784 for option
, value
in opts
:
3785 if option
in ('-h', '--help'):
3787 if not option
.startswith('--'):
3791 if option
in DEPRECATED_OPTIONS
:
3792 print >> sys
.stderr
, ('--%s is deprecated, please use --%s.' %
3793 (option
, DEPRECATED_OPTIONS
[option
]))
3794 option
= DEPRECATED_OPTIONS
[option
]
3796 if option
in BOOL_ARGS
:
3797 arg_dict
[option
] = True
3798 elif option
in INT_ARGS
:
3799 arg_dict
[option
] = int(value
)
3800 elif option
in FILENAME_ARGS
:
3801 arg_dict
[option
] = ExpandFilename(value
)
3802 elif option
in STRING_ARGS
:
3803 arg_dict
[option
] = value
3805 return ProcessArguments(arg_dict
, die_fn
=die_fn
)
3808 def ThrottleLayout(bandwidth_limit
, http_limit
, rps_limit
):
3809 """Return a dictionary indicating the throttle options."""
3810 bulkloader_limits
= dict(remote_api_throttle
.NO_LIMITS
)
3811 bulkloader_limits
.update({
3812 remote_api_throttle
.BANDWIDTH_UP
: bandwidth_limit
,
3813 remote_api_throttle
.BANDWIDTH_DOWN
: bandwidth_limit
,
3814 remote_api_throttle
.REQUESTS
: http_limit
,
3815 remote_api_throttle
.HTTPS_BANDWIDTH_UP
: bandwidth_limit
,
3816 remote_api_throttle
.HTTPS_BANDWIDTH_DOWN
: bandwidth_limit
,
3817 remote_api_throttle
.HTTPS_REQUESTS
: http_limit
,
3818 remote_api_throttle
.ENTITIES_FETCHED
: rps_limit
,
3819 remote_api_throttle
.ENTITIES_MODIFIED
: rps_limit
,
3821 return bulkloader_limits
3824 def CheckOutputFile(filename
):
3825 """Check that the given file does not exist and can be opened for writing.
3828 filename: The name of the file.
3831 FileExistsError: if the given filename is not found
3832 FileNotWritableError: if the given filename is not readable.
3834 full_path
= os
.path
.abspath(filename
)
3835 if os
.path
.exists(full_path
):
3836 raise FileExistsError('%s: output file exists' % filename
)
3837 elif not os
.access(os
.path
.dirname(full_path
), os
.W_OK
):
3838 raise FileNotWritableError(
3839 '%s: not writable' % os
.path
.dirname(full_path
))
3842 def LoadYamlConfig(config_file_name
):
3843 """Loads a config file and registers any Loader classes present.
3845 Used for a the second generation Yaml configuration file.
3848 config_file_name: The name of the configuration file.
3850 (loaders
, exporters
) = bulkloader_config
.load_config(config_file_name
,
3851 reserve_keys
=ReserveKeys
)
3853 Loader
.RegisterLoader(cls())
3854 for cls
in exporters
:
3855 Exporter
.RegisterExporter(cls())
3858 def LoadConfig(config_file_name
, exit_fn
=sys
.exit
):
3859 """Loads a config file and registers any Loader classes present.
3861 Used for a legacy Python configuration file.
3864 config_file_name: The name of the configuration file.
3865 exit_fn: Used for dependency injection.
3867 if config_file_name
:
3868 config_file
= open(config_file_name
, 'r')
3874 bulkloader_config
= imp
.load_module(
3875 'bulkloader_config', config_file
, config_file_name
,
3876 ('', 'r', imp
.PY_SOURCE
))
3877 sys
.modules
['bulkloader_config'] = bulkloader_config
3881 if hasattr(bulkloader_config
, 'loaders'):
3882 for cls
in bulkloader_config
.loaders
:
3883 Loader
.RegisterLoader(cls())
3885 if hasattr(bulkloader_config
, 'exporters'):
3886 for cls
in bulkloader_config
.exporters
:
3887 Exporter
.RegisterExporter(cls())
3889 if hasattr(bulkloader_config
, 'mappers'):
3890 for cls
in bulkloader_config
.mappers
:
3891 Mapper
.RegisterMapper(cls())
3893 except NameError, e
:
3896 m
= re
.search(r
"[^']*'([^']*)'.*", str(e
))
3897 if m
.groups() and m
.group(1) == 'Loader':
3898 print >> sys
.stderr
, """
3899 The config file format has changed and you appear to be using an old-style
3900 config file. Please make the following changes:
3902 1. At the top of the file, add this:
3904 from google.appengine.tools.bulkloader import Loader
3906 2. For each of your Loader subclasses add the following at the end of the
3907 __init__ definitioion:
3909 self.alias_old_names()
3911 3. At the bottom of the file, add this:
3913 loaders = [MyLoader1,...,MyLoaderN]
3915 Where MyLoader1,...,MyLoaderN are the Loader subclasses you want the bulkloader
3921 except Exception, e
:
3925 if isinstance(e
, NameClashError
) or 'bulkloader_config' in vars() and (
3926 hasattr(bulkloader_config
, 'bulkloader') and
3927 isinstance(e
, bulkloader_config
.bulkloader
.NameClashError
)):
3928 print >> sys
.stderr
, (
3929 'Found both %s and %s while aliasing old names on %s.' %
3930 (e
.old_name
, e
.new_name
, e
.klass
))
3935 def GetArgument(kwargs
, name
, die_fn
):
3936 """Get the value of the key name in kwargs, or die with die_fn.
3939 kwargs: A dictionary containing the options for the bulkloader.
3940 name: The name of a bulkloader option.
3941 die_fn: The function to call to exit the program.
3944 The value of kwargs[name] is name in kwargs
3949 print >> sys
.stderr
, '%s argument required' % name
3953 def _MakeSignature(app_id
=None,
3960 result_db_filename
=None,
3963 """Returns a string that identifies the important options for the database."""
3966 result_db_line
= 'result_db: %s' % result_db_filename
3980 """ % (app_id
, url
, kind
, download
, perform_map
, dump
, restore
, db_filename
,
3981 has_header
, result_db_line
)
3984 def ProcessArguments(arg_dict
,
3985 die_fn
=lambda: sys
.exit(1)):
3986 """Processes non command-line input arguments.
3989 arg_dict: Dictionary containing the values of bulkloader options.
3990 die_fn: Function to call in case of an error during argument processing.
3993 A dictionary of bulkloader options.
3997 unused_application
= GetArgument(arg_dict
, 'application', die_fn
)
3998 url
= GetArgument(arg_dict
, 'url', die_fn
)
3999 dump
= GetArgument(arg_dict
, 'dump', die_fn
)
4000 restore
= GetArgument(arg_dict
, 'restore', die_fn
)
4001 create_config
= GetArgument(arg_dict
, 'create_config', die_fn
)
4002 filename
= GetArgument(arg_dict
, 'filename', die_fn
)
4003 batch_size
= GetArgument(arg_dict
, 'batch_size', die_fn
)
4004 kind
= GetArgument(arg_dict
, 'kind', die_fn
)
4005 db_filename
= GetArgument(arg_dict
, 'db_filename', die_fn
)
4006 config_file
= GetArgument(arg_dict
, 'config_file', die_fn
)
4007 result_db_filename
= GetArgument(arg_dict
, 'result_db_filename', die_fn
)
4008 download
= GetArgument(arg_dict
, 'download', die_fn
)
4009 log_file
= GetArgument(arg_dict
, 'log_file', die_fn
)
4010 perform_map
= GetArgument(arg_dict
, 'map', die_fn
)
4011 namespace
= GetArgument(arg_dict
, 'namespace', die_fn
)
4015 if batch_size
is None:
4016 if download
or perform_map
or dump
or create_config
:
4017 arg_dict
['batch_size'] = DEFAULT_DOWNLOAD_BATCH_SIZE
4019 arg_dict
['batch_size'] = DEFAULT_BATCH_SIZE
4020 elif batch_size
<= 0:
4021 errors
.append('batch_size must be at least 1')
4023 if db_filename
is None:
4024 arg_dict
['db_filename'] = time
.strftime(
4025 'bulkloader-progress-%Y%m%d.%H%M%S.sql3')
4027 if result_db_filename
is None:
4028 arg_dict
['result_db_filename'] = time
.strftime(
4029 'bulkloader-results-%Y%m%d.%H%M%S.sql3')
4031 if log_file
is None:
4032 arg_dict
['log_file'] = time
.strftime('bulkloader-log-%Y%m%d.%H%M%S')
4034 required
= '%s argument required'
4037 if config_file
is None and not dump
and not restore
and not create_config
:
4038 errors
.append('One of --config_file, --dump, --restore, or --create_config '
4041 if url
is REQUIRED_OPTION
:
4042 errors
.append(required
% 'url')
4044 if not filename
and not perform_map
:
4045 errors
.append(required
% 'filename')
4048 if download
or perform_map
:
4049 errors
.append('kind argument required for this operation')
4050 elif not dump
and not restore
and not create_config
:
4052 'kind argument required unless --dump, --restore or --create_config '
4057 namespace_manager
.validate_namespace(namespace
)
4058 except namespace_manager
.BadValueError
, msg
:
4059 errors
.append('namespace parameter %s' % msg
)
4062 POSSIBLE_COMMANDS
= ('create_config', 'download', 'dump', 'map', 'restore')
4064 for command
in POSSIBLE_COMMANDS
:
4065 if arg_dict
[command
]:
4066 commands
.append(command
)
4067 if len(commands
) > 1:
4068 errors
.append('%s are mutually exclusive.' % ' and '.join(commands
))
4074 print >> sys
.stderr
, '\n'.join(errors
)
4080 def _GetRemoteAppId(url
, throttle
, email
, passin
,
4081 raw_input_fn
=raw_input, password_input_fn
=getpass
.getpass
,
4082 throttle_class
=None):
4083 """Get the App ID from the remote server."""
4084 scheme
, host_port
, url_path
, _
, _
= urlparse
.urlsplit(url
)
4086 secure
= (scheme
== 'https')
4088 throttled_rpc_server_factory
= (
4089 remote_api_throttle
.ThrottledHttpRpcServerFactory(
4090 throttle
, throttle_class
=throttle_class
))
4093 return _AuthFunction(host_port
, email
, passin
, raw_input_fn
,
4096 app_id
, server
= remote_api_stub
.GetRemoteAppId(
4097 host_port
, url_path
, AuthFunction
,
4098 rpc_server_factory
=throttled_rpc_server_factory
, secure
=secure
)
4100 return app_id
, server
4103 def ParseKind(kind
):
4104 if kind
and kind
[0] == '(' and kind
[-1] == ')':
4105 return tuple(kind
[1:-1].split(','))
4110 def _PerformBulkload(arg_dict
,
4111 check_file
=CheckFile
,
4112 check_output_file
=CheckOutputFile
):
4113 """Runs the bulkloader, given the command line options.
4116 arg_dict: Dictionary of bulkloader options.
4117 check_file: Used for dependency injection.
4118 check_output_file: Used for dependency injection.
4124 ConfigurationError: if inconsistent options are passed.
4126 app_id
= arg_dict
['application']
4127 url
= arg_dict
['url']
4128 filename
= arg_dict
['filename']
4129 batch_size
= arg_dict
['batch_size']
4130 kind
= arg_dict
['kind']
4131 num_threads
= arg_dict
['num_threads']
4132 bandwidth_limit
= arg_dict
['bandwidth_limit']
4133 rps_limit
= arg_dict
['rps_limit']
4134 http_limit
= arg_dict
['http_limit']
4135 db_filename
= arg_dict
['db_filename']
4136 config_file
= arg_dict
['config_file']
4137 auth_domain
= arg_dict
['auth_domain']
4138 has_header
= arg_dict
['has_header']
4139 download
= arg_dict
['download']
4140 result_db_filename
= arg_dict
['result_db_filename']
4141 loader_opts
= arg_dict
['loader_opts']
4142 exporter_opts
= arg_dict
['exporter_opts']
4143 mapper_opts
= arg_dict
['mapper_opts']
4144 email
= arg_dict
['email']
4145 passin
= arg_dict
['passin']
4146 perform_map
= arg_dict
['map']
4147 dump
= arg_dict
['dump']
4148 restore
= arg_dict
['restore']
4149 create_config
= arg_dict
['create_config']
4150 namespace
= arg_dict
['namespace']
4151 dry_run
= arg_dict
['dry_run']
4152 throttle_class
= arg_dict
['throttle_class']
4155 namespace_manager
.set_namespace(namespace
)
4156 os
.environ
['AUTH_DOMAIN'] = auth_domain
4158 kind
= ParseKind(kind
)
4160 if not dump
and not restore
and not create_config
:
4162 check_file(config_file
)
4164 if download
or dump
or create_config
:
4166 check_output_file(filename
)
4167 elif not perform_map
:
4168 check_file(filename
)
4171 throttle_layout
= ThrottleLayout(bandwidth_limit
, http_limit
, rps_limit
)
4172 logger
.info('Throttling transfers:')
4173 logger
.info('Bandwidth: %s bytes/second', bandwidth_limit
)
4174 logger
.info('HTTP connections: %s/second', http_limit
)
4175 logger
.info('Entities inserted/fetched/modified: %s/second', rps_limit
)
4176 logger
.info('Batch Size: %s', batch_size
)
4178 throttle
= remote_api_throttle
.Throttle(layout
=throttle_layout
)
4181 throttle
.Register(threading
.currentThread())
4182 threading
.currentThread().exit_flag
= False
4189 raise ConfigurationError('Must sepcify application ID in dry run mode.')
4190 app_id
, server
= _GetRemoteAppId(url
, throttle
, email
, passin
,
4191 throttle_class
=throttle_class
)
4193 arg_dict
['application'] = app_id
4196 Exporter
.RegisterExporter(DumpExporter(kind
, result_db_filename
))
4198 Loader
.RegisterLoader(RestoreLoader(kind
, app_id
))
4201 kind
= '__Stat_PropertyType_PropertyName_Kind__'
4202 arg_dict
['kind'] = kind
4205 root_dir
= os
.path
.dirname(os
.path
.abspath(__file__
))
4206 if os
.path
.basename(root_dir
) == 'tools':
4207 root_dir
= os
.path
.dirname(os
.path
.dirname(os
.path
.dirname(root_dir
)))
4209 LoadYamlConfig(os
.path
.join(
4210 root_dir
, os
.path
.normpath(
4211 'google/appengine/ext/bulkload/bulkloader_wizard.yaml')))
4212 elif (config_file
and
4213 (config_file
.endswith('.yaml') or config_file
.endswith('.yml'))):
4214 LoadYamlConfig(config_file
)
4216 LoadConfig(config_file
)
4218 os
.environ
['APPLICATION_ID'] = app_id
4220 signature
= _MakeSignature(app_id
=app_id
,
4223 db_filename
=db_filename
,
4225 perform_map
=perform_map
,
4226 has_header
=has_header
,
4227 result_db_filename
=result_db_filename
,
4233 max_queue_size
= max(DEFAULT_QUEUE_SIZE
, 3 * num_threads
+ 5)
4235 upload
= not (download
or dump
or restore
or perform_map
or create_config
)
4237 if db_filename
== 'skip':
4238 progress_db
= StubProgressDatabase()
4239 elif upload
or restore
:
4240 progress_db
= ProgressDatabase(db_filename
, signature
)
4242 progress_db
= ExportProgressDatabase(db_filename
, signature
)
4246 if upload
or restore
:
4247 loader
= Loader
.RegisteredLoader(kind
)
4249 loader
.initialize(filename
, loader_opts
)
4252 workitem_generator_factory
= GetCSVGeneratorFactory(
4253 kind
, filename
, batch_size
, has_header
)
4255 app
= BulkUploaderApp(arg_dict
,
4256 workitem_generator_factory
,
4259 ProgressTrackerThread
,
4266 return_code
= app
.Run()
4267 except AuthenticationError
:
4268 logger
.error(AUTH_FAILED_MESSAGE
)
4271 elif download
or dump
or create_config
:
4272 exporter
= Exporter
.RegisteredExporter(kind
)
4273 result_db
= ResultDatabase(result_db_filename
, signature
, exporter
=exporter
)
4275 exporter
.initialize(filename
, exporter_opts
)
4277 def KeyRangeGeneratorFactory(request_manager
, progress_queue
,
4278 progress_gen
, kinds
):
4279 logger
.info('Downloading kinds: %s', kinds
)
4280 return KeyRangeItemGenerator(request_manager
, kinds
, progress_queue
,
4281 progress_gen
, DownloadItem
)
4283 def ExportProgressThreadFactory(progress_queue
, progress_db
):
4284 return ExportProgressThread(exporter
,
4289 app
= BulkDownloaderApp(arg_dict
,
4290 KeyRangeGeneratorFactory
,
4293 ExportProgressThreadFactory
,
4300 return_code
= app
.Run()
4301 except AuthenticationError
:
4302 logger
.error(AUTH_FAILED_MESSAGE
)
4303 except KindStatError
:
4304 logger
.error('Unable to download kind stats for all-kinds download.')
4305 logger
.error('Kind stats are generated periodically by the appserver')
4306 logger
.error('Kind stats are not available on dev_appserver.')
4310 mapper
= Mapper
.RegisteredMapper(kind
)
4312 mapper
.initialize(mapper_opts
)
4314 def KeyRangeGeneratorFactory(request_manager
, progress_queue
,
4315 progress_gen
, kinds
):
4316 return KeyRangeItemGenerator(request_manager
, kinds
, progress_queue
,
4317 progress_gen
, MapperItem
)
4319 def MapperProgressThreadFactory(progress_queue
, progress_db
):
4320 return MapperProgressThread(mapper
,
4324 app
= BulkMapperApp(arg_dict
,
4325 KeyRangeGeneratorFactory
,
4328 MapperProgressThreadFactory
,
4335 return_code
= app
.Run()
4336 except AuthenticationError
:
4337 logger
.error(AUTH_FAILED_MESSAGE
)
4343 def SetupLogging(arg_dict
):
4344 """Sets up logging for the bulkloader.
4347 arg_dict: Dictionary mapping flag names to their arguments.
4349 format
= '[%(levelname)-8s %(asctime)s %(filename)s] %(message)s'
4350 debug
= arg_dict
['debug']
4351 log_file
= arg_dict
['log_file']
4353 logger
.setLevel(logging
.DEBUG
)
4356 logger
.propagate
= False
4359 file_handler
= logging
.FileHandler(log_file
, 'w')
4360 file_handler
.setLevel(logging
.DEBUG
)
4361 file_formatter
= logging
.Formatter(format
)
4362 file_handler
.setFormatter(file_formatter
)
4363 logger
.addHandler(file_handler
)
4366 console
= logging
.StreamHandler()
4367 level
= logging
.INFO
4369 level
= logging
.DEBUG
4370 console
.setLevel(level
)
4371 console_format
= '[%(levelname)-8s] %(message)s'
4372 formatter
= logging
.Formatter(console_format
)
4373 console
.setFormatter(formatter
)
4374 logger
.addHandler(console
)
4376 logger
.info('Logging to %s', log_file
)
4378 remote_api_throttle
.logger
.setLevel(level
)
4379 remote_api_throttle
.logger
.addHandler(file_handler
)
4380 remote_api_throttle
.logger
.addHandler(console
)
4384 appengine_rpc
.logger
.setLevel(logging
.WARN
)
4386 adaptive_thread_pool
.logger
.setLevel(logging
.DEBUG
)
4387 adaptive_thread_pool
.logger
.addHandler(console
)
4388 adaptive_thread_pool
.logger
.addHandler(file_handler
)
4389 adaptive_thread_pool
.logger
.propagate
= False
4393 """Sets up and runs the bulkloader, given the options as keyword arguments.
4396 arg_dict: Dictionary of bulkloader options
4401 arg_dict
= ProcessArguments(arg_dict
)
4403 SetupLogging(arg_dict
)
4405 return _PerformBulkload(arg_dict
)
4409 """Runs the importer from the command line."""
4412 arg_dict
= ParseArguments(argv
)
4415 errors
= ['%s argument required' % key
4416 for (key
, value
) in arg_dict
.iteritems()
4417 if value
is REQUIRED_OPTION
]
4419 print >> sys
.stderr
, '\n'.join(errors
)
4422 SetupLogging(arg_dict
)
4423 return _PerformBulkload(arg_dict
)
4426 if __name__
== '__main__':
4427 sys
.exit(main(sys
.argv
))