Skip to content

Commit d1c72c2

Browse files
committed
refactor(langserver): make documents handling synchronous and more threadsafe
1 parent 6d7b3d2 commit d1c72c2

40 files changed

+280
-291
lines changed

packages/core/src/robotcode/core/event.py

+24-34
Original file line numberDiff line numberDiff line change
@@ -69,19 +69,37 @@ def __iter__(self) -> Iterator[Callable[_TParams, _TResult]]:
6969
if c is not None:
7070
yield c
7171

72-
def _notify(self, *__args: _TParams.args, **__kwargs: _TParams.kwargs) -> Iterator[_TResult]:
73-
for method in set(self):
72+
def _notify(
73+
self,
74+
*__args: _TParams.args,
75+
callback_filter: Optional[Callable[[Callable[..., Any]], bool]] = None,
76+
**__kwargs: _TParams.kwargs,
77+
) -> Iterator[_TResult]:
78+
for method in filter(
79+
lambda x: callback_filter(x) if callback_filter is not None else True,
80+
set(self),
81+
):
7482
yield method(*__args, **__kwargs)
7583

7684

7785
class EventIterator(EventResultIteratorBase[_TParams, _TResult]):
78-
def __call__(self, *__args: _TParams.args, **__kwargs: _TParams.kwargs) -> Iterator[_TResult]:
79-
return self._notify(*__args, **__kwargs)
86+
def __call__(
87+
self,
88+
*__args: _TParams.args,
89+
callback_filter: Optional[Callable[[Callable[..., Any]], bool]] = None,
90+
**__kwargs: _TParams.kwargs,
91+
) -> Iterator[_TResult]:
92+
return self._notify(*__args, callback_filter=callback_filter, **__kwargs)
8093

8194

8295
class Event(EventResultIteratorBase[_TParams, _TResult]):
83-
def __call__(self, *__args: _TParams.args, **__kwargs: _TParams.kwargs) -> List[_TResult]:
84-
return list(self._notify(*__args, **__kwargs))
96+
def __call__(
97+
self,
98+
*__args: _TParams.args,
99+
callback_filter: Optional[Callable[[Callable[..., Any]], bool]] = None,
100+
**__kwargs: _TParams.kwargs,
101+
) -> List[_TResult]:
102+
return list(self._notify(*__args, callback_filter=callback_filter, **__kwargs))
85103

86104

87105
_TEvent = TypeVar("_TEvent")
@@ -125,31 +143,3 @@ def __init__(self, _func: Callable[_TParams, _TResult]) -> None:
125143
class event(EventDescriptorBase[_TParams, _TResult, Event[_TParams, _TResult]]): # noqa: N801
126144
def __init__(self, _func: Callable[_TParams, _TResult]) -> None:
127145
super().__init__(_func, Event[_TParams, _TResult])
128-
129-
130-
class ThreadedEventResultIteratorBase(EventResultIteratorBase[_TParams, _TResult]):
131-
def _notify(self, *__args: _TParams.args, **__kwargs: _TParams.kwargs) -> Iterator[_TResult]:
132-
for method in set(self):
133-
yield method(*__args, **__kwargs)
134-
135-
136-
class ThreadedEventIterator(ThreadedEventResultIteratorBase[_TParams, _TResult]):
137-
def __call__(self, *__args: _TParams.args, **__kwargs: _TParams.kwargs) -> Iterator[_TResult]:
138-
return self._notify(*__args, **__kwargs)
139-
140-
141-
class ThreadedEvent(ThreadedEventResultIteratorBase[_TParams, _TResult]):
142-
def __call__(self, *__args: _TParams.args, **__kwargs: _TParams.kwargs) -> List[_TResult]:
143-
return list(self._notify(*__args, **__kwargs))
144-
145-
146-
class threaded_event_iterator( # noqa: N801
147-
EventDescriptorBase[_TParams, _TResult, ThreadedEventIterator[_TParams, _TResult]]
148-
):
149-
def __init__(self, _func: Callable[_TParams, _TResult]) -> None:
150-
super().__init__(_func, ThreadedEventIterator[_TParams, _TResult])
151-
152-
153-
class threaded_event(EventDescriptorBase[_TParams, _TResult, ThreadedEvent[_TParams, _TResult]]): # noqa: N801
154-
def __init__(self, _func: Callable[_TParams, _TResult]) -> None:
155-
super().__init__(_func, ThreadedEvent[_TParams, _TResult])

packages/jsonrpc2/src/robotcode/jsonrpc2/protocol.py

+78-63
Original file line numberDiff line numberDiff line change
@@ -144,12 +144,21 @@ class InvalidProtocolVersionError(JsonRPCParseError):
144144
pass
145145

