/*
 * Decompiled with CFR 0.152.
 */
package com.devexperts.qd.dxlink.websocket.application;

import com.devexperts.io.BufferedInput;
import com.devexperts.logging.Logging;
import com.devexperts.qd.DataScheme;
import com.devexperts.qd.QDContract;
import com.devexperts.qd.dxlink.websocket.application.DxLinkClientReceiver;
import com.devexperts.qd.dxlink.websocket.application.DxLinkJsonMessageParser;
import com.devexperts.qd.dxlink.websocket.application.HeartbeatProcessor;
import com.devexperts.qd.qtp.AbstractQTPParser;
import com.devexperts.qd.qtp.MessageConsumer;
import com.devexperts.qd.qtp.MessageType;
import com.devexperts.qd.qtp.ProtocolDescriptor;
import com.devexperts.qd.qtp.RuntimeQTPException;
import com.devexperts.qd.qtp.fieldreplacer.FieldReplacersCache;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

class DxLinkWebSocketQTPParser
extends AbstractQTPParser {
    private static final Logging log = Logging.getLogging(DxLinkWebSocketQTPParser.class);
    private final DxLinkJsonMessageParser messageParser;
    private final HeartbeatProcessor heartbeatProcessor;
    private ProtocolDescriptor descriptor = ProtocolDescriptor.newPeerProtocolDescriptor(null);
    private MessageConsumer currentConsumer;

    DxLinkWebSocketQTPParser(DataScheme scheme, boolean supportsMixedSubscription, FieldReplacersCache fieldReplacer, HeartbeatProcessor heartbeatProcessor, Function<DxLinkClientReceiver, DxLinkJsonMessageParser> factory) {
        super(scheme);
        this.setMixedSubscription(supportsMixedSubscription);
        this.setFieldReplacers(fieldReplacer);
        this.heartbeatProcessor = heartbeatProcessor;
        this.messageParser = factory.apply(new DxLinkClientReceiverImpl());
    }

    protected void parseImpl(BufferedInput in, MessageConsumer consumer) {
        throw new IllegalStateException();
    }

    public void parse(String message, MessageConsumer consumer) {
        try {
            this.currentConsumer = consumer;
            this.messageParser.read(message);
        }
        catch (IOException e) {
            throw new RuntimeException(String.format("Can't parse the message: '%s'", message), e);
        }
        finally {
            this.processPending(consumer);
            this.currentConsumer = null;
        }
    }

    private class DxLinkClientReceiverImpl
    implements DxLinkClientReceiver {
        private DxLinkClientReceiverImpl() {
        }

        @Override
        public void receiveError(int channel, String error, String message) {
            log.error(message);
        }

        @Override
        public void receiveKeepalive(int channel) {
            DxLinkWebSocketQTPParser.this.currentConsumer.processHeartbeat(null);
        }

        @Override
        public void receiveSetup(String version, Long keepaliveTimeout, Long acceptKeepaliveTimeout) {
            ProtocolDescriptor newDescriptor = ProtocolDescriptor.newPeerProtocolDescriptor((ProtocolDescriptor)DxLinkWebSocketQTPParser.this.descriptor);
            newDescriptor.setProperty("type", "dxlink");
            newDescriptor.setProperty("version", version);
            if (keepaliveTimeout != null) {
                newDescriptor.setProperty("keepaliveTimeout", Double.toString((double)keepaliveTimeout.longValue() / 1000.0));
                DxLinkWebSocketQTPParser.this.heartbeatProcessor.setDisconnectTimeout(keepaliveTimeout);
            }
            if (acceptKeepaliveTimeout != null) {
                newDescriptor.setProperty("acceptKeepaliveTimeout", Double.toString((double)acceptKeepaliveTimeout.longValue() / 1000.0));
            }
            DxLinkWebSocketQTPParser.this.descriptor = newDescriptor;
            DxLinkWebSocketQTPParser.this.currentConsumer.processDescribeProtocol(newDescriptor, true);
        }

        @Override
        public void receiveAuthState(int channel, String state) {
            if ("AUTHORIZED".equals(state)) {
                ProtocolDescriptor newDescriptor = ProtocolDescriptor.newPeerProtocolDescriptor((ProtocolDescriptor)DxLinkWebSocketQTPParser.this.descriptor);
                newDescriptor.setProperty("authentication", "");
                newDescriptor.addReceive(newDescriptor.newMessageDescriptor(MessageType.TICKER_ADD_SUBSCRIPTION));
                newDescriptor.addReceive(newDescriptor.newMessageDescriptor(MessageType.TICKER_REMOVE_SUBSCRIPTION));
                newDescriptor.addReceive(newDescriptor.newMessageDescriptor(MessageType.STREAM_ADD_SUBSCRIPTION));
                newDescriptor.addReceive(newDescriptor.newMessageDescriptor(MessageType.STREAM_REMOVE_SUBSCRIPTION));
                newDescriptor.addReceive(newDescriptor.newMessageDescriptor(MessageType.HISTORY_ADD_SUBSCRIPTION));
                newDescriptor.addReceive(newDescriptor.newMessageDescriptor(MessageType.HISTORY_REMOVE_SUBSCRIPTION));
                DxLinkWebSocketQTPParser.this.descriptor = newDescriptor;
                DxLinkWebSocketQTPParser.this.currentConsumer.processDescribeProtocol(newDescriptor, true);
            } else {
                ProtocolDescriptor newDescriptor = ProtocolDescriptor.newPeerProtocolDescriptor((ProtocolDescriptor)DxLinkWebSocketQTPParser.this.descriptor);
                newDescriptor.setProperty("authentication", state);
                DxLinkWebSocketQTPParser.this.descriptor = newDescriptor;
                DxLinkWebSocketQTPParser.this.currentConsumer.processDescribeProtocol(newDescriptor, true);
            }
        }

        @Override
        public void receiveChannelOpened(int channel, String service, String contract) {
            DxLinkWebSocketQTPParser.this.messageParser.createChannelParser(channel, contract);
        }

        @Override
        public void receiveFeedConfig(int channel, Long aggregationPeriod, String dataFormat, Map<String, List<String>> eventFields) {
            DxLinkWebSocketQTPParser.this.messageParser.updateConfigChannelParser(channel, dataFormat, eventFields);
        }

        @Override
        public void receiveFeedData(DxLinkClientReceiver.EventsParser parser) {
            try {
                parser.parse(DxLinkWebSocketQTPParser.this.nextRecordsMessage(DxLinkWebSocketQTPParser.this.currentConsumer, MessageType.forData((QDContract)parser.getContract())));
            }
            catch (IOException e) {
                throw new RuntimeQTPException((Throwable)e);
            }
            finally {
                DxLinkWebSocketQTPParser.this.processPending(DxLinkWebSocketQTPParser.this.currentConsumer);
            }
        }
    }
}

