/*
 * Decompiled with CFR 0.152.
 */
package com.dxfeed.api.test;

import com.devexperts.qd.QDContract;
import com.devexperts.qd.QDDistributor;
import com.devexperts.qd.QDFactory;
import com.devexperts.qd.ng.RecordBuffer;
import com.devexperts.qd.ng.RecordMode;
import com.devexperts.qd.ng.RecordSink;
import com.devexperts.qd.qtp.QDEndpoint;
import com.dxfeed.api.DXEndpoint;
import com.dxfeed.api.DXFeedSubscription;
import com.dxfeed.api.impl.DXEndpointImpl;
import com.dxfeed.api.osub.ObservableSubscription;
import com.dxfeed.api.osub.ObservableSubscriptionChangeListener;
import com.dxfeed.api.osub.TimeSeriesSubscriptionSymbol;
import com.dxfeed.event.candle.Candle;
import com.dxfeed.event.candle.CandlePeriod;
import com.dxfeed.event.candle.CandleSymbol;
import com.dxfeed.event.candle.CandleSymbolAttribute;
import com.dxfeed.event.candle.CandleType;
import com.dxfeed.promise.Promise;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.junit.Assert;
import org.junit.Test;

public class HistorySubscriptionTest {
    @Test
    public void testEmptySubscription() {
        QDFactory.getDefaultScheme();
        System.out.println();
        int tests = 0;
        int failures = 0;
        for (int mask = 0; mask < 16; ++mask) {
            for (int size = 1; size < 100; size *= 2) {
                ++tests;
                if (this.checkEmptySub((mask & 8) != 0, (mask & 4) != 0, (mask & 2) != 0, (mask & 1) != 0, size)) continue;
                ++failures;
            }
        }
        System.out.println(tests - failures + " tests passed, " + failures + " tests failed");
        Assert.assertEquals((String)("tests failed: " + failures), (long)0L, (long)failures);
    }

    private static CandleSymbol symbol(int i) {
        return CandleSymbol.valueOf((String)("S" + i), (CandleSymbolAttribute)CandlePeriod.valueOf((double)1.0, (CandleType)CandleType.MINUTE));
    }

    private boolean checkEmptySub(boolean storeEverything, boolean promise, boolean subscribe, boolean publish, int size) {
        int i;
        System.out.println("testing storeEverything " + storeEverything + ", promise " + promise + ", subscribe " + subscribe + ", publish " + publish + ", size " + size);
        DXEndpoint endpoint = DXEndpoint.newBuilder().withRole(DXEndpoint.Role.LOCAL_HUB).withProperty("dxendpoint.storeEverything", String.valueOf(storeEverything)).build();
        endpoint.executor(Runnable::run);
        ArrayList<Promise> promises = new ArrayList<Promise>();
        if (promise && publish) {
            for (int i2 = 0; i2 < size; ++i2) {
                promises.add(endpoint.getFeed().getTimeSeriesPromise(Candle.class, (Object)HistorySubscriptionTest.symbol(i2), 0L, 1000L));
            }
        }
        ArrayList events = new ArrayList();
        DXFeedSubscription sub = new DXFeedSubscription(Candle.class);
        sub.addEventListener(events::addAll);
        if (subscribe) {
            ArrayList<TimeSeriesSubscriptionSymbol> symbols = new ArrayList<TimeSeriesSubscriptionSymbol>();
            for (i = 0; i < size; ++i) {
                symbols.add(new TimeSeriesSubscriptionSymbol((Object)HistorySubscriptionTest.symbol(i), 0L));
            }
            sub.setSymbols(symbols);
        }
        sub.attach(endpoint.getFeed());
        Assert.assertEquals((String)"garbage events received", (long)0L, (long)events.size());
        if (publish) {
            ArrayList<Candle> candles = new ArrayList<Candle>();
            for (i = 0; i < size; ++i) {
                candles.add(new Candle(HistorySubscriptionTest.symbol(i)));
            }
            endpoint.getPublisher().publishEvents(candles);
        }
        Assert.assertEquals((String)"events received", (long)(subscribe && publish ? (long)size : 0L), (long)events.size());
        sub.setSymbols(new Object[0]);
        sub.close();
        Assert.assertEquals((String)"events received", (long)(subscribe && publish ? (long)size : 0L), (long)events.size());
        promises.forEach(p -> Assert.assertEquals((String)"promise completed", (long)1L, (long)((List)p.getResult()).size()));
        boolean result = this.assertEmptySub(endpoint);
        endpoint.close();
        System.out.println("test " + (result ? "passed" : "FAILED"));
        System.out.println();
        return result;
    }

    private boolean assertEmptySub(DXEndpoint endpoint) {
        final ArrayList<String> messages = new ArrayList<String>();
        QDEndpoint qdEndpoint = ((DXEndpointImpl)endpoint).getQDEndpoint();
        RecordBuffer buffer = RecordBuffer.getInstance((RecordMode)RecordMode.HISTORY_SUBSCRIPTION);
        qdEndpoint.getCollector(QDContract.HISTORY).examineSubscription((RecordSink)buffer);
        if (!buffer.isEmpty()) {
            messages.add("QDHistory subscription [" + buffer.size() + "] " + buffer.next());
        }
        buffer.clear();
        QDDistributor distributor = qdEndpoint.getCollector(QDContract.HISTORY).distributorBuilder().build();
        while (distributor.getAddedRecordProvider().retrieve((RecordSink)buffer)) {
        }
        if (!buffer.isEmpty()) {
            messages.add("QDDistributor added [" + buffer.size() + "] " + buffer.next());
        }
        buffer.clear();
        distributor.close();
        ObservableSubscription subscription = endpoint.getPublisher().getSubscription(Candle.class);
        ObservableSubscriptionChangeListener listener = new ObservableSubscriptionChangeListener(){

            public void symbolsAdded(Set<?> symbols) {
                messages.add("dxFeed added " + symbols);
            }

            public void symbolsRemoved(Set<?> symbols) {
                messages.add("dxFeed removed " + symbols);
            }
        };
        subscription.addChangeListener(listener);
        subscription.removeChangeListener(listener);
        messages.forEach(System.out::println);
        return messages.isEmpty();
    }
}

