package com.dxfeed.webservice.comet;

import com.devexperts.logging.Logging;
import com.devexperts.qd.QDFilter;
import com.devexperts.qd.impl.matrix.management.CollectorManagement;
import com.devexperts.qd.qtp.ProtocolDescriptor;
import com.devexperts.util.SystemProperties;
import com.devexperts.util.TimeFormat;
import com.dxfeed.api.DXEndpoint;
import com.dxfeed.api.DXFeed;
import com.dxfeed.api.DXFeedEventListener;
import com.dxfeed.api.DXFeedSubscription;
import com.dxfeed.api.impl.DXEndpointImpl;
import com.dxfeed.api.impl.DXFeedImpl;
import com.dxfeed.api.osub.TimeSeriesSubscriptionSymbol;
import com.dxfeed.event.EventType;
import com.dxfeed.event.TimeSeriesEvent;
import com.dxfeed.ondemand.OnDemandService;
import com.dxfeed.webservice.DXFeedContext;
import com.dxfeed.webservice.EventSymbolMap;
import java.beans.PropertyChangeEvent;
import java.beans.PropertyChangeListener;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nonnull;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.cometd.annotation.Listener;
import org.cometd.annotation.Service;
import org.cometd.annotation.Session;
import org.cometd.bayeux.Promise;
import org.cometd.bayeux.server.ServerMessage;
import org.cometd.bayeux.server.ServerSession;
import org.cometd.server.ServerSessionImpl;

@Service
/* loaded from: input_file:WEB-INF/lib/dxfeed-webservice-impl.jar:com/dxfeed/webservice/comet/DataService.class */
public class DataService {
    private static final String DATA_CHANNEL = "/service/data";
    private static final String TIME_SERIES_DATA_CHANNEL = "/service/timeSeriesData";
    private static final String STATE_CHANNEL = "/service/state";
    private static final int DEFAULT_BATCH_SIZE = 100;
    private static final String ONDEMAND_NAME_PREFIX = "onDemand";
    private DXEndpoint sharedEndpoint;
    private OnDemandService sharedOnDemand;

    @Session
    private ServerSession server;
    private static final Logging log = Logging.getLogging((Class<?>) DataService.class);
    private static final Map<String, Method> ONDEMAND_OPS = new HashMap();
    private int messageBatchSize = SystemProperties.getIntProperty(DataService.class, "messageBatchSize", 100, 1, CollectorManagement.DEFAULT_SUBSCRIPTION_BUCKET);
    private final ConcurrentHashMap<ServerSession, SessionState> sessions = new ConcurrentHashMap<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/dxfeed-webservice-impl.jar:com/dxfeed/webservice/comet/DataService$SessionState.class */
    public class SessionState implements ServerSession.RemoveListener, ServerSession.Extension, PropertyChangeListener {
        private final ServerSession remote;
        private final ServerSessionImpl sessionImpl;
        private final QDFilter filter;
        private DXFeed feed;
        private OnDemandService onDemand;
        private DXFeedImpl onDemandFeed;
        private volatile boolean closed;
        private final Map<Class<?>, DXFeedSubscription<Object>> regularSubscriptionsMap = new HashMap();
        private final Map<Class<?>, DXFeedSubscription<Object>> timeSeriesSubscriptionsMap = new HashMap();
        private final EventSymbolMap symbolMap = new EventSymbolMap();
        private SessionStats stats = new SessionStats();
        private SessionStats tmpStats = new SessionStats();

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:WEB-INF/lib/dxfeed-webservice-impl.jar:com/dxfeed/webservice/comet/DataService$SessionState$Listener.class */
        public class Listener<T extends EventType<?>> implements DXFeedEventListener<T> {
            private final Class<T> eventType;
            private final boolean timeSeries;
            private boolean sendScheme;

            private Listener(Class<T> cls, boolean z) {
                this.sendScheme = true;
                this.eventType = cls;
                this.timeSeries = z;
            }

