import { call, cancel, delay, fork, race, select, take, takeLatest } from "@redux-saga/core/effects";
import { PayloadAction } from "@reduxjs/toolkit";
import { authWSRequest } from "api";
import { wsUrl } from "const";
import { Task } from "redux-saga";
import { getSession } from "services/authService";
import { clearFilters } from "hooks/useFilterState";
import { IRootState } from "store";
import { actionChannel } from "store/actionChannel";
import { logout } from "store/auth/logout";
import { agreementNotSigned } from "store/onboardingActions";
import { setIsTermsAndConditionsRequired, setOnboardingStages } from "store/onboardingSlice";
import Feeds from "utils/ClientFeeds";
import { parse, stringify } from "utils/json";
import { stopBusyProcess, setIsActive } from ".";
import { eventNotificationChannel } from "./store/saga/eventNotifications";
import {
    addMessage,
    bindTo,
    getInitialRecord,
    restRequestResponse,
    sendRESTRequestViaWS,
    snapshotUpdateAvailable,
    State,
    syncFeeds,
    unBind,
    wsRequestAuth,
    wsReset,
    wsResetRetries,
    wsResubscribeToFeeds,
} from "./ws";
import { FeedId, Listeners, MainFeeds } from "./ws.d";

type Response = { reqId: number; error?: number; reply?: unknown };
let subsribers: ((response: Response) => void)[] = [];
export const subscribeToWsRestResponses = (fn: (response: Response) => void) => {
    subsribers.push(fn);

    return () => {
        subsribers = subsribers.filter((eventFn) => fn !== eventFn);
    };
};
const emitRestResponse = (data: { payload: Response }) => {
    subsribers.forEach((fn) => {
        fn.call(null, data.payload);
    });
};

const handleMessage = ({ data }: MessageEvent<any>) => {
    // should be fixed in json-bigint package, proto is not being set correctly => isPlainObject stops working
    const message = { ...parse(data) };
    if (message.type === addMessage.type) {
        eventNotificationChannel.put(message.payload);
    } else if (message.type === restRequestResponse.type) {
        emitRestResponse(message);
    } else {
        actionChannel.put(message);
    }
};

function* awaitSuspendSaga() {
    while (true) {
        const { isActive, isBusy } = yield select((state) => state.app);

        if (!isActive && !isBusy) {
            return true;
        }

        yield take([setIsActive, stopBusyProcess]);
    }
}

const timeouts = [0, 0, 1, 2, 3, 5].map((t) => t * 1000);
export function* wsSaga() {
    let retryCount = 0;
    do {
        const worker = new Worker(new URL("./ws.worker", import.meta.url));

        // awaiting signed T&C
        const { isTermsAndConditionsRequired } = yield select(
            (state) => state.onboarding.onboardingStages,
        );
        if (isTermsAndConditionsRequired) {
            worker.postMessage(stringify({ type: "close" }));

            yield take([setOnboardingStages, setIsTermsAndConditionsRequired]);
        }

        worker.postMessage(
            stringify({
                type: "init",
                url: `${wsUrl}/ws`,
            }),
        );

        const activeListeners: Listeners = getInitialRecord();

        const postMessage = (message: unknown) => worker.postMessage(stringify(message));

        worker.addEventListener("message", handleMessage);

        yield takeLatest(wsRequestAuth, async () => {
            const userSession = await getSession();
            const key = { token: userSession.accessToken?.toString() };
            postMessage(authWSRequest(JSON.stringify(key)));
        });
        yield takeLatest(wsResetRetries, () => {
            retryCount = 0;
        });
        yield takeLatest(snapshotUpdateAvailable, () => {
            requestAnimationFrame(() => {
                postMessage(syncFeeds());
            });
        });
        yield takeLatest(sendRESTRequestViaWS, function ({ payload: messageBody }) {
            postMessage({
                ...messageBody,
                event: "request",
            });
        });
        yield takeLatest(wsResubscribeToFeeds, function* () {
            const wsListeners: Listeners = yield select((state: IRootState) => state.ws.listeners);
            for (const [feed, bindings] of Object.entries(wsListeners)) {
                for (const { feedId = 0, count } of bindings) {
                    if (count > 0) {
                        const activeListener =
                            activeListeners[feed as MainFeeds] ||
                            activeListeners[Feeds.masterFeed(feed) as MainFeeds];
                        let activeListenerFeed = activeListener.find((i) => i.feedId === feedId);

                        if (activeListenerFeed === undefined) {
                            activeListener.push({ feedId, count: 1 });
                        } else {
                            activeListenerFeed.count = 1;
                        }

                        postMessage({ event: "unbind", feed, feedId });
                        postMessage({ event: "bind", feed, feedId });
                    }
                }
            }
        });
        const listenersTask: Task = yield fork(function* () {
            while (true) {
                const {
                    bind,
                    unbind,
                }: Record<string, PayloadAction<FeedId>> = yield race({
                    bind: take(bindTo),
                    unbind: take(unBind),
                });

                const listeners: State["listeners"] = yield select(
                    (state: IRootState) => state.ws.listeners,
                );
                const { feed, feedId = 0 } = bind?.payload || unbind.payload;

                const listener = listeners[feed] || listeners[Feeds.masterFeed(feed) as MainFeeds];
                const listenerFeed = listener.find((i) => i.feedId === feedId);
                const listenersCount = listenerFeed?.count ?? 0;

                const activeListener =
                    activeListeners[feed] || activeListeners[Feeds.masterFeed(feed) as MainFeeds];
                let activeListenerFeed = activeListener.find((i) => i.feedId === feedId);

                if (activeListenerFeed === undefined) {
                    activeListenerFeed = { feedId, count: 0 };
                    activeListener.push(activeListenerFeed);
                }

                const isListening = activeListenerFeed.count > 0;

                if (bind && listenersCount && !isListening) {
                    postMessage({ event: "bind", ...bind.payload });
                    activeListenerFeed.count = 1;
                }
                if (unbind && !listenersCount && isListening) {
                    postMessage({ event: "unbind", ...unbind.payload });
                    activeListenerFeed.count = 0;
                }
            }
        });
        const { suspend, logout: loggedOut } = yield race({
            reset: take(wsReset),
            agreementNotSigned: take(agreementNotSigned),
            suspend: call(awaitSuspendSaga),
            logout: take(logout),
        });
        worker.terminate();
        if (loggedOut) {
            yield cancel(listenersTask);
            clearFilters();
            break;
        }
        if (suspend) {
            yield take((action: any) => action.type === setIsActive.toString() && action.payload);
        } else {
            const timeout = timeouts[retryCount] || 5000;
            if (retryCount < 5) retryCount += 1;
            yield delay(timeout);
        }
    } while (true);
}
