package com.dxfeed.api.test;

import com.devexperts.logging.Logging;
import com.devexperts.qd.qtp.MessageConnector;
import com.devexperts.qd.qtp.socket.ClientSocketConnector;
import com.devexperts.qd.qtp.socket.ServerSocketTestHelper;
import com.devexperts.test.ThreadCleanCheck;
import com.devexperts.test.TraceRunner;
import com.dxfeed.api.DXEndpoint;
import com.dxfeed.api.DXFeedEventListener;
import com.dxfeed.api.DXFeedSubscription;
import com.dxfeed.api.DXPublisher;
import com.dxfeed.api.impl.DXEndpointImpl;
import com.dxfeed.event.market.Quote;
import com.dxfeed.promise.Promise;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(TraceRunner.class)
/* loaded from: input_file:com/dxfeed/api/test/DXEndpointReconnectTest.class */
public class DXEndpointReconnectTest {
    static final Logging log = Logging.getLogging(DXEndpointReconnectTest.class);
    private static final int PUB_COUNT = 10;
    private static final int WAIT_TIMEOUT = 30;
    private String testId;
    List<DXEndpoint> publishers;
    List<Integer> ports;
    DXEndpoint feedEndpoint;
    volatile CountDownLatch connectedSomewhere;
    AtomicInteger connectsCount;
    volatile String expectedSymbol;
    private BlockingQueue<Quote> quotes;

    @Before
    public void setUp() throws Exception {
        ThreadCleanCheck.before();
        this.testId = UUID.randomUUID().toString();
        this.publishers = new ArrayList(10);
        this.ports = new ArrayList(10);
        this.connectedSomewhere = new CountDownLatch(1);
        this.connectsCount = new AtomicInteger(0);
        this.quotes = new ArrayBlockingQueue(10);
    }

    @After
    public void tearDown() throws Exception {
        log.debug("======== tearDown ========");
        if (this.publishers != null) {
            this.publishers.forEach((v0) -> {
                v0.close();
            });
        }
        if (this.feedEndpoint != null) {
            this.feedEndpoint.close();
        }
        ThreadCleanCheck.after();
    }

    @Test
    public void testReconnect() throws InterruptedException {
        ArrayList<String> startPublishers = startPublishers();
        String str = (String) this.ports.stream().map(num -> {
            return "127.0.0.1:" + num;
        }).collect(Collectors.joining(","));
        BlockingQueue<Quote> blockingQueue = this.quotes;
        blockingQueue.getClass();
        this.feedEndpoint = createFeedEndpoint(startPublishers, (v1) -> {
            r3.addAll(v1);
        });
        log.info("Connecting...");
        this.feedEndpoint.connect(str);
        HashSet hashSet = new HashSet();
        for (int i = 1; i <= 10; i++) {
            awaitConnection(i);
            hashSet.add(expectQuote(this.quotes, this.expectedSymbol));
            if (i < 10) {
                this.connectedSomewhere = new CountDownLatch(1);
                log.info("Reconnecting...");
                this.feedEndpoint.reconnect();
            }
        }
        Assert.assertEquals(10L, hashSet.size());
    }

    @Test
    public void testOrderedConnect() throws InterruptedException {
        doTestOrderedStrategy(false);
    }

    @Test
    public void testOrderedReconnect() throws InterruptedException {
        doTestOrderedStrategy(true);
    }

    private void doTestOrderedStrategy(boolean z) throws InterruptedException {
        ArrayList<String> startPublishers = startPublishers();
        String str = ((String) this.ports.stream().map(num -> {
            return "127.0.0.1:" + num;
        }).collect(Collectors.joining(","))) + "[connectOrder=ordered]";
        BlockingQueue<Quote> blockingQueue = this.quotes;
        blockingQueue.getClass();
        this.feedEndpoint = createFeedEndpoint(startPublishers, (v1) -> {
            r3.addAll(v1);
        });
        log.info("Connecting...");
        this.feedEndpoint.connect(str);
        for (int i = 1; i <= 10; i++) {
            awaitConnection(i);
            expectQuote(this.quotes, startPublishers.get(i - 1));
            if (i < 10) {
                this.connectedSomewhere = new CountDownLatch(1);
                if (z) {
                    log.info("Reconnecting...");
                    this.feedEndpoint.reconnect();
                } else {
                    log.info("Killing publisher " + startPublishers.get(i - 1));
                    this.publishers.get(i - 1).close();
                }
            }
        }
    }

