/*
 * Decompiled with CFR 0.152.
 */
package com.devexperts.qd.impl.matrix;

import com.devexperts.logging.Logging;
import com.devexperts.logging.TraceLogging;
import com.devexperts.qd.DataIntField;
import com.devexperts.qd.DataObjField;
import com.devexperts.qd.DataRecord;
import com.devexperts.qd.DataScheme;
import com.devexperts.qd.HistorySubscriptionFilter;
import com.devexperts.qd.QDAgent;
import com.devexperts.qd.QDDistributor;
import com.devexperts.qd.QDFactory;
import com.devexperts.qd.SymbolCodec;
import com.devexperts.qd.impl.matrix.History;
import com.devexperts.qd.kit.CompactIntField;
import com.devexperts.qd.kit.DefaultRecord;
import com.devexperts.qd.kit.DefaultScheme;
import com.devexperts.qd.kit.PentaCodec;
import com.devexperts.qd.kit.RecordOnlyFilter;
import com.devexperts.qd.ng.AbstractRecordSink;
import com.devexperts.qd.ng.EventFlag;
import com.devexperts.qd.ng.RecordBuffer;
import com.devexperts.qd.ng.RecordCursor;
import com.devexperts.qd.ng.RecordListener;
import com.devexperts.qd.ng.RecordMode;
import com.devexperts.qd.ng.RecordProvider;
import com.devexperts.qd.ng.RecordSink;
import com.devexperts.qd.ng.RecordSource;
import com.devexperts.qd.qtp.MessageType;
import com.devexperts.qd.stats.QDStats;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import javax.annotation.Nonnull;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class HistorySnapshotMTStressTest {
    private static final Logging log = Logging.getLogging(HistorySnapshotMTStressTest.class);
    private static final int N_SECS = 5;
    private static final int N_SYMBOLS = 3;
    private static final int N_AGENTS = 3;
    private static final int MAX_RECS = 5;
    private static final int MAX_PROGRESS_WAIT = 5;
    private static final int MAX_VALUE = 5;
    private static final int N_FIXED_TIMES = 6;
    private static final int VALUE_INDEX = 2;
    private static final int SUM_INDEX = 3;
    private static final long WARN_INCONSISTENT_INTERVAL = 500L;
    private static final int TX_PENDING = EventFlag.TX_PENDING.flag();
    private static final int REMOVE_EVENT = EventFlag.REMOVE_EVENT.flag();
    private static final int SNAPSHOT_BEGIN = EventFlag.SNAPSHOT_BEGIN.flag();
    private static final int SNAPSHOT_END = EventFlag.SNAPSHOT_END.flag();
    private static final int SNAPSHOT_SNIP = EventFlag.SNAPSHOT_SNIP.flag();
    private static final DataRecord RECORD = new DefaultRecord(0, "Test", true, new DataIntField[]{new CompactIntField(0, "Test.1"), new CompactIntField(1, "Test.2"), new CompactIntField(2, "Test.Value"), new CompactIntField(3, "Test.Sum")}, new DataObjField[0]);
    private static final PentaCodec CODEC = PentaCodec.INSTANCE;
    private static final DataScheme SCHEME = new DefaultScheme((SymbolCodec)CODEC, new DataRecord[]{RECORD});
    private final boolean unconflated;
    private volatile boolean stopped;
    private DistributorThread distributorThread;
    private String[] symbols;
    private volatile BlockingQueue<Throwable> uncaughtException;
    private List<Throwable> stackTraces;
    private List<Thread> threads;
    private List<AgentThread> agents;

    @Parameterized.Parameters(name="unconflated={0}")
    public static Iterable<Object[]> data() {
        return Arrays.asList({false}, {true});
    }

    public HistorySnapshotMTStressTest(boolean unconflated) {
        this.unconflated = unconflated;
    }

    @Before
    public void setUp() throws Exception {
        int i;
        History history = new History(QDFactory.getDefaultFactory().historyBuilder().withScheme(SCHEME).withStats(QDStats.VOID).withHistoryFilter((HistorySubscriptionFilter)new HSF()), new RecordOnlyFilter(SCHEME){

            public boolean acceptRecord(DataRecord record) {
                return !HistorySnapshotMTStressTest.this.unconflated;
            }
        });
        this.distributorThread = new DistributorThread(history.distributorBuilder().build());
        this.stopped = false;
        TraceLogging.restart();
        this.symbols = new String[3];
        for (i = 0; i < 3; ++i) {
            this.symbols[i] = "SYMBOL_" + i;
            Assert.assertEquals((long)0L, (long)CODEC.encode(this.symbols[i]));
        }
        this.uncaughtException = new ArrayBlockingQueue<Throwable>(1);
        this.stackTraces = new ArrayList<Throwable>();
        this.threads = new ArrayList<Thread>();
        this.agents = new ArrayList<AgentThread>();
        this.threads.add(this.distributorThread);
        for (i = 0; i < 3; ++i) {
            AgentThread agent = new AgentThread(i, history.agentBuilder().withHistorySnapshot(true).withKeyProperties("i=" + i).build());
            this.threads.add(agent);
            this.agents.add(agent);
        }
        UEH ueh = new UEH();
        for (Thread thread : this.threads) {
            thread.setUncaughtExceptionHandler(ueh);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStress() throws InterruptedException {
        for (Thread thread : this.threads) {
            thread.start();
        }
        Throwable ex = null;
        try {
            for (int sec = 1; sec <= 5 && (ex = this.uncaughtException.poll(1L, TimeUnit.SECONDS)) == null; ++sec) {
                for (AgentThread agent : this.agents) {
                    if (!agent.makesProgress) {
                        this.makeStackTrace(this.distributorThread);
                        this.makeStackTrace(agent);
                        this.logAndFail("agent does not make progress: " + agent);
                    }
                    agent.makesProgress = false;
                }
                System.err.print(".");
                if (sec % 100 != 0) continue;
                System.err.println();
            }
            System.err.println(" stopping" + (ex == null ? "" : " because of " + ex));
            this.stopped = true;
        }
        catch (Throwable e) {
            try {
                e.printStackTrace();
                ex = e;
                System.err.println(" stopping" + (ex == null ? "" : " because of " + ex));
                this.stopped = true;
            }
            catch (Throwable throwable) {
                System.err.println(" stopping" + (ex == null ? "" : " because of " + ex));
                this.stopped = true;
                for (Thread thread : this.threads) {
                    thread.interrupt();
                }
                for (Thread thread : this.threads) {
                    thread.join();
                }
                throw throwable;
            }
            for (Thread thread : this.threads) {
                thread.interrupt();
            }
            for (Thread thread : this.threads) {
                thread.join();
            }
        }
        for (Thread thread : this.threads) {
            thread.interrupt();
        }
        for (Thread thread : this.threads) {
            thread.join();
        }
        if (ex == null) {
            ex = (Throwable)this.uncaughtException.poll();
        }
        if (ex != null) {
            TraceLogging.dump((PrintStream)System.err, (String)HistorySnapshotMTStressTest.class.getSimpleName());
            Assert.fail((String)ex.toString());
        }
    }

    private void makeStackTrace(Thread thread) {
        Throwable t = new Throwable(thread.getName() + " stack trace");
        t.setStackTrace(thread.getStackTrace());
        this.stackTraces.add(t);
    }

    private void logAndFail(String msg) {
        TraceLogging.logAndStop(HistorySnapshotMTStressTest.class, (String)msg);
        System.err.println();
        for (Throwable stackTrace : this.stackTraces) {
            stackTrace.printStackTrace(System.err);
        }
        Assert.fail((String)msg);
    }

    @Nonnull
    private static String withFlagsStr(int eventFlags) {
        return (eventFlags == 0 ? "" : " with " + EventFlag.formatEventFlags((int)eventFlags, (MessageType)MessageType.HISTORY_DATA)) + ((eventFlags & TX_PENDING) == 0 ? " (txEnd)" : "");
    }

    private static long rndTime(Random rnd) {
        long result;
        while ((result = rnd.nextInt(10) == 0 ? rnd.nextLong() : (long)rnd.nextInt(6) * 1000000000000L) == Long.MIN_VALUE || Math.abs(result) == Long.MAX_VALUE) {
        }
        return result;
    }

    private static class DataItem
    implements Comparable<DataItem> {
        String symbol;
        long time;
        int value;
        int sum;
        boolean updated;

        DataItem(String symbol) {
            this.symbol = symbol;
        }

        @Override
        public int compareTo(@Nonnull DataItem o) {
            return Long.compare(o.time, this.time);
        }

        public String toString() {
            return this.symbol + "@" + this.time + "," + this.value + "," + this.sum;
        }
    }

    private class UEH
    implements Thread.UncaughtExceptionHandler {
        private UEH() {
        }

        @Override
        public void uncaughtException(Thread t, Throwable ex) {
            System.err.println("Exception in thread " + t + ": " + ex);
            ex.printStackTrace(System.err);
            HistorySnapshotMTStressTest.this.uncaughtException.offer(ex);
        }
    }

    private static class HSF
    implements HistorySubscriptionFilter {
        private HSF() {
        }

        public long getMinHistoryTime(DataRecord record, int cipher, String symbol) {
            Assert.assertEquals((long)0L, (long)cipher);
            return symbol.hashCode();
        }

        public int getMaxRecordCount(DataRecord record, int cipher, String symbol) {
            return Integer.MAX_VALUE;
        }
    }

    private class AgentThread
    extends Thread
    implements RecordListener {
        private final int index;
        private final QDAgent agent;
        private final Random rnd;
        private long subTime;
        private final Map<Long, DataItem> data;
        private boolean seenSnapshot;
        private boolean inSnapshot;
        private boolean txPending;
        private boolean emptyView;
        private int progress;
        volatile String symbol;
        volatile boolean makesProgress;
        volatile boolean consistentView;
        volatile boolean hasData;
        private final RecordSink sink;

        AgentThread(int index, QDAgent agent) {
            super("Agent-" + index);
            this.data = new TreeMap<Long, DataItem>();
            this.sink = new AbstractRecordSink(){

                public void append(RecordCursor cursor) {
                    AgentThread.this.appendData(cursor);
                }
            };
            this.index = index;
            this.agent = agent;
            this.rnd = new Random(index);
            agent.setRecordListener((RecordListener)this);
        }

        @Override
        public void run() {
            while (!HistorySnapshotMTStressTest.this.stopped) {
                this.makeSubRun();
            }
        }

        private void makeSubRun() {
            this.progress = 0;
            this.seenSnapshot = false;
            this.inSnapshot = false;
            this.txPending = false;
            this.emptyView = false;
            this.consistentView = false;
            this.data.clear();
            this.symbol = HistorySnapshotMTStressTest.this.symbols[this.rnd.nextInt(3)];
            this.subTime = HistorySnapshotMTStressTest.rndTime(this.rnd);
            RecordBuffer sub = RecordBuffer.getInstance((RecordMode)RecordMode.HISTORY_SUBSCRIPTION);
            log.trace("[" + this.index + "] Subscribe " + this.symbol + "@time=" + this.subTime);
            sub.add(RECORD, 0, this.symbol).setTime(this.subTime);
            this.agent.setSubscription((RecordSource)sub);
            sub.release();
            int progressWait = this.rnd.nextInt(5) + 1;
            while (!HistorySnapshotMTStressTest.this.stopped && this.progress < progressWait && !this.emptyView) {
                while (!this.hasData && !HistorySnapshotMTStressTest.this.stopped) {
                    log.trace("[" + this.index + "] Park for more data");
                    LockSupport.park();
                }
                this.hasData = false;
                if (!this.agent.retrieve(this.sink)) continue;
                this.hasData = true;
            }
            log.trace("[" + this.index + "] Unsubscribe " + this.symbol + "@time=" + this.subTime);
            this.agent.setSubscription(RecordSource.VOID);
            this.symbol = null;
        }

        public void recordsAvailable(RecordProvider provider) {
            this.hasData = true;
            log.trace("[" + this.index + "] recordsAvailable unpark");
            LockSupport.unpark(this);
        }

        private void appendData(RecordCursor cursor) {
            Assert.assertEquals((Object)RECORD, (Object)cursor.getRecord());
            Assert.assertEquals((long)0L, (long)cursor.getCipher());
            long time = cursor.getTime();
            int eventFlags = cursor.getEventFlags();
            boolean bl = this.txPending = (eventFlags & TX_PENDING) != 0;
            if ((eventFlags & SNAPSHOT_BEGIN) != 0) {
                this.seenSnapshot = true;
                this.inSnapshot = true;
                this.data.clear();
            }
            if (time <= this.subTime && (eventFlags & SNAPSHOT_END) != 0 || (eventFlags & SNAPSHOT_SNIP) != 0) {
                this.inSnapshot = false;
            }
            if ((eventFlags & REMOVE_EVENT) == 0) {
                DataItem item = new DataItem(this.symbol);
                item.time = time;
                item.value = cursor.getInt(2);
                item.sum = cursor.getInt(3);
                log.trace("[" + this.index + "] Rcv " + item + HistorySnapshotMTStressTest.withFlagsStr(eventFlags));
                this.data.put(-time, item);
            } else {
                log.trace("[" + this.index + "] Rcv " + this.symbol + "@time=" + time + HistorySnapshotMTStressTest.withFlagsStr(eventFlags));
                this.data.remove(-time);
            }
            boolean bl2 = this.consistentView = this.seenSnapshot && !this.inSnapshot && !this.txPending;
            if (this.consistentView) {
                this.validateData();
                ++this.progress;
                this.makesProgress = true;
                this.emptyView = this.data.isEmpty();
            }
        }

        private void validateData() {
            int sum = 0;
            for (DataItem item : this.data.values()) {
                if (sum != item.sum) {
                    HistorySnapshotMTStressTest.this.logAndFail("[" + this.index + "] !!! Inconsistent " + item + ", expected sum=" + sum);
                }
                sum += item.value;
            }
        }

        @Override
        public String toString() {
            return this.getName() + "{symbol='" + this.symbol + '\'' + ", subTime=" + this.subTime + ", seenSnapshot=" + this.seenSnapshot + ", inSnapshot=" + this.inSnapshot + ", txPending=" + this.txPending + ", emptyView=" + this.emptyView + '}';
        }
    }

    private class Generator {
        private final String symbol;
        private final List<DataItem> data = new ArrayList<DataItem>();
        private final List<DataItem> updated = new ArrayList<DataItem>();
        private long subTime;
        private boolean resendSnapshot;
        private long pausedAtTime;
        private Random rnd;

        Generator(String symbol, long seed) {
            this.symbol = symbol;
            this.rnd = new Random(seed);
            this.prepareData();
        }

        public void subscribe(long time) {
            this.subTime = time;
            this.resendSnapshot = true;
        }

        private void prepareData() {
            HashSet<Long> times = new HashSet<Long>();
            int nRecs = this.rnd.nextInt(4) + 2;
            for (int i = 0; i < nRecs; ++i) {
                DataItem item = new DataItem(this.symbol);
                do {
                    item.time = HistorySnapshotMTStressTest.rndTime(this.rnd);
                } while (!times.add(item.time));
                item.value = this.rnd.nextInt(5);
                this.data.add(item);
            }
            Collections.sort(this.data);
            int sum = 0;
            for (DataItem item : this.data) {
                item.sum = sum;
                sum += item.value;
            }
        }

        private void prepareUpdate() {
            int b;
            int a = this.rnd.nextInt(this.data.size() - 1);
            Assert.assertTrue((a < (b = this.rnd.nextInt(this.data.size() - a - 1) + a + 1) ? 1 : 0) != 0);
            int delta = this.rnd.nextInt(5) + 1;
            this.data.get((int)a).value += delta;
            this.data.get((int)b).value -= delta;
            int sum = this.data.get((int)a).sum;
            for (int i = a; i <= b; ++i) {
                DataItem item = this.data.get(i);
                item.sum = sum;
                sum += item.value;
                item.updated = true;
            }
        }

        public void sendData(RecordBuffer buf) {
            if (this.resendSnapshot) {
                if (this.rnd.nextBoolean()) {
                    this.prepareUpdate();
                }
                this.sendSnapshot(buf);
                this.resendSnapshot = false;
            } else {
                if (this.pausedAtTime != 0L) {
                    int badAgent = this.checkAgentsWhenPaused();
                    if (badAgent >= 0) {
                        long now = System.currentTimeMillis();
                        if (now > this.pausedAtTime + 500L) {
                            log.trace("### On paused " + this.symbol + " agent " + badAgent + " is not consistent");
                            this.pausedAtTime = now;
                        }
                        return;
                    }
                    log.trace("### Resume " + this.symbol);
                    this.pausedAtTime = 0L;
                }
                if (this.rnd.nextInt(1000) == 0) {
                    log.trace("### Pause " + this.symbol);
                    this.pausedAtTime = System.currentTimeMillis();
                } else {
                    this.prepareUpdate();
                    this.sendUpdate(buf);
                }
            }
        }

        private int checkAgentsWhenPaused() {
            for (AgentThread agent : HistorySnapshotMTStressTest.this.agents) {
                if (agent.symbol != this.symbol || agent.consistentView) continue;
                return agent.index;
            }
            return -1;
        }

        private void sendSnapshot(RecordBuffer buf) {
            int n = this.data.size();
            long lastTime = Long.MAX_VALUE;
            log.trace("### sendSnapshot for " + this.symbol);
            for (int i = 0; i < n; ++i) {
                DataItem item = this.data.get(i);
                if (item.time < this.subTime) break;
                RecordCursor cur = buf.add(RECORD, 0, this.symbol);
                cur.setTime(item.time);
                cur.setInt(2, item.value);
                cur.setInt(3, item.sum);
                if (i == 0) {
                    cur.setEventFlags(SNAPSHOT_BEGIN);
                }
                if (item.time == this.subTime) {
                    cur.setEventFlags(cur.getEventFlags() | SNAPSHOT_END);
                }
                lastTime = item.time;
                log.trace("### Snd " + item + HistorySnapshotMTStressTest.withFlagsStr(cur.getEventFlags()));
            }
            if (lastTime > this.subTime) {
                RecordCursor cur = buf.add(RECORD, 0, this.symbol);
                cur.setTime(this.subTime);
                cur.setEventFlags(REMOVE_EVENT | SNAPSHOT_END);
                if (lastTime == Long.MAX_VALUE) {
                    cur.setEventFlags(cur.getEventFlags() | SNAPSHOT_BEGIN);
                }
                log.trace("### Snd " + this.symbol + "@" + this.subTime + HistorySnapshotMTStressTest.withFlagsStr(cur.getEventFlags()));
            }
            for (DataItem item : this.data) {
                item.updated = false;
            }
        }

        private void sendUpdate(RecordBuffer buf) {
            for (DataItem item : this.data) {
                if (!item.updated) continue;
                this.updated.add(item);
                item.updated = false;
            }
            Collections.shuffle(this.updated, this.rnd);
            int n = this.updated.size();
            log.trace("### sendUpdate for " + this.symbol);
            for (int i = 0; i < n; ++i) {
                DataItem item = this.updated.get(i);
                RecordCursor cur = buf.add(RECORD, 0, this.symbol);
                cur.setTime(item.time);
                cur.setInt(2, item.value);
                cur.setInt(3, item.sum);
                if (i < n - 1) {
                    cur.setEventFlags(TX_PENDING);
                }
                if (item.time < this.subTime) {
                    cur.setEventFlags(cur.getEventFlags() | REMOVE_EVENT);
                }
                log.trace("### Snd " + item + HistorySnapshotMTStressTest.withFlagsStr(cur.getEventFlags()));
            }
            this.updated.clear();
        }
    }

    private class DistributorThread
    extends Thread
    implements RecordListener {
        private final QDDistributor dist;
        private final RecordProvider addedRecordProvider;
        private final RecordProvider removedRecordProvider;
        private volatile boolean addedSubAvailable;
        private volatile boolean removedSubAvailable;
        private final RecordBuffer buf;
        private final Map<String, Generator> generators;
        private final Random rnd;

        DistributorThread(QDDistributor dist) {
            super("Distributor");
            this.buf = new RecordBuffer(RecordMode.FLAGGED_DATA);
            this.generators = new HashMap<String, Generator>();
            this.rnd = new Random(1L);
            this.setPriority(1);
            this.dist = dist;
            this.addedRecordProvider = dist.getAddedRecordProvider();
            this.removedRecordProvider = dist.getRemovedRecordProvider();
            this.addedRecordProvider.setRecordListener((RecordListener)this);
            this.removedRecordProvider.setRecordListener((RecordListener)this);
        }

        @Override
        public void run() {
            while (!HistorySnapshotMTStressTest.this.stopped) {
                RecordCursor cur;
                RecordBuffer sub;
                if (this.addedSubAvailable) {
                    this.addedSubAvailable = false;
                    sub = RecordBuffer.getInstance((RecordMode)RecordMode.HISTORY_SUBSCRIPTION);
                    if (this.addedRecordProvider.retrieve((RecordSink)sub)) {
                        this.addedSubAvailable = true;
                    }
                    while ((cur = sub.next()) != null) {
                        this.processAddSub(cur);
                    }
                    sub.release();
                }
                if (this.removedSubAvailable) {
                    this.removedSubAvailable = false;
                    sub = RecordBuffer.getInstance((RecordMode)RecordMode.HISTORY_SUBSCRIPTION);
                    if (this.removedRecordProvider.retrieve((RecordSink)sub)) {
                        this.removedSubAvailable = true;
                    }
                    while ((cur = sub.next()) != null) {
                        this.processRemoveSub(cur);
                    }
                    sub.release();
                }
                for (Generator generator : this.generators.values()) {
                    generator.sendData(this.buf);
                }
                this.process();
            }
        }

        private void processAddSub(RecordCursor cur) {
            Assert.assertEquals((long)0L, (long)cur.getCipher());
            log.trace("### addSub " + cur.getSymbol() + "@time=" + cur.getTime());
            Generator generator = this.generators.get(cur.getSymbol());
            if (generator == null) {
                generator = new Generator(cur.getSymbol(), this.rnd.nextLong());
                this.generators.put(cur.getSymbol(), generator);
            }
            generator.subscribe(this.rnd.nextBoolean() ? cur.getTime() : -9223372036854775807L);
        }

        private void processRemoveSub(RecordCursor cur) {
            Assert.assertEquals((long)0L, (long)cur.getCipher());
            log.trace("### removeSub " + cur.getSymbol());
            this.generators.remove(cur.getSymbol());
        }

        public void recordsAvailable(RecordProvider provider) {
            if (provider == this.addedRecordProvider) {
                log.trace("### addSub available notification");
                this.addedSubAvailable = true;
            } else if (provider == this.removedRecordProvider) {
                log.trace("### removeSub available notification");
                this.removedSubAvailable = true;
            } else {
                Assert.fail();
            }
        }

        public void process() {
            if (this.buf.isEmpty()) {
                return;
            }
            int n = this.rnd.nextInt(20) == 0 ? this.buf.size() : this.rnd.nextInt(this.buf.size()) + 1;
            long pos = this.buf.getPosition();
            for (int i = 0; i < n; ++i) {
                this.buf.next();
            }
            this.dist.process(this.buf.newSource(pos, this.buf.getPosition()));
            this.buf.compact();
        }
    }
}

