remove gevent monkey-patch from feed-downloader
[mygpo.git] / mygpo / maintenance / management / changescmd.py
blobed7dc7438f66b21d05a7a4b7b4ac564c5127cf9c
1 from datetime import datetime
2 from optparse import make_option
3 from abc import abstractmethod
4 from collections import Counter
6 from django.core.management.base import BaseCommand
8 from couchdbkit.exceptions import ResourceNotFound
9 from couchdbkit import Consumer
11 from mygpo.utils import progress
12 from mygpo.db.couchdb import get_main_database
13 from mygpo.maintenance.models import CommandStatus, CommandRunStatus
16 class ChangesCommand(BaseCommand):
17 """ base class for commands that operate on the CouchDB _changes feed """
19 option_list = BaseCommand.option_list + (
20 make_option('--since', action='store', type=int, dest='since',
21 default=0, help="Where to start the operation"),
23 make_option('--continue', action='store_true', dest='continue',
24 default=False, help="Continue from last sequence number"),
26 make_option('--silent', action='store_true', dest='silent',
27 default=False, help="Don't display any progress output"),
31 def __init__(self, status_id, command_name):
32 self.status_id = status_id
33 self.command_name = command_name
34 super(ChangesCommand, self).__init__()
37 def handle(self, *args, **options):
39 self.db = self.get_db()
41 status = self.get_cmd_status()
42 since = self.get_since(status, options)
43 self.actions = Counter()
46 # create unfinished command run status
47 run_status = CommandRunStatus()
48 run_status.timestamp_started = datetime.utcnow()
49 run_status.start_seq = since
50 # add it to existing one (if any)
51 status.runs.append(run_status)
52 status.save()
54 if options['silent']:
55 # "disable" print_status
56 self.print_status = lambda *args, **kwargs: None
58 try:
59 self.process(self.db, since)
61 finally:
62 # finish command run status
63 total = self.db.info()['update_seq']
64 run_status.timestamp_finished = datetime.utcnow()
65 run_status.end_seq = total
66 run_status.status_counter = dict(self.actions)
67 # and overwrite existing one (we could keep a longer log here)
68 status.runs = [run_status]
69 status.save()
72 def callback(self, line):
73 seq = line['seq']
74 doc = line['doc']
76 self.handle_obj(seq, doc, self.actions)
77 self.print_status(seq, self.actions)
80 def print_status(self, seq, actions):
81 counter = getattr(self, 'counter', 0)
82 if counter % 1000 == 0:
83 self.total = self.db.info()['update_seq']
84 self.counter = counter + 1
86 status_str = ', '.join('%s: %d' % x for x in self.actions.items())
87 progress(seq, self.total, status_str)
90 @abstractmethod
91 def handle_obj(seq, obj, actions):
92 raise NotImplemented
95 @abstractmethod
96 def process(self, db, since):
97 consumer = Consumer(db)
98 params = self.get_query_params()
99 consumer.wait(self.callback, since=since, heartbeat=10000, **params)
102 def get_cmd_status(self):
103 try:
104 status = CommandStatus.get(self.status_id)
105 except ResourceNotFound:
106 status = CommandStatus()
107 status.command = self.command_name
108 status._id = self.status_id
110 return status
113 def get_query_params(self):
114 return dict(include_docs=True)
117 @staticmethod
118 def get_since(status, options):
119 if options['continue']:
120 return status.last_seq
121 else:
122 return options['since']
125 @abstractmethod
126 def get_db(self):
127 return get_main_database()