create "touch-couchdb-views" mgmt cmd
[mygpo.git] / mygpo / maintenance / management / changescmd.py
blobd591ba81f148582fe6b400c0cfc946ba5ad89b23
1 from datetime import datetime
2 from optparse import make_option
3 from abc import abstractmethod
5 from django.core.management.base import BaseCommand
7 from couchdbkit.exceptions import ResourceNotFound
8 from couchdbkit import Consumer
10 from mygpo.utils import progress
11 from mygpo.maintenance.models import CommandStatus, CommandRunStatus
13 try:
14 from collections import Counter
15 except ImportError:
16 from mygpo.counter import Counter
20 class ChangesCommand(BaseCommand):
21 """ base class for commands that operate on the CouchDB _changes feed """
23 option_list = BaseCommand.option_list + (
24 make_option('--since', action='store', type=int, dest='since',
25 default=0, help="Where to start the operation"),
27 make_option('--continue', action='store_true', dest='continue',
28 default=False, help="Continue from last sequence number"),
30 make_option('--silent', action='store_true', dest='silent',
31 default=False, help="Don't display any progress output"),
35 def __init__(self, status_id, command_name):
36 self.status_id = status_id
37 self.command_name = command_name
40 def handle(self, *args, **options):
42 self.db = self.get_db()
44 status = self.get_cmd_status()
45 since = self.get_since(status, options)
46 self.actions = Counter()
49 # create unfinished command run status
50 run_status = CommandRunStatus()
51 run_status.timestamp_started = datetime.utcnow()
52 run_status.start_seq = since
53 # add it to existing one (if any)
54 status.runs.append(run_status)
55 status.save()
57 if options['silent']:
58 # "disable" print_status
59 self.print_status = lambda *args, **kwargs: None
61 try:
62 self.process(self.db, since)
64 except:
65 import traceback
66 traceback.print_exc()
68 finally:
69 # finish command run status
70 total = self.db.info()['update_seq']
71 run_status.timestamp_finished = datetime.utcnow()
72 run_status.end_seq = total
73 run_status.status_counter = dict(self.actions)
74 # and overwrite existing one (we could keep a longer log here)
75 status.runs = [run_status]
76 status.save()
79 def callback(self, line):
80 seq = line['seq']
81 doc = line['doc']
83 self.handle_obj(seq, doc, self.actions)
84 self.print_status(seq, self.actions)
87 def print_status(self, seq, actions):
88 counter = getattr(self, 'counter', 0)
89 if counter % 1000 == 0:
90 self.total = self.db.info()['update_seq']
91 self.counter = counter + 1
93 status_str = ', '.join('%s: %d' % x for x in self.actions.items())
94 progress(seq, self.total, status_str)
97 @abstractmethod
98 def handle_obj(seq, obj, actions):
99 raise NotImplemented
102 @abstractmethod
103 def process(self, db, since):
104 consumer = Consumer(db)
105 params = self.get_query_params()
106 consumer.wait(self.callback, since=since, heartbeat=10000, **params)
109 def get_cmd_status(self):
110 try:
111 status = CommandStatus.get(self.status_id)
112 except ResourceNotFound:
113 status = CommandStatus()
114 status.command = self.command_name
115 status._id = self.status_id
117 return status
120 def get_query_params(self):
121 return dict(include_docs=True)
124 @staticmethod
125 def get_since(status, options):
126 if options['continue']:
127 return status.last_seq
128 else:
129 return options['since']
132 @abstractmethod
133 def get_db(self):
134 raise NotImplemented