import { Injectable, NgZone, OnDestroy } from '@angular/core';
import { BehaviorSubject, Subject, exhaustMap, takeUntil } from 'rxjs';
import { PortChangesDelta } from '../models';
import { ServerConnection, SignalRService } from './signalR.service';

@Injectable()
export class SignalRPortUpdateService implements OnDestroy {
    private onPortUpdateSubject = new BehaviorSubject<PortChangesDelta | null>(null);
    onPortUpdate$ = this.onPortUpdateSubject.asObservable();
    private connection!: ServerConnection;
    private destroy$ = new Subject<void>();

    constructor(private signalRService: SignalRService, private zone: NgZone) {}

    ngOnDestroy(): void {
        this.destroy$.next();
        this.destroy$.complete();
    }

    subscribeToPortUpdate(ids: string | string[], url = 'portsHub', methodName='onPortUpdate'): void {
        this.connection = this.signalRService.createConnection(url);
        this.connection.on(methodName, (data: PortChangesDelta) => {
            // https://github.com/angular/angular/issues/6431#issuecomment-171220101
            this.zone.run(() => {
                if (data) {
                    this.onPortUpdateSubject.next(data);
                }
            });
        });

        this.connection
            .start()
            .pipe(
                takeUntil(this.destroy$),
                exhaustMap(() => this.connection.send('SubscribeToPort', ids))
            )
            .subscribe();
    }

    stopConnection(): void {
        if(!this.connection){
            return;
        }
        this.connection.stop().subscribe();
    }
}