            @Override // com.dxfeed.api.DXFeedEventListener
            public void eventsReceived(List<T> list) {
                int size = list.size();
                int i = 0;
                while (true) {
                    int i2 = i;
                    if (i2 >= size) {
                        return;
                    }
                    List<T> subList = list.subList(i2, Math.min(size, i2 + DataService.this.messageBatchSize));
                    synchronized (SessionState.this) {
                        SessionState.this.remote.deliver(DataService.this.server, this.timeSeries ? DataService.TIME_SERIES_DATA_CHANNEL : DataService.DATA_CHANNEL, new DataMessage(this.sendScheme, this.eventType, subList, SessionState.this.symbolMap), Promise.noop());
                    }
                    this.sendScheme = false;
                    i = i2 + DataService.this.messageBatchSize;
                }
            }
        }

        SessionState(@Nonnull ServerSession serverSession, @Nonnull QDFilter qDFilter) {
            DataService.log.info("Create session=" + serverSession.getId() + ", filter=" + qDFilter);
            this.remote = serverSession;
            this.sessionImpl = serverSession instanceof ServerSessionImpl ? (ServerSessionImpl) serverSession : null;
            this.filter = qDFilter;
            this.feed = DXFeedContext.INSTANCE.getFeed(qDFilter);
            this.stats.sessionId = serverSession.getId();
            this.stats.numSessions = 1;
            SessionStats sessionStats = this.stats;
            SessionStats sessionStats2 = this.stats;
            long currentTimeMillis = System.currentTimeMillis();
            sessionStats2.lastActiveTime = currentTimeMillis;
            sessionStats.createTime = currentTimeMillis;
            serverSession.setAttribute(CometDMonitoring.STATS_ATTR, this.stats);
            serverSession.setAttribute(CometDMonitoring.TMP_STATS_ATTR, this.tmpStats);
            serverSession.addExtension(this);
            deliverStateChange("replaySupported", Boolean.valueOf(DataService.this.sharedOnDemand.isReplaySupported()));
        }

        private synchronized boolean makeClosedSync() {
            if (this.closed) {
                return false;
            }
            this.closed = true;
            return true;
        }

        @Override // org.cometd.bayeux.server.ServerSession.RemoveListener
        public void removed(ServerSession serverSession, boolean z) {
            if (makeClosedSync()) {
                DataService.log.info("Close session=" + serverSession.getId() + ", timeout=" + z);
                OnDemandService clearOnDemandButNotCloseItYet = clearOnDemandButNotCloseItYet();
                if (clearOnDemandButNotCloseItYet != null) {
                    this.onDemandFeed.closeImpl();
                    clearOnDemandButNotCloseItYet.getEndpoint().close();
                }
                closeSubscriptions();
                DataService.this.sessions.remove(serverSession);
            }
        }

        private void closeSubscriptions() {
            Iterator<DXFeedSubscription<Object>> it = this.regularSubscriptionsMap.values().iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            Iterator<DXFeedSubscription<Object>> it2 = this.timeSeriesSubscriptionsMap.values().iterator();
            while (it2.hasNext()) {
                it2.next().close();
            }
        }

        public synchronized void sub(ServerMessage.Mutable mutable) {
            Map map = (Map) mutable.getData();
            if (this.closed) {
                DataService.log.warn("sub[session=" + this.remote.getId() + "] ignored closed session: " + map);
                return;
            }
            if (DataService.log.debugEnabled()) {
                DataService.log.debug("sub[session=" + this.remote.getId() + "]: " + map);
            }
            Boolean bool = (Boolean) map.get("reset");
            if (bool != null && bool.booleanValue()) {
                resetSub();
            }
            processSub(false, false, (Map) map.get("remove"));
            processSub(true, false, (Map) map.get("add"));
            processSub(false, true, (Map) map.get("removeTimeSeries"));
            processSub(true, true, (Map) map.get("addTimeSeries"));
            this.stats.regSubscription(this.regularSubscriptionsMap.values().stream().mapToInt(dXFeedSubscription -> {
                return dXFeedSubscription.getSymbols().size();
            }).sum(), false);
            this.stats.regSubscription(this.timeSeriesSubscriptionsMap.values().stream().mapToInt(dXFeedSubscription2 -> {
                return dXFeedSubscription2.getSymbols().size();
            }).sum(), true);
        }

