package com.dxfeed.api.test;

import com.devexperts.logging.Logging;
import com.devexperts.test.ThreadCleanCheck;
import com.devexperts.test.TraceRunnerWithParametersFactory;
import com.dxfeed.api.DXEndpoint;
import com.dxfeed.api.DXFeed;
import com.dxfeed.api.DXFeedSubscription;
import com.dxfeed.api.DXFeedTimeSeriesSubscription;
import com.dxfeed.api.DXPublisher;
import com.dxfeed.api.osub.ObservableSubscriptionChangeListener;
import com.dxfeed.api.osub.TimeSeriesSubscriptionSymbol;
import com.dxfeed.event.EventType;
import com.dxfeed.event.LastingEvent;
import com.dxfeed.event.TimeSeriesEvent;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@Parameterized.UseParametersRunnerFactory(TraceRunnerWithParametersFactory.class)
@RunWith(Parameterized.class)
/* loaded from: input_file:com/dxfeed/api/test/AbstractDXPublisherTest.class */
public abstract class AbstractDXPublisherTest {
    protected final Logging log = Logging.getLogging(getClass());
    protected final DXEndpoint.Role role;
    protected ExecutorService executorService;
    protected Executor executor;
    protected AtomicInteger executionCount;
    protected DXEndpoint endpoint;
    protected DXFeed feed;
    protected DXPublisher publisher;

    /* loaded from: input_file:com/dxfeed/api/test/AbstractDXPublisherTest$DefaultEventChecker.class */
    private static class DefaultEventChecker<T extends EventType<?>> implements EventChecker<T> {
        private DefaultEventChecker() {
        }

