package com.dxfeed.ondemand.impl;

import com.devexperts.io.ByteArrayInput;
import com.devexperts.io.ByteArrayOutput;
import com.devexperts.qd.DataRecord;
import com.devexperts.qd.SubscriptionBuffer;
import com.devexperts.qd.ng.RecordBuffer;
import com.devexperts.qd.ng.RecordCursor;
import com.devexperts.qd.ng.RecordMode;
import com.devexperts.util.IndexedSet;
import com.devexperts.util.LogUtil;
import com.devexperts.util.TimeFormat;
import com.dxfeed.ondemand.impl.event.MDREventUtil;
import java.io.DataInput;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.net.NoRouteToHostException;
import java.net.SocketTimeoutException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;

/* loaded from: input_file:com/dxfeed/ondemand/impl/MarketDataReplay.class */
public class MarketDataReplay implements Runnable {
    private volatile Thread worker;
    private volatile MarketDataToken badToken;
    private Cache cache;
    private volatile String status;
    private long prevReplayTime;
    private long prevReplayMillis;
    private volatile MarketDataToken token = new MarketDataToken((Map<String, String>) Collections.emptyMap(), (String) null);
    private final Map<String, Long> badAddresses = Collections.synchronizedMap(new HashMap());
    private long badAddressTimeout = 600000;
    private final CacheConfig config = new CacheConfig();
    private final Current current = new Current();
    private long prefetchInterval = 59000;
    private final AtomicLong sentBytes = new AtomicLong();
    private final AtomicLong receivedBytes = new AtomicLong();
    private final Thread[] stoppedThreads = new Thread[2];

    public long getCacheLimit() {
        return this.config.cacheLimit;
    }

    public void setCacheLimit(long j) {
        this.config.cacheLimit = j;
    }

    public long getFileCacheLimit() {
        return this.config.fileCacheLimit;
    }

    public void setFileCacheLimit(long j) {
        this.config.fileCacheLimit = j;
    }

    public String getFileCachePath() {
        return this.config.fileCachePath;
    }

    public void setFileCachePath(String str) {
        if (str == null) {
            throw new NullPointerException();
        }
        this.config.fileCachePath = str;
    }

    public long getFileCacheDumpPeriod() {
        return this.config.fileCacheDumpPeriod;
    }

    public void setFileCacheDumpPeriod(long j) {
        this.config.fileCacheDumpPeriod = j;
    }

    public long getTimeToLive() {
        return this.config.timeToLive;
    }

    public void setTimeToLive(long j) {
        this.config.timeToLive = j;
    }

    public synchronized void setTime(long j) {
        if (j == this.current.time) {
            return;
        }
        Log.log.info("setTime(" + TimeFormat.DEFAULT.withMillis().format(j) + ")");
        Iterator it = this.current.segments.iterator();
        while (it.hasNext()) {
            ((CurrentSegment) it.next()).restart();
        }
        this.current.time = j;
        if (!this.current.isCurrentInterval(j) && this.cache != null) {
            this.cache.rebuildCurrentSegments(this.current);
        }
        awaken();
    }

    public synchronized long getTime() {
        return this.current.time;
    }

    public synchronized void clearSubscription() {
        this.current.subscription.clear();
    }

    public void setSubscription(SubscriptionBuffer subscriptionBuffer) {
        RecordBuffer recordBuffer = RecordBuffer.getInstance(RecordMode.SUBSCRIPTION);
        recordBuffer.processSubscription(subscriptionBuffer);
        setSubscription(recordBuffer);
        recordBuffer.release();
    }

    public synchronized void setSubscription(RecordBuffer recordBuffer) {
        this.current.subscription.clear();
        this.current.subscription.addAll(buildSubscription("setSubscription", recordBuffer));
        if (this.cache != null) {
            this.cache.rebuildCurrentSegments(this.current);
        }
        awaken();
    }