    @Test
    public void testPriorityStrategy() throws InterruptedException {
        ArrayList<String> startPublishers = startPublishers();
        String str = ((String) this.ports.stream().map(num -> {
            return "127.0.0.1:" + num;
        }).collect(Collectors.joining(","))) + "[connectOrder=priority]";
        for (int i = 0; i < 5; i++) {
            this.publishers.get(i).disconnect();
        }
        BlockingQueue<Quote> blockingQueue = this.quotes;
        blockingQueue.getClass();
        this.feedEndpoint = createFeedEndpoint(startPublishers, (v1) -> {
            r3.addAll(v1);
        });
        log.info("Connecting...");
        this.feedEndpoint.connect(str);
        ((MessageConnector) this.feedEndpoint.getQDEndpoint().getConnectors().get(0)).setReconnectDelay(10L);
        for (int i2 = 5 + 1; i2 <= 9; i2++) {
            awaitConnection(i2 - 5);
            expectQuote(this.quotes, startPublishers.get(i2 - 1));
            if (i2 < 9) {
                this.connectedSomewhere = new CountDownLatch(1);
                log.info("Killing publisher " + startPublishers.get(i2 - 1));
                this.publishers.get(i2 - 1).disconnect();
            }
        }
        this.publishers.get(1).connect(":" + this.ports.get(1) + "[bindAddr=127.0.0.1]");
        BlockingQueue<Quote> blockingQueue2 = this.quotes;
        blockingQueue2.getClass();
        DXEndpoint createFeedEndpoint = createFeedEndpoint(startPublishers, (v1) -> {
            r2.addAll(v1);
        });
        createFeedEndpoint.connect("127.0.0.1:" + this.ports.get(1));
        expectQuote(this.quotes, startPublishers.get(1));
        createFeedEndpoint.closeAndAwaitTermination();
        this.quotes.clear();
        this.connectsCount.set(0);
        this.connectedSomewhere = new CountDownLatch(1);
        this.publishers.get(8).disconnect();
        awaitConnection(1);
        expectQuote(this.quotes, startPublishers.get(1));
    }

    @Test
    public void testConnectorDirectUpdate() throws InterruptedException {
        ArrayList<String> startPublishers = startPublishers();
        String str = "127.0.0.1:" + this.ports.get(0);
        BlockingQueue<Quote> blockingQueue = this.quotes;
        blockingQueue.getClass();
        this.feedEndpoint = createFeedEndpoint(startPublishers, (v1) -> {
            r3.addAll(v1);
        });
        log.info("Connecting: " + str);
        this.feedEndpoint.connect(str);
        ClientSocketConnector firstConnector = getFirstConnector(this.feedEndpoint);
        for (int i = 1; i <= 10; i++) {
            awaitConnection(i);
            expectQuote(this.quotes, startPublishers.get(i - 1));
            if (i < 10) {
                this.connectedSomewhere = new CountDownLatch(1);
                String str2 = "127.0.0.1:" + this.ports.get(i);
                log.info("Resetting address: " + str2);
                firstConnector.setHost(str2);
            }
        }
    }

    private ClientSocketConnector getFirstConnector(DXEndpoint dXEndpoint) {
        List connectors = ((DXEndpointImpl) dXEndpoint).getQDEndpoint().getConnectors();
        Assert.assertEquals(1L, connectors.size());
        Assert.assertTrue(connectors.get(0) instanceof ClientSocketConnector);
        return (ClientSocketConnector) connectors.get(0);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void addPublisher(String str) {
        String str2 = this.testId + "-pub-" + str;
        Promise createPortPromise = ServerSocketTestHelper.createPortPromise(str2);
        DXEndpoint connect = DXEndpoint.newBuilder().withRole(DXEndpoint.Role.PUBLISHER).build().connect(":0[name=" + str2 + ",bindAddr=127.0.0.1]");
        DXPublisher publisher = connect.getPublisher();
        publisher.getSubscription(Quote.class).addChangeListener(set -> {
            this.expectedSymbol = str;
            publisher.publishEvents(Collections.singleton(new Quote(str)));
            this.connectsCount.getAndIncrement();
            this.connectedSomewhere.countDown();
        });
        this.publishers.add(connect);
        this.ports.add(createPortPromise.await(30L, TimeUnit.SECONDS));
    }

    private ArrayList<String> startPublishers() {
        ArrayList<String> arrayList = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            String valueOf = String.valueOf((char) (65 + i));
            addPublisher(valueOf);
            arrayList.add(valueOf);
        }
        return arrayList;
    }

    private DXEndpoint createFeedEndpoint(ArrayList<String> arrayList, DXFeedEventListener<Quote> dXFeedEventListener) {
        DXEndpoint create = DXEndpoint.create(DXEndpoint.Role.FEED);
        DXFeedSubscription createSubscription = create.getFeed().createSubscription(Quote.class);
        createSubscription.addEventListener(dXFeedEventListener);
        createSubscription.addSymbols(arrayList);
        return create;
    }

    private String expectQuote(BlockingQueue<Quote> blockingQueue, String str) throws InterruptedException {
        Quote poll = blockingQueue.poll(30L, TimeUnit.SECONDS);
        Assert.assertNotNull("Missed quote from publisher", poll);
        String eventSymbol = poll.getEventSymbol();
        if (str != null) {
            Assert.assertEquals(str, eventSymbol);
        }
        return eventSymbol;
    }

    private void awaitConnection(int i) throws InterruptedException {
        Assert.assertTrue("Connected to a publisher", this.connectedSomewhere.await(30L, TimeUnit.SECONDS));
        if (i > 0) {
            Assert.assertEquals(i, this.connectsCount.get());
        }
    }
}
