/* eslint-disable lines-between-class-members */

import { BehaviorSubject, Observable, Subscription, fromEvent } from 'rxjs';
import { filter, map, share, switchMap } from 'rxjs/operators';
import { tag } from 'rxjs-spy/operators';
import { LocalDataTrack, RemoteDataTrack } from 'twilio-video';
import {
  EmptyRequestType,
  ExpertRequest,
  ExpertRequestBody,
  ExpertRequestType,
  HeliosEvent,
  IAcknowledgement,
  NonEmptyRequestType,
  ProtoValueWithDefaults,
} from '@bfly/telemed-interchange';

import filterEmpty from '../utils/filterEmpty';

export type ParsedHeliosEvent = ProtoValueWithDefaults<HeliosEvent>;

interface Deferred<T> {
  resolve(value: T): void;
  reject(err: any): void;
}
let id = 0;

const isArrayBuffer = (value: string | ArrayBuffer): value is ArrayBuffer =>
  value instanceof ArrayBuffer ||
  {}.toString.call(value) === '[object ArrayBuffer]';

const parseMessage = (buffer: ArrayBuffer) => {
  try {
    return HeliosEvent.decode(new Uint8Array(buffer)) as ParsedHeliosEvent;
  } catch (err) {
    console.error(err);
    return null;
  }
};

class HeliosDataService {
  private localData$ = new BehaviorSubject<LocalDataTrack | null>(null);

  private ackSubscription?: Subscription;

  private pendingRequests = new Map<string, Deferred<IAcknowledgement>>();

  messages$: Observable<ParsedHeliosEvent>;

  constructor(
    localData$: Observable<LocalDataTrack | null>,
    remoteData$: Observable<RemoteDataTrack | null>,
  ) {
    localData$.subscribe(ld => {
      this.localData$.next(ld);
    });

    this.messages$ = remoteData$.pipe(
      switchMap(track =>
        track != null
          ? fromEvent<[string | ArrayBuffer, typeof track]>(track!, 'message')
          : [],
      ),
      map(a => a[0]),
      filter(isArrayBuffer),
      map(parseMessage),
      filterEmpty(),
      tag('messages'),
      share(),
    );
  }

  private nextRequestId() {
    return `req_${++id}`;
  }

  private ensureListeningForAcks() {
    if (this.ackSubscription) return;

    this.ackSubscription = this.messages$
      .pipe(
        filter(e => e.event === 'ack'),
        map(e => e.ack!),
      )
      .subscribe(ack => {
        if (!ack.requestId || !this.pendingRequests.has(ack.requestId)) {
          return;
        }

        const deferred = this.pendingRequests.get(ack.requestId);
        this.pendingRequests.delete(ack.requestId);
        deferred!.resolve(ack);
      });
  }

  destroy() {
    this.ackSubscription!.unsubscribe();
  }

  send<K extends EmptyRequestType>(type: K): void;
  send<K extends NonEmptyRequestType>(
    type: K,
    rawRequest: NonNullable<ExpertRequestBody[K]>,
  ): void;
  send<K extends ExpertRequestType>(
    type: K,
    rawRequest?: NonNullable<ExpertRequestBody[K]>,
  ): void {
    const message = { [type]: rawRequest || {} };
    const notValidReason = ExpertRequest.verify(message);

    if (notValidReason) {
      throw new Error(notValidReason);
    }

    // this.ensureListeningForAcks();
    message.requestId = this.nextRequestId();

    const buffer = ExpertRequest.encode(message).finish();

    // Sending is best effort, and until the data track connects any commands
    // from web won't do anything (but UI is disabled anyway)
    this.localData$.value?.send(buffer);
  }
}

export default HeliosDataService;