    public synchronized void addSubscription(RecordBuffer recordBuffer) {
        if (recordBuffer.isEmpty()) {
            return;
        }
        this.current.subscription.addAll(buildSubscription("addSubscription", recordBuffer));
        if (this.cache != null) {
            this.cache.rebuildCurrentSegments(this.current);
        }
        awaken();
    }

    public synchronized void removeSubscription(RecordBuffer recordBuffer) {
        if (recordBuffer.isEmpty()) {
            return;
        }
        this.current.subscription.removeAll(buildSubscription("removeSubscription", recordBuffer));
        if (this.cache != null) {
            this.cache.rebuildCurrentSegments(this.current);
        }
        awaken();
    }

    private IndexedSet<Key, Key> buildSubscription(String str, RecordBuffer recordBuffer) {
        IndexedSet<Key, Key> indexedSet = new IndexedSet<>();
        IndexedSet indexedSet2 = new IndexedSet();
        IndexedSet indexedSet3 = new IndexedSet();
        TreeSet treeSet = new TreeSet();
        TreeSet treeSet2 = new TreeSet();
        while (true) {
            RecordCursor next = recordBuffer.next();
            if (next == null) {
                Log.log.info(str + ": " + indexedSet2.size() + " records, " + indexedSet3.size() + " symbols, ignored " + treeSet + " " + treeSet2 + ", categories:" + MDREventUtil.countCategories(indexedSet));
                return indexedSet;
            }
            DataRecord record = next.getRecord();
            char type = MDREventUtil.getType(record);
            String intern = record.getScheme().getCodec().decode(next.getCipher(), next.getSymbol()).intern();
            boolean isGoodSymbol = MDREventUtil.isGoodSymbol(intern);
            if (type != 0) {
                indexedSet2.add(record.getName());
            } else {
                treeSet.add(record.getName());
            }
            if (isGoodSymbol) {
                indexedSet3.add(intern);
            } else {
                treeSet2.add(intern == null ? "<null>" : intern.length() == 0 ? "<empty>" : intern);
            }
            if (type != 0 && isGoodSymbol) {
                indexedSet.put(new Key(intern, MDREventUtil.getExchange(record), type));
            }
        }
    }

    public synchronized void setToken(MarketDataToken marketDataToken) {
        if (marketDataToken == null) {
            throw new NullPointerException("token is null");
        }
        if (marketDataToken == this.token) {
            return;
        }
        this.token = marketDataToken;
        Log.log.info("setToken(" + marketDataToken + ")");
        awaken();
    }

    public synchronized boolean hasPermanentError() {
        return this.token == this.badToken;
    }

    private void awaken() {
        LockSupport.unpark(this.worker);
    }

    public synchronized void start() {
        Log.log.info("start");
        this.status = null;
        this.worker = new Thread(this, "MarketDataReplay-Worker");
        this.worker.setDaemon(true);
        this.worker.start();
    }

    private synchronized void updateCacheInstance() {
        this.config.address = this.token.getServiceAddress();
        if (this.cache != null && !this.config.equals(this.cache.getConfig())) {
            this.cache.release();
            this.cache = null;
        }
        if (this.cache == null) {
            this.cache = Cache.acquireInstance(this.config);
        }
        this.cache.rebuildCurrentSegments(this.current);
    }

    private synchronized Thread releaseCacheInstance(Current current) {
        Cache cache = this.cache;
        this.cache = null;
        if (cache == null) {
            return null;
        }
        cache.releaseSegments(current);
        return cache.release();
    }

    public synchronized void stop() {
        Log.log.info("stop");
        this.status = null;
        Thread thread = this.worker;
        this.worker = null;
        if (thread != null) {
            LockSupport.unpark(thread);
        }
        this.stoppedThreads[0] = thread;
        this.stoppedThreads[1] = releaseCacheInstance(this.current);
    }

    public Thread[] getStoppedThreads() {
        return this.stoppedThreads;
    }

