package com.dxfeed.api.test;

import com.devexperts.test.ThreadCleanCheck;
import com.dxfeed.api.DXEndpoint;
import com.dxfeed.api.DXFeed;
import com.dxfeed.api.DXFeedSubscription;
import com.dxfeed.api.DXPublisher;
import com.dxfeed.api.osub.ObservableSubscriptionChangeListener;
import com.dxfeed.event.candle.Candle;
import com.dxfeed.event.candle.CandlePeriod;
import com.dxfeed.event.candle.CandleSymbol;
import com.dxfeed.event.candle.CandleType;
import java.util.ArrayList;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Stream;
import junit.framework.TestCase;

/* loaded from: input_file:com/dxfeed/api/test/CandleAddRemoveSubTest.class */
public class CandleAddRemoveSubTest extends TestCase {
    private DXEndpoint endpoint;
    private DXFeed feed;
    private DXPublisher publisher;
    private DXFeedSubscription<Candle> sub;
    private long time;
    private CandleSymbol symbol1 = CandleSymbol.valueOf("X", CandlePeriod.valueOf(5.0d, CandleType.MINUTE));
    private CandleSymbol symbol2 = CandleSymbol.valueOf("Y", CandlePeriod.valueOf(5.0d, CandleType.MINUTE));
    private CandleSymbol symbol3 = CandleSymbol.valueOf("Z", CandlePeriod.valueOf(5.0d, CandleType.MINUTE));
    private CandleSymbol[] symbols = {this.symbol1, this.symbol2, this.symbol3};
    private final BlockingQueue<Object> added = new LinkedBlockingQueue();
    private final BlockingQueue<Object> removed = new LinkedBlockingQueue();
    private final BlockingQueue<Object> received = new LinkedBlockingQueue();

    protected void setUp() throws Exception {
        ThreadCleanCheck.before();
        this.endpoint = DXEndpoint.create(DXEndpoint.Role.LOCAL_HUB);
        this.endpoint.executor((v0) -> {
            v0.run();
        });
        this.feed = this.endpoint.getFeed();
        this.publisher = this.endpoint.getPublisher();
        this.sub = this.feed.createSubscription(Candle.class);
        this.publisher.getSubscription(Candle.class).addChangeListener(new ObservableSubscriptionChangeListener() { // from class: com.dxfeed.api.test.CandleAddRemoveSubTest.1
            public void symbolsAdded(Set<?> set) {
                CandleAddRemoveSubTest.this.added.addAll(set);
            }

            public void symbolsRemoved(Set<?> set) {
                CandleAddRemoveSubTest.this.removed.addAll(set);
            }
        });
        this.sub.addEventListener(list -> {
            Stream map = list.stream().map((v0) -> {
                return v0.getEventSymbol();
            });
            BlockingQueue<Object> blockingQueue = this.received;
            blockingQueue.getClass();
            map.forEach((v1) -> {
                r1.add(v1);
            });
        });
    }

    protected void tearDown() throws Exception {
        this.endpoint.closeAndAwaitTermination();
        checkQueues(null, null, null);
        ThreadCleanCheck.after();
    }

    public void testCandleAddRemoveSub() throws InterruptedException {
        this.sub.addSymbols(this.symbol1);
        checkQueues(this.symbol1, null, null);
        publish();
        checkQueues(null, null, this.symbol1);
        this.sub.removeSymbols(new Object[]{this.symbol1});
        checkQueues(null, this.symbol1, null);
        publish();
        checkQueues(null, null, null);
    }

    public void testCandleAddClearSub() throws InterruptedException {
        this.sub.addSymbols(this.symbol1);
        checkQueues(this.symbol1, null, null);
        publish();
        checkQueues(null, null, this.symbol1);
        this.sub.clear();
        checkQueues(null, this.symbol1, null);
        publish();
        checkQueues(null, null, null);
    }

    public void testReAttach() throws InterruptedException {
        this.feed.detachSubscription(this.sub);
        this.feed.attachSubscription(this.sub);
        testCandleAddDetachSub();
    }

    public void testIdempotentAttach() throws InterruptedException {
        this.feed.attachSubscription(this.sub);
        this.feed.attachSubscription(this.sub);
        testCandleAddDetachSub();
    }

    public void testIdempotentDetach() throws InterruptedException {
        this.feed.detachSubscription(this.sub);
        this.feed.detachSubscription(this.sub);
        this.feed.attachSubscription(this.sub);
        testCandleAddDetachSub();
    }