        private void resetSub() {
            closeSubscriptions();
            this.regularSubscriptionsMap.clear();
            this.timeSeriesSubscriptionsMap.clear();
        }

        private void processSub(boolean z, boolean z2, Map<String, List<?>> map) {
            if (map == null) {
                return;
            }
            for (Map.Entry<String, List<?>> entry : map.entrySet()) {
                Class<? extends EventType<?>> cls = DXFeedContext.INSTANCE.getEventTypes().get(entry.getKey());
                if (cls != null && (!z2 || TimeSeriesEvent.class.isAssignableFrom(cls))) {
                    DXFeedSubscription<?> orCreateSubscription = getOrCreateSubscription(cls, z2);
                    List<?> value = entry.getValue();
                    ArrayList arrayList = new ArrayList(value.size());
                    if (z2 && z) {
                        Iterator<?> it = value.iterator();
                        while (it.hasNext()) {
                            Map map2 = (Map) it.next();
                            String str = (String) map2.get("eventSymbol");
                            Object obj = map2.get("fromTime");
                            if (obj instanceof Number) {
                                arrayList.add(new TimeSeriesSubscriptionSymbol(this.symbolMap.resolveEventSymbolMapping(cls, str), ((Number) obj).longValue()));
                            }
                        }
                    } else {
                        Iterator<?> it2 = value.iterator();
                        while (it2.hasNext()) {
                            arrayList.add(this.symbolMap.resolveEventSymbolMapping(cls, (String) it2.next()));
                        }
                    }
                    if (z) {
                        orCreateSubscription.addSymbols((Collection<?>) arrayList);
                    } else {
                        orCreateSubscription.removeSymbols(arrayList);
                        this.symbolMap.cleanupEventSymbolMapping(cls, getSubscriptions(z2).get(cls));
                    }
                }
            }
        }

        private Map<Class<?>, DXFeedSubscription<Object>> getSubscriptions(boolean z) {
            return z ? this.timeSeriesSubscriptionsMap : this.regularSubscriptionsMap;
        }

        @Override // java.beans.PropertyChangeListener
        public void propertyChange(PropertyChangeEvent propertyChangeEvent) {
            String propertyName = propertyChangeEvent.getPropertyName();
            if (ProtocolDescriptor.TIME_PROPERTY.equals(propertyName)) {
                deliverStateChange(propertyName, propertyChangeEvent.getNewValue());
            }
        }

        private void deliverStateChange(String str, Object obj) {
            this.remote.deliver(DataService.this.server, DataService.STATE_CHANNEL, Collections.singletonMap(str, obj), Promise.noop());
        }

        private synchronized OnDemandService ensureOnDemand() {
            if (this.onDemand == null) {
                DXEndpointImpl dXEndpointImpl = (DXEndpointImpl) DXEndpoint.create(DXEndpoint.Role.ON_DEMAND_FEED);
                this.onDemand = OnDemandService.getInstance(dXEndpointImpl);
                this.onDemand.addPropertyChangeListener(this);
                this.onDemandFeed = new DXFeedImpl(dXEndpointImpl, this.filter);
                attachTo(this.onDemandFeed);
            }
            return this.onDemand;
        }

        private synchronized OnDemandService getOnDemand() {
            return this.onDemand;
        }

        private synchronized OnDemandService clearOnDemandButNotCloseItYet() {
            OnDemandService onDemandService = this.onDemand;
            this.onDemand = null;
            if (onDemandService != null) {
                onDemandService.removePropertyChangeListener(this);
            }
            return onDemandService;
        }