    private synchronized ReplayRequest prepareRequest(MarketDataToken marketDataToken, boolean z) throws IOException {
        long j = z ? this.current.endTime : this.current.time;
        IndexedSet<Key, Key> indexedSet = new IndexedSet<>();
        IndexedSet<Key, Key> indexedSet2 = new IndexedSet<>();
        if (this.cache != null) {
            this.cache.checkRequestKeys(this.current, j, indexedSet, indexedSet2);
        }
        boolean z2 = false;
        ArrayList arrayList = new ArrayList();
        Iterator it = this.current.subscription.iterator();
        while (it.hasNext()) {
            Key key = (Key) it.next();
            if (!indexedSet.containsKey(key)) {
                arrayList.add(key);
                if (!indexedSet2.containsKey(key)) {
                    z2 = true;
                }
            }
        }
        if (arrayList.isEmpty()) {
            return null;
        }
        Collections.sort(arrayList, Key.COMPARATOR);
        ReplayRequest replayRequest = new ReplayRequest();
        replayRequest.setToken(marketDataToken);
        replayRequest.setAllowedDelay(z2 ? Math.min((long) ((j - this.current.time) / this.current.replaySpeed), 60000L) : 60000L);
        replayRequest.setRequestTime(j);
        replayRequest.addRequestKeys(arrayList);
        Log.log.info("Request <" + marketDataToken.getTokenUser() + "/" + marketDataToken.getTokenContract() + "> [" + arrayList.size() + " keys at " + TimeFormat.DEFAULT.format(j) + "]" + MDREventUtil.countCategories(arrayList) + ", urgent " + z2 + ", replay speed " + this.current.replaySpeed + ", delay " + (replayRequest.getAllowedDelay() / 1000.0d) + " seconds");
        return replayRequest;
    }

    private ByteArrayInput doRequest(ByteArrayOutput byteArrayOutput, int i, MarketDataToken marketDataToken) {
        ArrayList arrayList = new ArrayList(Arrays.asList(marketDataToken.getServiceAddress().split(",")));
        if (!arrayList.isEmpty() && this.badAddresses.keySet().containsAll(arrayList)) {
            this.badAddresses.clear();
        }
        while (!arrayList.isEmpty()) {
            String str = (String) arrayList.remove(Math.abs(i % arrayList.size()));
            Long l = this.badAddresses.get(str);
            if (l == null || l.longValue() <= System.currentTimeMillis() - this.badAddressTimeout) {
                HttpURLConnection httpURLConnection = null;
                try {
                    httpURLConnection = prepareConnection(str);
                    OutputStream outputStream = httpURLConnection.getOutputStream();
                    outputStream.write(byteArrayOutput.getBuffer(), 0, byteArrayOutput.getPosition());
                    outputStream.close();
                    this.sentBytes.addAndGet(200 + byteArrayOutput.getPosition());
                    InputStream inputStream = httpURLConnection.getInputStream();
                    this.receivedBytes.addAndGet(100L);
                    this.status = null;
                    return readResponse(inputStream);
                } catch (IOException e) {
                    if ((e instanceof NoRouteToHostException) || (e instanceof ConnectException) || (e instanceof SocketTimeoutException)) {
                        Log.log.error("Unable to connect to " + LogUtil.hideCredentials(str) + ": " + e);
                        this.badAddresses.put(str, Long.valueOf(System.currentTimeMillis()));
                    } else {
                        this.receivedBytes.addAndGet(50L);
                        Log.log.error("Unexpected error", e);
                        if (httpURLConnection != null) {
                            try {
                                if (httpURLConnection.getResponseCode() == 401) {
                                    this.badToken = marketDataToken;
                                    this.status = "Service unavailable due to security issues.";
                                    Log.log.warn("status = " + this.status);
                                    return null;
                                }
                                continue;
                            } catch (IOException e2) {
                                Log.log.error("Nested error: " + e2);
                            }
                        } else {
                            continue;
                        }
                    }
                }
            }
        }
        this.status = "Service unavailable due to connectivity issues.";
        Log.log.warn("status = " + this.status);
        return null;
    }

