/*
 * Decompiled with CFR 0.152.
 */
package com.dxfeed.ondemand.impl.connector;

import com.devexperts.connector.proto.ApplicationConnectionFactory;
import com.devexperts.qd.DataIterator;
import com.devexperts.qd.DataProvider;
import com.devexperts.qd.DataVisitor;
import com.devexperts.qd.SubscriptionProvider;
import com.devexperts.qd.SubscriptionVisitor;
import com.devexperts.qd.ng.RecordBuffer;
import com.devexperts.qd.ng.RecordMode;
import com.devexperts.qd.qtp.AbstractConnectionHandler;
import com.devexperts.qd.qtp.AbstractMessageConnector;
import com.devexperts.qd.qtp.AbstractMessageVisitor;
import com.devexperts.qd.qtp.MessageAdapter;
import com.devexperts.qd.qtp.MessageConnectorState;
import com.devexperts.qd.qtp.MessageConnectors;
import com.devexperts.qd.qtp.MessageListener;
import com.devexperts.qd.qtp.MessageProvider;
import com.devexperts.qd.qtp.MessageType;
import com.devexperts.qd.qtp.MessageVisitor;
import com.devexperts.qd.stats.QDStats;
import com.dxfeed.ondemand.impl.MarketDataReplay;
import com.dxfeed.ondemand.impl.connector.OnDemandConnector;
import java.util.Date;

class ReplayConnectionHandler
extends AbstractConnectionHandler<OnDemandConnector> {
    private final MessageProcessor messageProcessor = new MessageProcessor();
    private final MarketDataReplay replay;
    private MessageAdapter adapter;
    private volatile double time;
    private volatile double speed;
    private MessageConnectorState connectionState = MessageConnectorState.CONNECTING;

    ReplayConnectionHandler(OnDemandConnector connector) {
        super((AbstractMessageConnector)connector);
        this.replay = connector.getMarketDataReplay();
        this.time = connector.getTime().getTime();
        this.speed = connector.getSpeed();
    }

    public long getTime() {
        return (long)this.time;
    }

    public void setSpeed(double speed) {
        this.speed = speed;
    }

    public synchronized MessageConnectorState getConnectionState() {
        return this.connectionState;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateConnectionState(MessageConnectorState connectionState) {
        ReplayConnectionHandler replayConnectionHandler = this;
        synchronized (replayConnectionHandler) {
            if (connectionState == this.connectionState) {
                return;
            }
            this.connectionState = connectionState;
        }
        ((OnDemandConnector)this.connector).notifyMessageConnectorListeners();
    }

    protected void doWork() throws Throwable {
        ((OnDemandConnector)this.connector).getReconnectHelper().sleepBeforeConnection();
        this.replay.clearSubscription();
        this.replay.setToken(((OnDemandConnector)this.connector).resolveToken());
        this.replay.setTime((long)this.time);
        this.replay.start();
        MessageAdapter.Factory factory = MessageConnectors.retrieveMessageAdapterFactory((ApplicationConnectionFactory)((OnDemandConnector)this.connector).getFactory());
        this.adapter = factory.createAdapter(((OnDemandConnector)this.connector).getStats().getOrCreate(QDStats.SType.CONNECTIONS));
        this.adapter.setMessageListener((MessageListener)this.messageProcessor);
        this.adapter.start();
        while (!this.isClosed()) {
            this.messageProcessor.retrieveMessages();
            while (!this.isClosed() && !this.messageProcessor.available && this.isPreBuffering()) {
                if (this.replay.hasPermanentError()) {
                    return;
                }
                this.updateConnectionState(MessageConnectorState.CONNECTING);
                this.sleepTickPeriod();
            }
            this.updateConnectionState(MessageConnectorState.CONNECTED);
            long timeWall = System.currentTimeMillis();
            while (!(this.isClosed() || this.messageProcessor.available || this.isPreBuffering())) {
                if (this.replay.hasPermanentError()) {
                    return;
                }
                RecordBuffer buf = this.replay.getUpdate((long)this.time);
                this.processData(buf);
                buf.release();
                ((OnDemandConnector)this.connector).updateTime(new Date((long)this.time));
                this.sleepTickPeriod();
                long timeWallNew = System.currentTimeMillis();
                this.time += this.speed * (double)(timeWallNew - timeWall);
                timeWall = timeWallNew;
            }
        }
    }

    private void sleepTickPeriod() throws InterruptedException {
        Thread.sleep(((OnDemandConnector)this.connector).getTickPeriod().getTime());
    }

    private boolean isPreBuffering() {
        return this.replay.getAvailableData((long)this.time) < 1.0;
    }

    private void processData(RecordBuffer buf) {
        this.adapter.processTickerData((DataIterator)buf);
        buf.rewind();
        this.adapter.processStreamData((DataIterator)buf);
        buf.rewind();
        this.adapter.processHistoryData((DataIterator)buf);
    }

    private void closeAdapter() {
        if (this.adapter != null) {
            try {
                this.adapter.close();
            }
            catch (Throwable t) {
                this.log.error("Failed to close adapter", t);
            }
            this.adapter = null;
        }
    }

    protected void closeImpl(Throwable reason) {
        this.replay.stop();
        this.closeAdapter();
        this.log.info("Replay stopped");
    }

    public void joinAll() throws InterruptedException {
        this.join();
        for (Thread thread : this.replay.getStoppedThreads()) {
            if (thread == null) continue;
            thread.join();
        }
    }

    class MessageProcessor
    extends AbstractMessageVisitor
    implements MessageListener {
        volatile boolean available;

        MessageProcessor() {
        }

        public void retrieveMessages() {
            this.available = false;
            while (ReplayConnectionHandler.this.adapter.retrieveMessages((MessageVisitor)this)) {
            }
        }

        public void messagesAvailable(MessageProvider provider) {
            this.available = true;
        }

        public boolean visitData(DataProvider provider, MessageType message) {
            return provider.retrieveData(DataVisitor.VOID);
        }

        public boolean visitSubscription(SubscriptionProvider provider, MessageType message) {
            RecordBuffer sub = RecordBuffer.getInstance((RecordMode)RecordMode.SUBSCRIPTION);
            boolean result = provider.retrieveSubscription((SubscriptionVisitor)sub);
            if (message.isSubscriptionAdd()) {
                ReplayConnectionHandler.this.replay.addSubscription(sub);
            } else {
                ReplayConnectionHandler.this.replay.removeSubscription(sub);
            }
            sub.release();
            return result;
        }
    }
}

