/*
 * Decompiled with CFR 0.152.
 */
package com.dxfeed.api.test;

import com.devexperts.test.ThreadCleanCheck;
import com.devexperts.util.TimeFormat;
import com.dxfeed.api.DXEndpoint;
import com.dxfeed.api.DXFeedEventListener;
import com.dxfeed.api.DXFeedSubscription;
import com.dxfeed.api.osub.ObservableSubscriptionChangeListener;
import com.dxfeed.event.LastingEvent;
import com.dxfeed.event.market.Quote;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class LostSubscriptionTest
implements ObservableSubscriptionChangeListener,
DXFeedEventListener<Quote> {
    private static final List<String> SYMBOLS = Arrays.asList("AAPL", "GOOG", "IBM");
    private static final double BID_PRICE = 100.5;
    private DXEndpoint endpoint;
    private boolean closed;
    private Semaphore pubSemaphore;
    private Semaphore subSemaphore;

    @Before
    public void setUp() throws Exception {
        ThreadCleanCheck.before();
        this.endpoint = DXEndpoint.create((DXEndpoint.Role)DXEndpoint.Role.LOCAL_HUB);
        this.pubSemaphore = new Semaphore(0);
        this.subSemaphore = new Semaphore(0);
    }

    @After
    public void tearDown() throws Exception {
        this.endpoint.close();
        ThreadCleanCheck.after();
    }

    @Test
    public void testLostSub() {
        Assert.assertTrue((String)"must have at least 2 symbols", (SYMBOLS.size() >= 2 ? 1 : 0) != 0);
        this.endpoint.getPublisher().getSubscription(Quote.class).addChangeListener((ObservableSubscriptionChangeListener)this);
        DXFeedSubscription s = this.endpoint.getFeed().createSubscription(Quote.class);
        s.addEventListener((DXFeedEventListener)this);
        this.println("subscribe " + SYMBOLS);
        s.addSymbols(SYMBOLS);
        this.acquire(this.subSemaphore, 1);
        this.println("unsubscribe " + SYMBOLS);
        s.removeSymbols(SYMBOLS);
        this.pubSemaphore.release(1);
        this.acquire(this.subSemaphore, 1);
        this.println("subscribe " + SYMBOLS);
        s.addSymbols(SYMBOLS);
        this.pubSemaphore.release(SYMBOLS.size() - 1);
        this.acquire(this.subSemaphore, SYMBOLS.size() + 1 - 2);
        for (int i = 0; i < SYMBOLS.size(); ++i) {
            this.check(SYMBOLS.get(i), i == 0 ? Double.NaN : 100.5);
        }
        this.pubSemaphore.release(SYMBOLS.size());
        this.acquire(this.subSemaphore, SYMBOLS.size() + 1);
        for (String symbol : SYMBOLS) {
            this.check(symbol, 100.5);
        }
        this.endpoint.close();
        Assert.assertTrue((String)"should be closed", (boolean)this.closed);
    }

    public void symbolsAdded(Set<?> symbols) {
        this.println("added " + symbols);
        Assert.assertEquals(symbols, new HashSet<String>(SYMBOLS));
        for (String symbol : SYMBOLS) {
            this.subSemaphore.release(1);
            this.acquire(this.pubSemaphore, 1);
            Quote q = new Quote(symbol);
            q.setBidPrice(100.5);
            this.println("publish " + q);
            this.endpoint.getPublisher().publishEvents(Collections.singleton(q));
        }
        this.subSemaphore.release(1);
    }

    public void symbolsRemoved(Set<?> symbols) {
        this.println("removed " + symbols);
        Assert.fail();
    }

    public void subscriptionClosed() {
        this.println("subscription closed");
        this.closed = true;
    }

    public void eventsReceived(List<Quote> events) {
        this.println("received " + events);
    }

    private void check(String symbol, double expectedBidPrice) {
        Quote q = (Quote)this.endpoint.getFeed().getLastEvent((LastingEvent)new Quote(symbol));
        this.println("get last = " + q);
        Assert.assertEquals((long)0L, (long)Double.compare(q.getBidPrice(), expectedBidPrice));
    }

    private void acquire(Semaphore semaphore, int permits) {
        try {
            if (!semaphore.tryAcquire(permits, 10L, TimeUnit.SECONDS)) {
                Assert.fail((String)"cannot acquire semaphore");
            }
        }
        catch (InterruptedException e) {
            Assert.fail((String)"unexpected thread interruption");
        }
    }

    private void println(String message) {
        String threadName = Thread.currentThread().getName();
        if (threadName.matches(".*ExecutorThread..")) {
            threadName = "ET" + threadName.substring(threadName.length() - 2);
        }
        System.out.println(TimeFormat.DEFAULT.withMillis().format(System.currentTimeMillis()) + " [" + threadName + "] " + message);
    }
}