    private HttpURLConnection prepareConnection(String str) throws IOException {
        HttpURLConnection httpURLConnection = (HttpURLConnection) new URL("http://" + str + "/MarketDataReplay").openConnection();
        httpURLConnection.setConnectTimeout(10000);
        httpURLConnection.setReadTimeout(60000);
        httpURLConnection.setRequestMethod("POST");
        httpURLConnection.setRequestProperty("Accept", "application/octet-stream");
        httpURLConnection.setRequestProperty("Content-Type", "application/octet-stream");
        httpURLConnection.setDoOutput(true);
        return httpURLConnection;
    }

    private ByteArrayInput readResponse(InputStream inputStream) throws IOException {
        ByteArrayInput byteArrayInput = new ByteArrayInput(new byte[10000]);
        byteArrayInput.setLimit(0);
        while (true) {
            int read = inputStream.read(byteArrayInput.getBuffer(), byteArrayInput.getLimit(), byteArrayInput.getBuffer().length - byteArrayInput.getLimit());
            if (read < 0) {
                throw new IOException("Unexpected end of response");
            }
            this.receivedBytes.addAndGet(read);
            byteArrayInput.setLimit(byteArrayInput.getLimit() + read);
            if (byteArrayInput.getLimit() - byteArrayInput.getPosition() >= ReplayUtil.getCompactLength(byteArrayInput.getBuffer(), byteArrayInput.getPosition())) {
                int position = byteArrayInput.getPosition();
                int readCompactInt = byteArrayInput.readCompactInt();
                if (readCompactInt <= 0) {
                    throw new IOException("Unexpected end of response");
                }
                int position2 = byteArrayInput.getPosition() + readCompactInt;
                byteArrayInput.setPosition(position);
                byteArrayInput.ensureCapacity(position2);
                if (byteArrayInput.getLimit() >= position2) {
                    inputStream.close();
                    return byteArrayInput;
                }
            }
        }
    }

    private static ArrayList<Segment> unpackResponse(ByteArrayInput byteArrayInput) throws IOException {
        ReplayResponse replayResponse = new ReplayResponse();
        replayResponse.read(byteArrayInput);
        DataInput responseBlocksInput = replayResponse.getResponseBlocksInput();
        ArrayList<Segment> arrayList = new ArrayList<>();
        while (responseBlocksInput.available() > 0) {
            int position = responseBlocksInput.getPosition();
            try {
                Block block = new Block();
                block.readBlock(responseBlocksInput);
                block.decompress();
                arrayList.add(new Segment(block));
            } catch (Exception e) {
                responseBlocksInput.setPosition(position);
                responseBlocksInput.setPosition(responseBlocksInput.getPosition() + responseBlocksInput.readCompactInt());
                Log.log.error("Error reading block", e);
            }
        }
        return arrayList;
    }

    private synchronized void addData(ArrayList<Segment> arrayList) {
        if (this.cache != null) {
            this.cache.addData(arrayList);
        }
    }

    public long getSentBytes() {
        return this.sentBytes.longValue();
    }

    public long getReceivedBytes() {
        return this.receivedBytes.longValue();
    }

    public String getStatus() {
        return this.status;
    }

    public synchronized double getAvailableData(long j) {
        if (this.cache == null) {
            return 0.0d;
        }
        return this.cache.getAvailableData(this.current, j);
    }

    public synchronized RecordBuffer getSnapshot(long j) {
        setTime(j);
        return getUpdate(j);
    }