        private synchronized void attachTo(DXFeed dXFeed) {
            Iterator<DXFeedSubscription<Object>> it = this.regularSubscriptionsMap.values().iterator();
            while (it.hasNext()) {
                DXFeedSubscription<?> dXFeedSubscription = (DXFeedSubscription) it.next();
                this.feed.detachSubscriptionAndClear(dXFeedSubscription);
                dXFeed.attachSubscription(dXFeedSubscription);
            }
            Iterator<DXFeedSubscription<Object>> it2 = this.timeSeriesSubscriptionsMap.values().iterator();
            while (it2.hasNext()) {
                DXFeedSubscription<?> dXFeedSubscription2 = (DXFeedSubscription) it2.next();
                this.feed.detachSubscriptionAndClear(dXFeedSubscription2);
                dXFeed.attachSubscription(dXFeedSubscription2);
            }
            this.feed = dXFeed;
        }

        public void onDemandReplay(Date date, double d) {
            DataService.log.info("onDemandReplay(" + TimeFormat.DEFAULT.format(date) + ", " + d + ")");
            ensureOnDemand().replay(date, d);
        }

        public void onDemandSetSpeed(double d) {
            DataService.log.debug("onDemandSetSpeed(" + d + ")");
            OnDemandService onDemand = getOnDemand();
            if (onDemand != null) {
                onDemand.setSpeed(d);
            }
        }

        public void onDemandStopAndResume() throws InterruptedException {
            DataService.log.info("onDemandStopAndResume()");
            OnDemandService clearOnDemandButNotCloseItYet = clearOnDemandButNotCloseItYet();
            if (clearOnDemandButNotCloseItYet != null) {
                attachTo(DXFeedContext.INSTANCE.getFeed(this.filter));
                this.onDemandFeed.awaitTerminationAndCloseImpl();
                clearOnDemandButNotCloseItYet.getEndpoint().closeAndAwaitTermination();
            }
        }

        public void onDemandStopAndClear() {
            DataService.log.info("onDemandStopAndClear()");
            ensureOnDemand().stopAndClear();
        }

        private DXFeedSubscription<?> getOrCreateSubscription(Class<?> cls, boolean z) {
            Map<Class<?>, DXFeedSubscription<Object>> subscriptions = getSubscriptions(z);
            DXFeedSubscription<?> dXFeedSubscription = subscriptions.get(cls);
            if (dXFeedSubscription == null) {
                dXFeedSubscription = this.feed.createSubscription(cls);
                dXFeedSubscription.addEventListener(new Listener(cls, z));
                subscriptions.put(cls, dXFeedSubscription);
            }
            return dXFeedSubscription;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.cometd.bayeux.server.ServerSession.Extension
        public boolean rcv(ServerSession serverSession, ServerMessage.Mutable mutable) {
            if (mutable instanceof DataMessage) {
                this.stats.readEvents += ((DataMessage) mutable).getEvents().size();
            }
            this.stats.lastActiveTime = System.currentTimeMillis();
            this.stats.read++;
            return true;
        }

        @Override // org.cometd.bayeux.server.ServerSession.Extension
        public boolean rcvMeta(ServerSession serverSession, ServerMessage.Mutable mutable) {
            this.stats.readMeta++;
            return true;
        }

        @Override // org.cometd.bayeux.server.ServerSession.Extension
        public ServerMessage send(ServerSession serverSession, ServerMessage serverMessage) {
            if (serverMessage.getData() instanceof DataMessage) {
                this.stats.writeEvents += ((DataMessage) serverMessage.getData()).getEvents().size();
            }
            if (this.sessionImpl != null) {
                this.stats.regQueueSize(((ServerSessionImpl) this.remote).getQueue().size());
            }
            this.stats.lastActiveTime = System.currentTimeMillis();
            this.stats.write++;
            return serverMessage;
        }

        @Override // org.cometd.bayeux.server.ServerSession.Extension
        public boolean sendMeta(ServerSession serverSession, ServerMessage.Mutable mutable) {
            this.stats.writeMeta++;
            return true;
        }
    }

