/*
 * Decompiled with CFR 0.152.
 */
package com.dxfeed.api.test;

import com.devexperts.logging.Logging;
import com.devexperts.test.ThreadCleanCheck;
import com.devexperts.test.TraceRunnerWithParametersFactory;
import com.dxfeed.api.DXEndpoint;
import com.dxfeed.api.DXFeed;
import com.dxfeed.api.DXFeedSubscription;
import com.dxfeed.api.DXFeedTimeSeriesSubscription;
import com.dxfeed.api.DXPublisher;
import com.dxfeed.api.osub.ObservableSubscriptionChangeListener;
import com.dxfeed.api.osub.TimeSeriesSubscriptionSymbol;
import com.dxfeed.event.EventType;
import com.dxfeed.event.LastingEvent;
import com.dxfeed.event.TimeSeriesEvent;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
@Parameterized.UseParametersRunnerFactory(value=TraceRunnerWithParametersFactory.class)
public abstract class AbstractDXPublisherTest {
    private static final Logging log = Logging.getLogging(AbstractDXPublisherTest.class);
    protected final DXEndpoint.Role role;
    protected ExecutorService executorService;
    protected Executor executor;
    protected AtomicInteger executionCount;
    protected DXEndpoint endpoint;
    protected DXFeed feed;
    protected DXPublisher publisher;

    protected AbstractDXPublisherTest(DXEndpoint.Role role) {
        this.role = role;
    }

    @Parameterized.Parameters(name="{0}")
    public static Iterable<Object[]> parameters() {
        return Arrays.asList({DXEndpoint.Role.LOCAL_HUB}, {DXEndpoint.Role.STREAM_FEED});
    }

    public void setUp(String description) {
        ThreadCleanCheck.before((String)description);
        this.executorService = Executors.newSingleThreadExecutor();
        this.executor = command -> {
            this.executionCount.incrementAndGet();
            this.executorService.execute(command);
        };
        this.executionCount = new AtomicInteger();
        this.endpoint = this.endpointBuilder().build().executor(this.executor);
        this.feed = this.endpoint.getFeed();
        this.publisher = this.endpoint.getPublisher();
    }

    public void tearDown() throws InterruptedException {
        this.endpoint.close();
        this.executorService.shutdown();
        this.executorService.awaitTermination(1L, TimeUnit.SECONDS);
        ThreadCleanCheck.after();
    }

    protected DXEndpoint.Builder endpointBuilder() {
        return DXEndpoint.newBuilder().withRole(this.role).withProperty("dxfeed.wildcard.enable", "true").withProperty("dxendpoint.eventTime", "true");
    }

    protected <T extends TimeSeriesEvent<?>> void testTimeSeriesEventPublishing(Class<T> clazz, Object symbol, EventCreator<T> eventCreator, EventChecker<T> eventChecker) throws InterruptedException {
        this.setUp("testTimeSeriesEventPublishing for " + clazz.getSimpleName() + ", symbol=" + symbol);
        DXFeedTimeSeriesSubscription sub = this.feed.createTimeSeriesSubscription(clazz);
        sub.setFromTime(0L);
        this.testEventPublishing(clazz, symbol, eventCreator, eventChecker, (DXFeedSubscription<T>)sub);
        this.tearDown();
    }

    protected <T extends TimeSeriesEvent<?>> void testTimeSeriesEventPublishing(Class<T> clazz, Object symbol, EventCreator<T> eventCreator) throws InterruptedException {
        this.setUp("testTimeSeriesEventPublishing for " + clazz.getSimpleName() + ", symbol=" + symbol);
        DXFeedTimeSeriesSubscription sub = this.feed.createTimeSeriesSubscription(clazz);
        sub.setFromTime(0L);
        this.testEventPublishing(clazz, symbol, eventCreator, new DefaultEventChecker(), (DXFeedSubscription<T>)sub);
        this.tearDown();
    }

    protected <T extends EventType<?>> void testEventPublishing(Class<T> clazz, Object symbol, EventCreator<T> eventCreator, EventChecker<T> eventChecker) throws InterruptedException {
        this.setUp("testEventPublishing for " + clazz.getSimpleName() + ", symbol=" + symbol);
        this.testEventPublishing(clazz, symbol, eventCreator, eventChecker, this.feed.createSubscription(clazz));
        this.tearDown();
    }

    protected <T extends EventType<?>> void testEventPublishing(Class<T> clazz, Object symbol, EventCreator<T> eventCreator) throws InterruptedException {
        this.setUp("testEventPublishing for " + clazz.getSimpleName() + ", symbol=" + symbol);
        this.testEventPublishing(clazz, symbol, eventCreator, new DefaultEventChecker(), this.feed.createSubscription(clazz));
        this.tearDown();
    }