        @Override // com.dxfeed.api.test.AbstractDXPublisherTest.EventChecker
        public void check(T t, T t2) {
            Assert.assertNotNull(t);
            Assert.assertNotNull(t2);
            for (Method method : t.getClass().getMethods()) {
                if (!method.getName().equals("getClass") && method.getName().matches("(get|is).*") && method.getParameterTypes().length == 0) {
                    try {
                        Assert.assertEquals(method.getName(), method.invoke(t, new Object[0]), method.invoke(t2, new Object[0]));
                    } catch (IllegalAccessException | InvocationTargetException e) {
                        throw new IllegalStateException(e);
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/dxfeed/api/test/AbstractDXPublisherTest$EventChecker.class */
    public interface EventChecker<T extends EventType<?>> {
        void check(T t, T t2);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/dxfeed/api/test/AbstractDXPublisherTest$EventCreator.class */
    public interface EventCreator<T extends EventType<?>> {
        T createEvent(int i);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractDXPublisherTest(DXEndpoint.Role role) {
        this.role = role;
    }

    @Parameterized.Parameters(name = "{0}")
    public static Iterable<Object[]> parameters() {
        return Arrays.asList(new Object[]{DXEndpoint.Role.LOCAL_HUB}, new Object[]{DXEndpoint.Role.STREAM_FEED});
    }

    public void setUp(String str) {
        ThreadCleanCheck.before(str);
        this.executorService = Executors.newSingleThreadExecutor();
        this.executor = runnable -> {
            this.executionCount.incrementAndGet();
            this.executorService.execute(runnable);
        };
        this.executionCount = new AtomicInteger();
        this.endpoint = endpointBuilder().build().executor(this.executor);
        this.feed = this.endpoint.getFeed();
        this.publisher = this.endpoint.getPublisher();
    }

    public void tearDown() throws InterruptedException {
        this.endpoint.close();
        this.executorService.shutdown();
        this.executorService.awaitTermination(1L, TimeUnit.SECONDS);
        ThreadCleanCheck.after();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public DXEndpoint.Builder endpointBuilder() {
        return DXEndpoint.newBuilder().withRole(this.role).withProperty("dxfeed.wildcard.enable", "true").withProperty("dxendpoint.eventTime", "true");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T extends TimeSeriesEvent<?>> void testTimeSeriesEventPublishing(Class<T> cls, Object obj, EventCreator<T> eventCreator, EventChecker<T> eventChecker) throws InterruptedException {
        setUp("testTimeSeriesEventPublishing for " + cls.getSimpleName() + ", symbol=" + obj);
        DXFeedTimeSeriesSubscription createTimeSeriesSubscription = this.feed.createTimeSeriesSubscription(cls);
        createTimeSeriesSubscription.setFromTime(0L);
        testEventPublishing(cls, obj, eventCreator, eventChecker, createTimeSeriesSubscription);
        tearDown();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T extends TimeSeriesEvent<?>> void testTimeSeriesEventPublishing(Class<T> cls, Object obj, EventCreator<T> eventCreator) throws InterruptedException {
        setUp("testTimeSeriesEventPublishing for " + cls.getSimpleName() + ", symbol=" + obj);
        DXFeedTimeSeriesSubscription createTimeSeriesSubscription = this.feed.createTimeSeriesSubscription(cls);
        createTimeSeriesSubscription.setFromTime(0L);
        testEventPublishing(cls, obj, eventCreator, new DefaultEventChecker(), createTimeSeriesSubscription);
        tearDown();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T extends EventType<?>> void testEventPublishing(Class<T> cls, Object obj, EventCreator<T> eventCreator, EventChecker<T> eventChecker) throws InterruptedException {
        setUp("testEventPublishing for " + cls.getSimpleName() + ", symbol=" + obj);
        testEventPublishing(cls, obj, eventCreator, eventChecker, this.feed.createSubscription(cls));
        tearDown();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T extends EventType<?>> void testEventPublishing(Class<T> cls, Object obj, EventCreator<T> eventCreator) throws InterruptedException {
        setUp("testEventPublishing for " + cls.getSimpleName() + ", symbol=" + obj);
        testEventPublishing(cls, obj, eventCreator, new DefaultEventChecker(), this.feed.createSubscription(cls));
        tearDown();
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <T extends EventType<?>> void testEventPublishing(Class<T> cls, Object obj, EventCreator<T> eventCreator, EventChecker<T> eventChecker, DXFeedSubscription<T> dXFeedSubscription) {
        final ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(100);
        final ArrayBlockingQueue arrayBlockingQueue2 = new ArrayBlockingQueue(100);
        ArrayBlockingQueue arrayBlockingQueue3 = new ArrayBlockingQueue(1);
        dXFeedSubscription.addEventListener(list -> {
            this.log.trace("eventsReceived " + list);
            Assert.assertEquals(1L, list.size());
            arrayBlockingQueue3.addAll(list);
        });
        ObservableSubscriptionChangeListener observableSubscriptionChangeListener = new ObservableSubscriptionChangeListener() { // from class: com.dxfeed.api.test.AbstractDXPublisherTest.1
            public void symbolsAdded(Set<?> set) {
                AbstractDXPublisherTest.this.log.trace("symbolsAdded " + set);
                Assert.assertTrue(!set.isEmpty());
                Iterator<?> it = set.iterator();
                while (it.hasNext()) {
                    arrayBlockingQueue.add(getEventSymbol(it.next()));
                }
            }

            public void symbolsRemoved(Set<?> set) {
                AbstractDXPublisherTest.this.log.trace("symbolsRemoved " + set);
                Assert.assertTrue(!set.isEmpty());
                Iterator<?> it = set.iterator();
                while (it.hasNext()) {
                    arrayBlockingQueue2.add(getEventSymbol(it.next()));
                }
            }

            private Object getEventSymbol(Object obj2) {
                return obj2 instanceof TimeSeriesSubscriptionSymbol ? ((TimeSeriesSubscriptionSymbol) obj2).getEventSymbol() : obj2;
            }
        };
        this.publisher.getSubscription(cls).addChangeListener(observableSubscriptionChangeListener);
        this.log.trace("Adding symbol " + obj);
        dXFeedSubscription.addSymbols(obj);
        checkpoint();
        Assert.assertEquals(obj, arrayBlockingQueue.poll());
        Assert.assertEquals(0L, arrayBlockingQueue.size());
        Assert.assertEquals(0L, arrayBlockingQueue2.size());
        for (int i = 0; i < 100; i++) {
            T createEvent = eventCreator.createEvent(i);
            this.log.trace("Publishing " + createEvent);
            this.publisher.publishEvents(Collections.singletonList(createEvent));
            checkpoint();
            EventType eventType = (EventType) arrayBlockingQueue3.poll();
            Assert.assertTrue(eventType != createEvent);
            eventChecker.check(createEvent, eventType);
        }
        dXFeedSubscription.close();
        checkpoint();
        Assert.assertEquals(obj, arrayBlockingQueue2.poll());
        Assert.assertEquals(0L, arrayBlockingQueue.size());
        Assert.assertEquals(0L, arrayBlockingQueue2.size());
        this.publisher.getSubscription(cls).removeChangeListener(observableSubscriptionChangeListener);
        checkpoint();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Incorrect types in method signature: <S:Ljava/lang/Object;T::Lcom/dxfeed/event/EventType<TS;>;:Lcom/dxfeed/event/LastingEvent<TS;>;L:TT;P:TT;>(Ljava/lang/Class<TT;>;TP;TL;Lcom/dxfeed/api/test/AbstractDXPublisherTest$EventChecker<TT;>;)V */
    public void testGetLastEvent(Class cls, EventType eventType, EventType eventType2, EventChecker eventChecker) throws InterruptedException {
        if (this.role != DXEndpoint.Role.LOCAL_HUB) {
            return;
        }
        setUp("testGetLastEvent for " + cls.getSimpleName() + ", pubEvent=" + eventType);
        this.feed.createSubscription(cls).addSymbols(eventType.getEventSymbol());
        this.publisher.publishEvents(Collections.singletonList(eventType));
        checkpoint();
        eventType2.setEventSymbol(eventType.getEventSymbol());
        LastingEvent lastEvent = this.feed.getLastEvent((LastingEvent) eventType2);
        Assert.assertTrue(lastEvent != eventType);
        eventChecker.check(eventType, lastEvent);
        tearDown();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void checkpoint() {
        int i = 1;
        while (true) {
            try {
                int i2 = i;
                int i3 = this.executionCount.get();
                this.executorService.submit(() -> {
                    this.log.trace("Executing checkpoint #" + i2 + " for " + i3);
                }).get();
                if (this.executionCount.get() == i3) {
                    this.log.trace("Checkpoint done");
                    return;
                }
                i++;
            } catch (Exception e) {
                e.printStackTrace();
                Assert.fail(e.toString());
                return;
            }
        }
    }
}
