/*
 * Decompiled with CFR 0.152.
 */
package com.dxfeed.api.test;

import com.devexperts.logging.Logging;
import com.devexperts.qd.qtp.socket.ClientSocketConnector;
import com.devexperts.qd.qtp.socket.ServerSocketTestHelper;
import com.devexperts.test.ThreadCleanCheck;
import com.devexperts.test.TraceRunner;
import com.devexperts.util.SynchronizedIndexedSet;
import com.dxfeed.api.DXEndpoint;
import com.dxfeed.api.DXFeed;
import com.dxfeed.api.DXFeedEventListener;
import com.dxfeed.api.DXFeedSubscription;
import com.dxfeed.api.DXPublisher;
import com.dxfeed.api.impl.DXEndpointImpl;
import com.dxfeed.api.test.TestConnectorFactory;
import com.dxfeed.event.market.Quote;
import com.dxfeed.promise.Promise;
import com.dxfeed.promise.Promises;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(value=TraceRunner.class)
public class DXEndpointReconnectTest {
    private static final Logging log = Logging.getLogging(DXEndpointReconnectTest.class);
    private static final int PUB_COUNT = 10;
    private static final int WAIT_TIMEOUT = 30;
    private String testId;
    List<DXEndpoint> publishers;
    List<Integer> ports;
    DXEndpoint feedEndpoint;
    volatile CountDownLatch connectedSomewhere;
    AtomicInteger connectsCount;
    volatile String expectedSymbol;
    private BlockingQueue<Quote> quotes;

    @Before
    public void setUp() throws Exception {
        ThreadCleanCheck.before();
        this.testId = UUID.randomUUID().toString();
        this.publishers = new ArrayList<DXEndpoint>(10);
        this.ports = new ArrayList<Integer>(10);
        this.connectedSomewhere = new CountDownLatch(1);
        this.connectsCount = new AtomicInteger(0);
        this.quotes = new ArrayBlockingQueue<Quote>(10);
    }

    @After
    public void tearDown() throws Exception {
        log.debug("======== tearDown ========");
        if (this.feedEndpoint != null) {
            this.feedEndpoint.close();
        }
        if (this.publishers != null) {
            this.publishers.forEach(DXEndpoint::close);
        }
        ThreadCleanCheck.after();
    }

    @Test
    public void testReconnect() throws InterruptedException {
        ArrayList<String> symbols = this.startPublishers();
        String address = this.ports.stream().map(port -> "127.0.0.1:" + port).collect(Collectors.joining(",")) + "[name=uplink,reconnectDelay=1]";
        this.feedEndpoint = this.createFeedEndpoint(symbols, (DXFeedEventListener<Quote>)((DXFeedEventListener)this.quotes::addAll));
        log.info("Connecting...");
        this.feedEndpoint.connect(address);
        HashSet<String> receivedQuotes = new HashSet<String>();
        for (int i = 1; i <= 10; ++i) {
            this.awaitConnection(i);
            receivedQuotes.add(this.expectQuote(this.quotes, this.expectedSymbol));
            if (i >= 10) continue;
            this.connectedSomewhere = new CountDownLatch(1);
            log.info("Reconnecting...");
            this.feedEndpoint.reconnect();
        }
        Assert.assertEquals((long)10L, (long)receivedQuotes.size());
    }

    @Test
    public void testOrderedConnect() throws InterruptedException {
        this.doTestOrderedStrategy(false);
    }

    @Test
    public void testOrderedReconnect() throws InterruptedException {
        this.doTestOrderedStrategy(true);
    }

    private void doTestOrderedStrategy(boolean useReconnect) throws InterruptedException {
        ArrayList<String> symbols = this.startPublishers();
        String address = this.ports.stream().map(port -> "127.0.0.1:" + port).collect(Collectors.joining(","));
        address = address + "[name=uplink,reconnectDelay=1,connectOrder=ordered]";
        this.feedEndpoint = this.createFeedEndpoint(symbols, (DXFeedEventListener<Quote>)((DXFeedEventListener)this.quotes::addAll));
        log.info("Connecting...");
        this.feedEndpoint.connect(address);
        for (int i = 1; i <= 10; ++i) {
            this.awaitConnection(i);
            this.expectQuote(this.quotes, symbols.get(i - 1));
            if (i >= 10) continue;
            this.connectedSomewhere = new CountDownLatch(1);
            if (useReconnect) {
                log.info("Reconnecting...");
                this.feedEndpoint.reconnect();
                continue;
            }
            log.info("Killing publisher " + symbols.get(i - 1));
            this.publishers.get(i - 1).close();
        }
    }