146146

147-
class RpcMethodEntry(NamedTuple):
147+
@dataclass
148+
class RpcMethodEntry:
148149
name: str
149150
method: Callable[..., Any]
150151
param_type: Optional[Type[Any]]
151152
cancelable: bool
152153

154+
_is_coroutine: Optional[bool] = field(default=None, init=False)
155+
156+
@property
157+
def is_coroutine(self) -> bool:
158+
if self._is_coroutine is None:
159+
self._is_coroutine = inspect.iscoroutinefunction(self.method)
160+
return self._is_coroutine
161+
153162

154163
@runtime_checkable
155164
class RpcMethod(Protocol):
@@ -622,7 +631,7 @@ async def handle_error(self, message: JsonRPCError) -> None:
622631
JsonRPCErrorException(message.error.code, message.error.message, message.error.data)
623632
)
624633
else:
625-
self.__logger.warning(lambda: f"Response for {message} is already done.")
634+
self.__logger.warning(lambda: f"Error Response for {message} is already done.")
626635

627636
except (SystemExit, KeyboardInterrupt):
628637
raise
@@ -707,53 +716,57 @@ async def handle_request(self, message: JsonRPCRequest) -> None:
707716

708717
params = self._convert_params(e.method, e.param_type, message.params)
709718

710-
if (
711-
isinstance(e.method, HasThreaded)
712-
and e.method.__threaded__
713-
or inspect.ismethod(e.method)
714-
and isinstance(e.method.__func__, HasThreaded)
715-
and e.method.__func__.__threaded__
716-
):
717-
task = run_coroutine_in_thread(
718-
ensure_coroutine(cast(Callable[..., Any], e.method)), *params[0], **params[1]
719-
)
719+
if not e.is_coroutine:
720+
self.send_response(message.id, e.method(*params[0], **params[1]))
720721
else:
721-
task = create_sub_task(
722-
ensure_coroutine(e.method)(*params[0], **params[1]),
723-
name=message.method,
724-
)
722+
if (
723+
isinstance(e.method, HasThreaded)
724+
and e.method.__threaded__
725+
or inspect.ismethod(e.method)
726+
and isinstance(e.method.__func__, HasThreaded)
727+
and e.method.__func__.__threaded__
728+
):
729+
task = run_coroutine_in_thread(
730+
ensure_coroutine(cast(Callable[..., Any], e.method)), *params[0], **params[1]
731+
)
732+
else:
733+
task = create_sub_task(
734+
ensure_coroutine(e.method)(*params[0], **params[1]),
735+
name=message.method,
736+
)
725737

726-
with self._received_request_lock:
727-
self._received_request[message.id] = ReceivedRequestEntry(task, message, e.cancelable)
728-
729-
def done(t: asyncio.Future[Any]) -> None:
730-
try:
731-
if not t.cancelled():
732-
ex = t.exception()
733-
if ex is not None:
734-
self.__logger.exception(ex, exc_info=ex)
735-
raise JsonRPCErrorException(JsonRPCErrors.INTERNAL_ERROR, f"{type(ex).__name__}: {ex}") from ex
736-
737-
self.send_response(message.id, t.result())
738-
except asyncio.CancelledError:
739-
self.__logger.debug(lambda: f"request message {message!r} canceled")
740-
self.send_error(JsonRPCErrors.REQUEST_CANCELLED, "Request canceled.", id=message.id)
741-
except (SystemExit, KeyboardInterrupt):
742-
raise
743-
except JsonRPCErrorException as e:
744-
self.send_error(e.code, e.message or f"{type(e).__name__}: {e}", id=message.id, data=e.data)
745-
except BaseException as e:
746-
self.__logger.exception(e)
747-
self.send_error(JsonRPCErrors.INTERNAL_ERROR, f"{type(e).__name__}: {e}", id=message.id)
748-
finally:
749-
with self._received_request_lock:
750-
self._received_request.pop(message.id, None)
751-
752-
task.add_done_callback(done)
753-
754-
await task
738+
with self._received_request_lock:
739+
self._received_request[message.id] = ReceivedRequestEntry(task, message, e.cancelable)
740+
741+
def done(t: asyncio.Future[Any]) -> None:
742+
try:
743+
if not t.cancelled():
744+
ex = t.exception()
745+
if ex is not None:
746+
self.__logger.exception(ex, exc_info=ex)
747+
raise JsonRPCErrorException(
748+
JsonRPCErrors.INTERNAL_ERROR, f"{type(ex).__name__}: {ex}"
749+
) from ex
750+
751+
self.send_response(message.id, t.result())
752+
except asyncio.CancelledError:
753+
self.__logger.debug(lambda: f"request message {message!r} canceled")
754+
self.send_error(JsonRPCErrors.REQUEST_CANCELLED, "Request canceled.", id=message.id)
755+
except (SystemExit, KeyboardInterrupt):
756+
raise
757+
except JsonRPCErrorException as e:
758+
self.send_error(e.code, e.message or f"{type(e).__name__}: {e}", id=message.id, data=e.data)
759+
except BaseException as e:
760+
self.__logger.exception(e)
761+
self.send_error(JsonRPCErrors.INTERNAL_ERROR, f"{type(e).__name__}: {e}", id=message.id)
762+
finally:
763+
with self._received_request_lock:
764+
self._received_request.pop(message.id, None)
765+
766+
task.add_done_callback(done)
767+
768+
await task
755769

