1 from datetime
import datetime
2 from optparse
import make_option
3 from abc
import abstractmethod
5 from django
.core
.management
.base
import BaseCommand
7 from couchdbkit
.exceptions
import ResourceNotFound
8 from couchdbkit
import Consumer
10 from mygpo
.utils
import progress
11 from mygpo
.maintenance
.models
import CommandStatus
, CommandRunStatus
14 from collections
import Counter
16 from mygpo
.counter
import Counter
20 class ChangesCommand(BaseCommand
):
21 """ base class for commands that operate on the CouchDB _changes feed """
23 option_list
= BaseCommand
.option_list
+ (
24 make_option('--since', action
='store', type=int, dest
='since',
25 default
=0, help="Where to start the operation"),
27 make_option('--continue', action
='store_true', dest
='continue',
28 default
=False, help="Continue from last sequence number"),
30 make_option('--silent', action
='store_true', dest
='silent',
31 default
=False, help="Don't display any progress output"),
35 def __init__(self
, status_id
, command_name
):
36 self
.status_id
= status_id
37 self
.command_name
= command_name
40 def handle(self
, *args
, **options
):
42 self
.db
= self
.get_db()
44 status
= self
.get_cmd_status()
45 since
= self
.get_since(status
, options
)
46 self
.actions
= Counter()
49 # create unfinished command run status
50 run_status
= CommandRunStatus()
51 run_status
.timestamp_started
= datetime
.utcnow()
52 run_status
.start_seq
= since
53 # add it to existing one (if any)
54 status
.runs
.append(run_status
)
58 # "disable" print_status
59 self
.print_status
= lambda *args
, **kwargs
: None
62 self
.process(self
.db
, since
)
69 # finish command run status
70 total
= self
.db
.info()['update_seq']
71 run_status
.timestamp_finished
= datetime
.utcnow()
72 run_status
.end_seq
= total
73 run_status
.status_counter
= dict(self
.actions
)
74 # and overwrite existing one (we could keep a longer log here)
75 status
.runs
= [run_status
]
79 def callback(self
, line
):
83 self
.handle_obj(seq
, doc
, self
.actions
)
84 self
.print_status(seq
, self
.actions
)
87 def print_status(self
, seq
, actions
):
88 counter
= getattr(self
, 'counter', 0)
89 if counter
% 1000 == 0:
90 self
.total
= self
.db
.info()['update_seq']
91 self
.counter
= counter
+ 1
93 status_str
= ', '.join('%s: %d' % x
for x
in self
.actions
.items())
94 progress(seq
, self
.total
, status_str
)
98 def handle_obj(seq
, obj
, actions
):
103 def process(self
, db
, since
):
104 consumer
= Consumer(db
)
105 params
= self
.get_query_params()
106 consumer
.wait(self
.callback
, since
=since
, heartbeat
=10000, **params
)
109 def get_cmd_status(self
):
111 status
= CommandStatus
.get(self
.status_id
)
112 except ResourceNotFound
:
113 status
= CommandStatus()
114 status
.command
= self
.command_name
115 status
._id
= self
.status_id
120 def get_query_params(self
):
121 return dict(include_docs
=True)
125 def get_since(status
, options
):
126 if options
['continue']:
127 return status
.last_seq
129 return options
['since']