/*
 * Decompiled with CFR 0.152.
 */
package com.dxfeed.ondemand.impl;

import com.devexperts.io.ByteArrayInput;
import com.devexperts.io.ByteArrayOutput;
import com.devexperts.qd.DataRecord;
import com.devexperts.qd.SubscriptionBuffer;
import com.devexperts.qd.SubscriptionIterator;
import com.devexperts.qd.ng.RecordBuffer;
import com.devexperts.qd.ng.RecordCursor;
import com.devexperts.qd.ng.RecordMode;
import com.devexperts.util.IndexedSet;
import com.devexperts.util.LogUtil;
import com.devexperts.util.TimeFormat;
import com.dxfeed.ondemand.impl.Block;
import com.dxfeed.ondemand.impl.Cache;
import com.dxfeed.ondemand.impl.CacheConfig;
import com.dxfeed.ondemand.impl.Current;
import com.dxfeed.ondemand.impl.CurrentSegment;
import com.dxfeed.ondemand.impl.Key;
import com.dxfeed.ondemand.impl.Log;
import com.dxfeed.ondemand.impl.MarketDataAccess;
import com.dxfeed.ondemand.impl.MarketDataToken;
import com.dxfeed.ondemand.impl.ReplayRequest;
import com.dxfeed.ondemand.impl.ReplayResponse;
import com.dxfeed.ondemand.impl.ReplayUtil;
import com.dxfeed.ondemand.impl.Segment;
import com.dxfeed.ondemand.impl.event.MDREventUtil;
import java.io.DataInput;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.NoRouteToHostException;
import java.net.SocketTimeoutException;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;

