import { DestroyRef, inject, Injectable, NgZone } from '@angular/core';
import { BehaviorSubject, exhaustMap } from 'rxjs';
import { PortChangesDelta } from '../models';
import { ServerConnection, SignalRService } from './signalR.service';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';

@Injectable()
export class SignalRPortUpdateService {
    private readonly onPortUpdateSubject = new BehaviorSubject<PortChangesDelta | null>(null);
    onPortUpdate$ = this.onPortUpdateSubject.asObservable();
    private connection!: ServerConnection;
    private readonly destroyRef = inject(DestroyRef);

    constructor(private readonly signalRService: SignalRService, private readonly zone: NgZone) {}

    subscribeToPortUpdate(ids: string | string[], url = 'portsHub', methodName='onPortUpdate'): void {
        this.connection = this.signalRService.createConnection(url);
        this.connection.on(methodName, (data: PortChangesDelta) => {
            // https://github.com/angular/angular/issues/6431#issuecomment-171220101
            this.zone.run(() => {
                if (data) {
                    this.onPortUpdateSubject.next(data);
                }
            });
        });

        this.connection
            .start()
            .pipe(
                takeUntilDestroyed(this.destroyRef),
                exhaustMap(() => this.connection.send('SubscribeToPort', ids))
            )
            .subscribe();
    }

    stopConnection(): void {
        if(!this.connection){
            return;
        }
        this.connection.stop().subscribe();
    }
}
