import { Injectable } from '@angular/core';
import { isNil } from '@core/utils/nil/is-nil';
import {
  BehaviorSubject,
  fromEvent,
  map,
  Observable,
  of,
  switchMap,
  tap,
} from 'rxjs';
import { environment } from '@environments/environment';
import { ApiUrlFactory } from '@api/common/factories/api-url.factory';
import { HttpClient, HttpResponseBase } from '@angular/common/http';
import { ApiHelperService } from '@api/common/services/api-helper.service';
import { Nillable } from '@core/utils/nil/nillable';
import { blank } from '@core/utils/rxjs/blank';
import { NotificationCountResponse } from '@api/notification/responses/notification-count.response';
import { MessageEventName } from '@api/websocket/enums/message-event-name';

const { websocket } = environment;

@Injectable({
  providedIn: 'root',
})
export class WebsocketService {
  private connection!: Nillable<EventSource>;
  private reconnectAttempt = 0;
  private reconnectTimout: Nillable<ReturnType<typeof setTimeout>>;

  private readonly reconnectDelay = 2000;
  private readonly maxReconnectAttempts = 20;
  private readonly reconnectSource$ = new BehaviorSubject(void 0);

  constructor(
    private readonly http: HttpClient,
    private readonly helper: ApiHelperService,
    private readonly urlFactory: ApiUrlFactory
  ) {}

  get reconnect$(): Observable<void> {
    return this.reconnectSource$.asObservable();
  }

  connect(): Observable<void> {
    if (!isNil(this.connection)) {
      return of(void 0);
    }

    return this.discover().pipe(
      tap(response => this.createConnection(response)),
      blank()
    );
  }

  disconnect(): void {
    this.shutdown();
  }

  receiveNotificationRead(): Observable<NotificationCountResponse> {
    return this.reconnect$.pipe(
      switchMap(() => {
        return fromEvent<MessageEvent>(
          this.connection!,
          MessageEventName.NotificationRead
        ).pipe(
          map(event => event.data),
          map(this.helper.bodyToResponse(NotificationCountResponse))
        );
      })
    );
  }

  receiveNotificationCreate(): Observable<NotificationCountResponse> {
    return this.reconnect$.pipe(
      switchMap(() => {
        return fromEvent<MessageEvent>(
          this.connection!,
          MessageEventName.NotificationCreated
        ).pipe(
          map(event => event.data),
          map(this.helper.bodyToResponse(NotificationCountResponse))
        );
      })
    );
  }

  reconnect(): void {
    if (this.reconnectAttempt < this.maxReconnectAttempts) {
      this.reconnectTimout = setTimeout(() => {
        this.connect().subscribe(() => this.reconnectSource$.next(void 0));
      }, this.reconnectDelay);
    }

    this.reconnectAttempt++;
  }

  private shutdown(): void {
    if (isNil(this.connection)) {
      return;
    }

    this.connection.close();
    this.connection = null;

    if (!isNil(this.reconnectTimout)) {
      this.reconnectTimout = null;
    }
  }

  private discover(): Observable<HttpResponseBase> {
    const {
      WEBSOCKET: { HOST, DISCOVER },
    } = websocket;
    const url = this.urlFactory.simple(HOST, DISCOVER);

    return this.http.get(url, { observe: 'response', withCredentials: true });
  }

  private createConnection(response: HttpResponseBase): void {
    this.connection = new EventSource(this.connectionUrl(response), {
      withCredentials: true,
    });
    this.connection.onerror = (): void => this.handleConnectionError();
  }

  private connectionUrl(response: HttpResponseBase): string {
    const link = response.headers.get('link');
    const matches = link!.match(
      /<([^>]+)>;\s+rel=(?:mercure|"[^"]*mercure[^"]*")/
    );
    const url = new URL(matches![1]);

    url.searchParams.append('topic', '*');

    return url.toString();
  }

  private handleConnectionError(): void {
    if (isNil(this.connection)) {
      return;
    }

    this.shutdown();

    if (this.reconnectAttempt < this.maxReconnectAttempts) {
      this.reconnect();

      return;
    }

    throw new Error(
      'Połączenie z serwerem PUSH zamknięte permanentnie. Nie nastąpi więcej prób wznowienia połączenia.'
    );
  }
}