public class MarketDataReplay
implements Runnable {
    private volatile Thread worker;
    private volatile MarketDataToken token = new MarketDataToken(Collections.emptyMap(), null);
    private volatile MarketDataToken badToken;
    private final Map<String, Long> badAddresses = Collections.synchronizedMap(new HashMap());
    private long badAddressTimeout = 600000L;
    private final CacheConfig config = new CacheConfig();
    private Cache cache;
    private final Current current = new Current();
    private long prefetchInterval = 59000L;
    private final AtomicLong sentBytes = new AtomicLong();
    private final AtomicLong receivedBytes = new AtomicLong();
    private volatile String status;
    private long prevReplayTime;
    private long prevReplayMillis;
    private final Thread[] stoppedThreads = new Thread[2];

    public long getCacheLimit() {
        return this.config.cacheLimit;
    }

    public void setCacheLimit(long amount) {
        this.config.cacheLimit = amount;
    }

    public long getFileCacheLimit() {
        return this.config.fileCacheLimit;
    }

    public void setFileCacheLimit(long amount) {
        this.config.fileCacheLimit = amount;
    }

    public String getFileCachePath() {
        return this.config.fileCachePath;
    }

    public void setFileCachePath(String path) {
        if (path == null) {
            throw new NullPointerException();
        }
        this.config.fileCachePath = path;
    }

    public long getFileCacheDumpPeriod() {
        return this.config.fileCacheDumpPeriod;
    }

    public void setFileCacheDumpPeriod(long fileCacheDumpPeriod) {
        this.config.fileCacheDumpPeriod = fileCacheDumpPeriod;
    }

    public long getTimeToLive() {
        return this.config.timeToLive;
    }

    public void setTimeToLive(long timeToLive) {
        this.config.timeToLive = timeToLive;
    }

    public synchronized void setTime(long time) {
        if (time == this.current.time) {
            return;
        }
        Log.log.info("setTime(" + TimeFormat.DEFAULT.withMillis().format(time) + ")");
        for (CurrentSegment segment : this.current.segments) {
            segment.restart();
        }
        this.current.time = time;
        if (!this.current.isCurrentInterval(time) && this.cache != null) {
            this.cache.rebuildCurrentSegments(this.current);
        }
        this.awaken();
    }

    public synchronized long getTime() {
        return this.current.time;
    }

    public synchronized void clearSubscription() {
        this.current.subscription.clear();
    }

    public void setSubscription(SubscriptionBuffer sb) {
        RecordBuffer sub = RecordBuffer.getInstance((RecordMode)RecordMode.SUBSCRIPTION);
        sub.processSubscription((SubscriptionIterator)sb);
        this.setSubscription(sub);
        sub.release();
    }

    public synchronized void setSubscription(RecordBuffer sub) {
        this.current.subscription.clear();
        this.current.subscription.addAll(this.buildSubscription("setSubscription", sub));
        if (this.cache != null) {
            this.cache.rebuildCurrentSegments(this.current);
        }
        this.awaken();
    }

    public synchronized void addSubscription(RecordBuffer sub) {
        if (sub.isEmpty()) {
            return;
        }
        for (Key key : this.buildSubscription("addSubscription", sub)) {
            Key old = (Key)this.current.subscription.put((Object)key);
            if (old == null) continue;
            key.subscriptionCount += old.subscriptionCount;
        }
        if (this.cache != null) {
            this.cache.rebuildCurrentSegments(this.current);
        }
        this.awaken();
    }

    public synchronized void removeSubscription(RecordBuffer sub) {
        if (sub.isEmpty()) {
            return;
        }
        for (Key key : this.buildSubscription("removeSubscription", sub)) {
            Key old = (Key)this.current.subscription.getByKey((Object)key);
            if (old == null) continue;
            old.subscriptionCount -= key.subscriptionCount;
            if (old.subscriptionCount > 0) continue;
            this.current.subscription.removeKey((Object)key);
        }
        if (this.cache != null) {
            this.cache.rebuildCurrentSegments(this.current);
        }
        this.awaken();
    }

    private IndexedSet<Key, Key> buildSubscription(String method, RecordBuffer sub) {
        RecordCursor cur;
        IndexedSet subscription = new IndexedSet();
        IndexedSet goodRecords = new IndexedSet();
        IndexedSet goodSymbols = new IndexedSet();
        TreeSet<String> badRecords = new TreeSet<String>();
        TreeSet<String> badSymbols = new TreeSet<String>();
        while ((cur = sub.next()) != null) {
            DataRecord record = cur.getRecord();
            char type = MDREventUtil.getType(record);
            String symbol = record.getScheme().getCodec().decode(cur.getCipher(), cur.getSymbol()).intern();
            boolean goodSymbol = MDREventUtil.isGoodSymbol(symbol);
            if (type != '\u0000') {
                goodRecords.add(record.getName());
            } else {
                badRecords.add(record.getName());
            }
            if (goodSymbol) {
                goodSymbols.add(symbol);
            } else {
                badSymbols.add(symbol == null ? "<null>" : (symbol.length() == 0 ? "<empty>" : symbol));
            }
            if (type == '\u0000' || !goodSymbol) continue;
            Key key = (Key)subscription.putIfAbsentAndGet((Object)new Key(symbol, MDREventUtil.getExchange(record), type));
            ++key.subscriptionCount;
        }
        Log.log.info(method + ": " + goodRecords.size() + " records, " + goodSymbols.size() + " symbols, ignored " + badRecords + " " + badSymbols + ", categories:" + MDREventUtil.countCategories((Collection<Key>)subscription));
        return subscription;
    }

    public synchronized void setToken(MarketDataToken token) {
        if (token == null) {
            throw new NullPointerException("token is null");
        }
        if (token == this.token) {
            return;
        }
        this.token = token;
        Log.log.info("setToken(" + token + ")");
        this.awaken();
    }

    public synchronized boolean hasPermanentError() {
        return this.token == this.badToken;
    }

    private void awaken() {
        LockSupport.unpark(this.worker);
    }

    public synchronized void start() {
        Log.log.info("start");
        this.status = null;
        this.worker = new Thread((Runnable)this, "MarketDataReplay-Worker");
        this.worker.setDaemon(true);
        this.worker.start();
    }

    private synchronized void updateCacheInstance() {
        this.config.address = this.token.getServiceAddress();
        if (this.cache != null && !this.config.equals(this.cache.getConfig())) {
            this.cache.release();
            this.cache = null;
        }
        if (this.cache == null) {
            this.cache = Cache.acquireInstance(this.config);
        }
        this.cache.rebuildCurrentSegments(this.current);
    }

    private synchronized Thread releaseCacheInstance(Current current) {
        Cache cache = this.cache;
        this.cache = null;
        if (cache != null) {
            cache.releaseSegments(current);
            return cache.release();
        }
        return null;
    }

    public synchronized void stop() {
        Log.log.info("stop");
        this.status = null;
        Thread worker = this.worker;
        this.worker = null;
        if (worker != null) {
            LockSupport.unpark(worker);
        }
        this.stoppedThreads[0] = worker;
        this.stoppedThreads[1] = this.releaseCacheInstance(this.current);
    }

    public Thread[] getStoppedThreads() {
        return this.stoppedThreads;
    }

    private synchronized ReplayRequest prepareRequest(MarketDataToken token, boolean prefetch) throws IOException {
        long requestTime = prefetch ? this.current.endTime : this.current.time;
        IndexedSet presentKeys = new IndexedSet();
        IndexedSet expiredKeys = new IndexedSet();
        if (this.cache != null) {
            this.cache.checkRequestKeys(this.current, requestTime, (IndexedSet<Key, Key>)presentKeys, (IndexedSet<Key, Key>)expiredKeys);
        }
        boolean urgent = false;
        ArrayList<Key> requestKeys = new ArrayList<Key>();
        for (Key key : this.current.subscription) {
            if (presentKeys.containsKey((Object)key)) continue;
            requestKeys.add(key);
            if (expiredKeys.containsKey((Object)key)) continue;
            urgent = true;
        }
        if (requestKeys.isEmpty()) {
            return null;
        }
        Collections.sort(requestKeys, Key.COMPARATOR);
        ReplayRequest request = new ReplayRequest();
        request.setToken(token);
        request.setAllowedDelay(urgent ? Math.min((long)((double)(requestTime - this.current.time) / this.current.replaySpeed), 60000L) : 60000L);
        request.setRequestTime(requestTime);
        request.addRequestKeys(requestKeys);
        Log.log.info("Request <" + token.getTokenUser() + "/" + token.getTokenContract() + "> [" + requestKeys.size() + " keys at " + TimeFormat.DEFAULT.format(requestTime) + "]" + MDREventUtil.countCategories(requestKeys) + ", urgent " + urgent + ", replay speed " + this.current.replaySpeed + ", delay " + (double)request.getAllowedDelay() / 1000.0 + " seconds");
        return request;
    }

    private ByteArrayInput doRequest(ReplayRequest request) throws IOException {
        MarketDataToken token = request.getToken();
        int addressHash = this.getRequestHash(request);
        ByteArrayOutput requestBody = request.write();
        ArrayList<String> addresses = this.getResolvedAddresses(token.getServiceAddress());
        if (!addresses.isEmpty() && (this.badAddresses.keySet().containsAll(addresses) || this.badAddresses.size() > addresses.size() * 2)) {
            this.badAddresses.clear();
        }
        while (!addresses.isEmpty()) {
            String address = addresses.remove(Math.abs(addressHash % addresses.size()));
            Long badTime = this.badAddresses.get(address);
            if (badTime != null) {
                if (badTime > System.currentTimeMillis() - this.badAddressTimeout) continue;
                this.badAddresses.remove(address, badTime);
            }
            HttpURLConnection con = null;
            try {
                con = this.prepareConnection(request, address);
                OutputStream output = con.getOutputStream();
                output.write(requestBody.getBuffer(), 0, requestBody.getPosition());
                output.close();
                this.sentBytes.addAndGet(200 + requestBody.getPosition());
                InputStream input = con.getInputStream();
                this.receivedBytes.addAndGet(100L);
                this.status = null;
                return this.readResponse(input);
            }
            catch (IOException e) {
                if (e instanceof NoRouteToHostException || e instanceof ConnectException || e instanceof SocketTimeoutException) {
                    Log.log.error("Unable to connect to " + LogUtil.hideCredentials((Object)address) + ": " + e);
                    this.badAddresses.put(address, System.currentTimeMillis());
                    continue;
                }
                this.receivedBytes.addAndGet(50L);
                Log.log.error("Unexpected error", (Throwable)e);
                try {
                    if (con == null || con.getResponseCode() != 401) continue;
                    this.badToken = token;
                    this.status = "Service unavailable due to security issues.";
                    Log.log.warn("status = " + this.status);
                    return null;
                }
                catch (IOException ee) {
                    Log.log.error("Nested error: " + ee);
                }
            }
        }
        this.status = "Service unavailable due to connectivity issues.";
        Log.log.warn("status = " + this.status);
        return null;
    }

    ArrayList<String> getResolvedAddresses(String addressList) {
        LinkedHashSet<String> resolvedURLs = new LinkedHashSet<String>();
        for (String addr : addressList.split(",")) {
            URL url = null;
            try {
                url = MarketDataReplay.addrToURL(addr);
                String host = url.getHost();
                String port = url.getPort() == -1 ? "" : ":" + url.getPort();
                InetAddress[] all = this.getAllByName(host);
                Arrays.sort(all, (o1, o2) -> {
                    byte[] a1 = o1.getAddress();
                    byte[] a2 = o2.getAddress();
                    int n = Math.min(a1.length, a2.length);
                    for (int i = 0; i < n; ++i) {
                        int delta = (a1[i] & 0xFF) - (a2[i] & 0xFF);
                        if (delta == 0) continue;
                        return delta;
                    }
                    return a1.length - a2.length;
                });
                for (InetAddress iaddr : all) {
                    String hostAddress = iaddr.getHostAddress();
                    if (iaddr instanceof Inet6Address) {
                        hostAddress = "[" + hostAddress + "]";
                    }
                    resolvedURLs.add(url.getProtocol() + "://" + hostAddress + port + url.getPath());
                }
            }
            catch (UnknownHostException e) {
                Log.log.warn("Failed to resolve IPs for " + addr);
                resolvedURLs.add(url.toString());
            }
            catch (MalformedURLException e) {
                Log.log.warn("Malformed address element " + addr);
            }
        }
        return new ArrayList<String>(resolvedURLs);
    }

    InetAddress[] getAllByName(String host) throws UnknownHostException {
        return InetAddress.getAllByName(host);
    }

    static URL addrToURL(String addr) throws MalformedURLException {
        if (addr.regionMatches(true, 0, "http://", 0, 7) || addr.regionMatches(true, 0, "https://", 0, 8)) {
            return new URL(addr);
        }
        return new URL("http://" + addr + "/MarketDataReplay");
    }

    private HttpURLConnection prepareConnection(ReplayRequest request, String address) throws IOException {
        URL url = new URL(address);
        HttpURLConnection con = (HttpURLConnection)url.openConnection();
        con.setConnectTimeout(10000);
        con.setReadTimeout(60000);
        con.setRequestMethod("POST");
        con.setRequestProperty("Accept", "application/octet-stream");
        con.setRequestProperty("Content-Type", "application/octet-stream");
        con.setRequestProperty("X-OnDemand-StartTime", TimeFormat.GMT.asFullIso().format(request.getRequestTime()));
        con.setDoOutput(true);
        return con;
    }

    private ByteArrayInput readResponse(InputStream input) throws IOException {
        ByteArrayInput response = new ByteArrayInput(new byte[10000]);
        response.setLimit(0);
        while (true) {
            int n;
            if ((n = input.read(response.getBuffer(), response.getLimit(), response.getBuffer().length - response.getLimit())) < 0) {
                throw new IOException("Unexpected end of response");
            }
            this.receivedBytes.addAndGet(n);
            response.setLimit(response.getLimit() + n);
            if (response.getLimit() - response.getPosition() < ReplayUtil.getCompactLength(response.getBuffer(), response.getPosition())) continue;
            int position = response.getPosition();
            int length = response.readCompactInt();
            if (length <= 0) {
                throw new IOException("Unexpected end of response");
            }
            int limit = response.getPosition() + length;
            response.setPosition(position);
            response.ensureCapacity(limit);
            if (response.getLimit() >= limit) break;
        }
        input.close();
        return response;
    }

    private static ArrayList<Segment> unpackResponse(ByteArrayInput response) throws IOException {
        ReplayResponse rr = new ReplayResponse();
        rr.read(response);
        response = rr.getResponseBlocksInput();
        ArrayList<Segment> newSegments = new ArrayList<Segment>();
        while (response.available() > 0) {
            int blockPosition = response.getPosition();
            try {
                Block block = new Block();
                block.readBlock((DataInput)response);
                block.decompress();
                newSegments.add(new Segment(block));
            }
            catch (Exception e) {
                response.setPosition(blockPosition);
                int blockLength = response.readCompactInt();
                response.setPosition(response.getPosition() + blockLength);
                Log.log.error("Error reading block", (Throwable)e);
            }
        }
        return newSegments;
    }

    private synchronized void addData(ArrayList<Segment> newSegments) {
        if (this.cache != null) {
            this.cache.addData(newSegments);
        }
    }

    public long getSentBytes() {
        return this.sentBytes.longValue();
    }

    public long getReceivedBytes() {
        return this.receivedBytes.longValue();
    }

    public String getStatus() {
        return this.status;
    }

    public synchronized double getAvailableData(long time) {
        return this.cache == null ? 0.0 : this.cache.getAvailableData(this.current, time);
    }

    public synchronized RecordBuffer getSnapshot(long time) {
        this.setTime(time);
        return this.getUpdate(time);
    }

    public synchronized RecordBuffer getUpdate(long time) {
        this.updateReplaySpeed(time);
        RecordBuffer buffer = RecordBuffer.getInstance((RecordMode)RecordMode.TIMESTAMPED_DATA);
        if (this.cache == null || time < this.current.time) {
            return buffer;
        }
        this.cache.rebuildCurrentSegmentsIfNeeded(this.current);
        this.readCurrent(buffer, time);
        if (!this.current.isCurrentInterval(time)) {
            this.current.time = time;
            this.cache.rebuildCurrentSegments(this.current);
            this.readCurrent(buffer, time);
            this.awaken();
        } else {
            long prefetchTime = this.current.endTime - this.prefetchInterval;
            if (this.current.time < prefetchTime && time >= prefetchTime) {
                this.awaken();
            }
            this.current.time = time;
        }
        return buffer;
    }

    private void readCurrent(RecordBuffer buffer, long time) {
        long usage = this.cache.nextUsage();
        for (CurrentSegment segment : this.current.segments) {
            segment.read(buffer, time, usage);
        }
    }

    private void updateReplaySpeed(long time) {
        long dt = time - this.prevReplayTime;
        long dm = System.currentTimeMillis() - this.prevReplayMillis;
        if (dt <= 0L || dt > 59000L || dm <= 0L || dm > 59000L) {
            this.prevReplayTime += dt;
            this.prevReplayMillis += dm;
        } else if (dt >= 1000L && dm >= 1000L) {
            this.current.replaySpeed = Math.floor((this.current.replaySpeed + Math.min(Math.max(0.1, (double)dt / (double)dm), 10.0)) * 50.0 + 0.5) / 100.0;
            this.prevReplayTime += dt;
            this.prevReplayMillis += dm;
        }
    }

    @Override
    public void run() {
        this.updateCacheInstance();
        if (Thread.currentThread() != this.worker) {
            this.releaseCacheInstance(this.current);
            return;
        }
        String testMode = System.getProperty("TestThinkOnDemand");
        if (testMode != null) {
            try {
                MarketDataAccess.getInstance().startConfigurationWatcher(testMode, testMode + ".cache", 10000L);
                Thread.sleep(1000L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
        while (Thread.currentThread() == this.worker) {
            try {
                MarketDataToken currentToken = this.token;
                if (testMode != null) {
                    currentToken = MarketDataAccess.getInstance().createToken("test");
                }
                if (currentToken != this.badToken) {
                    ReplayRequest request = this.prepareRequest(currentToken, false);
                    if (this.current.time >= this.current.endTime - this.prefetchInterval) {
                        if (request == null) {
                            request = this.prepareRequest(currentToken, true);
                        } else {
                            this.awaken();
                        }
                    }
                    if (request != null) {
                        long millis = System.currentTimeMillis();
                        long oldSent = this.sentBytes.longValue();
                        long oldReceived = this.receivedBytes.longValue();
                        ByteArrayInput response = this.doRequest(request);
                        if (response != null) {
                            ArrayList<Segment> newSegments = MarketDataReplay.unpackResponse(response);
                            long size = 0L;
                            for (Segment segment : newSegments) {
                                size += (long)segment.size();
                            }
                            Log.log.info("Response: " + Log.mb(size) + " in " + newSegments.size() + " segments, sent " + Log.mb(this.sentBytes.longValue() - oldSent) + " received " + Log.mb(this.receivedBytes.longValue() - oldReceived) + " in " + (double)(System.currentTimeMillis() - millis) / 1000.0 + " seconds, total sent " + Log.mb(this.sentBytes.longValue()) + ", total received " + Log.mb(this.receivedBytes.longValue()));
                            this.addData(newSegments);
                        }
                        Thread.sleep(1000L);
                    }
                }
                LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10L));
            }
            catch (Throwable t) {
                Log.log.error("Unexpected error", t);
                try {
                    Thread.sleep(10000L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }
    }

    private int getRequestHash(ReplayRequest request) {
        return (int)(request.getRequestTime() / 1800000L);
    }
}

