import { Observable } from "rxjs";
import SockJS from "sockjs-client";

import { createListenerMiddleware } from "@reduxjs/toolkit";
import Stomp, { Client, Subscription } from "webstomp-client";
import { addNotification } from "../features/notification/notification-slice";
import { setProfile } from "../features/user/user-slice";
import { RootState } from "../store";

let stompClient: Client | null = null;

let subscriber: Subscription | null = null;
let connection: Promise<any>;
let connectedPromise: any = null;
let listener: Observable<any>;
let listenerObserver: any;
let alreadyConnectedOnce = false;
let connectionAttempts = 1;

const createConnection = (): Promise<any> =>
  new Promise((resolve) => (connectedPromise = resolve));

const createListener = (): Observable<any> =>
  new Observable((observer) => {
    listenerObserver = observer;
  });

const subscribe = () => {
  connection.then(() => {
    if (stompClient !== null) {
      subscriber = stompClient.subscribe("/topic/notifications/all", (data) => {
        listenerObserver.next(JSON.parse(data.body));
      });

      stompClient.subscribe("/user/queue/notifications/private", (data) => {
        listenerObserver.next(JSON.parse(data.body));
      });
    }
  });
};

const connect = () => {
  if (connectedPromise !== null || alreadyConnectedOnce) {
    // the connection is already being established
    return;
  }
  connection = createConnection();
  listener = createListener();

  const headers = {};
  let url = process.env.REACT_APP_BE_HOST + "ws";
  const authToken = localStorage.getItem("token");

  if (authToken) {
    url += "?access_token=" + authToken;
  }

  const socket = new SockJS(url);
  stompClient = Stomp.over(socket, { protocols: ["v12.stomp"] });

  connectionAttempts++;

  stompClient.connect(
    headers,
    () => {
      console.log("ws connected");
      connectedPromise("success");
      connectedPromise = null;
      alreadyConnectedOnce = true;
    },
    (error) => {
      console.log(error);
      connectedPromise = null;
      alreadyConnectedOnce = false;
      setTimeout(connect, 1000 * connectionAttempts);
    }
  );
};

const disconnect = () => {
  if (stompClient !== null) {
    if (stompClient.connected) {
      stompClient.disconnect();
    }
    stompClient = null;
  }
  alreadyConnectedOnce = false;
};

const receive = () => listener;

const unsubscribe = () => {
  if (subscriber !== null) {
    subscriber.unsubscribe();
  }
  listener = createListener();
};

// Create the middleware instance and methods
const listenerMiddleware = createListenerMiddleware();

listenerMiddleware.startListening({
  actionCreator: setProfile,
  effect: async (action, listenerApi) => {
    // Can cancel other running instances
    listenerApi.cancelActiveListeners();

    if ((listenerApi.getState() as RootState).auth.token !== null) {
      connect();

      if (!alreadyConnectedOnce) {
        subscribe();
        receive().subscribe((activity) => {
          console.log(activity);
          listenerApi.dispatch(addNotification(activity));
        });
      }
    } else {
      unsubscribe();
      disconnect();
    }
  },
});

export default listenerMiddleware;
