import { Channel, channel, Saga, Task } from 'redux-saga';
import {
  call,
  fork,
  put,
  select,
  takeEvery,
  take,
  cancel,
} from 'redux-saga/effects';
import { actions } from '../reducer';
import { chatModel } from '../index';
import { v4 as uuid } from 'uuid';
import { teamsModel } from '../../../teams';
import { RT } from '../../../../shared/types';
import { createNotification, snackbarModel } from '../../../snackbar';
import { saveChatMessage } from './saveChatMessage';
import { chatsApi } from '../../../../shared/api/axios/chat';
import { loadChat } from './loadChatData';
import { ChatResponseType } from '@distribute/shared/types';

import { ConversationState, GenerateMessageAction } from '../types';

import { trackChatResponseErrors } from '../../lib/track-chat-response-errors';
import { parseChatResponse } from '@distribute/shared/chat';
import { normalizeResponse } from '../../lib';

const abortControllers = new Map<string, AbortController>();

// Define a reusable function to create a throttled streaming processor
function createThrottleListener<T extends object | null>(
  channel: Channel<T>,
  processFn: Saga,
  throttleMs = 100
) {
  return function* () {
    let lastProcessed = 0;
    try {
      while (true) {
        const payload: T = yield take(channel);
        const now = Date.now();

        // Throttle processing
        if (now - lastProcessed >= throttleMs) {
          yield call(processFn, payload);
          lastProcessed = now;
        }
      }
    } catch (e) {
      // Task cancellation will throw here, just exit
    }
  };
}

function* generateMessage({
  payload: { prompt, pageContent, tabs },
}: ReturnType<typeof actions.generateMessage>) {
  // Create a local channel for this specific message generation
  const streamChannel = channel();
  const streamTask: Task = yield fork(
    createThrottleListener(streamChannel, processResponse, 200)
  );

  try {
    const id = uuid();

    yield put(
      actions.setConversation({
        id,
        request: { prompt, pageContent },
        response: '',
        structuredResponse: [],
      })
    );
    yield put(actions.setConversationState(ConversationState.Streaming));

    const currentTeam: RT<
      typeof teamsModel.selectors.selectCurrentTeamWithError
    > = yield select(teamsModel.selectors.selectCurrentTeamWithError);

    const currentChat: RT<
      typeof chatModel.selectors.selectCurrentChatWithError
    > = yield select(chatModel.selectors.selectCurrentChatWithError);

    const abortController = new AbortController();

    abortControllers.set(id, abortController);

    let response = '';

    yield call(chatsApi.generateMessage, {
      data: { prompt, pageContent },
      teamId: currentTeam.id,
      chatId: currentChat.id,
      signal: abortController.signal,
      onFulfilled: (text) => {
        response += text;
        streamChannel.put({ response, tabs, stream: true });
      },
    });

    yield put(teamsModel.actions.decrementAIResponse());

    // Clean up streaming - cancel task and close channel
    yield cancel(streamTask);
    streamChannel.close();

    // Final process
    yield call(processResponse, { response, tabs, stream: false });

    const conversation: RT<
      typeof chatModel.selectors.selectConversationWithError
    > = yield select(chatModel.selectors.selectConversationWithError);

    yield call(trackChatResponseErrors, conversation.response);

    yield call(saveChatMessage, {
      data: conversation.response,
      structuredData: conversation.structuredResponse,
      chatId: currentChat.id,
    });
    yield call(loadChat, currentChat.id);

    const isExistAnySuggestion = conversation.structuredResponse.some(
      (item) => item.type === ChatResponseType.Suggestion && item.changed
    );

    if (isExistAnySuggestion) {
      yield put(actions.setConversationState(ConversationState.Suggestion));
      return;
    }

    yield put(actions.clearConversation());
    yield put(actions.setConversationState(ConversationState.Idle));
  } catch (error: unknown) {
    yield cancel(streamTask);
    streamChannel.close();

    yield call(genereateMessageError, error);
  }
}

function* genereateMessageError(error: unknown) {
  try {
    // If aborted, do not save any suggestions.
    if (error instanceof DOMException && error.name === 'AbortError') {
      const currentChat: RT<
        typeof chatModel.selectors.selectCurrentChatWithError
      > = yield select(chatModel.selectors.selectCurrentChatWithError);

      yield put(teamsModel.actions.decrementAIResponse());
      yield call(loadChat, currentChat.id);
      return;
    }

    yield put(
      snackbarModel.actions.addNotificationAction(
        createNotification('error', 'There was an error generating a response.')
      )
    );
  } catch (error: unknown) {
    yield put(
      snackbarModel.actions.addNotificationAction(
        createNotification('error', 'Failed to save chat message.')
      )
    );
  } finally {
    yield put(actions.setConversationState(ConversationState.Idle));
  }
}

function* stopGenerateMessage() {
  const conversation: RT<typeof chatModel.selectors.selectConversation> =
    yield select(chatModel.selectors.selectConversation);

  if (!conversation) {
    return;
  }

  if (abortControllers.has(conversation.id)) {
    abortControllers.get(conversation.id)?.abort();
    abortControllers.delete(conversation.id);
  }
}

function* applySuggestions() {
  yield put(actions.clearConversation());
  yield put(actions.setConversationState(ConversationState.Idle));
}

function* rejectSuggestions() {
  yield put(actions.clearConversation());
  yield put(actions.setConversationState(ConversationState.Idle));
}

function* processResponse({
  response,
  tabs,
  stream,
}: {
  response: string;
  tabs: GenerateMessageAction['tabs'];
  stream: boolean;
}) {
  const parsedResponse = parseChatResponse(response);
  const structuredResponse = parsedResponse.data
    ? normalizeResponse({ tabs, data: parsedResponse.data, stream })
    : null;

  yield put(actions.setConversationResponse({ response, structuredResponse }));

  return { tabs, structuredResponse };
}

export function* conversationWorker() {
  yield takeEvery(actions.generateMessage, generateMessage);
  yield takeEvery(actions.stopGenerateMessage, stopGenerateMessage);
  yield takeEvery(actions.applySuggestions, applySuggestions);
  yield takeEvery(actions.rejectSuggestions, rejectSuggestions);
}
