Exposed peer_info via tord. Implemented redering of peer_info from tord via torc.
[tore.git] / tord / tord.py
blob88c7f92cfa07aca52fce1629f7c6b60ef67a7af4
1 import libtorrent as lt
2 from os import listdir, path
3 from pyro import PyroServer
4 from common import *
5 from shutil import move
6 from time import sleep
8 DEFAULT_PREFS = {
9 "dht": True,
10 "lsd": True,
11 "upnp": True,
12 "utpex": True,
13 "natpmp": False,
14 'session': (8, 5, True),
15 "encryption": (1,1,2,True),
16 "max_upload_speed": 60,
17 "max_download_speed": -1,
18 "max_connections_global": -1,
19 "max_upload_slots_global": -1,
20 "listen_ports": (10000, 10100),
21 "max_connections_per_torrent": -1,
22 "max_upload_slots_per_torrent": -1,
23 "path_watched": get_default_dir("downloads/watched"),
24 "path_current": get_default_dir("downloads/current/"),
25 "path_resume": get_default_dir("downloads/resume/"),
26 "path_finished": get_default_dir("downloads/finished"),
27 "path_incomplete": get_default_dir("downloads/incomplete"),
30 class Watched:
31 @test
32 def __init__(self, dir, func):
33 try:
34 from pyinotify import WatchManager, ThreadedNotifier, EventsCodes, ProcessEvent
35 class Action(ProcessEvent):
36 def __init__(self, func):
37 self.func = func
39 def process_IN_CLOSE_WRITE(self, event):
40 self.func(event.path, event.name)
41 except: return
42 self.wm = WatchManager()
43 mask = EventsCodes.IN_CLOSE_WRITE | EventsCodes.IN_DELETE
44 self.notifier = ThreadedNotifier(self.wm, Action(func))
45 self.notifier.start()
46 self.wdd = self.wm.add_watch(dir, mask, rec=True)
48 def watched_shutdown(self):
49 try: self.notifier.stop()
50 except: pass
52 class Alerts:
53 def __init__(self, general_cb):
54 log.debug("Alerts initialized..")
55 self.alert_handlers = {}
56 self.general_cb = general_cb
58 @test
59 def alert_register(self, alert_type, handler):
60 log.debug("Registering handler for alert %s", alert_type)
61 if alert_type not in self.alert_handlers.keys():
62 self.alert_handlers[alert_type] = []
63 self.alert_handlers[alert_type].append(handler)
65 @test
66 def alert_deregister(self, alert_type, handler):
67 log.debug("Deregistering handler for alert %s", alert_type)
68 self.alert_handlers[alert_type].remove(handler)
70 def alert_handle(self, alert_type, alert):
71 log.info("%s: %s" % (alert_type, alert))
72 if alert_type in self.alert_handlers.keys():
73 for handler in self.alert_handlers[alert_type]:
74 handler(alert_type, alert)
75 return
76 self.general_cb(alert_type, alert)
78 class Torrent:
79 def __init__(self, config, torrent_dir, defaults):
80 self.config = config
81 self.resume = None
82 self.files_id, self.files_path, self.files_size, self.files_offset = [], [], [], []
84 p = path.join(torrent_dir, defaults['tfile'])
85 handle = open(p,'rb')
86 data = lt.bdecode(handle.read())
88 self.info = lt.torrent_info(data)
89 self.hash = str(self.info.info_hash())
90 self.name = self.info.name()
92 defaults['priorities'] = defaults['priorities'] * self.info.num_files()
93 self.fastresume_read(defaults)
94 self['tpath'] = torrent_dir
96 self.generate_files()
98 def __getitem__(self, key):
99 try: return self.resume[key]
100 except: return None
102 def __setitem__(self, key, value):
103 self.resume[key] = value
105 @test
106 def fastresume_read(self, default):
107 log.info("Reading resume: %s", self.hash)
108 try:
109 resume_path = path.join(self.config['path_resume'], self.hash)
110 resume_handle = open(resume_path, "rb")
111 self.resume = lt.bdecode(resume_handle.read())
112 except:
113 log.error("Reading resume: %s", self.hash)
114 self.resume = default
116 @test
117 def fastresume_write(self):
118 log.info("Writing resume: %s", self.hash)
120 if not (self.handle.is_valid() and self.handle.has_metadata()):
121 log.error("Writing resume: %s", self.hash)
123 try:
124 resume = self.handle.write_resume_data()
125 resume['paused'] = self.handle.status().paused
126 resume['managed'] = self.handle.is_auto_managed()
127 resume['spath'] = self.handle.save_path()
128 resume['tpath'] = self['tpath']
129 resume['tfile'] = self['tfile']
130 resume['priorities'] = self['priorities']
132 resume_data = lt.bencode(resume)
133 resume_path = path.join(self.config['path_resume'], self.hash)
134 open(resume_path, "wb").write(resume_data)
135 except:
136 log.error("Writing resume: %s", self.hash)
138 @test
139 def fix_download(self):
140 s = self.handle.status()
141 if s.progress == 1 and self['total_downloaded'] < s.total_wanted:
142 self['total_downloaded'] = s.total_wanted
144 @test
145 def get_status(self, id_field, fields):
146 s = self.handle.status()
148 def ratio(up, down):
149 if down == 0: return 0
150 return float(up) / down
152 def eta(size, done, speed):
153 if (size - done) == 0: return 0
154 if speed == 0: return 0
155 return (size - done) / speed
157 def state():
158 p = self.handle.queue_position()
159 m = self.handle.is_auto_managed()
160 return p, m, int(s.state), s.paused
162 peers = []
163 def peers_stat(s, ps):
164 if not ps: ps = self.handle.get_peer_info()
165 return [getattr(p,s) for p in ps]
167 values = {
168 'eta': eta(s.total_wanted, s.total_done, s.download_payload_rate),
169 'ratio': ratio(s.all_time_upload, s.all_time_download),
170 'name': self.name,
171 'torrent_hash': self.hash,
172 'files_path': self.files_path,
173 'files_size': self.files_size,
174 'files_id': self.files_id,
175 'files_priority': self['priorities'],
178 funcs = {
179 'files_progress': self.handle.file_progress,
180 'peers_flags': lambda: peers_stat('flags', peers),
181 'peers_source_flags': lambda: peers_stat('source', peers),
182 'peers_read_state': lambda: peers_stat('read_state', peers),
183 'peers_write_state': lambda: peers_stat('write_state', peers),
184 'peers_upload_queue_length': lambda: peers_stat('upload_queue_length', peers),
185 'peers_download_queue_length': lambda: peers_stat('download_queue_length', peers),
186 'peers_up_speed': lambda: peers_stat('up_speed', peers),
187 'peers_down_speed': lambda: peers_stat('down_speed', peers),
188 'peers_payload_up_speed': lambda: peers_stat('payload_up_speed', peers),
189 'peers_payload_down_speed': lambda: peers_stat('payload_down_speed', peers),
190 'peers_ip': lambda: peers_stat('ip', peers),
191 'state': state
194 data = [get_stat(s, f, values, funcs) for f in fields]
195 id = get_stat(s, id_field, values, funcs)
196 return (id, data)
198 @test
199 def set_state(self, attr, msg):
200 try: getattr(self.handle, attr)(*msg)
201 except AttributeError: getattr(self, attr)(*msg)
202 except: log.error("Not a valid state change method " + attr)
204 @test
205 def toggle_pause(self):
206 if self.handle.status().paused: self.handle.resume()
207 else: self.handle.pause()
209 @test
210 def toggle_managed(self, manage=None):
211 if manage == None:
212 if self.handle.is_auto_managed(): manage = False
213 else: manage = True
214 self.handle.auto_managed(manage)
216 def prioritize(self, file=None, delta=0):
217 if file is not None:
218 self['priorities'][file] = (self['priorities'][file] + delta) % 10
219 self.handle.prioritize_files(self['priorities'])
221 def move(self, sp=None, tp=None):
222 if sp and sp != self.handle.save_path():
223 self.handle.move_storage(sp)
225 if tp and tp != self['tpath']:
226 move(path.join(self['tpath'], self['tfile']), path.join(tp, self['tfile']))
227 self['tpath'] = tp
229 self.fastresume_write()
231 def recheck(self):
232 self.handle.force_recheck()
233 self.fastresume_write()
235 def generate_files(self):
236 for id, f in enumerate(self.info.files()):
237 self.files_id += [(self.hash, id)]
238 self.files_path += [f.path]
239 self.files_size += [f.size]
240 self.files_offset += [f.offset]
242 def checked_cb(self):
243 if self.handle.status().progress == 1: self.finished_cb()
244 else: self.move(self.config["path_current"], self.config['path_watched'])
246 def finished_cb(self):
247 self.move(self.config["path_finished"])
249 class Core(Alerts, Watched):
250 def __init__(self):
251 self.torrents = {}
252 self.config = Config("tord.conf", DEFAULT_PREFS)
253 self.config["path_removed"] = "/mnt/data/downloads/removed"
255 self.session = lt.session()
256 self.session.set_severity_level(lt.alert.severity_levels.info)
258 def speed(x):
259 if x > 0: return x*1000
260 return x
262 self.config["lsd"] = True
263 self.config.register_set_function("utpex", lambda k,v: self.session.add_extension(lt.create_ut_pex_plugin))
264 self.config.register_set_function("listen_ports", lambda k,v: self.session.listen_on(*v))
265 self.config.register_set_function("max_upload_speed", lambda k,v: self.session.set_upload_rate_limit(speed(v)))
266 self.config.register_set_function("max_download_speed", lambda k,v: self.session.set_download_rate_limit(speed(v)))
267 self.config.register_set_function("max_connections_global", lambda k,v: self.session.set_max_connections(v))
268 self.config.register_set_function("max_upload_slots_global", lambda k,v: self.session.set_max_uploads(v))
269 self.config.register_set_function("dht", toggle(self.session.start_dht, self.session.stop_dht, (None,)))
270 self.config.register_set_function("lsd", toggle(self.session.start_lsd, self.session.stop_lsd))
271 self.config.register_set_function("upnp", toggle(self.session.start_upnp, self.session.stop_upnp))
272 self.config.register_set_function("natpmp", toggle(self.session.start_natpmp, self.session.stop_natpmp))
273 self.config.register_set_function("encryption", self.set_encryption)
274 self.config.register_set_function("session", self.set_session)
276 Alerts.__init__(self, self.alert_cb())
277 Watched.__init__(self, self.config['path_watched'], self.add)
278 self.alert_register("torrent_paused_alert", self.alert_cb(lambda x: "update"))
279 self.alert_register("torrent_resumed_alert", self.alert_cb(lambda x: "update"))
280 self.alert_register("torrent_finished_alert", lambda t,a: self.set_torrents_state(a[0], 'finished_cb', publish='update'))
281 self.alert_register("torrent_checked_alert", lambda t,a: self.set_torrents_state(a[0], 'checked_cb', publish='update'))
283 def go(self):
284 log.info("Strarting alert checking loop")
285 self.add(self.config['path_watched'])
286 while self.looping:
287 self.check_alerts()
288 sleep(0.1)
290 def shutdown(self):
291 log.info("Shutting down")
292 self.watched_shutdown()
293 for t in self.torrents.values(): t.fastresume_write()
295 def check_alerts(self):
296 alert = self.session.pop_alert()
297 while alert:
298 alert_type = str(type(alert)).split("'")[1].split(".")[1]
299 try: id = str(alert.handle.info_hash())
300 except: id = None
301 if id: self.alert_handle(alert_type, ([id], alert.msg()))
302 else: self.alert_handle(alert_type, (str(alert), alert.msg()))
303 alert = self.session.pop_alert()
305 @test
306 def add(self, torrent_dir, torrent_file = None):
307 def add(torrent_dir, torrent_file):
308 log.info("Adding torrent: %s", torrent_file)
310 default = {
311 'spath': self.config["path_current"],
312 'tfile': torrent_file,
313 'paused': False,
314 'managed': True,
315 'priorities': [1],
318 torrent = Torrent(self.config, torrent_dir, default)
319 #resume = torrent.resume
320 #if len(torrent.resume) == len(default): resume = None
321 resume = ""
322 try:
323 resume = open(path.join(self.config['path_resume'], torrent.hash)).read()
324 except:
325 log.error("reading resume")
327 param = {
328 'ti': torrent.info,
329 'duplicate_is_error': True,
330 'resume_data': resume,
331 'save_path': torrent['spath'],
332 'paused': torrent['paused'],
333 'auto_managed': torrent['managed'],
334 'storage_mode': lt.storage_mode_t(1)
336 try:
337 torrent.handle = self.session.add_torrent(param)
338 self.torrents[torrent.hash] = torrent
340 torrent.handle.set_max_connections(60)
341 torrent.prioritize()
342 torrent.fix_download()
343 except:
344 log.error("Adding torrent: %s", torrent_file)
346 ''' add a directory '''
347 if not torrent_file:
348 log.info("adding torrents in directory: %s", torrent_dir)
349 for f in listdir(torrent_dir):
350 if f[-8:] == '.torrent': add(torrent_dir, f)
351 return
353 ''' add a single file '''
354 add(torrent_dir, torrent_file)
356 def remove(self, torrent_hash, force = False):
357 torrent = self.torrents[torrent_hash]
358 log.info("Removing torrent: %s", torrent.hash)
360 if torrent.handle.status().progress != 1 and not force:
361 log.error("Removing torrent: %s", torrent.hash)
362 return
364 if torrent.handle.status().progress != 1: dir = self.config['path_incomplete']
365 else: dir = self.config['path_removed']
367 torrent.move(dir, dir)
369 self.session.remove_torrent(torrent.handle, 0)
370 self.publish(('remove', ([torrent_hash], 'remove')))
371 del self.torrents[torrent.hash]
373 @test
374 def get_torrents_status(self, torrents, id_field, fields):
375 return [self.torrents[t].get_status(id_field, fields) for t in torrents or self.torrents.keys()]
377 def get_session_status(self, fields):
378 s = self.session.status()
379 values = {
380 "download_limit": int(self.session.download_rate_limit()),
381 "upload_limit": int(self.session.upload_rate_limit()),
382 "dht": (self.config["dht"] and s.dht_nodes) or -1
384 return [get_stat(s, f, values) for f in fields]
386 @test
387 def set_torrents_state(self, torrents, attr, msg=(), publish=False, all=False):
388 torrents = torrents or self.torrents.keys()
389 for t in torrents: self.torrents[t].set_state(attr, msg)
390 if publish and all: torrents = None
391 self.publish((publish, (torrents, attr)))
393 def set_encryption(self, key, value):
394 o, i, l, r = value
395 e = lt.pe_settings()
396 e.out_enc_policy = lt.enc_policy(o)
397 e.in_enc_policy = lt.enc_policy(i)
398 e.allowed_enc_level = lt.enc_level(l)
399 e.prefer_rc4 = r
400 self.session.set_pe_settings(e)
402 def set_session(self, key, value):
403 s = lt.session_settings()
404 s.active_downloads, s.active_seeds, s.lazy_bitfields = value
405 s.connection_speed = 1
406 self.session.set_settings(s)
408 def alert_cb(self, subject_fn=lambda x:x):
409 return lambda t,a: self.publish((subject_fn(t), a))
411 def main():
412 PyroServer(Core, "tord")