import { useState, useEffect, useRef } from "react";
import {
  EventStreamContentType,
  fetchEventSource,
} from "@microsoft/fetch-event-source";
import { Subject } from "rxjs";
import { ONE_MINUTE, ONE_SECOND } from "../const/const";
import { STREAM_API } from "../const/api";
import { usePageVisibility } from "react-page-visibility";

const NOT_FOUND = 404;
const UNAUTHORIZED = 401;
export function useStream(
  url: string,
  token: string | null,
  openWhenHidden = false
) {
  const [stream$] = useState(new Subject<any>());
  const [connId, setConnId] = useState(null);
  const isVisible = usePageVisibility();
  const ctrlRef = useRef<AbortController>(new AbortController());

  useEffect(() => {
    if (connId && token && (isVisible || openWhenHidden)) {
      // send heartbeat to the server every one minutes via POST request with an empty body
      const intvl = setInterval(async () => {
        const ctrl = new AbortController();
        const to = setTimeout(() => ctrl.abort("timeout"), ONE_SECOND * 5);
        try {
          const r = await fetch(`${STREAM_API}/ping/${connId}`, {
            method: "POST",
            headers: {
              Authorization: `Bearer ${token}`,
            },
            signal: ctrl.signal,
          });
          const status = r.status;
          if (status === UNAUTHORIZED) {
            stream$.error("unauthorized error");
          } else if (status === NOT_FOUND) {
            // dangling stream(SSE) client. need to terminate the connection and open again
            ctrlRef.current.abort();
            ctrlRef.current = new AbortController();
            clearInterval(intvl);
          }
        } catch (err) {
          console.error(err);
        }
        clearTimeout(to);
      }, ONE_MINUTE);
      return () => clearInterval(intvl);
    }
  }, [connId, isVisible, openWhenHidden, stream$, token]);

  useEffect(() => {
    if (!token || !url)
      return () => {
        ctrlRef.current.abort();
        ctrlRef.current = new AbortController();
      };
    const fetchData = async () => {
      await fetchEventSource(url, {
        method: "GET",
        headers: {
          Authorization: `Bearer ${token}`,
        },
        openWhenHidden,
        async onopen(response) {
          if (
            response.ok &&
            response.headers.get("content-type") === EventStreamContentType
          ) {
            return;
          } else if (response.status === 401) {
            stream$.error("unauthorized error");
          } else {
            throw new Error(`Error! response status: ${response.status}`);
          }
        },
        onmessage(msg) {
          try {
            const d = JSON.parse(msg.data);
            if (d) {
              if (d.connId) {
                return setConnId(d.connId);
              }
              stream$.next(d);
            }
          } catch (e) {
            console.log("failed to parse data: ", msg.data);
          }
        },
        onerror(err) {
          console.log(err);
          return 5 * 1000;
        },
        signal: ctrlRef.current.signal,
      });
    };
    fetchData().catch((err) => console.error(err));
    return () => {
      ctrlRef.current.abort();
      ctrlRef.current = new AbortController();
    };
  }, [stream$, url, token, openWhenHidden]);

  return stream$;
}