756-
@__logger.call
757770
def cancel_request(self, id: Union[int, str, None]) -> None:
758771
with self._received_request_lock:
759772
entry = self._received_request.get(id, None)
@@ -762,8 +775,7 @@ def cancel_request(self, id: Union[int, str, None]) -> None:
762775
self.__logger.debug(lambda: f"try to cancel request {entry.request if entry is not None else ''}")
763776
entry.future.cancel()
764777

765-
@__logger.call
766-
async def cancel_all_received_request(self) -> None:
778+
def cancel_all_received_request(self) -> None:
767779
for entry in self._received_request.values():
768780
if entry is not None and entry.cancelable and entry.future is not None and not entry.future.cancelled():
769781
entry.future.cancel()
@@ -778,23 +790,26 @@ async def handle_notification(self, message: JsonRPCNotification) -> None:
778790
try:
779791
params = self._convert_params(e.method, e.param_type, message.params)
780792

781-
if (
782-
isinstance(e.method, HasThreaded)
783-
and e.method.__threaded__
784-
or inspect.ismethod(e.method)
785-
and isinstance(e.method.__func__, HasThreaded)
786-
and e.method.__func__.__threaded__
787-
):
788-
task = run_coroutine_in_thread(
789-
ensure_coroutine(cast(Callable[..., Any], e.method)), *params[0], **params[1]
790-
)
793+
if not e.is_coroutine:
794+
e.method(*params[0], **params[1])
791795
else:
792-
task = create_sub_task(
793-
ensure_coroutine(e.method)(*params[0], **params[1]),
794-
name=message.method,
795-
)
796+
if (
797+
isinstance(e.method, HasThreaded)
798+
and e.method.__threaded__
799+
or inspect.ismethod(e.method)
800+
and isinstance(e.method.__func__, HasThreaded)
801+
and e.method.__func__.__threaded__
802+
):
803+
task = run_coroutine_in_thread(
804+
ensure_coroutine(cast(Callable[..., Any], e.method)), *params[0], **params[1]
805+
)
806+
else:
807+
task = create_sub_task(
808+
ensure_coroutine(e.method)(*params[0], **params[1]),
809+
name=message.method,
810+
)
796811

797-
await task
812+
await task
798813
except asyncio.CancelledError:
799814
pass
800815
except (SystemExit, KeyboardInterrupt):

packages/language_server/src/robotcode/language_server/common/parts/code_action.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ async def _text_document_code_action(
7171
) -> Optional[List[Union[Command, CodeAction]]]:
7272
results: List[Union[Command, CodeAction]] = []
7373

74-
document = await self.parent.documents.get(text_document.uri)
74+
document = self.parent.documents.get(text_document.uri)
7575
if document is None:
7676
return None
7777

packages/language_server/src/robotcode/language_server/common/parts/code_lens.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ async def _text_document_code_lens(
4949
self, text_document: TextDocumentIdentifier, *args: Any, **kwargs: Any
5050
) -> Optional[List[CodeLens]]:
5151
results: List[CodeLens] = []
52-
document = await self.parent.documents.get(text_document.uri)
52+
document = self.parent.documents.get(text_document.uri)
5353
if document is None:
5454
return None
5555

