Skip to content

Discovery cleanup #1181

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions arduino-ide-extension/src/node/arduino-ide-backend-module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -351,10 +351,10 @@ export default new ContainerModule((bind, unbind, isBound, rebind) => {
bind(ILogger)
.toDynamicValue((ctx) => {
const parentLogger = ctx.container.get<ILogger>(ILogger);
return parentLogger.child('discovery-log'); // TODO: revert
return parentLogger.child('discovery');
})
.inSingletonScope()
.whenTargetNamed('discovery-log'); // TODO: revert
.whenTargetNamed('discovery');

// Logger for the CLI config service. From the CLI config (FS path aware), we make a URI-aware app config.
bind(ILogger)
Expand Down
112 changes: 54 additions & 58 deletions arduino-ide-extension/src/node/board-discovery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@ import { Emitter, Event } from '@theia/core/lib/common/event';
import { DisposableCollection } from '@theia/core/lib/common/disposable';
import { Disposable } from '@theia/core/shared/vscode-languageserver-protocol';
import { ArduinoCoreServiceClient } from './cli-protocol/cc/arduino/cli/commands/v1/commands_grpc_pb';
import { v4 } from 'uuid';
import { ServiceError } from './service-error';
import { BackendApplicationContribution } from '@theia/core/lib/node';
import { BackendApplicationContribution } from '@theia/core/lib/node/backend-application';
import { Deferred } from '@theia/core/lib/common/promise-util';

type Duplex = ClientDuplexStream<BoardListWatchRequest, BoardListWatchResponse>;
interface StreamWrapper extends Disposable {
interface DisposableDuplex extends Disposable {
readonly stream: Duplex;
readonly uuid: string; // For logging only
}

/**
Expand All @@ -41,15 +39,15 @@ export class BoardDiscovery
implements BackendApplicationContribution
{
@inject(ILogger)
@named('discovery-log')
@named('discovery')
private readonly logger: ILogger;

@inject(NotificationServiceServer)
private readonly notificationService: NotificationServiceServer;

private watching: Deferred<void> | undefined;
private stopping: Deferred<void> | undefined;
private wrapper: StreamWrapper | undefined;
private duplex: DisposableDuplex | undefined;
private readonly onStreamDidEndEmitter = new Emitter<void>(); // sent from the CLI when the discovery process is killed for example after the indexes update and the core client re-initialization.
private readonly onStreamDidCancelEmitter = new Emitter<void>(); // when the watcher is canceled by the IDE2
private readonly toDisposeOnStopWatch = new DisposableCollection();
Expand All @@ -66,59 +64,50 @@ export class BoardDiscovery
* }
* ```
*/
private _availablePorts: AvailablePorts = {};
get availablePorts(): AvailablePorts {
return this._availablePorts;
private _state: AvailablePorts = {};
get state(): AvailablePorts {
return this._state;
}

onStart(): void {
this.start();
this.onClientDidRefresh(() => this.restart());
}

private async restart(): Promise<void> {
this.logger.info('restarting before stop');
await this.stop();
this.logger.info('restarting after stop');
return this.start();
this.onClientWillRefresh(() => this.stop());
this.onClientDidRefresh(() => this.start());
}

onStop(): void {
this.stop();
}

async stop(restart = false): Promise<void> {
this.logger.info('stop');
async stop(): Promise<void> {
this.logger.debug('stop');
if (this.stopping) {
this.logger.info('stop already stopping');
this.logger.debug('stop already stopping');
return this.stopping.promise;
}
if (!this.watching) {
return;
}
this.stopping = new Deferred();
this.logger.info('>>> Stopping boards watcher...');
this.logger.debug('>>> Stopping boards watcher...');
return new Promise<void>((resolve, reject) => {
const timeout = this.createTimeout(10_000, reject);
const toDispose = new DisposableCollection();
const waitForEvent = (event: Event<unknown>) =>
const waitForEvent = (event: Event<unknown>, name: string) =>
event(() => {
this.logger.info('stop received event: either end or cancel');
this.logger.debug(`stop received event: ${name}`);
toDispose.dispose();
this.stopping?.resolve();
this.stopping = undefined;
this.logger.info('stop stopped');
this.logger.debug('cancelled boards watcher');
resolve();
if (restart) {
this.start();
}
});
toDispose.pushAll([
timeout,
waitForEvent(this.onStreamDidEndEmitter.event),
waitForEvent(this.onStreamDidCancelEmitter.event),
waitForEvent(this.onStreamDidEndEmitter.event, 'end'),
waitForEvent(this.onStreamDidCancelEmitter.event, 'cancel'),
]);
this.logger.info('Canceling boards watcher...');
this.logger.debug('Canceling boards watcher...');
this.toDisposeOnStopWatch.dispose();
});
}
Expand Down Expand Up @@ -154,22 +143,22 @@ export class BoardDiscovery
});
}

private async createWrapper(
private async createDuplex(
client: ArduinoCoreServiceClient
): Promise<StreamWrapper> {
if (this.wrapper) {
): Promise<DisposableDuplex> {
if (this.duplex) {
throw new Error(`Duplex was already set.`);
}
const stream = client
.boardListWatch()
.on('end', () => {
this.logger.info('received end');
this.logger.debug('received end'); // after the core client re-initialization, the CLI kills the watcher process.
this.onStreamDidEndEmitter.fire();
})
.on('error', (error) => {
this.logger.info('error received');
this.logger.debug('error received');
if (ServiceError.isCancel(error)) {
this.logger.info('cancel error received!');
this.logger.debug('cancel error received!');
this.onStreamDidCancelEmitter.fire();
} else {
this.logger.error(
Expand All @@ -181,14 +170,11 @@ export class BoardDiscovery
});
const wrapper = {
stream,
uuid: v4(),
dispose: () => {
this.logger.info('disposing requesting cancel');
// Cancelling the stream will kill the discovery `builtin:mdns-discovery process`.
// The client (this class) will receive a `{"eventType":"quit","error":""}` response from the CLI.
this.logger.debug('disposing requesting cancel');
stream.cancel();
this.logger.info('disposing canceled');
this.wrapper = undefined;
this.logger.debug('disposing canceled');
this.duplex = undefined;
},
};
this.toDisposeOnStopWatch.pushAll([
Expand All @@ -197,6 +183,16 @@ export class BoardDiscovery
this.watching?.reject(new Error(`Stopping watcher.`));
this.watching = undefined;
}),
Disposable.create(() => {
const oldState = deepClone(this._state);
const boards = this.getAttachedBoards(oldState);
const ports = this.getAvailablePorts(oldState);
this._state = {};
this.notificationService.notifyAttachedBoardsDidChange({
newState: { boards: [], ports: [] },
oldState: { boards, ports },
});
}),
]);
return wrapper;
}
Expand All @@ -214,24 +210,24 @@ export class BoardDiscovery
}

async start(): Promise<void> {
this.logger.info('start');
this.logger.debug('start');
if (this.stopping) {
this.logger.info('start is stopping wait');
this.logger.debug('start is stopping wait');
await this.stopping.promise;
this.logger.info('start stopped');
this.logger.debug('start stopped');
}
if (this.watching) {
this.logger.info('start already watching');
this.logger.debug('start already watching');
return this.watching.promise;
}
this.watching = new Deferred();
this.logger.info('start new deferred');
this.logger.debug('start new deferred');
const { client, instance } = await this.coreClient;
const wrapper = await this.createWrapper(client);
const wrapper = await this.createDuplex(client);
wrapper.stream.on('data', async (resp: BoardListWatchResponse) => {
this.logger.info('onData', this.toJson(resp));
this.logger.debug('onData', this.toJson(resp));
if (resp.getEventType() === 'quit') {
this.logger.info('quit received');
this.logger.debug('quit received');
this.stop();
return;
}
Expand All @@ -251,8 +247,8 @@ export class BoardDiscovery
throw new Error(`Unexpected event type: '${resp.getEventType()}'`);
}

const oldState = deepClone(this._availablePorts);
const newState = deepClone(this._availablePorts);
const oldState = deepClone(this._state);
const newState = deepClone(this._state);

const address = (detectedPort as any).getPort().getAddress();
const protocol = (detectedPort as any).getPort().getProtocol();
Expand Down Expand Up @@ -320,21 +316,21 @@ export class BoardDiscovery
},
};

this._availablePorts = newState;
this._state = newState;
this.notificationService.notifyAttachedBoardsDidChange(event);
}
});
this.logger.info('start request start watch');
this.logger.debug('start request start watch');
await this.requestStartWatch(
new BoardListWatchRequest().setInstance(instance),
wrapper.stream
);
this.logger.info('start requested start watch');
this.logger.debug('start requested start watch');
this.watching.resolve();
this.logger.info('start resolved watching');
this.logger.debug('start resolved watching');
}

