Add position metadata to desugaring
[hiphop-php.git] / hphp / hack / test / integration / lsptestspec.py
blob7e5f3aa6f3d93dc01c40974d887fb4d2ad868648
1 # pyre-strict
2 from __future__ import absolute_import, division, print_function, unicode_literals
4 import copy
5 import difflib
6 import inspect
7 import itertools
8 import os.path
9 import pprint
10 import textwrap
11 from dataclasses import dataclass
12 from typing import (
13 AbstractSet,
14 Iterable,
15 List,
16 Mapping,
17 Optional,
18 Sequence,
19 Tuple,
20 Union,
23 import libcst
24 from libcst.metadata import CodeRange, MetadataWrapper, PositionProvider
25 from lspcommand import LspCommandProcessor, Transcript, TranscriptEntry
26 from utils import (
27 Json,
28 VariableMap,
29 fixup_hhi_json,
30 interpolate_variables,
31 uninterpolate_variables,
35 _MessageSpec = Union[
36 "_RequestSpec",
37 "_DebugRequestSpec",
38 "_NotificationSpec",
39 "_WaitForNotificationSpec",
40 "_WaitForRequestSpec",
41 "_WaitForResponseSpec",
42 "_WaitForHhServerReadySpec",
46 # pyre-fixme[5]: Global expression must be annotated.
47 _LspIdMap = Mapping[_MessageSpec, Json]
49 _Traceback = Sequence[inspect.FrameInfo]
52 @dataclass
53 class _CallSiteInfo:
54 line_num: int
55 traceback: _Traceback
58 class NoResponse:
59 """Indicates that no response should be sent (different from `None` since
60 `None` is a valid JSON value)."""
62 pass
65 def line() -> int:
66 """Get the line number that this function was called at.
68 Previously, we used to do this automatically whenever we called
69 `.request`. However, a recent upgrade of Python breaks that functionality
70 for chained function calls in some cases, and it instead reports the line
71 number of the first function call in the chain. We use `line()` to ensure
72 that we don't have a chained function call and can get the line number
73 accurately.
74 """
75 cf = inspect.currentframe()
76 assert (
77 cf is not None
78 ), "Must be able to get current call frame to produce error messages for test"
79 # pyre-fixme[16]: `Optional` has no attribute `f_lineno`.
80 return cf.f_back.f_lineno
83 class LspTestSpec:
84 """Represents an LSP test to be run, in a declarative fashion.
86 Since `LspTestSpec`s are just values, they can be composed using regular
87 functions. For example, you can make an `initialize_spec` function that
88 returns an `LspTestSpec` with the `initialize` request already sent and
89 checked."""
91 def __init__(self, name: str) -> None:
92 self.name = name
93 self._messages: Sequence["_MessageSpec"] = []
94 self._ignored_notification_methods: AbstractSet[str] = set()
95 # pyre-fixme[11]: Annotation `Json` is not defined as a type.
96 self._ignored_requests: Sequence[Tuple[str, Json]] = []
97 self._ignore_status_diagnostics: bool = False
99 def ignore_notifications(self, *, method: str) -> "LspTestSpec":
100 ignored_notification_methods = set(self._ignored_notification_methods)
101 ignored_notification_methods.add(method)
102 return self._update(ignored_notification_methods=ignored_notification_methods)
104 def ignore_status_diagnostics(self, value: bool) -> "LspTestSpec":
105 return self._update(ignore_status_diagnostics=value)
107 def ignore_requests(
108 self, *, method: str, params: Json, comment: Optional[str] = None
109 ) -> "LspTestSpec":
110 ignored_requests = list(self._ignored_requests)
111 ignored_requests.append((method, params))
112 return self._update(ignored_requests=ignored_requests)
114 def request(
115 self,
116 line: int,
117 method: str,
118 params: Json,
120 result: Json,
121 wait_id: Optional[str] = None,
122 comment: Optional[str] = None,
123 powered_by: Optional[str] = None,
124 ) -> "LspTestSpec":
125 traceback = inspect.stack()
126 assert traceback is not None, "Failed to get traceback info"
128 messages = list(self._messages)
129 if wait_id is not None and any(
130 isinstance(message, _RequestSpec) and message.wait_id == wait_id
131 for message in messages
133 raise ValueError(f"Duplicate wait ID: {wait_id}")
135 messages.append(
136 _RequestSpec(
137 method=method,
138 params=params,
139 result=result,
140 wait_id=wait_id,
141 comment=comment,
142 powered_by=powered_by,
143 call_site_info=_CallSiteInfo(line_num=line, traceback=traceback),
146 return self._update(messages=messages)
148 def debug(self) -> "LspTestSpec":
149 """Issue a `telemetry/rage` request for debugging.
151 The language server has to support the `telemetry/rage` request. Once
152 the response is received, its debugging output is rendered in the test
153 output. This can be useful when trying to debug the internal state of
154 the language server.
156 The test will not pass while there's a `debug()` statement in its spec,
157 so it must be removed before committing the code.
159 messages = list(self._messages)
160 messages.append(_DebugRequestSpec())
161 return self._update(messages=messages)
163 def notification(
164 self, method: str, params: Json, *, comment: Optional[str] = None
165 ) -> "LspTestSpec":
166 messages = list(self._messages)
167 messages.append(
168 _NotificationSpec(method=method, params=params, comment=comment)
170 return self._update(messages=messages)
172 def wait_for_server_request(
173 self,
174 method: str,
175 params: Json,
177 result: Union[Json, NoResponse],
178 comment: Optional[str] = None,
179 ) -> "LspTestSpec":
180 messages = list(self._messages)
181 messages.append(
182 _WaitForRequestSpec(
183 method=method, params=params, result=result, comment=comment
186 return self._update(messages=messages)
188 def wait_for_notification(
189 self, method: str, params: Json, *, comment: Optional[str] = None
190 ) -> "LspTestSpec":
191 messages = list(self._messages)
192 messages.append(
193 _WaitForNotificationSpec(method=method, params=params, comment=comment)
195 return self._update(messages=messages)
197 def wait_for_response(self, wait_id: str) -> "LspTestSpec":
198 messages = list(self._messages)
199 messages.append(_WaitForResponseSpec(wait_id=wait_id))
200 return self._update(messages=messages)
202 def wait_for_hh_server_ready(self) -> "LspTestSpec":
203 messages = list(self._messages)
204 messages.append(_WaitForHhServerReadySpec())
205 return self._update(messages=messages)
207 def start_hh_server(self, comment: str) -> "LspTestSpec":
208 return self.request(
209 line=line(),
210 comment=comment,
211 method="$test/startHhServer",
212 params=None,
213 result=None,
214 powered_by="serverless_ide",
217 def stop_hh_server(self, comment: str) -> "LspTestSpec":
218 return self.request(
219 line=line(),
220 comment=comment,
221 method="$test/stopHhServer",
222 params=None,
223 result=None,
224 powered_by="serverless_ide",
227 def write_to_disk(
228 self,
230 comment: Optional[str] = None,
231 uri: str,
232 contents: Optional[str],
233 notify: bool,
234 wait: Optional[bool] = None,
235 ) -> "LspTestSpec":
236 """Write a file to disk in the middle of the LSP test.
238 If `contents` is `None`, delete the file from disk.
240 If `notify` is `True`, also send a `workspace/didChangeWatchedFiles`
241 notification to the language server corresponding to the file you just
242 changed. The test will then wait for serverless IDE to process the file
243 change before proceeding, unless `wait` is set to `False`.
245 messages = list(self._messages)
246 messages.append(
247 _NotificationSpec(
248 method="$test/writeToDisk",
249 params={"uri": uri, "contents": contents},
250 comment=comment,
253 if notify:
254 messages.append(
255 _NotificationSpec(
256 method="workspace/didChangeWatchedFiles",
257 params={"changes": [{"uri": uri, "type": 2}]},
258 comment=comment,
261 if wait is None:
262 wait = True
263 if wait:
264 messages.append(
265 _WaitForNotificationSpec(
266 comment=(
267 f"Waiting for change to URI {uri} to be processed... "
268 + "(set wait=False on the corresponding `write_to_disk` call " # noqa: B950
269 + "if this is undesirable)"
271 method="telemetry/event",
272 params={
273 "type": 4,
274 "message": "[client-ide] Done processing file changes",
278 return self._update(messages=messages)
280 def run(
281 self, lsp_command_processor: LspCommandProcessor, variables: VariableMap
282 ) -> Tuple[Transcript, Optional[str]]:
283 """Run the test given the LSP command processor.
285 Raises an exception with useful debugging information if the test fails."""
286 (json_commands, lsp_id_map) = self._get_json_commands(variables=variables)
287 transcript = lsp_command_processor.communicate(json_commands=json_commands)
288 errors = list(
289 self._verify_transcript(
290 variables=variables, transcript=transcript, lsp_id_map=lsp_id_map
293 if errors:
294 num_errors = len(errors)
295 error_details = (
296 f"Test case {self.name} failed with {num_errors} errors:\n\n"
298 for i, error in enumerate(errors, 1):
299 error_details += f"Error {i}/{num_errors}:\n"
300 error_details += str(error) + "\n"
301 error_details += """\
302 If you want to examine the raw LSP logs, you can check the `.sent.log` and
303 `.received.log` files that were generated in the template repo for this test."""
304 else:
305 error_details = None
306 return (transcript, error_details)
308 ### Internal. ###
310 def _update(
311 self,
312 messages: Optional[Sequence["_MessageSpec"]] = None,
313 ignored_notification_methods: Optional[AbstractSet[str]] = None,
314 ignored_requests: Optional[Sequence[Tuple[str, Json]]] = None,
315 ignore_status_diagnostics: Optional[bool] = None,
316 ) -> "LspTestSpec":
317 spec = copy.copy(self)
318 if messages is not None:
319 spec._messages = messages
320 if ignored_notification_methods is not None:
321 spec._ignored_notification_methods = ignored_notification_methods
322 if ignored_requests is not None:
323 spec._ignored_requests = ignored_requests
324 if ignore_status_diagnostics:
325 spec._ignore_status_diagnostics = ignore_status_diagnostics
326 return spec
328 def _get_json_commands(
329 self,
330 variables: VariableMap
331 # pyre-fixme[11]: Annotation `_LspIdMap` is not defined as a type.
332 ) -> Tuple[Sequence[Json], "_LspIdMap"]:
333 """Transforms this test spec into something the LSP command processor
334 can interpret."""
335 json_commands = []
336 lsp_id_map = {}
337 current_id = 0
338 for message in self._messages:
339 current_id += 1
340 lsp_id_map[message] = current_id
342 if isinstance(message, _RequestSpec):
343 json_commands.append(
345 "jsonrpc": "2.0",
346 "comment": message.comment,
347 "id": current_id,
348 "method": message.method,
349 "params": interpolate_variables(
350 message.params, variables=variables
355 if message.wait_id is None:
356 # Assume that if no wait ID was explicitly passed, we want
357 # to wait on the response before sending the next message.
358 json_commands.append(
360 "jsonrpc": "2.0",
361 "method": "$test/waitForResponse",
362 "params": {"id": current_id},
365 elif isinstance(message, _DebugRequestSpec):
366 json_commands.append(
368 "jsonrpc": "2.0",
369 "id": current_id,
370 "method": "telemetry/rage",
371 "params": {},
374 elif isinstance(message, _NotificationSpec):
375 json_commands.append(
377 "jsonrpc": "2.0",
378 "comment": message.comment,
379 "method": message.method,
380 "params": interpolate_variables(
381 message.params, variables=variables
385 elif isinstance(message, _WaitForRequestSpec):
386 params = {
387 "method": message.method,
388 "params": interpolate_variables(
389 message.params, variables=variables
392 if not isinstance(message.result, NoResponse):
393 params["result"] = message.result
394 json_commands.append(
396 "jsonrpc": "2.0",
397 "comment": message.comment,
398 "method": "$test/waitForRequest",
399 "params": params,
402 elif isinstance(message, _WaitForNotificationSpec):
403 json_commands.append(
405 "jsonrpc": "2.0",
406 "comment": message.comment,
407 "method": "$test/waitForNotification",
408 "params": {
409 "method": message.method,
410 "params": interpolate_variables(
411 message.params, variables=variables
416 elif isinstance(message, _WaitForResponseSpec):
417 lsp_ids = [
418 lsp_id
419 for previous_message, lsp_id in lsp_id_map.items()
420 if isinstance(previous_message, _RequestSpec)
421 and previous_message.wait_id == message.wait_id
423 assert len(lsp_ids) == 1, (
424 f"Should have had exactly one previous message with wait ID {message.wait_id!r}, " # noqa: B950
425 + "but got {len(lsp_ids)}"
427 [lsp_id] = lsp_ids
429 json_commands.append(
431 "jsonrpc": "2.0",
432 "method": "$test/waitForResponse",
433 "params": {"id": lsp_id},
436 elif isinstance(message, _WaitForHhServerReadySpec):
437 json_commands.append(
439 "jsonrpc": "2.0",
440 "method": "$test/waitForHhServerReady",
441 "params": {},
444 else:
445 raise ValueError(f"unhandled message type {message.__class__.__name__}")
446 return (json_commands, lsp_id_map)
448 def _verify_transcript(
449 self, *, variables: VariableMap, transcript: Transcript, lsp_id_map: "_LspIdMap"
450 ) -> Iterable["_ErrorDescription"]:
451 handled_entries = set()
453 for message in self._messages:
454 lsp_id = lsp_id_map[message]
455 if isinstance(message, _RequestSpec):
456 transcript_id = LspCommandProcessor._client_request_id(lsp_id)
457 handled_entries.add(transcript_id)
458 assert transcript_id in transcript, (
459 f"Expected message with ID {lsp_id!r} "
460 + f"to have an entry in the transcript "
461 + f"under key {transcript_id!r}, "
462 + f"but it was not found. Transcript: {transcript!r}"
464 entry = transcript[transcript_id]
465 error_description = self._verify_request(
466 variables=variables, entry=entry, lsp_id=lsp_id, request=message
468 if error_description is not None:
469 yield error_description
470 elif isinstance(message, _DebugRequestSpec):
471 transcript_id = LspCommandProcessor._client_request_id(lsp_id)
472 handled_entries.add(transcript_id)
473 assert transcript_id in transcript, (
474 f"Expected message with ID {lsp_id!r} "
475 + f"to have an entry in the transcript "
476 + f"under key {transcript_id!r}, "
477 + f"but it was not found. Transcript: {transcript!r}"
479 entry = transcript[transcript_id]
480 error_description = self._render_telemetry_rage(
481 debug_request=message, result=entry.received["result"]
483 yield error_description
484 elif isinstance(message, _NotificationSpec):
485 # Nothing needs to be done here, since we sent the notification
486 # and don't expect a response.
487 pass
488 elif isinstance(
489 message,
491 _WaitForRequestSpec,
492 _WaitForNotificationSpec,
493 _WaitForResponseSpec,
494 _WaitForHhServerReadySpec,
497 # Nothing needs to be done here -- if we failed to wait for the
498 # message, an exception will have been thrown at the
499 # `LspCommandProcessor` layer.
500 pass
501 else:
502 raise ValueError(f"unhandled message type {message.__class__.__name__}")
504 handled_entries |= set(self._find_ignored_transcript_ids(transcript))
505 yield from self._flag_unhandled_messages(
506 handled_entries, variables, transcript, lsp_id_map
509 def _verify_request(
510 self,
512 variables: VariableMap,
513 lsp_id: Json,
514 entry: TranscriptEntry,
515 request: "_RequestSpec",
516 ) -> Optional["_ErrorDescription"]:
517 actual_result = entry.received.get("result")
518 actual_powered_by = entry.received.get("powered_by")
519 if request.comment is not None:
520 request_description = (
521 f"Request with ID {lsp_id!r} (comment: {request.comment!r})"
523 else:
524 request_description = f"Request with ID {lsp_id!r}"
526 # Because of the way hack allocates a different HHI folder for each running
527 # process, let's replace the standard HHI foldername
528 actual_result = fixup_hhi_json(actual_result)
529 expected_result = interpolate_variables(
530 payload=request.result, variables=variables
532 expected_result = fixup_hhi_json(expected_result)
534 if actual_result != expected_result:
535 error_description = self._pretty_print_diff(
536 actual=actual_result, expected=expected_result
538 description = f"""\
539 {request_description} got an incorrect result:
541 {error_description}"""
542 request_context = self._get_context_for_call_site_info(
543 request.call_site_info
545 context = f"""\
546 This was the associated request:
548 {request_context}"""
549 remediation = self._describe_response_for_remediation(
550 variables=variables, request=request, actual_response=entry.received
552 return _ErrorDescription(
553 description=description, context=context, remediation=remediation
555 elif entry.received.get("powered_by") != request.powered_by:
556 description = f"""\
557 {request_description} had an incorrect value for the `powered_by` field
558 (expected {request.powered_by!r}; got {actual_powered_by!r})
560 request_context = self._get_context_for_call_site_info(
561 request.call_site_info
563 context = f"""\
564 This was the associated request:
566 {request_context}"""
567 remediation = self._describe_response_for_remediation(
568 variables=variables, request=request, actual_response=entry.received
570 return _ErrorDescription(
571 description=description, context=context, remediation=remediation
574 def _get_context_for_call_site_info(self, call_site_info: _CallSiteInfo) -> str:
575 # Find the first caller frame that isn't in this source file. The
576 # assumption is that the first such frame is in the test code.
577 caller_frame = next(
578 frame for frame in call_site_info.traceback if frame.filename != __file__
580 source_filename = caller_frame.filename
581 with open(source_filename) as f:
582 source_text = f.read()
585 start_line_num_0idx_incl,
586 end_line_num_0idx_incl,
587 ) = self._find_line_range_for_function_call(
588 file_contents=source_text, line_num_1idx=call_site_info.line_num
590 return self._pretty_print_file_context(
591 file_path=source_filename,
592 file_contents=source_text,
593 start_line_num_0idx_incl=start_line_num_0idx_incl,
594 end_line_num_0idx_incl=end_line_num_0idx_incl,
597 def _find_line_range_for_function_call(
598 self, file_contents: str, line_num_1idx: int
599 ) -> Tuple[int, int]:
600 tree = libcst.parse_module(file_contents)
601 function_call_finder = _FunctionCallFinder()
602 MetadataWrapper(tree).visit(function_call_finder)
603 function_calls_containing_line = [
604 (node, node_range)
605 for node, node_range in function_call_finder.function_calls
606 if node_range.start.line <= line_num_1idx <= node_range.end.line
608 node_range = max(
609 function_calls_containing_line,
610 key=lambda node_with_range: node_with_range[1].end.line
611 - node_with_range[1].start.line,
612 )[1]
613 start_line_num_0idx_incl = node_range.start.line - 1
614 end_line_num_0idx_incl = node_range.end.line - 1
615 return (start_line_num_0idx_incl, end_line_num_0idx_incl)
617 def _pretty_print_file_context(
618 self,
619 file_path: str,
620 file_contents: str,
621 start_line_num_0idx_incl: int,
622 end_line_num_0idx_incl: int,
623 ) -> str:
624 source_lines = file_contents.splitlines(keepends=True)
625 context_lines = source_lines[
626 start_line_num_0idx_incl : end_line_num_0idx_incl + 1
628 context_lines = [
629 # Include the line number in a gutter for display.
630 f"{line_num:>5} | {line_contents}"
631 for line_num, line_contents in enumerate(
632 context_lines, start=start_line_num_0idx_incl + 1
635 file_context = "".join(context_lines)
637 # The full path is likely not useful, since it includes any temporary
638 # directories that Buck introduced.
639 prefix = os.path.commonprefix([file_path, __file__])
640 display_filename = file_path[len(prefix) :]
641 return display_filename + "\n" + file_context
643 def _describe_response_for_remediation(
644 self, variables: VariableMap, request: "_RequestSpec", actual_response: Json
645 ) -> str:
646 method = request.method
647 params = request.params
648 result = uninterpolate_variables(
649 payload=actual_response.get("result"), variables=variables
651 powered_by = actual_response.get("powered_by")
653 request_snippet = f"""\
654 .request(
655 line=line(),"""
656 if request.comment is not None:
657 request_snippet += f"""
658 comment={request.comment!r},"""
659 request_snippet += f"""
660 method={method!r},
661 params={params!r},
662 result={result!r},"""
663 if request.wait_id is not None:
664 request_snippet += f"""
665 wait_id={request.wait_id!r},"""
666 if powered_by is not None:
667 request_snippet += f"""
668 powered_by={powered_by!r},"""
669 request_snippet += f"""
670 )"""
672 remediation = f"""\
673 1) If this was unexpected, then the language server is buggy and should be
674 fixed.
676 2) If this was expected, you can update your request with the following code to
677 make it match:
679 {request_snippet}
681 return remediation
683 def _find_ignored_transcript_ids(self, transcript: Transcript) -> Iterable[str]:
684 for transcript_id, entry in transcript.items():
685 if (
686 entry.received is not None
687 and "id" not in entry.received
688 and entry.received.get("method") in self._ignored_notification_methods
690 yield transcript_id
692 if (
693 entry.received is not None
694 and "id" not in entry.received
695 and self._ignore_status_diagnostics
696 and entry.received["method"] == "textDocument/publishDiagnostics"
697 and entry.received["params"].get("isStatusFB")
699 yield transcript_id
701 if (
702 entry.received is not None
703 and "id" in entry.received
704 and "method" in entry.received
705 and "params" in entry.received
706 and (entry.received["method"], entry.received["params"])
707 in self._ignored_requests
709 yield transcript_id
711 def _flag_unhandled_messages(
712 self,
713 handled_entries: AbstractSet[str],
714 variables: VariableMap,
715 transcript: Transcript,
716 lsp_id_map: _LspIdMap,
717 ) -> Iterable["_ErrorDescription"]:
718 for transcript_id, entry in transcript.items():
719 if transcript_id in handled_entries:
720 continue
722 received = entry.received
723 if received is None:
724 continue
726 if entry.sent is not None:
727 # We received a request and responded to it.
728 continue
730 method = received["method"]
731 params = received["params"]
732 payload = self._pretty_print_snippet(received)
733 if "id" in received:
734 description = f"""\
735 An unexpected request of type {method!r} was sent by the language server.
736 Here is the request payload:
738 {payload}
740 at_nocommit = "@" + "nocommit"
741 remediation = f"""\
742 1) If this was unexpected, then the language server is buggy and should be
743 fixed.
745 2) If all requests of type {method!r} with theses params should be ignored,
746 add this directive anywhere in your test:
748 .{self.ignore_requests.__name__}(method={method!r}, params={params!r})
750 3) To handle this request, add this directive to your test to wait for it and
751 respond to it before proceeding:
753 .{self.wait_for_server_request.__name__}(
754 method={method!r},
755 params={params!r},
756 result={{
757 "{at_nocommit}": "fill in request data here",
761 else:
762 if any(
763 isinstance(message, _WaitForNotificationSpec)
764 and message.method == method
765 and interpolate_variables(
766 payload=message.params, variables=variables
768 == params
769 for message in self._messages
771 # This was a notification we we explicitly waiting for, so skip
772 # it.
773 continue
775 uninterpolated_params = uninterpolate_variables(
776 payload=params, variables=variables
778 description = f"""\
779 An unexpected notification of type {method!r} was sent by the language server.
780 Here is the notification payload:
782 {payload}
784 remediation = f"""\
785 1) If this was unexpected, then the language server is buggy and should be
786 fixed.
788 2) If all notifications of type {method!r} should be ignored, add this directive
789 anywhere in your test:
791 .{self.ignore_notifications.__name__}(method={method!r})
793 3) If this single instance of the notification was expected, add this directive
794 to your test to wait for it before proceeding:
796 .{self.wait_for_notification.__name__}(
797 method={method!r},
798 params={uninterpolated_params!r},
802 previous_request = self._find_previous_request(
803 transcript, lsp_id_map, current_id=transcript_id
805 if previous_request is not None:
806 request_context = self._get_context_for_call_site_info(
807 previous_request.call_site_info
809 else:
810 request_context = "<no previous request was found>"
811 context = f"""\
812 This was the most recent request issued from the language client before it
813 received the notification:
815 {request_context}"""
817 yield _ErrorDescription(
818 description=description, context=context, remediation=remediation
821 def _find_previous_request(
822 self, transcript: Transcript, lsp_id_map: _LspIdMap, current_id: str
823 ) -> Optional["_RequestSpec"]:
824 previous_transcript_entries = itertools.takewhile(
825 lambda kv: kv[0] != current_id, transcript.items()
827 previous_request_entries = [
828 entry.sent
829 for _id, entry in previous_transcript_entries
830 if entry.sent is not None and LspCommandProcessor._is_request(entry.sent)
832 if previous_request_entries:
833 previous_request_lsp_id = previous_request_entries[-1]["id"]
834 else:
835 return None
837 [corresponding_request] = [
838 request
839 for request, lsp_id in lsp_id_map.items()
840 if lsp_id == previous_request_lsp_id
842 assert isinstance(
843 corresponding_request, _RequestSpec
844 ), "We should have identified a client-to-server request at this point"
845 return corresponding_request
847 def _render_telemetry_rage(
848 self, debug_request: "_DebugRequestSpec", result: Json
849 ) -> "_ErrorDescription":
850 sections = []
851 for row in result:
852 title = row["title"]
853 if title is None:
854 title = "<none>"
855 data = row.get("data")
856 sections.append(
857 f"""\
858 ### Section {title} ###
859 {data}
862 sections = textwrap.indent("".join(sections), prefix=" ")
863 description = f"""\
864 Here are the results of issuing a `telemetry/rage` request to the language
865 server:
867 {sections}"""
868 context = """\
869 <none>
871 remediation = """\
872 Remove this `debug` request once you're done debugging.
874 return _ErrorDescription(
875 description=description, context=context, remediation=remediation
878 def _pretty_print_snippet(self, obj: object) -> str:
879 return textwrap.indent(pprint.pformat(obj), prefix=" ")
881 def _pretty_print_diff(self, actual: object, expected: object) -> str:
882 # Similar to the standard library's `unittest` module:
883 # https://github.com/python/cpython/blob/35d9c37e271c35b87d64cc7422600e573f3ee244/Lib/unittest/case.py#L1147-L1149 # noqa B950
884 return (
885 "(+ is expected lines, - is actual lines)\n"
886 + "\n".join(
887 difflib.ndiff(
888 pprint.pformat(actual).splitlines(),
889 pprint.pformat(expected).splitlines(),
892 + "\n"
896 ### Internal. ###
899 class _FunctionCallFinder(libcst.CSTVisitor):
900 """Find function calls and their locations in the given syntax tree.
902 Chained function calls include the entire chain as the callee. For example,
903 the chain `x().y().z()` might include `x().y().z` as the callee and `()` as
904 the function call itself. But in the case of function call chains, we really
905 want just the range covered by the parentheses.
907 However, that's not directly available in `libcst`, so we approximate this
908 by finding the location of `z` and assume that's where the function call
909 starts.
912 METADATA_DEPENDENCIES = (PositionProvider,)
914 def __init__(self) -> None:
915 self.function_calls: List[Tuple[libcst.Call, CodeRange]] = []
917 def visit_Call(self, node: libcst.Call) -> None:
918 node_range = self.get_metadata(PositionProvider, node)
920 start_node = node.func
921 while isinstance(start_node, libcst.Attribute):
922 start_node = start_node.attr
923 start_node_range = self.get_metadata(PositionProvider, start_node)
924 start_position = start_node_range.start
925 end_position = node_range.end
926 node_range = CodeRange(start=start_position, end=end_position)
928 self.function_calls.append((node, node_range))
931 class _RequestSpec:
932 __slots__ = [
933 "method",
934 "params",
935 "result",
936 "wait_id",
937 "comment",
938 "powered_by",
939 "call_site_info",
942 def __init__(
943 self,
945 method: str,
946 params: Json,
947 result: Json,
948 wait_id: Optional[str],
949 comment: Optional[str],
950 powered_by: Optional[str],
951 call_site_info: _CallSiteInfo,
952 ) -> None:
953 # pyre-fixme[4]: Attribute must be annotated.
954 self.method = method
955 # pyre-fixme[4]: Attribute must be annotated.
956 self.params = params
957 # pyre-fixme[4]: Attribute must be annotated.
958 self.result = result
959 # pyre-fixme[4]: Attribute must be annotated.
960 self.wait_id = wait_id
961 # pyre-fixme[4]: Attribute must be annotated.
962 self.comment = comment
963 # pyre-fixme[4]: Attribute must be annotated.
964 self.powered_by = powered_by
965 # pyre-fixme[4]: Attribute must be annotated.
966 self.call_site_info = call_site_info
969 class _DebugRequestSpec:
970 pass
973 class _NotificationSpec:
974 __slots__ = ["method", "params", "comment"]
976 def __init__(self, *, method: str, params: Json, comment: Optional[str]) -> None:
977 # pyre-fixme[4]: Attribute must be annotated.
978 self.method = method
979 # pyre-fixme[4]: Attribute must be annotated.
980 self.params = params
981 # pyre-fixme[4]: Attribute must be annotated.
982 self.comment = comment
985 class _WaitForRequestSpec:
986 __slots__ = ["method", "params", "result", "comment"]
988 def __init__(
989 self,
991 method: str,
992 params: Json,
993 result: Union[Json, NoResponse],
994 comment: Optional[str],
995 ) -> None:
996 # pyre-fixme[4]: Attribute must be annotated.
997 self.method = method
998 # pyre-fixme[4]: Attribute must be annotated.
999 self.params = params
1000 # pyre-fixme[4]: Attribute must be annotated.
1001 self.result = result
1002 # pyre-fixme[4]: Attribute must be annotated.
1003 self.comment = comment
1006 class _WaitForNotificationSpec:
1007 __slots__ = ["method", "params", "comment"]
1009 def __init__(self, *, method: str, params: Json, comment: Optional[str]) -> None:
1010 # pyre-fixme[4]: Attribute must be annotated.
1011 self.method = method
1012 # pyre-fixme[4]: Attribute must be annotated.
1013 self.params = params
1014 # pyre-fixme[4]: Attribute must be annotated.
1015 self.comment = comment
1018 class _WaitForResponseSpec:
1019 __slots__ = ["wait_id"]
1021 def __init__(self, *, wait_id: str) -> None:
1022 # pyre-fixme[4]: Attribute must be annotated.
1023 self.wait_id = wait_id
1026 class _WaitForHhServerReadySpec:
1027 pass
1030 class _ErrorDescription:
1031 def __init__(self, description: str, context: str, remediation: str) -> None:
1032 self.description = description
1033 self.context = context
1034 self.remediation = remediation
1036 def __str__(self) -> str:
1037 result = f"""\
1038 Description: {self.description}
1040 if self.context is not None:
1041 result += f"""\
1042 Context:
1043 {self.context}
1045 result += f"""\
1046 Remediation:
1047 {self.remediation}"""
1048 return result