/*
 * Decompiled with CFR 0.152.
 */
package com.dxfeed.api.test;

import com.devexperts.test.ThreadCleanCheck;
import com.dxfeed.api.DXEndpoint;
import com.dxfeed.api.DXFeed;
import com.dxfeed.api.DXFeedSubscription;
import com.dxfeed.api.DXPublisher;
import com.dxfeed.event.market.Quote;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class DXEndpointConnectTest {
    private static final int PORT = 7744;
    private static final long WAIT_FOR_FUTURE_TIMEOUT = 5000L;
    private static final long WAIT_FOR_SUBS_TIMEOUT = 10000L;
    private static final long WAIT_FOR_QUOTES_TIMEOUT = 10000L;
    DXEndpoint publishedEndpoint;
    DXEndpoint feedEndpoint;
    ExecutorService executor;

    @Before
    public void setUp() throws Exception {
        ThreadCleanCheck.before();
    }

    @After
    public void tearDown() throws Exception {
        if (this.publishedEndpoint != null) {
            this.publishedEndpoint.close();
        }
        if (this.feedEndpoint != null) {
            this.feedEndpoint.close();
        }
        if (this.executor != null) {
            this.executor.shutdown();
        }
        ThreadCleanCheck.after();
    }

    @Test
    public void testConnectInEndpointThread() throws InterruptedException, TimeoutException, ExecutionException {
        this.publishedEndpoint = DXEndpoint.create((DXEndpoint.Role)DXEndpoint.Role.PUBLISHER);
        DXPublisher publisher = this.publishedEndpoint.connect(":7744").getPublisher();
        ArrayBlockingQueue subs = new ArrayBlockingQueue(2);
        publisher.getSubscription(Quote.class).addChangeListener(subs::addAll);
        this.executor = Executors.newSingleThreadExecutor();
        this.feedEndpoint = DXEndpoint.create((DXEndpoint.Role)DXEndpoint.Role.FEED).executor((Executor)this.executor);
        DXFeed feed = this.feedEndpoint.getFeed();
        ArrayBlockingQueue quotes = new ArrayBlockingQueue(2);
        Future<?> future = this.executor.submit(() -> {
            DXFeedSubscription subscription = feed.createSubscription(Quote.class);
            subscription.addEventListener(quotes::addAll);
            subscription.addSymbols(Arrays.asList("AAPL", "IBM"));
            this.feedEndpoint.connect("localhost:7744");
        });
        future.get(5000L, TimeUnit.MILLISECONDS);
        HashSet receivedSubs = new HashSet();
        receivedSubs.add(subs.poll(10000L, TimeUnit.MILLISECONDS));
        receivedSubs.add(subs.poll(10000L, TimeUnit.MILLISECONDS));
        Assert.assertEquals((long)2L, (long)receivedSubs.size());
        Assert.assertTrue((receivedSubs.contains("AAPL") && receivedSubs.contains("IBM") ? 1 : 0) != 0);
        publisher.publishEvents(Collections.singleton(new Quote("AAPL")));
        Assert.assertEquals((Object)"AAPL", (Object)((Quote)quotes.poll(10000L, TimeUnit.MILLISECONDS)).getEventSymbol());
        publisher.publishEvents(Collections.singleton(new Quote("IBM")));
        Assert.assertEquals((Object)"IBM", (Object)((Quote)quotes.poll(10000L, TimeUnit.MILLISECONDS)).getEventSymbol());
        future = this.executor.submit(() -> this.feedEndpoint.disconnect());
        future.get(5000L, TimeUnit.MILLISECONDS);
        this.feedEndpoint.awaitNotConnected();
        this.feedEndpoint.closeAndAwaitTermination();
    }
}

