package com.dxfeed.api.test;

import com.devexperts.test.ThreadCleanCheck;
import com.dxfeed.api.DXEndpoint;
import com.dxfeed.api.DXFeed;
import com.dxfeed.api.DXFeedSubscription;
import com.dxfeed.api.DXPublisher;
import com.dxfeed.api.osub.ObservableSubscription;
import com.dxfeed.event.market.Quote;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import junit.framework.TestCase;

/* loaded from: input_file:com/dxfeed/api/test/DXEndpointConnectTest.class */
public class DXEndpointConnectTest extends TestCase {
    private static final int PORT = 7744;
    private static final long WAIT_FOR_FUTURE_TIMEOUT = 5000;
    private static final long WAIT_FOR_SUBS_TIMEOUT = 10000;
    private static final long WAIT_FOR_QUOTES_TIMEOUT = 10000;
    DXEndpoint publishedEndpoint;
    DXEndpoint feedEndpoint;
    ExecutorService executor;

    protected void setUp() throws Exception {
        ThreadCleanCheck.before();
    }

    protected void tearDown() throws Exception {
        if (this.publishedEndpoint != null) {
            this.publishedEndpoint.close();
        }
        if (this.feedEndpoint != null) {
            this.feedEndpoint.close();
        }
        if (this.executor != null) {
            this.executor.shutdown();
        }
        ThreadCleanCheck.after();
    }

    public void testConnectInEndpointThread() throws InterruptedException, TimeoutException, ExecutionException {
        this.publishedEndpoint = DXEndpoint.create(DXEndpoint.Role.PUBLISHER);
        DXPublisher publisher = this.publishedEndpoint.connect(":7744").getPublisher();
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(2);
        ObservableSubscription subscription = publisher.getSubscription(Quote.class);
        arrayBlockingQueue.getClass();
        subscription.addChangeListener((v1) -> {
            r1.addAll(v1);
        });
        this.executor = Executors.newSingleThreadExecutor();
        this.feedEndpoint = DXEndpoint.create(DXEndpoint.Role.FEED).executor(this.executor);
        DXFeed feed = this.feedEndpoint.getFeed();
        ArrayBlockingQueue arrayBlockingQueue2 = new ArrayBlockingQueue(2);
        this.executor.submit(() -> {
            DXFeedSubscription createSubscription = feed.createSubscription(Quote.class);
            arrayBlockingQueue2.getClass();
            createSubscription.addEventListener((v1) -> {
                r1.addAll(v1);
            });
            createSubscription.addSymbols(Arrays.asList("AAPL", "IBM"));
            this.feedEndpoint.connect("localhost:7744");
        }).get(WAIT_FOR_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS);
        HashSet hashSet = new HashSet();
        hashSet.add(arrayBlockingQueue.poll(10000L, TimeUnit.MILLISECONDS));
        hashSet.add(arrayBlockingQueue.poll(10000L, TimeUnit.MILLISECONDS));
        assertEquals(2, hashSet.size());
        assertTrue(hashSet.contains("AAPL") && hashSet.contains("IBM"));
        publisher.publishEvents(Collections.singleton(new Quote("AAPL")));
        assertEquals("AAPL", ((Quote) arrayBlockingQueue2.poll(10000L, TimeUnit.MILLISECONDS)).getEventSymbol());
        publisher.publishEvents(Collections.singleton(new Quote("IBM")));
        assertEquals("IBM", ((Quote) arrayBlockingQueue2.poll(10000L, TimeUnit.MILLISECONDS)).getEventSymbol());
        this.executor.submit(() -> {
            this.feedEndpoint.disconnect();
        }).get(WAIT_FOR_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS);
        this.feedEndpoint.awaitNotConnected();
        this.feedEndpoint.closeAndAwaitTermination();
    }
}