    public void testIdempotentReAttach() throws InterruptedException {
        this.feed.detachSubscription(this.sub);
        this.feed.detachSubscription(this.sub);
        this.feed.attachSubscription(this.sub);
        this.feed.attachSubscription(this.sub);
        testCandleAddDetachSub();
    }

    public void testIdempotentAttachAfterFirstChangeListener() throws InterruptedException {
        this.feed.detachSubscription(this.sub);
        this.sub.addChangeListener(emptyChangeListener());
        this.feed.attachSubscription(this.sub);
        this.feed.attachSubscription(this.sub);
        testCandleAddDetachSub();
    }

    public void testIdempotentAttachAfterSecondChangeListener() throws InterruptedException {
        this.sub.addChangeListener(emptyChangeListener());
        this.feed.attachSubscription(this.sub);
        this.feed.attachSubscription(this.sub);
        testCandleAddDetachSub();
    }

    public void testIdempotentReAttachWithManyChangeListener() throws InterruptedException {
        this.sub.addChangeListener(emptyChangeListener());
        this.feed.detachSubscription(this.sub);
        this.feed.detachSubscription(this.sub);
        this.sub.addChangeListener(emptyChangeListener());
        this.feed.attachSubscription(this.sub);
        this.feed.attachSubscription(this.sub);
        this.sub.addChangeListener(emptyChangeListener());
        testCandleAddDetachSub();
    }

    public void testDetachAfterSecondChangeListener() throws InterruptedException {
        this.sub.addChangeListener(emptyChangeListener());
        testCandleAddDetachSub();
    }

    public void testDualAttach() throws InterruptedException {
        DXEndpoint create = DXEndpoint.create(DXEndpoint.Role.LOCAL_HUB);
        create.executor((v0) -> {
            v0.run();
        });
        create.getFeed().attachSubscription(this.sub);
        testCandleAddDetachSub();
        create.closeAndAwaitTermination();
    }

    public void testDualAttachAndDetach() throws InterruptedException {
        DXEndpoint create = DXEndpoint.create(DXEndpoint.Role.LOCAL_HUB);
        create.executor((v0) -> {
            v0.run();
        });
        create.getFeed().attachSubscription(this.sub);
        create.getFeed().detachSubscription(this.sub);
        testCandleAddDetachSub();
        create.closeAndAwaitTermination();
    }

    public void testDualAttachAndReAttach() throws InterruptedException {
        DXEndpoint create = DXEndpoint.create(DXEndpoint.Role.LOCAL_HUB);
        create.executor((v0) -> {
            v0.run();
        });
        create.getFeed().detachSubscription(this.sub);
        this.feed.detachSubscription(this.sub);
        create.getFeed().attachSubscription(this.sub);
        this.sub.addChangeListener(emptyChangeListener());
        create.getFeed().detachSubscription(this.sub);
        this.feed.attachSubscription(this.sub);
        testCandleAddDetachSub();
        create.closeAndAwaitTermination();
    }

    public void testCandleAddDetachSub() throws InterruptedException {
        this.sub.addSymbols(this.symbol1);
        checkQueues(this.symbol1, null, null);
        publish();
        checkQueues(null, null, this.symbol1);
        this.feed.detachSubscription(this.sub);
        checkQueues(null, this.symbol1, null);
        publish();
        checkQueues(null, null, null);
    }

    public void testCandleSetSub() throws InterruptedException {
        this.sub.setSymbols(new Object[]{this.symbol1});
        checkQueues(this.symbol1, null, null);
        publish();
        checkQueues(null, null, this.symbol1);
        this.sub.setSymbols(new Object[]{this.symbol2});
        checkQueues(this.symbol2, this.symbol1, null);
        publish();
        checkQueues(null, null, this.symbol2);
        this.sub.clear();
        checkQueues(null, this.symbol2, null);
    }

    private void publish() {
        ArrayList arrayList = new ArrayList();
        for (CandleSymbol candleSymbol : this.symbols) {
            Candle candle = new Candle(candleSymbol);
            long j = this.time + 1000;
            this.time = j;
            candle.setTime(j);
            arrayList.add(candle);
        }
        this.publisher.publishEvents(arrayList);
    }

    private void checkQueues(CandleSymbol candleSymbol, CandleSymbol candleSymbol2, CandleSymbol candleSymbol3) {
        assertEquals(candleSymbol, this.added.poll());
        assertEquals(candleSymbol2, this.removed.poll());
        assertEquals(candleSymbol3, this.received.poll());
        assertTrue(this.added.isEmpty());
        assertTrue(this.removed.isEmpty());
        assertTrue(this.received.isEmpty());
    }

    private ObservableSubscriptionChangeListener emptyChangeListener() {
        return set -> {
        };
    }
}