getAttachedBoards(state: AvailablePorts = this.availablePorts): Board[] {
getAttachedBoards(state: AvailablePorts = this.state): Board[] {
const attachedBoards: Board[] = [];
for (const portID of Object.keys(state)) {
const [, boards] = state[portID];
Expand All @@ -343,7 +339,7 @@ export class BoardDiscovery
return attachedBoards;
}

getAvailablePorts(state: AvailablePorts = this.availablePorts): Port[] {
getAvailablePorts(state: AvailablePorts = this.state): Port[] {
const availablePorts: Port[] = [];
for (const portID of Object.keys(state)) {
const [port] = state[portID];
Expand Down
2 changes: 1 addition & 1 deletion arduino-ide-extension/src/node/boards-service-impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export class BoardsServiceImpl
protected readonly boardDiscovery: BoardDiscovery;

async getState(): Promise<AvailablePorts> {
return this.boardDiscovery.availablePorts;
return this.boardDiscovery.state;
}

async getAttachedBoards(): Promise<Board[]> {
Expand Down
10 changes: 10 additions & 0 deletions arduino-ide-extension/src/node/core-client-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ export class CoreClientProvider {
private readonly onClientReady = this.onClientReadyEmitter.event;
private readonly onClientDidRefreshEmitter =
new Emitter<CoreClientProvider.Client>();
private readonly onClientWillRefreshEmitter = new Emitter<void>();

@postConstruct()
protected init(): void {
Expand Down Expand Up @@ -94,6 +95,10 @@ export class CoreClientProvider {
return this.onClientDidRefreshEmitter.event;
}

get onClientWillRefresh(): Event<void> {
return this.onClientWillRefreshEmitter.event;
}

/**
* Encapsulates both the gRPC core client creation (`CreateRequest`) and initialization (`InitRequest`).
*/
Expand Down Expand Up @@ -253,6 +258,7 @@ export class CoreClientProvider {
private async refreshIndexes(): Promise<void> {
const client = this._client;
if (client) {
this.onClientWillRefreshEmitter.fire();
const progressHandler = this.createProgressHandler();
try {
await this.updateIndexes(client, progressHandler);
Expand Down Expand Up @@ -415,6 +421,10 @@ export abstract class CoreClientAware {
protected get onClientDidRefresh(): Event<CoreClientProvider.Client> {
return this.coreClientProvider.onClientDidRefresh;
}

protected get onClientWillRefresh(): Event<void> {
return this.coreClientProvider.onClientWillRefresh;
}
}

class IndexUpdateRequiredBeforeInitError extends Error {
Expand Down