    private <T extends EventType<?>> void testEventPublishing(Class<T> clazz, Object symbol, EventCreator<T> eventCreator, EventChecker<T> eventChecker, DXFeedSubscription<T> sub) {
        final ArrayBlockingQueue subAddQueue = new ArrayBlockingQueue(100);
        final ArrayBlockingQueue subRemoveQueue = new ArrayBlockingQueue(100);
        ArrayBlockingQueue queue = new ArrayBlockingQueue(1);
        sub.addEventListener(events -> {
            log.trace("eventsReceived " + events);
            Assert.assertEquals((long)1L, (long)events.size());
            queue.addAll(events);
        });
        ObservableSubscriptionChangeListener observableSubChangeListener = new ObservableSubscriptionChangeListener(){

            public void symbolsAdded(Set<?> symbols) {
                log.trace("symbolsAdded " + symbols);
                Assert.assertTrue((!symbols.isEmpty() ? 1 : 0) != 0);
                for (Object symbol : symbols) {
                    subAddQueue.add(this.getEventSymbol(symbol));
                }
            }

            public void symbolsRemoved(Set<?> symbols) {
                log.trace("symbolsRemoved " + symbols);
                Assert.assertTrue((!symbols.isEmpty() ? 1 : 0) != 0);
                for (Object symbol : symbols) {
                    subRemoveQueue.add(this.getEventSymbol(symbol));
                }
            }

            private Object getEventSymbol(Object symbol) {
                if (symbol instanceof TimeSeriesSubscriptionSymbol) {
                    return ((TimeSeriesSubscriptionSymbol)symbol).getEventSymbol();
                }
                return symbol;
            }
        };
        this.publisher.getSubscription(clazz).addChangeListener(observableSubChangeListener);
        log.trace("Adding symbol " + symbol);
        sub.addSymbols(symbol);
        this.checkpoint();
        Assert.assertEquals((Object)symbol, subAddQueue.poll());
        Assert.assertEquals((long)0L, (long)subAddQueue.size());
        Assert.assertEquals((long)0L, (long)subRemoveQueue.size());
        for (int i = 0; i < 100; ++i) {
            T pubEvent = eventCreator.createEvent(i);
            log.trace("Publishing " + pubEvent);
            this.publisher.publishEvents(Collections.singletonList(pubEvent));
            this.checkpoint();
            EventType subEvent = (EventType)queue.poll();
            Assert.assertTrue((subEvent != pubEvent ? 1 : 0) != 0);
            eventChecker.check((EventType)pubEvent, subEvent);
        }
        sub.close();
        this.checkpoint();
        Assert.assertEquals((Object)symbol, subRemoveQueue.poll());
        Assert.assertEquals((long)0L, (long)subAddQueue.size());
        Assert.assertEquals((long)0L, (long)subRemoveQueue.size());
        this.publisher.getSubscription(clazz).removeChangeListener(observableSubChangeListener);
        this.checkpoint();
    }

    protected <S, T extends EventType<S> & LastingEvent<S>, L extends T, P extends T> void testGetLastEvent(Class<T> clazz, P pubEvent, L lastEvent, EventChecker<T> eventChecker) throws InterruptedException {
        if (this.role != DXEndpoint.Role.LOCAL_HUB) {
            return;
        }
        this.setUp("testGetLastEvent for " + clazz.getSimpleName() + ", pubEvent=" + pubEvent);
        DXFeedSubscription sub = this.feed.createSubscription(clazz);
        sub.addSymbols(pubEvent.getEventSymbol());
        this.publisher.publishEvents(Collections.singletonList(pubEvent));
        this.checkpoint();
        lastEvent.setEventSymbol(pubEvent.getEventSymbol());
        lastEvent = this.feed.getLastEvent((LastingEvent)lastEvent);
        Assert.assertTrue((lastEvent != pubEvent ? 1 : 0) != 0);
        eventChecker.check(pubEvent, lastEvent);
        this.tearDown();
    }

    protected void checkpoint() {
        try {
            int i = 1;
            while (true) {
                int loop = i++;
                int count = this.executionCount.get();
                this.executorService.submit(() -> log.trace("Executing checkpoint #" + loop + " for " + count)).get();
                if (this.executionCount.get() == count) break;
            }
            log.trace("Checkpoint done");
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.toString());
        }
    }

    protected static interface EventChecker<T extends EventType<?>> {
        public void check(T var1, T var2);
    }

    protected static interface EventCreator<T extends EventType<?>> {
        public T createEvent(int var1);
    }

    private static class DefaultEventChecker<T extends EventType<?>>
    implements EventChecker<T> {
        private DefaultEventChecker() {
        }

        @Override
        public void check(T publishedEvent, T receivedEvent) {
            Method[] methods;
            Assert.assertNotNull(publishedEvent);
            Assert.assertNotNull(receivedEvent);
            for (Method method : methods = publishedEvent.getClass().getMethods()) {
                if (method.getName().equals("getClass") || !method.getName().matches("(get|is).*") || method.getParameterTypes().length != 0) continue;
                try {
                    Object expectedRes = method.invoke(publishedEvent, new Object[0]);
                    Object actualRes = method.invoke(receivedEvent, new Object[0]);
                    Assert.assertEquals((String)method.getName(), (Object)expectedRes, (Object)actualRes);
                }
                catch (IllegalAccessException | InvocationTargetException e) {
                    throw new IllegalStateException(e);
                }
            }
        }
    }
}

