package com.dxfeed.ondemand.impl.connector;

import com.devexperts.qd.DataProvider;
import com.devexperts.qd.DataVisitor;
import com.devexperts.qd.SubscriptionProvider;
import com.devexperts.qd.ng.RecordBuffer;
import com.devexperts.qd.ng.RecordMode;
import com.devexperts.qd.qtp.AbstractConnectionHandler;
import com.devexperts.qd.qtp.AbstractMessageVisitor;
import com.devexperts.qd.qtp.MessageAdapter;
import com.devexperts.qd.qtp.MessageConnectorState;
import com.devexperts.qd.qtp.MessageConnectors;
import com.devexperts.qd.qtp.MessageListener;
import com.devexperts.qd.qtp.MessageProvider;
import com.devexperts.qd.qtp.MessageType;
import com.devexperts.qd.stats.QDStats;
import com.dxfeed.ondemand.impl.MarketDataReplay;
import java.util.Date;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:WEB-INF/lib/dxfeed-ondemand.jar:com/dxfeed/ondemand/impl/connector/ReplayConnectionHandler.class */
public class ReplayConnectionHandler extends AbstractConnectionHandler<OnDemandConnector> {
    private final MessageProcessor messageProcessor;
    private final MarketDataReplay replay;
    private MessageAdapter adapter;
    private volatile double time;
    private volatile double speed;
    private MessageConnectorState connectionState;

    /* loaded from: input_file:WEB-INF/lib/dxfeed-ondemand.jar:com/dxfeed/ondemand/impl/connector/ReplayConnectionHandler$MessageProcessor.class */
    class MessageProcessor extends AbstractMessageVisitor implements MessageListener {
        volatile boolean available;

        MessageProcessor() {
        }

        public void retrieveMessages() {
            this.available = false;
            do {
            } while (ReplayConnectionHandler.this.adapter.retrieveMessages(this));
        }

        @Override // com.devexperts.qd.qtp.MessageListener
        public void messagesAvailable(MessageProvider messageProvider) {
            this.available = true;
        }

        @Override // com.devexperts.qd.qtp.AbstractMessageVisitor, com.devexperts.qd.qtp.MessageVisitor
        public boolean visitData(DataProvider dataProvider, MessageType messageType) {
            return dataProvider.retrieveData(DataVisitor.VOID);
        }

        @Override // com.devexperts.qd.qtp.AbstractMessageVisitor, com.devexperts.qd.qtp.MessageVisitor
        public boolean visitSubscription(SubscriptionProvider subscriptionProvider, MessageType messageType) {
            RecordBuffer recordBuffer = RecordBuffer.getInstance(RecordMode.SUBSCRIPTION);
            boolean retrieveSubscription = subscriptionProvider.retrieveSubscription(recordBuffer);
            if (messageType.isSubscriptionAdd()) {
                ReplayConnectionHandler.this.replay.addSubscription(recordBuffer);
            } else {
                ReplayConnectionHandler.this.replay.removeSubscription(recordBuffer);
            }
            recordBuffer.release();
            return retrieveSubscription;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReplayConnectionHandler(OnDemandConnector onDemandConnector) {
        super(onDemandConnector);
        this.messageProcessor = new MessageProcessor();
        this.connectionState = MessageConnectorState.CONNECTING;
        this.replay = onDemandConnector.getMarketDataReplay();
        this.time = onDemandConnector.getTime().getTime();
        this.speed = onDemandConnector.getSpeed();
    }

    public long getTime() {
        return (long) this.time;
    }

    public void setSpeed(double d) {
        this.speed = d;
    }

    public synchronized MessageConnectorState getConnectionState() {
        return this.connectionState;
    }

    private void updateConnectionState(MessageConnectorState messageConnectorState) {
        synchronized (this) {
            if (messageConnectorState == this.connectionState) {
                return;
            }
            this.connectionState = messageConnectorState;
            ((OnDemandConnector) this.connector).notifyMessageConnectorListeners();
        }
    }

    @Override // com.devexperts.qd.qtp.QTPWorkerThread
    protected void doWork() throws Throwable {
        ((OnDemandConnector) this.connector).getReconnectHelper().sleepBeforeConnection();
        this.replay.clearSubscription();
        this.replay.setToken(((OnDemandConnector) this.connector).resolveToken());
        this.replay.setTime((long) this.time);
        this.replay.start();
        this.adapter = MessageConnectors.retrieveMessageAdapterFactory(((OnDemandConnector) this.connector).getFactory()).createAdapter(((OnDemandConnector) this.connector).getStats().getOrCreate(QDStats.SType.CONNECTIONS));
        this.adapter.setMessageListener(this.messageProcessor);
        this.adapter.start();
        while (!isClosed()) {
            this.messageProcessor.retrieveMessages();
            while (!isClosed() && !this.messageProcessor.available && isPreBuffering()) {
                if (this.replay.hasPermanentError()) {
                    return;
                }
                updateConnectionState(MessageConnectorState.CONNECTING);
                sleepTickPeriod();
            }
            updateConnectionState(MessageConnectorState.CONNECTED);
            long currentTimeMillis = System.currentTimeMillis();
            while (true) {
                long j = currentTimeMillis;
                if (!isClosed() && !this.messageProcessor.available && !isPreBuffering()) {
                    if (this.replay.hasPermanentError()) {
                        return;
                    }
                    RecordBuffer update = this.replay.getUpdate((long) this.time);
                    processData(update);
                    update.release();
                    ((OnDemandConnector) this.connector).updateTime(new Date((long) this.time));
                    sleepTickPeriod();
                    long currentTimeMillis2 = System.currentTimeMillis();
                    this.time += this.speed * (currentTimeMillis2 - j);
                    currentTimeMillis = currentTimeMillis2;
                }
            }
        }
    }

    private void sleepTickPeriod() throws InterruptedException {
        Thread.sleep(((OnDemandConnector) this.connector).getTickPeriod().getTime());
    }

    private boolean isPreBuffering() {
        return this.replay.getAvailableData((long) this.time) < 1.0d;
    }

    private void processData(RecordBuffer recordBuffer) {
        this.adapter.processTickerData(recordBuffer);
        recordBuffer.rewind();
        this.adapter.processStreamData(recordBuffer);
        recordBuffer.rewind();
        this.adapter.processHistoryData(recordBuffer);
    }

    private void closeAdapter() {
        if (this.adapter != null) {
            try {
                this.adapter.close();
            } catch (Throwable th) {
                this.log.error("Failed to close adapter", th);
            }
            this.adapter = null;
        }
    }

    @Override // com.devexperts.qd.qtp.AbstractConnectionHandler
    protected void closeImpl(Throwable th) {
        this.replay.stop();
        closeAdapter();
        this.log.info("Replay stopped");
    }

    public void joinAll() throws InterruptedException {
        join();
        for (Thread thread : this.replay.getStoppedThreads()) {
            if (thread != null) {
                thread.join();
            }
        }
    }
}
