From 5f3c065408171e9ddce4d5402572577e2c3951c9 Mon Sep 17 00:00:00 2001 From: Cristian Maglie Date: Wed, 20 Jan 2021 00:58:23 +0100 Subject: [PATCH 1/2] Handle progress messages sequentially --- handler/builder.go | 2 +- handler/handler.go | 10 +++------- handler/syncer.go | 8 +++++++- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/handler/builder.go b/handler/builder.go index 580e0d0..657c793 100644 --- a/handler/builder.go +++ b/handler/builder.go @@ -95,7 +95,7 @@ func (handler *InoHandler) rebuildEnvironmentLoop() { }() handler.dataMux.Lock() - handler.initializeWorkbench(nil) + handler.initializeWorkbench(context.Background(), nil) handler.dataMux.Unlock() done <- true close(done) diff --git a/handler/handler.go b/handler/handler.go index 36c9095..193f3a6 100644 --- a/handler/handler.go +++ b/handler/handler.go @@ -178,7 +178,7 @@ func (handler *InoHandler) HandleMessageFromIDE(ctx context.Context, conn *jsonr defer handler.dataMux.Unlock() log.Printf("LS --- initializing workbench (running)") - handler.initializeWorkbench(p) + handler.initializeWorkbench(ctx, p) // clangd should be running now... handler.clangdStarted.Broadcast() @@ -410,8 +410,6 @@ func (handler *InoHandler) HandleMessageFromIDE(ctx context.Context, conn *jsonr err = handler.ClangdConn.Notify(ctx, req.Method, params) } else { log.Printf(prefix + "sent to Clang") - ctx, cancel := context.WithTimeout(ctx, 800*time.Millisecond) - defer cancel() result, err = lsp.SendRequest(ctx, handler.ClangdConn, req.Method, params) } if err == nil && handler.buildSketchSymbolsLoad { @@ -450,7 +448,7 @@ func (handler *InoHandler) exit() { os.Exit(1) } -func (handler *InoHandler) initializeWorkbench(params *lsp.InitializeParams) error { +func (handler *InoHandler) initializeWorkbench(ctx context.Context, params *lsp.InitializeParams) error { currCppTextVersion := 0 if params != nil { log.Printf(" --> initialize(%s)\n", params.RootURI) @@ -495,8 +493,6 @@ func (handler *InoHandler) initializeWorkbench(params *lsp.InitializeParams) err }, } - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() if err := handler.ClangdConn.Notify(ctx, "textDocument/didChange", syncEvent); err != nil { log.Println(" error reinitilizing clangd:", err) return err @@ -513,7 +509,7 @@ func (handler *InoHandler) initializeWorkbench(params *lsp.InitializeParams) err } clangdStream := jsonrpc2.NewBufferedStream(clangdStdio, jsonrpc2.VSCodeObjectCodec{}) - clangdHandler := jsonrpc2.AsyncHandler(jsonrpc2.HandlerWithError(handler.FromClangd)) + clangdHandler := AsyncHandler{jsonrpc2.HandlerWithError(handler.FromClangd)} handler.ClangdConn = jsonrpc2.NewConn(context.Background(), clangdStream, clangdHandler) // Send initialization command to clangd diff --git a/handler/syncer.go b/handler/syncer.go index a8d4cbb..0611017 100644 --- a/handler/syncer.go +++ b/handler/syncer.go @@ -13,5 +13,11 @@ type AsyncHandler struct { // Handle handles a request or notification func (ah AsyncHandler) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) { - go ah.handler.Handle(ctx, conn, req) + switch req.Method { + case // Request that should not be parallelized + "$/progress": + ah.handler.Handle(ctx, conn, req) + default: // By default process all requests in parallel + go ah.handler.Handle(ctx, conn, req) + } } From ce6e1cdc0af0acb61a0a1d4804747c49807612a3 Mon Sep 17 00:00:00 2001 From: Cristian Maglie Date: Wed, 20 Jan 2021 18:32:23 +0100 Subject: [PATCH 2/2] Dramatically improved progress handling The progress requests from clangd are now cached and sent back to the IDE with a moderate pace. If a burst of progress reports is received by clangd only the latest one is proxied to the IDE. --- handler/builder.go | 30 ++----- handler/handler.go | 65 ++++++++++++--- handler/progress.go | 192 +++++++++++++++++++++++++++++++++++++++++++ handler/syncer.go | 2 +- lsp/protocol_test.go | 12 ++- lsp/service.go | 55 +++++++++---- 6 files changed, 298 insertions(+), 58 deletions(-) create mode 100644 handler/progress.go diff --git a/handler/builder.go b/handler/builder.go index 657c793..e42a4d3 100644 --- a/handler/builder.go +++ b/handler/builder.go @@ -54,24 +54,12 @@ func (handler *InoHandler) rebuildEnvironmentLoop() { // Regenerate preprocessed sketch! done := make(chan bool) go func() { - { - // Request a new progress token - req := &lsp.WorkDoneProgressCreateParams{Token: "arduinoLanguageServerRebuild"} - var resp lsp.WorkDoneProgressCreateResult - if err := handler.StdioConn.Call(context.Background(), "window/workDoneProgress/create", req, &resp, nil); err != nil { - log.Printf(" !!! could not create report progress: %s", err) - <-done - return - } - } - req := &lsp.ProgressParams{Token: "arduinoLanguageServerRebuild"} - req.Value = lsp.WorkDoneProgressBegin{ + handler.progressHandler.Create("arduinoLanguageServerRebuild") + handler.progressHandler.Begin("arduinoLanguageServerRebuild", &lsp.WorkDoneProgressBegin{ Title: "Building sketch", - } - if err := handler.StdioConn.Notify(context.Background(), "$/progress", req, nil); err != nil { - log.Printf(" !!! could not report progress: %s", err) - } + }) + count := 0 dots := []string{".", "..", "..."} for { @@ -79,16 +67,10 @@ func (handler *InoHandler) rebuildEnvironmentLoop() { case <-time.After(time.Millisecond * 400): msg := "compiling" + dots[count%3] count++ - req.Value = lsp.WorkDoneProgressReport{Message: &msg} - if err := handler.StdioConn.Notify(context.Background(), "$/progress", req, nil); err != nil { - log.Printf(" !!! could not report progress: %s", err) - } + handler.progressHandler.Report("arduinoLanguageServerRebuild", &lsp.WorkDoneProgressReport{Message: &msg}) case <-done: msg := "done" - req.Value = lsp.WorkDoneProgressEnd{Message: &msg} - if err := handler.StdioConn.Notify(context.Background(), "$/progress", req, nil); err != nil { - log.Printf(" !!! could not report progress: %s", err) - } + handler.progressHandler.End("arduinoLanguageServerRebuild", &lsp.WorkDoneProgressEnd{Message: &msg}) return } } diff --git a/handler/handler.go b/handler/handler.go index 193f3a6..c49da3b 100644 --- a/handler/handler.go +++ b/handler/handler.go @@ -47,6 +47,7 @@ type InoHandler struct { stdioNotificationCount int64 clangdNotificationCount int64 + progressHandler *ProgressProxyHandler clangdStarted *sync.Cond dataMux sync.RWMutex @@ -83,6 +84,9 @@ func NewInoHandler(stdio io.ReadWriteCloser, board lsp.Board) *InoHandler { stdStream := jsonrpc2.NewBufferedStream(stdio, jsonrpc2.VSCodeObjectCodec{}) var stdHandler jsonrpc2.Handler = jsonrpc2.HandlerWithError(handler.HandleMessageFromIDE) handler.StdioConn = jsonrpc2.NewConn(context.Background(), stdStream, stdHandler) + + handler.progressHandler = NewProgressProxy(handler.StdioConn) + if enableLogging { log.Println("Initial board configuration:", board) } @@ -1443,20 +1447,55 @@ func (handler *InoHandler) FromClangd(ctx context.Context, connection *jsonrpc2. } defer log.Printf(prefix + "(done)") - log.Printf(prefix + "(queued)") - switch req.Method { - case // No locking required - "$/progress", - "window/workDoneProgress/create": - case // Read lock - "textDocument/publishDiagnostics", - "workspace/applyEdit": - handler.dataMux.RLock() - defer handler.dataMux.RUnlock() - default: // Default to read lock - handler.dataMux.RLock() - defer handler.dataMux.RUnlock() + if req.Method == "window/workDoneProgress/create" { + params := lsp.WorkDoneProgressCreateParams{} + if err := json.Unmarshal(*req.Params, ¶ms); err != nil { + log.Printf(prefix+"error decoding window/workDoneProgress/create: %v", err) + return nil, err + } + handler.progressHandler.Create(params.Token) + return &lsp.WorkDoneProgressCreateResult{}, nil + } + + if req.Method == "$/progress" { + // data may be of many different types... + log.Printf(prefix + "decoding progress...") + params := lsp.ProgressParams{} + if err := json.Unmarshal(*req.Params, ¶ms); err != nil { + log.Printf(prefix+"error decoding progress: %v", err) + return nil, err + } + id := params.Token + + var begin lsp.WorkDoneProgressBegin + if err := json.Unmarshal(*params.Value, &begin); err == nil { + log.Printf(prefix+"begin %s %v", id, begin) + handler.progressHandler.Begin(id, &begin) + return nil, nil + } + + var report lsp.WorkDoneProgressReport + if err := json.Unmarshal(*params.Value, &report); err == nil { + log.Printf(prefix+"report %s %v", id, report) + handler.progressHandler.Report(id, &report) + return nil, nil + } + + var end lsp.WorkDoneProgressEnd + if err := json.Unmarshal(*params.Value, &end); err == nil { + log.Printf(prefix+"end %s %v", id, end) + handler.progressHandler.End(id, &end) + return nil, nil + } + + log.Printf(prefix + "error unsupported $/progress: " + string(*params.Value)) + return nil, errors.New("unsupported $/progress: " + string(*params.Value)) } + + // Default to read lock + log.Printf(prefix + "(queued)") + handler.dataMux.RLock() + defer handler.dataMux.RUnlock() log.Printf(prefix + "(running)") params, err := lsp.ReadParams(req.Method, req.Params) diff --git a/handler/progress.go b/handler/progress.go new file mode 100644 index 0000000..471a3c6 --- /dev/null +++ b/handler/progress.go @@ -0,0 +1,192 @@ +package handler + +import ( + "context" + "log" + "sync" + + "github.com/bcmi-labs/arduino-language-server/lsp" + "github.com/bcmi-labs/arduino-language-server/streams" + "github.com/sourcegraph/jsonrpc2" +) + +type ProgressProxyHandler struct { + conn *jsonrpc2.Conn + mux sync.Mutex + actionRequiredCond *sync.Cond + proxies map[string]*progressProxy +} + +type progressProxyStatus int + +const ( + progressProxyNew progressProxyStatus = iota + progressProxyCreated + progressProxyBegin + progressProxyReport + progressProxyEnd +) + +type progressProxy struct { + currentStatus progressProxyStatus + requiredStatus progressProxyStatus + beginReq *lsp.WorkDoneProgressBegin + reportReq *lsp.WorkDoneProgressReport + endReq *lsp.WorkDoneProgressEnd +} + +func NewProgressProxy(conn *jsonrpc2.Conn) *ProgressProxyHandler { + res := &ProgressProxyHandler{ + conn: conn, + proxies: map[string]*progressProxy{}, + } + res.actionRequiredCond = sync.NewCond(&res.mux) + go res.handlerLoop() + return res +} + +func (p *ProgressProxyHandler) handlerLoop() { + defer streams.CatchAndLogPanic() + + p.mux.Lock() + defer p.mux.Unlock() + + for { + p.actionRequiredCond.Wait() + + for id, proxy := range p.proxies { + for proxy.currentStatus != proxy.requiredStatus { + p.handleProxy(id, proxy) + } + } + + // Cleanup ended proxies + for id, proxy := range p.proxies { + if proxy.currentStatus == progressProxyEnd { + delete(p.proxies, id) + } + } + } +} + +func (p *ProgressProxyHandler) handleProxy(id string, proxy *progressProxy) { + ctx := context.Background() + switch proxy.currentStatus { + case progressProxyNew: + p.mux.Unlock() + var res lsp.WorkDoneProgressCreateResult + err := p.conn.Call(ctx, "window/workDoneProgress/create", &lsp.WorkDoneProgressCreateParams{Token: id}, &res) + p.mux.Lock() + + if err != nil { + log.Printf("ProgressHandler: error creating token %s: %v", id, err) + } else { + proxy.currentStatus = progressProxyCreated + } + + case progressProxyCreated: + p.mux.Unlock() + err := p.conn.Notify(ctx, "$/progress", lsp.ProgressParams{ + Token: id, + Value: lsp.Raw(proxy.beginReq), + }) + p.mux.Lock() + + proxy.beginReq = nil + if err != nil { + log.Printf("ProgressHandler: error sending begin req token %s: %v", id, err) + } else { + proxy.currentStatus = progressProxyBegin + } + + case progressProxyBegin: + if proxy.requiredStatus == progressProxyReport { + p.mux.Unlock() + err := p.conn.Notify(ctx, "$/progress", &lsp.ProgressParams{ + Token: id, + Value: lsp.Raw(proxy.reportReq)}) + p.mux.Lock() + + proxy.reportReq = nil + if err != nil { + log.Printf("ProgressHandler: error sending begin req token %s: %v", id, err) + } else { + proxy.requiredStatus = progressProxyBegin + } + + } else if proxy.requiredStatus == progressProxyEnd { + p.mux.Unlock() + err := p.conn.Notify(ctx, "$/progress", &lsp.ProgressParams{ + Token: id, + Value: lsp.Raw(proxy.endReq), + }) + p.mux.Lock() + + proxy.endReq = nil + if err != nil { + log.Printf("ProgressHandler: error sending begin req token %s: %v", id, err) + } else { + proxy.currentStatus = progressProxyEnd + } + + } + } +} + +func (p *ProgressProxyHandler) Create(id string) { + p.mux.Lock() + defer p.mux.Unlock() + + if _, opened := p.proxies[id]; opened { + // Already created + return + } + + p.proxies[id] = &progressProxy{ + currentStatus: progressProxyNew, + requiredStatus: progressProxyCreated, + } + p.actionRequiredCond.Broadcast() +} + +func (p *ProgressProxyHandler) Begin(id string, req *lsp.WorkDoneProgressBegin) { + p.mux.Lock() + defer p.mux.Unlock() + + proxy, ok := p.proxies[id] + if !ok { + return + } + + proxy.beginReq = req + proxy.requiredStatus = progressProxyBegin + p.actionRequiredCond.Broadcast() +} + +func (p *ProgressProxyHandler) Report(id string, req *lsp.WorkDoneProgressReport) { + p.mux.Lock() + defer p.mux.Unlock() + + proxy, ok := p.proxies[id] + if !ok { + return + } + + proxy.reportReq = req + proxy.requiredStatus = progressProxyReport + p.actionRequiredCond.Broadcast() +} + +func (p *ProgressProxyHandler) End(id string, req *lsp.WorkDoneProgressEnd) { + p.mux.Lock() + defer p.mux.Unlock() + + proxy, ok := p.proxies[id] + if !ok { + return + } + + proxy.endReq = req + proxy.requiredStatus = progressProxyEnd + p.actionRequiredCond.Broadcast() +} diff --git a/handler/syncer.go b/handler/syncer.go index 0611017..0a60cdc 100644 --- a/handler/syncer.go +++ b/handler/syncer.go @@ -15,7 +15,7 @@ type AsyncHandler struct { func (ah AsyncHandler) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) { switch req.Method { case // Request that should not be parallelized - "$/progress": + "window/workDoneProgress/create", "$/progress": ah.handler.Handle(ctx, conn, req) default: // By default process all requests in parallel go ah.handler.Handle(ctx, conn, req) diff --git a/lsp/protocol_test.go b/lsp/protocol_test.go index e1095e2..87f552a 100644 --- a/lsp/protocol_test.go +++ b/lsp/protocol_test.go @@ -68,14 +68,22 @@ func TestDocumentSymbolParse(t *testing.T) { func TestVariousMessages(t *testing.T) { x := &ProgressParams{ Token: "token", - Value: WorkDoneProgressBegin{ + Value: Raw(WorkDoneProgressBegin{ Title: "some work", - }, + }), } data, err := json.Marshal(&x) require.NoError(t, err) require.JSONEq(t, `{"token":"token", "value":{"kind":"begin","title":"some work"}}`, string(data)) + var begin WorkDoneProgressBegin + err = json.Unmarshal([]byte(`{"kind":"begin","title":"some work"}`), &begin) + require.NoError(t, err) + + var report WorkDoneProgressReport + err = json.Unmarshal([]byte(`{"kind":"report","message":"28/29","percentage":96.551724137931032}`), &report) + require.NoError(t, err) + msg := `{ "capabilities":{ "codeActionProvider":{ diff --git a/lsp/service.go b/lsp/service.go index c4de03d..de2e2a9 100644 --- a/lsp/service.go +++ b/lsp/service.go @@ -1058,8 +1058,8 @@ type SemanticHighlightingToken struct { } type ProgressParams struct { - Token string `json:"token"` - Value interface{} `json:"value"` + Token string `json:"token"` + Value *json.RawMessage `json:"value"` } type WorkDoneProgressCreateParams struct { @@ -1088,6 +1088,15 @@ type WorkDoneProgressBegin struct { Percentage *int `json:"percentage,omitempty"` } +func Raw(v json.Marshaler) *json.RawMessage { + data, err := v.MarshalJSON() + if err != nil { + panic("error marshaling: " + err.Error()) + } + dataRaw := json.RawMessage(data) + return &dataRaw +} + // MarshalJSON implements json.Marshaler. func (v WorkDoneProgressBegin) MarshalJSON() ([]byte, error) { return json.Marshal(struct { @@ -1102,8 +1111,11 @@ func (v WorkDoneProgressBegin) MarshalJSON() ([]byte, error) { // UnmarshalJSON implements json.Unmarshaler. func (v *WorkDoneProgressBegin) UnmarshalJSON(data []byte) error { type ProgressBegin struct { - WorkDoneProgressBegin - Kind string `json:"kind"` + Title string `json:"title"` + Cancellable *bool `json:"cancellable,omitempty"` + Message *string `json:"message,omitempty"` + Percentage *int `json:"percentage,omitempty"` + Kind string `json:"kind"` } var x ProgressBegin if err := json.Unmarshal(data, &x); err != nil { @@ -1112,31 +1124,36 @@ func (v *WorkDoneProgressBegin) UnmarshalJSON(data []byte) error { if x.Kind != "begin" { return errors.New(`expected kind == "begin"`) } - *v = x.WorkDoneProgressBegin + (*v).Title = x.Title + (*v).Cancellable = x.Cancellable + (*v).Message = x.Message + (*v).Percentage = x.Percentage return nil } type WorkDoneProgressReport struct { - Cancellable *bool `json:"cancellable,omitempty"` - Message *string `json:"message,omitempty"` - Percentage *int `json:"percentage,omitempty"` + Cancellable *bool `json:"cancellable,omitempty"` + Message *string `json:"message,omitempty"` + Percentage *float64 `json:"percentage,omitempty"` } // MarshalJSON implements json.Marshaler. func (v WorkDoneProgressReport) MarshalJSON() ([]byte, error) { return json.Marshal(struct { - Cancellable *bool `json:"cancellable,omitempty"` - Message *string `json:"message,omitempty"` - Percentage *int `json:"percentage,omitempty"` - Kind string `json:"kind"` + Cancellable *bool `json:"cancellable,omitempty"` + Message *string `json:"message,omitempty"` + Percentage *float64 `json:"percentage,omitempty"` + Kind string `json:"kind"` }{v.Cancellable, v.Message, v.Percentage, "report"}) } // UnmarshalJSON implements json.Unmarshaler. func (v *WorkDoneProgressReport) UnmarshalJSON(data []byte) error { type ProgressReport struct { - WorkDoneProgressReport - Kind string `json:"kind"` + Cancellable *bool `json:"cancellable,omitempty"` + Message *string `json:"message,omitempty"` + Percentage *float64 `json:"percentage,omitempty"` + Kind string `json:"kind"` } var x ProgressReport if err := json.Unmarshal(data, &x); err != nil { @@ -1145,7 +1162,9 @@ func (v *WorkDoneProgressReport) UnmarshalJSON(data []byte) error { if x.Kind != "report" { return errors.New(`expected kind == "report"`) } - *v = x.WorkDoneProgressReport + (*v).Cancellable = x.Cancellable + (*v).Message = x.Message + (*v).Percentage = x.Percentage return nil } @@ -1164,8 +1183,8 @@ func (v WorkDoneProgressEnd) MarshalJSON() ([]byte, error) { // UnmarshalJSON implements json.Unmarshaler. func (v *WorkDoneProgressEnd) UnmarshalJSON(data []byte) error { type ProgressEnd struct { - WorkDoneProgressEnd - Kind string `json:"kind"` + Message *string `json:"message,omitempty"` + Kind string `json:"kind"` } var x ProgressEnd if err := json.Unmarshal(data, &x); err != nil { @@ -1174,6 +1193,6 @@ func (v *WorkDoneProgressEnd) UnmarshalJSON(data []byte) error { if x.Kind != "end" { return errors.New(`expected kind == "end"`) } - *v = x.WorkDoneProgressEnd + (*v).Message = x.Message return nil }