    public synchronized RecordBuffer getUpdate(long j) {
        updateReplaySpeed(j);
        RecordBuffer recordBuffer = RecordBuffer.getInstance(RecordMode.TIMESTAMPED_DATA);
        if (this.cache == null || j < this.current.time) {
            return recordBuffer;
        }
        this.cache.rebuildCurrentSegmentsIfNeeded(this.current);
        readCurrent(recordBuffer, j);
        if (this.current.isCurrentInterval(j)) {
            long j2 = this.current.endTime - this.prefetchInterval;
            if (this.current.time < j2 && j >= j2) {
                awaken();
            }
            this.current.time = j;
        } else {
            this.current.time = j;
            this.cache.rebuildCurrentSegments(this.current);
            readCurrent(recordBuffer, j);
            awaken();
        }
        return recordBuffer;
    }

    private void readCurrent(RecordBuffer recordBuffer, long j) {
        long nextUsage = this.cache.nextUsage();
        Iterator it = this.current.segments.iterator();
        while (it.hasNext()) {
            ((CurrentSegment) it.next()).read(recordBuffer, j, nextUsage);
        }
    }

    private void updateReplaySpeed(long j) {
        long j2 = j - this.prevReplayTime;
        long currentTimeMillis = System.currentTimeMillis() - this.prevReplayMillis;
        if (j2 <= 0 || j2 > 59000 || currentTimeMillis <= 0 || currentTimeMillis > 59000) {
            this.prevReplayTime += j2;
            this.prevReplayMillis += currentTimeMillis;
        } else {
            if (j2 < 1000 || currentTimeMillis < 1000) {
                return;
            }
            this.current.replaySpeed = Math.floor(((this.current.replaySpeed + Math.min(Math.max(0.1d, j2 / currentTimeMillis), 10.0d)) * 50.0d) + 0.5d) / 100.0d;
            this.prevReplayTime += j2;
            this.prevReplayMillis += currentTimeMillis;
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        updateCacheInstance();
        if (Thread.currentThread() != this.worker) {
            releaseCacheInstance(this.current);
            return;
        }
        String property = System.getProperty("TestThinkOnDemand");
        if (property != null) {
            try {
                MarketDataAccess.getInstance().startConfigurationWatcher(property, property + ".cache", 10000L);
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
            }
        }
        while (Thread.currentThread() == this.worker) {
            try {
                MarketDataToken marketDataToken = this.token;
                if (property != null) {
                    marketDataToken = MarketDataAccess.getInstance().createToken("test");
                }
                if (marketDataToken != this.badToken) {
                    ReplayRequest prepareRequest = prepareRequest(marketDataToken, false);
                    if (this.current.time >= this.current.endTime - this.prefetchInterval) {
                        if (prepareRequest == null) {
                            prepareRequest = prepareRequest(marketDataToken, true);
                        } else {
                            awaken();
                        }
                    }
                    if (prepareRequest != null) {
                        long currentTimeMillis = System.currentTimeMillis();
                        long longValue = this.sentBytes.longValue();
                        long longValue2 = this.receivedBytes.longValue();
                        ByteArrayInput doRequest = doRequest(prepareRequest.write(), (int) (prepareRequest.getRequestTime() / 1800000), marketDataToken);
                        if (doRequest != null) {
                            ArrayList<Segment> unpackResponse = unpackResponse(doRequest);
                            long j = 0;
                            while (unpackResponse.iterator().hasNext()) {
                                j += r0.next().size();
                            }
                            Log.log.info("Response: " + Log.mb(j) + " in " + unpackResponse.size() + " segments, sent " + Log.mb(this.sentBytes.longValue() - longValue) + " received " + Log.mb(this.receivedBytes.longValue() - longValue2) + " in " + ((System.currentTimeMillis() - currentTimeMillis) / 1000.0d) + " seconds, total sent " + Log.mb(this.sentBytes.longValue()) + ", total received " + Log.mb(this.receivedBytes.longValue()));
                            addData(unpackResponse);
                        }
                        Thread.sleep(1000L);
                    }
                }
                LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10L));
            } catch (Throwable th) {
                Log.log.error("Unexpected error", th);
                try {
                    Thread.sleep(10000L);
                } catch (InterruptedException e2) {
                }
            }
        }
    }
}
