import Ably from 'ably'
import { fromUint8Array, toUint8Array } from 'js-base64'
import _ from 'lodash'
import { RemoteUpdateVerifier } from 'sierra-client/collaboration/remote-update-verifier'
import { ABLY_STATUS_MAPPING, ConnectionStatus } from 'sierra-client/collaboration/types'
import { YDocIntegrityChecker } from 'sierra-client/collaboration/ydoc-integrity-checker'
import { YDocSyncer, YDocSyncerStatus } from 'sierra-client/collaboration/ydoc-syncer'
import { TypedEventEmitter } from 'sierra-client/lib/typed-event-emitter'
import { logger } from 'sierra-client/logger/logger'
import { RealTimeDataClient } from 'sierra-client/realtime-data/real-time-data-provider/realtime-data-context'
import { typedPost } from 'sierra-client/state/api'
import { ContentPermission } from 'sierra-domain/api/common'
import { RequestError } from 'sierra-domain/error'
import {
  XRealtimeCollaborationAddDocumentUpdate,
  XRealtimeCollaborationGetDocumentState,
  XRealtimeCollaborationGetDocumentUpdates,
  XRealtimeCollaborationGetLiveSessionContentDocumentState,
  XRealtimeCollaborationVerifyDocumentUpdates,
} from 'sierra-domain/routes'
import { RetryError, retry } from 'ts-retry-promise'
import * as Y from 'yjs'
import { z } from 'zod'

type YDocStatus = 'loading' | 'loaded' | 'error' | 'destroyed'

const YjsUpdatesData = z.object({
  updateIds: z.number().array().optional(),
  updates: z.string().array(),
})

