1 import libtorrent
as lt
2 from os
import listdir
, path
3 from pyro
import PyroServer
5 from shutil
import move
14 'session': (8, 5, True),
15 "encryption": (1,1,2,True),
16 "max_upload_speed": 60,
17 "max_download_speed": -1,
18 "max_connections_global": -1,
19 "max_upload_slots_global": -1,
20 "listen_ports": (10000, 10100),
21 "max_connections_per_torrent": -1,
22 "max_upload_slots_per_torrent": -1,
23 "path_watched": get_default_dir("downloads/watched"),
24 "path_current": get_default_dir("downloads/current/"),
25 "path_resume": get_default_dir("downloads/resume/"),
26 "path_finished": get_default_dir("downloads/finished"),
27 "path_incomplete": get_default_dir("downloads/incomplete"),
32 def __init__(self
, dir, func
):
34 from pyinotify
import WatchManager
, ThreadedNotifier
, EventsCodes
, ProcessEvent
35 class Action(ProcessEvent
):
36 def __init__(self
, func
):
39 def process_IN_CLOSE_WRITE(self
, event
):
40 self
.func(event
.path
, event
.name
)
42 self
.wm
= WatchManager()
43 mask
= EventsCodes
.IN_CLOSE_WRITE | EventsCodes
.IN_DELETE
44 self
.notifier
= ThreadedNotifier(self
.wm
, Action(func
))
46 self
.wdd
= self
.wm
.add_watch(dir, mask
, rec
=True)
48 def watched_shutdown(self
):
49 try: self
.notifier
.stop()
53 def __init__(self
, general_cb
):
54 log
.debug("Alerts initialized..")
55 self
.alert_handlers
= {}
56 self
.general_cb
= general_cb
59 def alert_register(self
, alert_type
, handler
):
60 log
.debug("Registering handler for alert %s", alert_type
)
61 if alert_type
not in self
.alert_handlers
.keys():
62 self
.alert_handlers
[alert_type
] = []
63 self
.alert_handlers
[alert_type
].append(handler
)
66 def alert_deregister(self
, alert_type
, handler
):
67 log
.debug("Deregistering handler for alert %s", alert_type
)
68 self
.alert_handlers
[alert_type
].remove(handler
)
70 def alert_handle(self
, alert_type
, alert
):
71 log
.info("%s: %s" % (alert_type
, alert
))
72 if alert_type
in self
.alert_handlers
.keys():
73 for handler
in self
.alert_handlers
[alert_type
]:
74 handler(alert_type
, alert
)
76 self
.general_cb(alert_type
, alert
)
79 def __init__(self
, config
, torrent_dir
, defaults
):
82 self
.files_id
, self
.files_path
, self
.files_size
, self
.files_offset
= [], [], [], []
84 p
= path
.join(torrent_dir
, defaults
['tfile'])
86 data
= lt
.bdecode(handle
.read())
88 self
.info
= lt
.torrent_info(data
)
89 self
.hash = str(self
.info
.info_hash())
90 self
.name
= self
.info
.name()
92 defaults
['priorities'] = defaults
['priorities'] * self
.info
.num_files()
93 self
.fastresume_read(defaults
)
94 self
['tpath'] = torrent_dir
98 def __getitem__(self
, key
):
99 try: return self
.resume
[key
]
102 def __setitem__(self
, key
, value
):
103 self
.resume
[key
] = value
106 def fastresume_read(self
, default
):
107 log
.info("Reading resume: %s", self
.hash)
109 resume_path
= path
.join(self
.config
['path_resume'], self
.hash)
110 resume_handle
= open(resume_path
, "rb")
111 self
.resume
= lt
.bdecode(resume_handle
.read())
113 log
.error("Reading resume: %s", self
.hash)
114 self
.resume
= default
117 def fastresume_write(self
):
118 log
.info("Writing resume: %s", self
.hash)
120 if not (self
.handle
.is_valid() and self
.handle
.has_metadata()):
121 log
.error("Writing resume: %s", self
.hash)
124 resume
= self
.handle
.write_resume_data()
125 resume
['paused'] = self
.handle
.status().paused
126 resume
['managed'] = self
.handle
.is_auto_managed()
127 resume
['spath'] = self
.handle
.save_path()
128 resume
['tpath'] = self
['tpath']
129 resume
['tfile'] = self
['tfile']
130 resume
['priorities'] = self
['priorities']
132 resume_data
= lt
.bencode(resume
)
133 resume_path
= path
.join(self
.config
['path_resume'], self
.hash)
134 open(resume_path
, "wb").write(resume_data
)
136 log
.error("Writing resume: %s", self
.hash)
139 def fix_download(self
):
140 s
= self
.handle
.status()
141 if s
.progress
== 1 and self
['total_downloaded'] < s
.total_wanted
:
142 self
['total_downloaded'] = s
.total_wanted
145 def get_status(self
, id_field
, fields
):
146 s
= self
.handle
.status()
149 if down
== 0: return 0
150 return float(up
) / down
152 def eta(size
, done
, speed
):
153 if (size
- done
) == 0: return 0
154 if speed
== 0: return 0
155 return (size
- done
) / speed
158 p
= self
.handle
.queue_position()
159 m
= self
.handle
.is_auto_managed()
160 return p
, m
, int(s
.state
), s
.paused
163 def peers_stat(s
, ps
):
164 if not ps
: ps
= self
.handle
.get_peer_info()
165 return [getattr(p
,s
) for p
in ps
]
168 'eta': eta(s
.total_wanted
, s
.total_done
, s
.download_payload_rate
),
169 'ratio': ratio(s
.all_time_upload
, s
.all_time_download
),
171 'torrent_hash': self
.hash,
172 'files_path': self
.files_path
,
173 'files_size': self
.files_size
,
174 'files_id': self
.files_id
,
175 'files_priority': self
['priorities'],
179 'files_progress': self
.handle
.file_progress
,
180 'peers_flags': lambda: peers_stat('flags', peers
),
181 'peers_source_flags': lambda: peers_stat('source', peers
),
182 'peers_read_state': lambda: peers_stat('read_state', peers
),
183 'peers_write_state': lambda: peers_stat('write_state', peers
),
184 'peers_upload_queue_length': lambda: peers_stat('upload_queue_length', peers
),
185 'peers_download_queue_length': lambda: peers_stat('download_queue_length', peers
),
186 'peers_up_speed': lambda: peers_stat('up_speed', peers
),
187 'peers_down_speed': lambda: peers_stat('down_speed', peers
),
188 'peers_payload_up_speed': lambda: peers_stat('payload_up_speed', peers
),
189 'peers_payload_down_speed': lambda: peers_stat('payload_down_speed', peers
),
190 'peers_ip': lambda: peers_stat('ip', peers
),
194 data
= [get_stat(s
, f
, values
, funcs
) for f
in fields
]
195 id = get_stat(s
, id_field
, values
, funcs
)
199 def set_state(self
, attr
, msg
):
200 try: getattr(self
.handle
, attr
)(*msg
)
201 except AttributeError: getattr(self
, attr
)(*msg
)
202 except: log
.error("Not a valid state change method " + attr
)
205 def toggle_pause(self
):
206 if self
.handle
.status().paused
: self
.handle
.resume()
207 else: self
.handle
.pause()
210 def toggle_managed(self
, manage
=None):
212 if self
.handle
.is_auto_managed(): manage
= False
214 self
.handle
.auto_managed(manage
)
216 def prioritize(self
, file=None, delta
=0):
218 self
['priorities'][file] = (self
['priorities'][file] + delta
) % 10
219 self
.handle
.prioritize_files(self
['priorities'])
221 def move(self
, sp
=None, tp
=None):
222 if sp
and sp
!= self
.handle
.save_path():
223 self
.handle
.move_storage(sp
)
225 if tp
and tp
!= self
['tpath']:
226 move(path
.join(self
['tpath'], self
['tfile']), path
.join(tp
, self
['tfile']))
229 self
.fastresume_write()
232 self
.handle
.force_recheck()
233 self
.fastresume_write()
235 def generate_files(self
):
236 for id, f
in enumerate(self
.info
.files()):
237 self
.files_id
+= [(self
.hash, id)]
238 self
.files_path
+= [f
.path
]
239 self
.files_size
+= [f
.size
]
240 self
.files_offset
+= [f
.offset
]
242 def checked_cb(self
):
243 if self
.handle
.status().progress
== 1: self
.finished_cb()
244 else: self
.move(self
.config
["path_current"], self
.config
['path_watched'])
246 def finished_cb(self
):
247 self
.move(self
.config
["path_finished"])
249 class Core(Alerts
, Watched
):
252 self
.config
= Config("tord.conf", DEFAULT_PREFS
)
253 self
.config
["path_removed"] = "/mnt/data/downloads/removed"
255 self
.session
= lt
.session()
256 self
.session
.set_severity_level(lt
.alert
.severity_levels
.info
)
259 if x
> 0: return x
*1000
262 self
.config
["lsd"] = True
263 self
.config
.register_set_function("utpex", lambda k
,v
: self
.session
.add_extension(lt
.create_ut_pex_plugin
))
264 self
.config
.register_set_function("listen_ports", lambda k
,v
: self
.session
.listen_on(*v
))
265 self
.config
.register_set_function("max_upload_speed", lambda k
,v
: self
.session
.set_upload_rate_limit(speed(v
)))
266 self
.config
.register_set_function("max_download_speed", lambda k
,v
: self
.session
.set_download_rate_limit(speed(v
)))
267 self
.config
.register_set_function("max_connections_global", lambda k
,v
: self
.session
.set_max_connections(v
))
268 self
.config
.register_set_function("max_upload_slots_global", lambda k
,v
: self
.session
.set_max_uploads(v
))
269 self
.config
.register_set_function("dht", toggle(self
.session
.start_dht
, self
.session
.stop_dht
, (None,)))
270 self
.config
.register_set_function("lsd", toggle(self
.session
.start_lsd
, self
.session
.stop_lsd
))
271 self
.config
.register_set_function("upnp", toggle(self
.session
.start_upnp
, self
.session
.stop_upnp
))
272 self
.config
.register_set_function("natpmp", toggle(self
.session
.start_natpmp
, self
.session
.stop_natpmp
))
273 self
.config
.register_set_function("encryption", self
.set_encryption
)
274 self
.config
.register_set_function("session", self
.set_session
)
276 Alerts
.__init
__(self
, self
.alert_cb())
277 Watched
.__init
__(self
, self
.config
['path_watched'], self
.add
)
278 self
.alert_register("torrent_paused_alert", self
.alert_cb(lambda x
: "update"))
279 self
.alert_register("torrent_resumed_alert", self
.alert_cb(lambda x
: "update"))
280 self
.alert_register("torrent_finished_alert", lambda t
,a
: self
.set_torrents_state(a
[0], 'finished_cb', publish
='update'))
281 self
.alert_register("torrent_checked_alert", lambda t
,a
: self
.set_torrents_state(a
[0], 'checked_cb', publish
='update'))
284 log
.info("Strarting alert checking loop")
285 self
.add(self
.config
['path_watched'])
291 log
.info("Shutting down")
292 self
.watched_shutdown()
293 for t
in self
.torrents
.values(): t
.fastresume_write()
295 def check_alerts(self
):
296 alert
= self
.session
.pop_alert()
298 alert_type
= str(type(alert
)).split("'")[1].split(".")[1]
299 try: id = str(alert
.handle
.info_hash())
301 if id: self
.alert_handle(alert_type
, ([id], alert
.msg()))
302 else: self
.alert_handle(alert_type
, (str(alert
), alert
.msg()))
303 alert
= self
.session
.pop_alert()
306 def add(self
, torrent_dir
, torrent_file
= None):
307 def add(torrent_dir
, torrent_file
):
308 log
.info("Adding torrent: %s", torrent_file
)
311 'spath': self
.config
["path_current"],
312 'tfile': torrent_file
,
318 torrent
= Torrent(self
.config
, torrent_dir
, default
)
319 #resume = torrent.resume
320 #if len(torrent.resume) == len(default): resume = None
323 resume
= open(path
.join(self
.config
['path_resume'], torrent
.hash)).read()
325 log
.error("reading resume")
329 'duplicate_is_error': True,
330 'resume_data': resume
,
331 'save_path': torrent
['spath'],
332 'paused': torrent
['paused'],
333 'auto_managed': torrent
['managed'],
334 'storage_mode': lt
.storage_mode_t(1)
337 torrent
.handle
= self
.session
.add_torrent(param
)
338 self
.torrents
[torrent
.hash] = torrent
340 torrent
.handle
.set_max_connections(60)
342 torrent
.fix_download()
344 log
.error("Adding torrent: %s", torrent_file
)
346 ''' add a directory '''
348 log
.info("adding torrents in directory: %s", torrent_dir
)
349 for f
in listdir(torrent_dir
):
350 if f
[-8:] == '.torrent': add(torrent_dir
, f
)
353 ''' add a single file '''
354 add(torrent_dir
, torrent_file
)
356 def remove(self
, torrent_hash
, force
= False):
357 torrent
= self
.torrents
[torrent_hash
]
358 log
.info("Removing torrent: %s", torrent
.hash)
360 if torrent
.handle
.status().progress
!= 1 and not force
:
361 log
.error("Removing torrent: %s", torrent
.hash)
364 if torrent
.handle
.status().progress
!= 1: dir = self
.config
['path_incomplete']
365 else: dir = self
.config
['path_removed']
367 torrent
.move(dir, dir)
369 self
.session
.remove_torrent(torrent
.handle
, 0)
370 self
.publish(('remove', ([torrent_hash
], 'remove')))
371 del self
.torrents
[torrent
.hash]
374 def get_torrents_status(self
, torrents
, id_field
, fields
):
375 return [self
.torrents
[t
].get_status(id_field
, fields
) for t
in torrents
or self
.torrents
.keys()]
377 def get_session_status(self
, fields
):
378 s
= self
.session
.status()
380 "download_limit": int(self
.session
.download_rate_limit()),
381 "upload_limit": int(self
.session
.upload_rate_limit()),
382 "dht": (self
.config
["dht"] and s
.dht_nodes
) or -1
384 return [get_stat(s
, f
, values
) for f
in fields
]
387 def set_torrents_state(self
, torrents
, attr
, msg
=(), publish
=False, all
=False):
388 torrents
= torrents
or self
.torrents
.keys()
389 for t
in torrents
: self
.torrents
[t
].set_state(attr
, msg
)
390 if publish
and all
: torrents
= None
391 self
.publish((publish
, (torrents
, attr
)))
393 def set_encryption(self
, key
, value
):
396 e
.out_enc_policy
= lt
.enc_policy(o
)
397 e
.in_enc_policy
= lt
.enc_policy(i
)
398 e
.allowed_enc_level
= lt
.enc_level(l
)
400 self
.session
.set_pe_settings(e
)
402 def set_session(self
, key
, value
):
403 s
= lt
.session_settings()
404 s
.active_downloads
, s
.active_seeds
, s
.lazy_bitfields
= value
405 s
.connection_speed
= 1
406 self
.session
.set_settings(s
)
408 def alert_cb(self
, subject_fn
=lambda x
:x
):
409 return lambda t
,a
: self
.publish((subject_fn(t
), a
))
412 PyroServer(Core
, "tord")