import { call, put, fork, select, take, cancelled, race, cancel } from 'redux-saga/effects';
import { eventChannel } from 'redux-saga';
import { normalize } from 'normalizr';
import WebsocketsService from '@netronixgroup/phoenix-streams';
import { normalizeDataPointsResp } from '~/utils/helpers';
import { DEFAULT_REPORTING_INTERVAL, SOCKET_URL } from '~/utils/constants';
import { getAuthIdToken } from '~/store/selectors/auth';
import { getNodesIds } from '~/store/selectors/nodes';
import { checkRequest, createRequest, createSuccess, createFailure, measurementsUpdate } from '~/store/reducers/measurements';
import * as schema from '~/store/schema';

export function initWebsocket(socket, { nodeIds }) {
  return eventChannel((emitter) => {
    try {
      socket.send('update', { node_ids: nodeIds });
      socket.on('measurements').subscribe((data) => {
        if (data) {
          emitter(measurementsUpdate(normalize(normalizeDataPointsResp(data), schema.dataPointsSchema)));
        }
      });
    } catch (e) {
      emitter(createFailure({ message: 'socket erorr', e }));
    }
    return () => {
      socket.close();
    };
  });
}

export function* streamCreateRequestSaga() {
  const token = yield select(getAuthIdToken);
  const nodeIds = yield select(getNodesIds);
  const socket = new WebsocketsService(SOCKET_URL, { params: { token } });
  const channel = yield call(initWebsocket, socket, { nodeIds });
  yield put(createSuccess());
  try {
    while (true) {
      const action = yield take(channel);
      yield put(action);
    }
  } finally {
    if (yield cancelled()) {
      channel.close();
    }
  }
}

function createIntervalChannel() {
  return eventChannel((emitter) => {
    const iv = setInterval(() => {
      emitter(true);
    }, DEFAULT_REPORTING_INTERVAL / 2);
    return () => {
      clearInterval(iv);
    };
  });
}

function* streamCheckSaga() {
  const chan = yield call(createIntervalChannel);
  try {
    while (true) {
      yield take(chan);
      yield put(checkRequest());
    }
  } finally {
    if (yield cancelled()) {
      chan.close();
    }
  }
}

function* watchCheckRequestStream() {
  while (true) {
    yield take(createSuccess);
    yield race([call(streamCheckSaga), take(createFailure)]);
  }
}

export function* watchCreateRequestStream() {
  while (yield take(createRequest)) {
    const createStreamTask = yield fork(streamCreateRequestSaga);
    yield take(createFailure);
    yield cancel(createStreamTask);
  }
}

export default function* app() {
  yield fork(watchCheckRequestStream);
  yield fork(watchCreateRequestStream);
}
