/*
 * Decompiled with CFR 0.152.
 */
package com.dxfeed.api.test;

import com.devexperts.test.ThreadCleanCheck;
import com.dxfeed.api.DXEndpoint;
import com.dxfeed.api.DXFeed;
import com.dxfeed.api.DXFeedSubscription;
import com.dxfeed.api.DXPublisher;
import com.dxfeed.api.osub.ObservableSubscriptionChangeListener;
import com.dxfeed.event.candle.Candle;
import com.dxfeed.event.candle.CandlePeriod;
import com.dxfeed.event.candle.CandleSymbol;
import com.dxfeed.event.candle.CandleSymbolAttribute;
import com.dxfeed.event.candle.CandleType;
import java.util.ArrayList;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class CandleAddRemoveSubTest {
    private DXEndpoint endpoint;
    private DXFeed feed;
    private DXPublisher publisher;
    private DXFeedSubscription<Candle> sub;
    private final CandleSymbol symbol1 = CandleSymbol.valueOf((String)"X", (CandleSymbolAttribute)CandlePeriod.valueOf((double)5.0, (CandleType)CandleType.MINUTE));
    private final CandleSymbol symbol2 = CandleSymbol.valueOf((String)"Y", (CandleSymbolAttribute)CandlePeriod.valueOf((double)5.0, (CandleType)CandleType.MINUTE));
    private final CandleSymbol symbol3 = CandleSymbol.valueOf((String)"Z", (CandleSymbolAttribute)CandlePeriod.valueOf((double)5.0, (CandleType)CandleType.MINUTE));
    private final CandleSymbol[] symbols = new CandleSymbol[]{this.symbol1, this.symbol2, this.symbol3};
    private long time;
    private final BlockingQueue<Object> added = new LinkedBlockingQueue<Object>();
    private final BlockingQueue<Object> removed = new LinkedBlockingQueue<Object>();
    private final BlockingQueue<Object> received = new LinkedBlockingQueue<Object>();

    @Before
    public void setUp() throws Exception {
        ThreadCleanCheck.before();
        this.endpoint = DXEndpoint.create((DXEndpoint.Role)DXEndpoint.Role.LOCAL_HUB);
        this.endpoint.executor(Runnable::run);
        this.feed = this.endpoint.getFeed();
        this.publisher = this.endpoint.getPublisher();
        this.sub = this.feed.createSubscription(Candle.class);
        this.publisher.getSubscription(Candle.class).addChangeListener(new ObservableSubscriptionChangeListener(){

            public void symbolsAdded(Set<?> symbols) {
                CandleAddRemoveSubTest.this.added.addAll(symbols);
            }

            public void symbolsRemoved(Set<?> symbols) {
                CandleAddRemoveSubTest.this.removed.addAll(symbols);
            }
        });
        this.sub.addEventListener(candles -> candles.stream().map(Candle::getEventSymbol).forEach(this.received::add));
    }

    @After
    public void tearDown() throws Exception {
        this.endpoint.closeAndAwaitTermination();
        this.checkQueues(null, null, null);
        ThreadCleanCheck.after();
    }

    @Test
    public void testCandleAddRemoveSub() throws InterruptedException {
        this.sub.addSymbols((Object)this.symbol1);
        this.checkQueues(this.symbol1, null, null);
        this.publish();
        this.checkQueues(null, null, this.symbol1);
        this.sub.removeSymbols(new Object[]{this.symbol1});
        this.checkQueues(null, this.symbol1, null);
        this.publish();
        this.checkQueues(null, null, null);
    }

    @Test
    public void testCandleAddClearSub() throws InterruptedException {
        this.sub.addSymbols((Object)this.symbol1);
        this.checkQueues(this.symbol1, null, null);
        this.publish();
        this.checkQueues(null, null, this.symbol1);
        this.sub.clear();
        this.checkQueues(null, this.symbol1, null);
        this.publish();
        this.checkQueues(null, null, null);
    }

    @Test
    public void testReAttach() throws InterruptedException {
        this.feed.detachSubscription(this.sub);
        this.feed.attachSubscription(this.sub);
        this.testCandleAddDetachSub();
    }

    @Test
    public void testIdempotentAttach() throws InterruptedException {
        this.feed.attachSubscription(this.sub);
        this.feed.attachSubscription(this.sub);
        this.testCandleAddDetachSub();
    }

    @Test
    public void testIdempotentDetach() throws InterruptedException {
        this.feed.detachSubscription(this.sub);
        this.feed.detachSubscription(this.sub);
        this.feed.attachSubscription(this.sub);
        this.testCandleAddDetachSub();
    }

    @Test
    public void testIdempotentReAttach() throws InterruptedException {
        this.feed.detachSubscription(this.sub);
        this.feed.detachSubscription(this.sub);
        this.feed.attachSubscription(this.sub);
        this.feed.attachSubscription(this.sub);
        this.testCandleAddDetachSub();
    }

    @Test
    public void testIdempotentAttachAfterFirstChangeListener() throws InterruptedException {
        this.feed.detachSubscription(this.sub);
        this.sub.addChangeListener(this.emptyChangeListener());
        this.feed.attachSubscription(this.sub);
        this.feed.attachSubscription(this.sub);
        this.testCandleAddDetachSub();
    }

    @Test
    public void testIdempotentAttachAfterSecondChangeListener() throws InterruptedException {
        this.sub.addChangeListener(this.emptyChangeListener());
        this.feed.attachSubscription(this.sub);
        this.feed.attachSubscription(this.sub);
        this.testCandleAddDetachSub();
    }

    @Test
    public void testIdempotentReAttachWithManyChangeListener() throws InterruptedException {
        this.sub.addChangeListener(this.emptyChangeListener());
        this.feed.detachSubscription(this.sub);
        this.feed.detachSubscription(this.sub);
        this.sub.addChangeListener(this.emptyChangeListener());
        this.feed.attachSubscription(this.sub);
        this.feed.attachSubscription(this.sub);
        this.sub.addChangeListener(this.emptyChangeListener());
        this.testCandleAddDetachSub();
    }

    @Test
    public void testDetachAfterSecondChangeListener() throws InterruptedException {
        this.sub.addChangeListener(this.emptyChangeListener());
        this.testCandleAddDetachSub();
    }

    @Test
    public void testDualAttach() throws InterruptedException {
        DXEndpoint secondEndpoint = DXEndpoint.create((DXEndpoint.Role)DXEndpoint.Role.LOCAL_HUB);
        secondEndpoint.executor(Runnable::run);
        secondEndpoint.getFeed().attachSubscription(this.sub);
        this.testCandleAddDetachSub();
        secondEndpoint.closeAndAwaitTermination();
    }

    @Test
    public void testDualAttachAndDetach() throws InterruptedException {
        DXEndpoint secondEndpoint = DXEndpoint.create((DXEndpoint.Role)DXEndpoint.Role.LOCAL_HUB);
        secondEndpoint.executor(Runnable::run);
        secondEndpoint.getFeed().attachSubscription(this.sub);
        secondEndpoint.getFeed().detachSubscription(this.sub);
        this.testCandleAddDetachSub();
        secondEndpoint.closeAndAwaitTermination();
    }

    @Test
    public void testDualAttachAndReAttach() throws InterruptedException {
        DXEndpoint secondEndpoint = DXEndpoint.create((DXEndpoint.Role)DXEndpoint.Role.LOCAL_HUB);
        secondEndpoint.executor(Runnable::run);
        secondEndpoint.getFeed().detachSubscription(this.sub);
        this.feed.detachSubscription(this.sub);
        secondEndpoint.getFeed().attachSubscription(this.sub);
        this.sub.addChangeListener(this.emptyChangeListener());
        secondEndpoint.getFeed().detachSubscription(this.sub);
        this.feed.attachSubscription(this.sub);
        this.testCandleAddDetachSub();
        secondEndpoint.closeAndAwaitTermination();
    }

    @Test
    public void testCandleAddDetachSub() throws InterruptedException {
        this.sub.addSymbols((Object)this.symbol1);
        this.checkQueues(this.symbol1, null, null);
        this.publish();
        this.checkQueues(null, null, this.symbol1);
        this.feed.detachSubscription(this.sub);
        this.checkQueues(null, this.symbol1, null);
        this.publish();
        this.checkQueues(null, null, null);
    }

    @Test
    public void testCandleSetSub() throws InterruptedException {
        this.sub.setSymbols(new Object[]{this.symbol1});
        this.checkQueues(this.symbol1, null, null);
        this.publish();
        this.checkQueues(null, null, this.symbol1);
        this.sub.setSymbols(new Object[]{this.symbol2});
        this.checkQueues(this.symbol2, this.symbol1, null);
        this.publish();
        this.checkQueues(null, null, this.symbol2);
        this.sub.clear();
        this.checkQueues(null, this.symbol2, null);
    }

    private void publish() {
        ArrayList<Candle> candles = new ArrayList<Candle>();
        for (CandleSymbol symbol : this.symbols) {
            Candle candle = new Candle(symbol);
            candle.setTime(this.time += 1000L);
            candles.add(candle);
        }
        this.publisher.publishEvents(candles);
    }

    private void checkQueues(CandleSymbol addedSymbol, CandleSymbol removedSymbol, CandleSymbol receivedSymbol) {
        Assert.assertEquals((Object)addedSymbol, this.added.poll());
        Assert.assertEquals((Object)removedSymbol, this.removed.poll());
        Assert.assertEquals((Object)receivedSymbol, this.received.poll());
        Assert.assertTrue((boolean)this.added.isEmpty());
        Assert.assertTrue((boolean)this.removed.isEmpty());
        Assert.assertTrue((boolean)this.received.isEmpty());
    }

    private ObservableSubscriptionChangeListener emptyChangeListener() {
        return symbols -> {};
    }
}

