1 from datetime
import datetime
2 from optparse
import make_option
3 from abc
import abstractmethod
4 from collections
import Counter
6 from django
.core
.management
.base
import BaseCommand
8 from couchdbkit
.exceptions
import ResourceNotFound
9 from couchdbkit
import Consumer
11 from mygpo
.utils
import progress
12 from mygpo
.db
.couchdb
import get_main_database
13 from mygpo
.maintenance
.models
import CommandStatus
, CommandRunStatus
16 class ChangesCommand(BaseCommand
):
17 """ base class for commands that operate on the CouchDB _changes feed """
19 option_list
= BaseCommand
.option_list
+ (
20 make_option('--since', action
='store', type=int, dest
='since',
21 default
=0, help="Where to start the operation"),
23 make_option('--continue', action
='store_true', dest
='continue',
24 default
=False, help="Continue from last sequence number"),
26 make_option('--silent', action
='store_true', dest
='silent',
27 default
=False, help="Don't display any progress output"),
31 def __init__(self
, status_id
, command_name
):
32 self
.status_id
= status_id
33 self
.command_name
= command_name
34 super(ChangesCommand
, self
).__init
__()
37 def handle(self
, *args
, **options
):
39 self
.db
= self
.get_db()
41 status
= self
.get_cmd_status()
42 since
= self
.get_since(status
, options
)
43 self
.actions
= Counter()
46 # create unfinished command run status
47 run_status
= CommandRunStatus()
48 run_status
.timestamp_started
= datetime
.utcnow()
49 run_status
.start_seq
= since
50 # add it to existing one (if any)
51 status
.runs
.append(run_status
)
55 # "disable" print_status
56 self
.print_status
= lambda *args
, **kwargs
: None
59 self
.process(self
.db
, since
)
62 # finish command run status
63 total
= self
.db
.info()['update_seq']
64 run_status
.timestamp_finished
= datetime
.utcnow()
65 run_status
.end_seq
= total
66 run_status
.status_counter
= dict(self
.actions
)
67 # and overwrite existing one (we could keep a longer log here)
68 status
.runs
= [run_status
]
72 def callback(self
, line
):
76 self
.handle_obj(seq
, doc
, self
.actions
)
77 self
.print_status(seq
, self
.actions
)
80 def print_status(self
, seq
, actions
):
81 counter
= getattr(self
, 'counter', 0)
82 if counter
% 1000 == 0:
83 self
.total
= self
.db
.info()['update_seq']
84 self
.counter
= counter
+ 1
86 status_str
= ', '.join('%s: %d' % x
for x
in self
.actions
.items())
87 progress(seq
, self
.total
, status_str
)
91 def handle_obj(seq
, obj
, actions
):
96 def process(self
, db
, since
):
97 consumer
= Consumer(db
)
98 params
= self
.get_query_params()
99 consumer
.wait(self
.callback
, since
=since
, heartbeat
=10000, **params
)
102 def get_cmd_status(self
):
104 status
= CommandStatus
.get(self
.status_id
)
105 except ResourceNotFound
:
106 status
= CommandStatus()
107 status
.command
= self
.command_name
108 status
._id
= self
.status_id
113 def get_query_params(self
):
114 return dict(include_docs
=True)
118 def get_since(status
, options
):
119 if options
['continue']:
120 return status
.last_seq
122 return options
['since']
127 return get_main_database()