package com.dxfeed.api.test;

import com.devexperts.logging.Logging;
import com.devexperts.qd.qtp.socket.ServerSocketTestHelper;
import com.devexperts.test.ThreadCleanCheck;
import com.devexperts.test.TraceRunner;
import com.dxfeed.api.DXEndpoint;
import com.dxfeed.api.DXFeed;
import com.dxfeed.api.DXFeedSubscription;
import com.dxfeed.api.DXPublisher;
import com.dxfeed.event.market.Quote;
import com.dxfeed.promise.Promise;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(TraceRunner.class)
/* loaded from: input_file:com/dxfeed/api/test/DXEndpointReconnectTest.class */
public class DXEndpointReconnectTest {
    static final Logging log = Logging.getLogging(DXEndpointReconnectTest.class);
    private static final int PUB_COUNT = 10;
    private String testId;
    List<DXEndpoint> publishers;
    List<Integer> ports;
    DXEndpoint feedEndpoint;
    volatile CountDownLatch connectedSomewhere;
    AtomicInteger connectsCount;
    volatile String expectedSymbol;

    @Before
    public void setUp() throws Exception {
        ThreadCleanCheck.before();
        this.testId = UUID.randomUUID().toString();
        this.publishers = new ArrayList(10);
        this.ports = new ArrayList(10);
        this.connectedSomewhere = new CountDownLatch(1);
        this.connectsCount = new AtomicInteger(0);
    }

    @After
    public void tearDown() throws Exception {
        log.debug("======== tearDown ========");
        if (this.publishers != null) {
            this.publishers.forEach((v0) -> {
                v0.close();
            });
        }
        if (this.feedEndpoint != null) {
            this.feedEndpoint.close();
        }
        ThreadCleanCheck.after();
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void addPublisher(String str) {
        String str2 = this.testId + "-pub-" + str;
        Promise createPortPromise = ServerSocketTestHelper.createPortPromise(str2);
        DXEndpoint connect = DXEndpoint.newBuilder().withRole(DXEndpoint.Role.PUBLISHER).build().connect(":0[name=" + str2 + ",bindAddr=127.0.0.1]");
        DXPublisher publisher = connect.getPublisher();
        publisher.getSubscription(Quote.class).addChangeListener(set -> {
            this.expectedSymbol = str;
            publisher.publishEvents(Collections.singleton(new Quote(str)));
            this.connectsCount.getAndIncrement();
            this.connectedSomewhere.countDown();
        });
        this.publishers.add(connect);
        this.ports.add(createPortPromise.await(10L, TimeUnit.SECONDS));
    }

    @Test
    public void testReconnect() throws InterruptedException {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            String valueOf = String.valueOf((char) (65 + i));
            addPublisher(valueOf);
            arrayList.add(valueOf);
        }
        String str = (String) this.ports.stream().map(num -> {
            return "127.0.0.1:" + num;
        }).collect(Collectors.joining(","));
        this.feedEndpoint = DXEndpoint.create(DXEndpoint.Role.FEED);
        DXFeed feed = this.feedEndpoint.getFeed();
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(10);
        HashSet hashSet = new HashSet();
        DXFeedSubscription createSubscription = feed.createSubscription(Quote.class);
        arrayBlockingQueue.getClass();
        createSubscription.addEventListener((v1) -> {
            r1.addAll(v1);
        });
        createSubscription.addSymbols(arrayList);
        log.info("Connecting...");
        this.feedEndpoint.connect(str);
        for (int i2 = 1; i2 <= 10; i2++) {
            Assert.assertTrue("Connected to a publisher", this.connectedSomewhere.await(10L, TimeUnit.SECONDS));
            Assert.assertEquals(i2, this.connectsCount.get());
            log.info("expectedSymbol = " + this.expectedSymbol);
            Quote quote = (Quote) arrayBlockingQueue.poll(10L, TimeUnit.SECONDS);
            Assert.assertNotNull("Missed quote from publisher on step " + i2, quote);
            hashSet.add(quote.getEventSymbol());
            if (i2 < 10) {
                this.connectedSomewhere = new CountDownLatch(1);
                log.info("Reconnecting...");
                this.feedEndpoint.reconnect();
            }
        }
        Assert.assertEquals(10L, hashSet.size());
        this.feedEndpoint.close();
    }
}