export class AblyYjsProvider extends TypedEventEmitter<{
  permissionChanged: (permission: ContentPermission) => void
  yDocStatusChanged: (yDocStatus: YDocStatus, error?: Error) => void
  connectionStatusChanged: (connectionStatus: ConnectionStatus) => void
  yDocSyncerStatusChanged: (status: YDocSyncerStatus) => void
  yDocMissingUpdates: (updateIds: number[]) => void
  yDocRecoveredUpdates: (updateIds: number[]) => void
  latestYUpdateIdChanged: (latestYUpdateId: number) => void
}> {
  readonly yDoc: Y.Doc = new Y.Doc()
  yDocStatus: YDocStatus = 'loading'
  connectionStatus: ConnectionStatus = 'disconnected'
  permission: ContentPermission | undefined = undefined

  private readonly remoteIntoLocalThrottleMs: number = 200
  private pendingRemoteUpdates: Uint8Array[] = []
  private readonly yDocSyncer: YDocSyncer
  private readonly yDocIntegrityChecker: YDocIntegrityChecker
  private isDestroyed = false
  private readonly remoteUpdateVerifier: RemoteUpdateVerifier

  constructor(
    private readonly yDocId: string,
    private readonly realTimeDataClient: RealTimeDataClient,
    private readonly isLiveSession: boolean,
    isRemoteUpdateVerifierEnabled: boolean
  ) {
    super()

    this.remoteUpdateVerifier = new RemoteUpdateVerifier({
      isEnabled: isRemoteUpdateVerifierEnabled,
      onMissingUpdates: (ids: number[]) => {
        this.emit('yDocMissingUpdates', ids)

        if (ids.length > 100) {
          this.refreshDocument()
        } else {
          void typedPost(XRealtimeCollaborationGetDocumentUpdates, {
            yDocId: this.yDocId,
            updateIds: ids,
            isLiveSession,
          })
            .then(({ updates }) => {
              for (const { update, id } of updates) {
                this.remoteUpdateVerifier.addUpdate(id)
                this.handleRemoteUpdate(toUint8Array(update))
              }
              this.emit(
                'yDocRecoveredUpdates',
                updates.map(it => it.id)
              )
            })
            .catch(() => {
              // Ignore since this will be retried by remoteUpdateVerifier
            })
        }
      },
      onFailedToRecoverUpdates: (ids: number[]) => {
        this.handleError(`Failed to recover updates: ${ids.join(', ')}`)
      },
      onLatestYUpdateIdChanged: latestYUpdateId => {
        this.emit('latestYUpdateIdChanged', latestYUpdateId)
      },
    })

    this.yDocIntegrityChecker = new YDocIntegrityChecker(
      (message, error) => this.handleError(message, error),
      updateHashes => typedPost(XRealtimeCollaborationVerifyDocumentUpdates, { yDocId, updateHashes })
    )

    this.yDocSyncer = new YDocSyncer({
      yDoc: this.yDoc,
      syncIntoRemote: async update => {
        const { yUpdateId } = await typedPost(XRealtimeCollaborationAddDocumentUpdate, {
          yDocId: this.yDocId,
          update: fromUint8Array(update),
        })

        this.remoteUpdateVerifier.addUpdate(yUpdateId)
        return null
      },
      yDocIntegrityChecker: this.yDocIntegrityChecker,
      onUnrecoverableError: error => this.handleError(error),
    })

    // Forward status events from the syncer
    this.yDocSyncer.on('statusChanged', status => {
      this.emit('yDocSyncerStatusChanged', status)
    })
  }

  channelErrorTimeout: ReturnType<typeof setTimeout> | undefined
  private handleChannelStateChange = (state: Ably.ChannelStateChange): void => {
    clearTimeout(this.channelErrorTimeout)
    if (this.isDestroyed) {
      return
    }

    const timeoutSeconds = 2
    /**
     * The ably channel being detached or failed is not an error that we are currently ably to recover from.
     * It indicates an error deeper in the our ably infrastructure, such as channel authentication being
     * revoken while the client is still connected.
     *
     * However, when refreshing the app it is possible to get into a state where the channel is detached
     * or failing for a short period of time before the `destroy` function is called in this code.
     */
    this.channelErrorTimeout = setTimeout(() => {
      if (state.current === 'detached') {
        this.handleError(
          `Ably connection detached for more than ${timeoutSeconds} seconds. Document ${this.yDocId}`
        )
        return
      }

      if (state.current === 'failed') {
        this.handleError(
          `Ably connection failed for more than ${timeoutSeconds} seconds. Document ${this.yDocId}`
        )
        return
      }
    }, timeoutSeconds * 1000)
  }

  /** init is separate from the constructor so that the user can 1) construct 2) set up listeners 3) init() */
  init(): void {
    this.subscribe().catch(err => this.handleError('Failed to subscribe', err))

    this.handleConnectionState({ current: this.realTimeDataClient.getConnectionState() })
    this.realTimeDataClient.onConnectionStateChange(this.handleConnectionState)

    this.realTimeDataClient.onChannelStateChange(this.yDocId, this.handleChannelStateChange)
  }

  private handleConnectionState = ({ current }: { current: Ably.ConnectionState }): void => {
    const newConnectionStatus = ABLY_STATUS_MAPPING[current]

    /**
     * We subscribe to all connection state events, including `update`, which represents events
     * that don't change the connection state. We will ignore these events here by comparing the
     * connection state to the last received value. In this function we only want to take action
     * when the derived connection state changes.
     */

    if (newConnectionStatus !== this.connectionStatus) {
      this.connectionStatus = newConnectionStatus
      this.emit('connectionStatusChanged', this.connectionStatus)
    }

    /**
     * NOTE! This should not be inside the above if-statement. If we miss a yupdate for some reason
     * it can lead to very strange behavior and data loss. Ideally we would have a more resilient way
     * of handling those cases.
     */
    if (newConnectionStatus === 'connected') {
      this.refreshDocument()
    }
  }

  async getInitialUpdates(): Promise<void> {
    const documentStateEndpoint = this.isLiveSession
      ? XRealtimeCollaborationGetLiveSessionContentDocumentState
      : XRealtimeCollaborationGetDocumentState

    try {
      const result = await retry(
        async () =>
          typedPost(documentStateEndpoint, {
            yDocId: this.yDocId,
          }),
        {
          retries: 5,
          logger: message => logger.debug(`retrying getInitialUpdates: ${message}`),
          timeout: 'INFINITELY',
          backoff: 'LINEAR',
          retryIf: error => {
            return (
              !RequestError.isFlexibleContentParsingException(error) &&
              !RequestError.isAccessError(error) &&
              !RequestError.isNotFound(error)
            )
          },
        }
      )
      const { permission, updates, latestYUpdateId } = result

      this.remoteUpdateVerifier.setLatestUpdate(latestYUpdateId)

      if (permission !== this.permission) {
        this.permission = permission
        this.emit('permissionChanged', permission)
      }
      if (updates.length === 0) {
        this.handleError(`YDoc ${this.yDocId} does not exist`)
      } else {
        const yUpdates = updates.map(update => toUint8Array(update))
        this.yDocSyncer.onRemoteUpdates(yUpdates)

        if (this.yDocStatus !== 'loaded') {
          this.yDocStatus = 'loaded'
          this.emit('yDocStatusChanged', this.yDocStatus)
        }
      }
    } catch (err) {
      this.handleError('Failed to publish ydoc update', err instanceof RetryError ? err.lastError : err)
    }
  }

  refreshDocument(): void {
    this.getInitialUpdates().catch(err => this.handleError('Failed to getInitialUpdates', err))
  }

  private handleError(message: string, cause?: unknown): void {
    const error = new Error(message, { cause })
    this.yDocStatus = 'error'
    this.emit('yDocStatusChanged', this.yDocStatus, error)
  }

  // Remote -> Local
  private handleRemoteUpdate = (update: Uint8Array): void => {
    this.pendingRemoteUpdates.push(update)
    this.throttledSyncRemoteIntoLocal()
  }

  private throttledSyncRemoteIntoLocal = _.throttle(() => {
    if (this.pendingRemoteUpdates.length < 1) {
      this.handleError('Expecting pendingRemoteUpdates to be non-empty.')
      return
    }

    console.debug(
      `[YjsProvider:syncRemoteIntoLocal] Syncing (throttle-batched ${this.pendingRemoteUpdates.length} updates)`
    )

    const yUpdates = this.pendingRemoteUpdates
    this.pendingRemoteUpdates = []

    this.yDocSyncer.onRemoteUpdates(yUpdates)
  }, this.remoteIntoLocalThrottleMs)

  // Local -> Remote

  private handleMessage = (message: Ably.InboundMessage): void => {
    if (message.name === 'yjs-updates') {
      const { updates, updateIds } = YjsUpdatesData.parse(message.data)
      for (const update of updates) {
        this.handleRemoteUpdate(toUint8Array(update))
      }

      if (updateIds !== undefined) {
        if (updateIds.length !== updates.length) {
          this.handleError(
            `updates and updateIds must have the same length. Got ${updates.length} updates and ${updateIds.length} updateIds`
          )
          return
        }
        for (const updateId of updateIds) {
          this.remoteUpdateVerifier.addUpdate(updateId)
        }
      }
    } else if (message.name === 'reload') {
      this.refreshDocument()
    }
  }

  private subscribe(): Promise<void> {
    return this.realTimeDataClient.subscribeToChannel({
      channelName: this.yDocId,
      callback: this.handleMessage,
    })
  }

  destroy(): void {
    this.isDestroyed = true
    this.realTimeDataClient.unsubscribeToChannel({
      channelName: this.yDocId,
      callback: this.handleMessage,
    })
    clearTimeout(this.channelErrorTimeout)
    this.realTimeDataClient.offChannelStateChange(this.yDocId, this.handleChannelStateChange)
    this.realTimeDataClient.offConnectionStateChange(this.handleConnectionState)
    this.throttledSyncRemoteIntoLocal.cancel()
    this.yDocSyncer.destroy()
    this.yDocIntegrityChecker.destroy()
    this.yDoc.destroy()
    this.yDocStatus = 'destroyed'
    this.emit('yDocStatusChanged', this.yDocStatus)
    this.remoteUpdateVerifier.destroy()
    this.off()
  }
}
