fix a bug for user with nick prifex
[mcbot.git] / sakuratown / mcbot.py
blob9657c521347efd7016bc23f3639409861045e23a
1 #!/usr/bin/env python3
3 import sys
4 import signal
5 import json
6 import fn
7 import re
8 import time
9 import math
10 import traceback
12 from minecraft import authentication
13 from minecraft.exceptions import YggdrasilError
14 from minecraft.networking.connection import Connection
15 from minecraft.networking.packets import Packet, clientbound, serverbound
17 from minecraft.compat import input
19 from pprint import pprint as pp
21 _color_wipe_pattern = re.compile('§[0-9a-fklmnor]', re.U|re.I|re.M|re.S)
22 _color_wipe = lambda s: re.sub(_color_wipe_pattern, '', s)
23 _nick_prefix_wipe = lambda s: re.sub(r'\[[^]]+\]|\s+', '', s)
25 _cmd_split = re.compile(r'\s+', re.U|re.I|re.M|re.S)
27 STATES = {
28 'INIT': 0,
29 'CONNECTED': 1,
30 'AUTHED': 2,
31 'AUTHFAIL': 3,
34 todayfmt = lambda : time.strftime('%Y%m%d', time.localtime())
36 class TimeoutExpired(Exception):
37 pass
39 def chat_strip(obj):
40 if 'extra' not in obj:
41 return (_color_wipe(obj.get('text', repr(obj))), )
42 else:
43 return fn.iters.flatten((_color_wipe(obj.get('text', repr(obj))), ) + tuple(chat_strip(o) for o in obj['extra']))
45 def input_with_timeout(prompt, timeout):
46 def alarm_handler(signum, frame):
47 raise TimeoutExpired
49 signal.signal(signal.SIGALRM, alarm_handler)
50 signal.alarm(timeout) # produce SIGALRM in `timeout` seconds
52 try:
53 return input(prompt)
54 finally:
55 signal.alarm(0) # cancel alarm
57 class CoreBot(object):
58 ignored_types = {
59 Packet,
61 ##clientbound.play.ChatMessagePacket,
62 clientbound.play.PlayerListItemPacket,
63 clientbound.play.BlockChangePacket,
64 clientbound.play.MultiBlockChangePacket,
65 clientbound.play.KeepAlivePacket,
66 clientbound.play.PlayerPositionAndLookPacket,
67 clientbound.play.EntityVelocityPacket,
68 clientbound.play.SpawnObjectPacket,
70 serverbound.play.KeepAlivePacket,
71 serverbound.play.TeleportConfirmPacket,
74 def __init__(self, username, passwd, address, port=25565, password=None, offline=True, dump_packets=False, **kwargs):
75 self.username = username
76 self.passwd = passwd
77 self.keepon = False
79 if offline:
80 print("Connecting in offline mode...")
81 self.connection = Connection(address, port, username=username, **kwargs)
82 else:
83 auth_token = authentication.AuthenticationToken()
85 try:
86 auth_token.authenticate(username, password)
87 except YggdrasilError as e:
88 print(e)
89 sys.exit()
91 print("Logged in as %s..." % auth_token.username)
92 self.connection = Connection(address, port, auth_token=auth_token)
94 if dump_packets:
95 self.connection.register_packet_listener(self.handle_incoming, Packet, early=True)
96 self.connection.register_packet_listener(self.handle_outgoing, Packet, outgoing=True)
98 self.connection.register_packet_listener(self.handle_join_game, clientbound.play.JoinGamePacket)
99 self.connection.register_packet_listener(self.handle_chat_message, clientbound.play.ChatMessagePacket)
100 self.connection.register_packet_listener(self.handle_player_list_item, clientbound.play.PlayerListItemPacket)
101 self.connection.register_packet_listener(self.handle_spawn_object, clientbound.play.SpawnObjectPacket)
102 self.connection.register_packet_listener(self.handle_player_position_and_look, clientbound.play.PlayerPositionAndLookPacket)
104 self.players = dict()
106 self.objects = dict()
108 self.state = STATES['INIT']
109 self.pos_look = clientbound.play.PlayerPositionAndLookPacket.PositionAndLook()
111 def handle_player_position_and_look(self, packet):
112 packet.apply(self.pos_look)
114 def handle_incoming(self, packet):
115 ptype = type(packet)
116 if any(ptype is p for p in self.ignored_types):
117 return
119 print('--> %s' % packet, file=sys.stderr)
121 def handle_outgoing(self, packet):
122 ptype = type(packet)
123 if any(ptype is p for p in self.ignored_types):
124 return
126 print('<-- %s' % packet, file=sys.stderr)
128 def handle_join_game(self, join_game_packet):
129 print('Connected.', join_game_packet, file=sys.stderr)
130 self.state = STATES['CONNECTED']
132 def handle_chat_message(self, packet):
133 pos = packet.field_string('position')
134 jdata = json.loads(packet.json_data)
136 if pos == 'SYSTEM':
137 translate = jdata.get('translate', '')
138 if translate.startswith('death.'):
139 return ## ignore some notifications
140 if translate in ('chat.type.advancement.task', ):
141 return
143 gmsg = [m for m in chat_strip(jdata) if m]
144 msg = ''.join(gmsg)
145 ##print(json.dumps([m for m in chat_strip(jdata) if m], indent=2))
147 if pos == 'SYSTEM' and ('请求你传送到他那里' in msg or '请求传送到你这里' in msg):
148 self.handle_tp_request(gmsg, msg)
149 return
151 if self.state != STATES['AUTHED']:
152 self.handle_login_state(pos, msg)
153 return
155 else:
156 if len(gmsg) == 4 and '悄悄地对你说' in gmsg[1]:
157 self.handle_priv_message(gmsg[0].strip(), gmsg[3].strip())
158 return
160 print('Msg [%s](%s): %s'%(self.state, pos, msg))
162 def handle_player_list_item(self, packet):
163 subcls = {
164 clientbound.play.PlayerListItemPacket.AddPlayerAction: 'add-player',
165 clientbound.play.PlayerListItemPacket.UpdateGameModeAction: 'update-gamemode',
166 clientbound.play.PlayerListItemPacket.UpdateLatencyAction: 'update-latency',
167 clientbound.play.PlayerListItemPacket.UpdateDisplayNameAction: 'update-displayname',
168 clientbound.play.PlayerListItemPacket.RemovePlayerAction: 'remove-player'
171 for action in packet.actions:
172 typ = subcls.get(type(action), None)
174 if typ == 'add-player':
175 uuid = action.uuid
176 name = ''.join(chat_strip(json.loads(action.display_name))) if action.display_name else None
177 if not name: continue
178 if ' BTLP ' in action.name: continue ## ignore
179 if uuid in self.players: continue
180 info = {k:getattr(action, k) for k in ('name', 'display_name', 'ping')}
181 self.players[uuid] = info
183 elif typ == 'update-latency':
184 uuid = action.uuid
185 info = self.players.get(uuid, None)
186 if info is None: continue
187 info['ping'] = action.ping
189 elif typ == 'update-displayname':
190 uuid = action.uuid
191 if uuid not in self.players: continue
193 name = ''.join(chat_strip(json.loads(action.display_name))) if action.display_name else None
194 if not name: continue
196 info = self.players[uuid]
197 info['display_name'] = name
199 elif typ == 'remove-player':
200 uuid = action.uuid
201 if uuid not in self.players: continue
202 del self.players[uuid]
204 elif typ == 'update-gamemode':
205 pass ## nonesense, unless you want to wait for admin's fault
207 else:
208 info = {k:getattr(action, k) for k in action.__slots__} if type(action.__slots__) is tuple else {action.__slots__:getattr(action, action.__slots__)}
209 info.update({'type_name': typ})
210 print(info, file=sys.stderr)
212 def handle_spawn_object(self, packet):
213 fields = ('entity_id', 'x', 'y', 'z', 'type', 'data', )
214 uuid = packet.object_uuid
215 info = {field:getattr(packet, field) for field in fields}
216 info.update({'uuid': uuid})
218 if uuid in self.objects:
219 del self.objects[uuid]
221 self.objects[uuid] = info
223 print(info, file=sys.stderr)
225 def send_msg(self, msg):
226 packet = serverbound.play.ChatPacket()
227 packet.message = msg
228 self.connection.write_packet(packet)
230 def connect(self):
231 self.connection.connect()
233 def loop(self):
234 self.keepon = True
235 while self.keepon:
236 try:
237 text = input_with_timeout(">> ", 30)
238 if text.startswith('.'):
239 text = text[1:]
240 parts = re.split(r'\s+', text)
241 cmd, args = parts[0], parts[1:]
243 func = getattr(self, 'command_{}'.format(cmd), lambda *aargs: print('unrecognized command => .{}'.format(text)))
244 try:
245 func(*args)
246 except:
247 traceback.print_exc()
249 continue
251 if not text: continue
252 self.send_msg(text)
253 except TimeoutExpired:
254 continue
255 except KeyboardInterrupt:
256 self.connection.disconnect()
257 self.keepon = False
259 class SakuraBot(CoreBot):
260 def __init__(self, **kwargs):
261 kwargs.update({
262 'address': 'sakuratown.cn',
263 'port': 25565,
264 'handle_exception': self.handle_exception,
265 'handle_exit': self.handle_exit,
267 self.admins = set()
268 self.username = kwargs['username']
269 super().__init__(**kwargs)
271 def handle_login_state(self, pos, msg):
272 if pos == 'SYSTEM' and self.state == STATES['CONNECTED']:
273 if '请按回车输入' in msg and '进行登录' in msg:
274 packet = serverbound.play.ChatPacket()
275 packet.message = '/l {}'.format(self.passwd)
276 self.connection.write_packet(packet)
278 if pos == 'CHAT' and self.state == STATES['CONNECTED']:
279 if '您已成功登录樱花镇服务器' in msg:
280 self.state = STATES['AUTHED']
282 def handle_tp_request(self, gmsg, msg):
283 idxs = [idx for idx, m in enumerate(gmsg) if '传送' in m][0]
284 from_user = _nick_prefix_wipe(''.join(gmsg[:idxs]))
286 if from_user in self.admins:
287 self.send_msg('/tpaccept')
288 else:
289 self.send_msg('/tpdeny')
291 def handle_priv_message(self, from_user, msg):
292 if not msg.startswith('.'):
293 print('PrivMsg <{}>: {}'.format(from_user, msg))
294 return
296 args = _cmd_split.split(msg)
297 cmd = args[0][1:]
298 params = args[1:]
300 if cmd == 'auth':
301 if params[0] == '{}{}'.format(self.username, todayfmt()):
302 self.admins.add(from_user)
303 self.send_msg('/msg {} {}'.format(from_user, '校验通过 你可以控制我啦'))
304 else:
305 self.send_msg('/msg {} {}'.format(from_user, '嘿嘿'))
306 else:
307 if from_user not in self.admins:
308 self.send_msg('/msg {} {}'.format(from_user, '嘿嘿'))
309 else:
310 func = getattr(self, 'command_{}'.format(cmd), lambda *aargs: print('unrecognized command => .{}'.format(text)))
311 try:
312 func(*params, from_user=from_user)
313 except:
314 traceback.print_exc()
315 self.send_msg('/msg {} {}'.format(from_user, '出了点小问题啊 大佬'))
317 def handle_exception(self, *args, **kwargs):
318 print("!IMPORT: Exception", args, kwargs)
319 traceback.print_exc()
321 def handle_exit(self, *args, **kwargs):
322 self.connection.disconnect()
323 self.keepon = False
325 def command_admins(self, *args, from_user=None):
326 sleep_int = 0.001 if from_user is None else 0.2
327 sendback = print if from_user is None else lambda m: self.send_msg('/msg {} {}'.format(from_user, m))
328 for u in self.admins:
329 sendback(u)
330 time.sleep(sleep_int)
332 def command_players(self, *args, from_user=None):
333 infos = ["{name}:{ping}".format(**i) for i in sorted(self.players.values(), key=lambda v: v['ping'])]
334 pp(infos)
336 def command_say(self, *args, from_user=None):
337 self.send_msg(' '.join(args))
338 if from_user is not None:
339 self.send_msg('/msg {} 执行成功'.format(self.from_user))
341 def command_download_maps(self, *args, from_user=None):
342 packet = serverbound.play.ClientSettingsPacket()
343 packet.locale = '简体中文'
344 packet.view_distance = 5
345 packet.chat_mode = 0,
346 packet.chat_colors = False,
348 self.connection.write_packet(packet)
350 def command_walk(self, *args, from_user=None):
351 steps = int(args[0])
352 steps = 100 if steps > 100 else steps
354 for i in range(steps):
355 self.pos_look.yaw = (self.pos_look.yaw + 1.0) % 360.0
356 # Move forward 0.1m.
357 self.pos_look.x -= 0.1 * math.sin(self.pos_look.yaw * math.pi / 180.0)
358 self.pos_look.z += 0.1 * math.cos(self.pos_look.yaw * math.pi / 180.0)
360 self.connection.write_packet(serverbound.play.PositionAndLookPacket(
361 x = self.pos_look.x,
362 feet_y = self.pos_look.y,
363 z = self.pos_look.z,
364 yaw = self.pos_look.yaw,
365 pitch = self.pos_look.pitch,
366 on_ground = True))
368 time.sleep(0.5)