    @Test
    public void testPriorityStrategy() throws InterruptedException {
        ArrayList<String> symbols = this.startPublishers();
        String address = this.ports.stream().map(port -> "127.0.0.1:" + port).collect(Collectors.joining(","));
        address = address + "[name=uplink,reconnectDelay=1,connectOrder=priority]";
        SynchronizedIndexedSet blockedPorts = SynchronizedIndexedSet.create();
        TestConnectorFactory.addClientSocketConnectorForBlockedPorts(this.testId + ":", (Set<Integer>)blockedPorts);
        int deadPubCount = 5;
        for (int i = 0; i < deadPubCount; ++i) {
            blockedPorts.add(this.ports.get(i));
        }
        this.feedEndpoint = this.createFeedEndpoint(symbols, (DXFeedEventListener<Quote>)((DXFeedEventListener)this.quotes::addAll));
        log.info("Connecting...");
        this.feedEndpoint.connect(this.testId + ":" + address);
        ClientSocketConnector connector = this.getFirstConnector(this.feedEndpoint);
        for (int i = deadPubCount + 1; i <= 9; ++i) {
            this.awaitConnection(i - deadPubCount);
            this.expectQuote(this.quotes, symbols.get(i - 1));
            Assert.assertEquals((long)this.ports.get(i - 1).intValue(), (long)connector.getCurrentPort());
            if (i >= 9) continue;
            this.connectedSomewhere = new CountDownLatch(1);
            log.info("Reconnecting from publisher " + symbols.get(i - 1));
            blockedPorts.add(this.ports.get(i - 1));
            Assert.assertEquals((long)this.ports.get(i - 1).intValue(), (long)connector.getCurrentPort());
            connector.reconnect();
        }
        blockedPorts.remove(this.ports.get(1));
        this.connectsCount.set(0);
        this.connectedSomewhere = new CountDownLatch(1);
        blockedPorts.add(this.ports.get(8));
        Assert.assertEquals((long)this.ports.get(8).intValue(), (long)connector.getCurrentPort());
        connector.reconnect();
        this.awaitConnection(1);
        this.expectQuote(this.quotes, symbols.get(1));
        Assert.assertEquals((long)this.ports.get(1).intValue(), (long)connector.getCurrentPort());
    }

    @Test
    public void testConnectorDirectUpdate() throws InterruptedException {
        ArrayList<String> symbols = this.startPublishers();
        String address = "127.0.0.1:" + this.ports.get(0) + "[name=uplink,reconnectDelay=1]";
        this.feedEndpoint = this.createFeedEndpoint(symbols, (DXFeedEventListener<Quote>)((DXFeedEventListener)this.quotes::addAll));
        log.info("Connecting: " + address);
        this.feedEndpoint.connect(address);
        ClientSocketConnector connector = this.getFirstConnector(this.feedEndpoint);
        for (int i = 1; i <= 10; ++i) {
            this.awaitConnection(i);
            this.expectQuote(this.quotes, symbols.get(i - 1));
            if (i >= 10) continue;
            this.connectedSomewhere = new CountDownLatch(1);
            address = "127.0.0.1:" + this.ports.get(i);
            log.info("Resetting address: " + address);
            connector.setAddress(address);
        }
    }

    private ClientSocketConnector getFirstConnector(DXEndpoint feedEndpoint) {
        List connectors = ((DXEndpointImpl)feedEndpoint).getQDEndpoint().getConnectors();
        Assert.assertEquals((long)1L, (long)connectors.size());
        Assert.assertTrue((boolean)(connectors.get(0) instanceof ClientSocketConnector));
        return (ClientSocketConnector)connectors.get(0);
    }

    private Promise<Integer> addPublisher(String symbol) {
        String name = this.testId + "-pub-" + symbol;
        Promise port = ServerSocketTestHelper.createPortPromise((String)name);
        DXEndpoint endpoint = DXEndpoint.newBuilder().withRole(DXEndpoint.Role.STREAM_PUBLISHER).build().connect(":0[name=" + name + ",bindAddr=127.0.0.1]");
        DXPublisher publisher = endpoint.getPublisher();
        publisher.getSubscription(Quote.class).addChangeListener(symbols -> {
            this.expectedSymbol = symbol;
            publisher.publishEvents(Collections.singleton(new Quote(symbol)));
            this.connectsCount.getAndIncrement();
            this.connectedSomewhere.countDown();
        });
        this.publishers.add(endpoint);
        return port;
    }

    private ArrayList<String> startPublishers() {
        ArrayList<Promise<Integer>> promises = new ArrayList<Promise<Integer>>();
        ArrayList<String> symbols = new ArrayList<String>();
        for (int i = 0; i < 10; ++i) {
            String symbol = String.valueOf((char)(65 + i));
            promises.add(this.addPublisher(symbol));
            symbols.add(symbol);
        }
        Promises.allOf(promises).await(30L, TimeUnit.SECONDS);
        promises.stream().map(Promise::getResult).forEach(this.ports::add);
        return symbols;
    }

    private DXEndpoint createFeedEndpoint(ArrayList<String> symbols, DXFeedEventListener<Quote> listener) {
        DXEndpoint endpoint = DXEndpoint.create((DXEndpoint.Role)DXEndpoint.Role.STREAM_FEED);
        DXFeed feed = endpoint.getFeed();
        DXFeedSubscription subscription = feed.createSubscription(Quote.class);
        subscription.addEventListener(listener);
        subscription.addSymbols(symbols);
        return endpoint;
    }

    private String expectQuote(BlockingQueue<Quote> quotes, String expectedSymbol) throws InterruptedException {
        Quote quote = quotes.poll(30L, TimeUnit.SECONDS);
        Assert.assertNotNull((String)"Missed quote from publisher", (Object)quote);
        String symbol = quote.getEventSymbol();
        if (expectedSymbol != null) {
            Assert.assertEquals((Object)expectedSymbol, (Object)symbol);
        }
        return symbol;
    }

    private void awaitConnection(int expectedConnectsCount) throws InterruptedException {
        Assert.assertTrue((String)"Connected to a publisher", (boolean)this.connectedSomewhere.await(30L, TimeUnit.SECONDS));
        if (expectedConnectsCount > 0) {
            Assert.assertEquals((long)expectedConnectsCount, (long)this.connectsCount.get());
        }
    }
}