packages/language_server/src/robotcode/language_server/common/parts/completion.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ async def _text_document_completion(
9797
if context is not None and context.trigger_kind == CompletionTriggerKind.TRIGGER_CHARACTER:
9898
await asyncio.sleep(0.25)
9999

100-
document = await self.parent.documents.get(text_document.uri)
100+
document = self.parent.documents.get(text_document.uri)
101101
if document is None:
102102
return None
103103

packages/language_server/src/robotcode/language_server/common/parts/declaration.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ async def _text_document_declaration(
6161
locations: List[Location] = []
6262
location_links: List[LocationLink] = []
6363

64-
document = await self.parent.documents.get(text_document.uri)
64+
document = self.parent.documents.get(text_document.uri)
6565
if document is None:
6666
return None
6767

@@ -85,13 +85,13 @@ async def _text_document_declaration(
8585

8686
if locations:
8787
for location in locations:
88-
doc = await self.parent.documents.get(location.uri)
88+
doc = self.parent.documents.get(location.uri)
8989
if doc is not None:
9090
location.range = doc.range_to_utf16(location.range)
9191

9292
if location_links:
9393
for location_link in location_links:
94-
doc = await self.parent.documents.get(location_link.target_uri)
94+
doc = self.parent.documents.get(location_link.target_uri)
9595
if doc is not None:
9696
location_link.target_range = doc.range_to_utf16(location_link.target_range)
9797
location_link.target_selection_range = doc.range_to_utf16(location_link.target_selection_range)

packages/language_server/src/robotcode/language_server/common/parts/definition.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ async def _text_document_definition(
5555
locations: List[Location] = []
5656
location_links: List[LocationLink] = []
5757

58-
document = await self.parent.documents.get(text_document.uri)
58+
document = self.parent.documents.get(text_document.uri)
5959
if document is None:
6060
return None
6161

@@ -81,13 +81,13 @@ async def _text_document_definition(
8181

8282
if locations:
8383
for location in locations:
84-
doc = await self.parent.documents.get(location.uri)
84+
doc = self.parent.documents.get(location.uri)
8585
if doc is not None:
8686
location.range = doc.range_to_utf16(location.range)
8787

8888
if location_links:
8989
for location_link in location_links:
90-
doc = await self.parent.documents.get(location_link.target_uri)
90+
doc = self.parent.documents.get(location_link.target_uri)
9191
if doc is not None:
9292
location_link.target_range = doc.range_to_utf16(location_link.target_range)
9393
location_link.target_selection_range = doc.range_to_utf16(location_link.target_selection_range)

packages/language_server/src/robotcode/language_server/common/parts/diagnostics.py

+7-4
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,10 @@ async def force_refresh_document(self, document: TextDocument, refresh: bool = T
281281

282282
@_logger.call
283283
@threaded()
284-
async def on_did_close(self, sender: Any, document: TextDocument) -> None:
284+
def on_did_close(self, sender: Any, document: TextDocument) -> None:
285+
create_sub_task(self._close_diagnostics_for_document(document), loop=self.diagnostics_loop)
286+
287+
async def _close_diagnostics_for_document(self, document: TextDocument) -> None:
285288
if await self.get_diagnostics_mode(document.uri) == DiagnosticsMode.WORKSPACE:
286289
return
287290

@@ -464,7 +467,7 @@ async def _get_diagnostics_for_document(
464467
d.range = document.range_to_utf16(d.range)
465468

466469
for r in d.related_information or []:
467-
doc = await self.parent.documents.get(r.location.uri)
470+
doc = self.parent.documents.get(r.location.uri)
468471
if doc is not None:
469472
r.location.range = doc.range_to_utf16(r.location.range)
470473

@@ -494,7 +497,7 @@ def publish_diagnostics(self, document: TextDocument, diagnostics: List[Diagnost
494497
),
495498
)
496499

497-
async def update_document_diagnostics(self, sender: Any, document: TextDocument) -> None:
500+
def update_document_diagnostics(self, sender: Any, document: TextDocument) -> None:
498501
self.create_document_diagnostics_task(document, True)
499502

500503
@rpc_method(name="textDocument/diagnostic", param_type=DocumentDiagnosticParams)
@@ -513,7 +516,7 @@ async def _text_document_diagnostic(
513516
LSPErrorCodes.SERVER_CANCELLED, "Server not initialized.", DiagnosticServerCancellationData(True)
514517
)
515518

516-
document = await self.parent.documents.get(text_document.uri)
519+
document = self.parent.documents.get(text_document.uri)
517520
if document is None:
518521
raise JsonRPCErrorException(LSPErrorCodes.SERVER_CANCELLED, f"Document {text_document!r} not found.")
519522

packages/language_server/src/robotcode/language_server/common/parts/document_highlight.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ async def _text_document_document_highlight(
5050
) -> Optional[List[DocumentHighlight]]:
5151
highlights: List[DocumentHighlight] = []
5252

53-
document = await self.parent.documents.get(text_document.uri)
53+
document = self.parent.documents.get(text_document.uri)
5454
if document is None:
5555
return None
5656

0 commit comments

Comments
 (0)