    @PostConstruct
    public void init() {
        DXFeedContext.INSTANCE.acquire();
        this.sharedEndpoint = DXFeedContext.INSTANCE.getEndpoint();
        this.sharedOnDemand = OnDemandService.getInstance(this.sharedEndpoint);
    }

    @PreDestroy
    public void destroy() {
        log.info("Closing");
        DXFeedContext.INSTANCE.release();
    }

    @Listener({"/service/sub"})
    public void sub(ServerSession serverSession, ServerMessage.Mutable mutable) {
        getOrCreateSession(serverSession).sub(mutable);
    }

    @Listener({"/service/onDemand"})
    public void onDemand(ServerSession serverSession, ServerMessage.Mutable mutable) {
        Map<String, Object> dataAsMap = mutable.getDataAsMap();
        String str = (String) dataAsMap.get("op");
        List<?> list = (List) dataAsMap.get("args");
        Method method = ONDEMAND_OPS.get(str);
        if (method == null) {
            throw new IllegalArgumentException("Unsupported ondemand operation: " + str);
        }
        try {
            invokeMethod(serverSession, method, list);
        } catch (Exception e) {
            throw new RuntimeException("Failed to invoke onDemand." + str + " with " + list, e);
        }
    }

    private void invokeMethod(ServerSession serverSession, Method method, List<?> list) throws Exception {
        Class<?>[] parameterTypes = method.getParameterTypes();
        Object[] objArr = new Object[parameterTypes.length];
        int i = 0;
        while (i < parameterTypes.length) {
            Class<?> cls = parameterTypes[i];
            Object obj = i < list.size() ? list.get(i) : null;
            Class<?> cls2 = obj == null ? null : obj.getClass();
            Object obj2 = obj;
            if (cls == Date.class) {
                if (cls2 != null && Number.class.isAssignableFrom(cls2)) {
                    obj2 = new Date(((Number) obj).longValue());
                } else if (cls2 == String.class) {
                    obj2 = TimeFormat.DEFAULT.parse((String) obj);
                } else if (cls2 != Date.class) {
                    throw new IllegalArgumentException("Arg #" + i + " cannot be coerced to Date");
                }
            } else if (cls != Double.TYPE) {
                continue;
            } else if (cls2 == String.class) {
                obj2 = new Double((String) obj);
            } else if (cls2 == Double.class) {
                continue;
            } else {
                if (cls2 == null || !Number.class.isAssignableFrom(cls2)) {
                    throw new IllegalArgumentException("Arg #" + i + " cannot be coerced to double");
                }
                obj2 = Double.valueOf(((Number) obj).doubleValue());
            }
            objArr[i] = obj2;
            i++;
        }
        method.invoke(getOrCreateSession(serverSession), objArr);
    }

    private SessionState getOrCreateSession(ServerSession serverSession) {
        QDFilter qDFilter = (QDFilter) serverSession.getAttribute(DXFeedContext.FILTER_PARAM);
        if (qDFilter == null) {
            qDFilter = QDFilter.ANYTHING;
        }
        SessionState sessionState = this.sessions.get(serverSession);
        if (sessionState != null && !qDFilter.toString().equals(sessionState.filter.toString())) {
            sessionState.removed(serverSession, false);
            sessionState = null;
        }
        if (sessionState != null) {
            return sessionState;
        }
        SessionState sessionState2 = new SessionState(serverSession, qDFilter);
        SessionState putIfAbsent = this.sessions.putIfAbsent(serverSession, sessionState2);
        if (putIfAbsent != null) {
            return putIfAbsent;
        }
        serverSession.addListener(sessionState2);
        return sessionState2;
    }

    static {
        for (Method method : SessionState.class.getDeclaredMethods()) {
            String name = method.getName();
            if (name.startsWith(ONDEMAND_NAME_PREFIX)) {
                String substring = name.substring(ONDEMAND_NAME_PREFIX.length());
                ONDEMAND_OPS.put(Character.toLowerCase(substring.charAt(0)) + substring.substring(1), method);
            }
        }
    }
}
