package com.dxfeed.api.test;

import com.devexperts.test.ThreadCleanCheck;
import com.devexperts.util.TimeFormat;
import com.dxfeed.api.DXEndpoint;
import com.dxfeed.api.DXFeedEventListener;
import com.dxfeed.api.DXFeedSubscription;
import com.dxfeed.api.osub.ObservableSubscriptionChangeListener;
import com.dxfeed.event.market.Quote;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;

/* loaded from: input_file:com/dxfeed/api/test/LostSubscriptionTest.class */
public class LostSubscriptionTest extends TestCase implements ObservableSubscriptionChangeListener, DXFeedEventListener<Quote> {
    private static final List<String> SYMBOLS = Arrays.asList("AAPL", "GOOG", "IBM");
    private static final double BID_PRICE = 100.5d;
    private DXEndpoint endpoint;
    private boolean closed;
    private Semaphore pubSemaphore;
    private Semaphore subSemaphore;

    protected void setUp() throws Exception {
        ThreadCleanCheck.before();
        this.endpoint = DXEndpoint.create(DXEndpoint.Role.LOCAL_HUB);
        this.pubSemaphore = new Semaphore(0);
        this.subSemaphore = new Semaphore(0);
    }

    protected void tearDown() throws Exception {
        this.endpoint.close();
        ThreadCleanCheck.after();
    }

    public void testLostSub() {
        assertTrue("must have at least 2 symbols", SYMBOLS.size() >= 2);
        this.endpoint.getPublisher().getSubscription(Quote.class).addChangeListener(this);
        DXFeedSubscription createSubscription = this.endpoint.getFeed().createSubscription(Quote.class);
        createSubscription.addEventListener(this);
        println("subscribe " + SYMBOLS);
        createSubscription.addSymbols(SYMBOLS);
        acquire(this.subSemaphore, 1);
        println("unsubscribe " + SYMBOLS);
        createSubscription.removeSymbols(SYMBOLS);
        this.pubSemaphore.release(1);
        acquire(this.subSemaphore, 1);
        println("subscribe " + SYMBOLS);
        createSubscription.addSymbols(SYMBOLS);
        this.pubSemaphore.release(SYMBOLS.size() - 1);
        acquire(this.subSemaphore, (SYMBOLS.size() + 1) - 2);
        int i = 0;
        while (i < SYMBOLS.size()) {
            check(SYMBOLS.get(i), i == 0 ? Double.NaN : BID_PRICE);
            i++;
        }
        this.pubSemaphore.release(SYMBOLS.size());
        acquire(this.subSemaphore, SYMBOLS.size() + 1);
        Iterator<String> it = SYMBOLS.iterator();
        while (it.hasNext()) {
            check(it.next(), BID_PRICE);
        }
        this.endpoint.close();
        assertTrue("should be closed", this.closed);
    }

    public void symbolsAdded(Set<?> set) {
        println("added " + set);
        assertEquals(set, new HashSet(SYMBOLS));
        for (String str : SYMBOLS) {
            this.subSemaphore.release(1);
            acquire(this.pubSemaphore, 1);
            Quote quote = new Quote(str);
            quote.setBidPrice(BID_PRICE);
            println("publish " + quote);
            this.endpoint.getPublisher().publishEvents(Collections.singleton(quote));
        }
        this.subSemaphore.release(1);
    }

    public void symbolsRemoved(Set<?> set) {
        println("removed " + set);
        fail();
    }

    public void subscriptionClosed() {
        println("subscription closed");
        this.closed = true;
    }

    public void eventsReceived(List<Quote> list) {
        println("received " + list);
    }

    private void check(String str, double d) {
        Quote lastEvent = this.endpoint.getFeed().getLastEvent(new Quote(str));
        println("get last = " + lastEvent);
        assertTrue(Double.compare(lastEvent.getBidPrice(), d) == 0);
    }

    private void acquire(Semaphore semaphore, int i) {
        try {
            if (!semaphore.tryAcquire(i, 10L, TimeUnit.SECONDS)) {
                fail("cannot acquire semaphore");
            }
        } catch (InterruptedException e) {
            fail("unexpected thread interruption");
        }
    }

    private void println(String str) {
        String name = Thread.currentThread().getName();
        if (name.matches(".*ExecutorThread..")) {
            name = "ET" + name.substring(name.length() - 2);
        }
        System.out.println(TimeFormat.DEFAULT.withMillis().format(System.currentTimeMillis()) + " [" + name + "] " + str);
    }
}
