/*
 * Decompiled with CFR 0.152.
 */
package com.dxfeed.webservice.comet;

import com.devexperts.logging.Logging;
import com.devexperts.qd.QDFilter;
import com.devexperts.util.SystemProperties;
import com.devexperts.util.TimeFormat;
import com.dxfeed.api.DXEndpoint;
import com.dxfeed.api.DXFeed;
import com.dxfeed.api.DXFeedEventListener;
import com.dxfeed.api.DXFeedSubscription;
import com.dxfeed.api.impl.DXEndpointImpl;
import com.dxfeed.api.impl.DXFeedImpl;
import com.dxfeed.api.osub.TimeSeriesSubscriptionSymbol;
import com.dxfeed.event.EventType;
import com.dxfeed.event.TimeSeriesEvent;
import com.dxfeed.ondemand.OnDemandService;
import com.dxfeed.webservice.DXFeedContext;
import com.dxfeed.webservice.EventSymbolMap;
import com.dxfeed.webservice.comet.CometReflectionUtil;
import com.dxfeed.webservice.comet.DataMessage;
import com.dxfeed.webservice.comet.DelayableExecutor;
import com.dxfeed.webservice.comet.SessionStats;
import com.dxfeed.webservice.comet.WebSocketTransportExtension;
import java.beans.PropertyChangeEvent;
import java.beans.PropertyChangeListener;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.cometd.annotation.Listener;
import org.cometd.annotation.Service;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.Promise;
import org.cometd.bayeux.Session;
import org.cometd.bayeux.server.ServerMessage;
import org.cometd.bayeux.server.ServerSession;
import org.cometd.server.ServerSessionImpl;

@Service
public class DataService {
    private static final Logging log = Logging.getLogging(DataService.class);
    private static final String DATA_CHANNEL = "/service/data";
    private static final String TIME_SERIES_DATA_CHANNEL = "/service/timeSeriesData";
    private static final String STATE_CHANNEL = "/service/state";
    private static final int DEFAULT_BATCH_SIZE = 100;
    private int messageBatchSize = SystemProperties.getIntProperty(DataService.class, (String)"messageBatchSize", (int)100, (int)1, (int)10000);
    private static final String ONDEMAND_NAME_PREFIX = "onDemand";
    private static final Map<String, Method> ONDEMAND_OPS = new HashMap<String, Method>();
    private DXEndpointImpl sharedEndpoint;
    private OnDemandService sharedOnDemand;
    private final ConcurrentHashMap<ServerSession, SessionState> sessions = new ConcurrentHashMap();
    @org.cometd.annotation.Session
    private ServerSession server;

    @PostConstruct
    public void init() {
        WebSocketTransportExtension.setOverflowHandler(this::toggleMessageProcessingDelaying);
        DXFeedContext.INSTANCE.acquire();
        this.sharedEndpoint = (DXEndpointImpl)DXFeedContext.INSTANCE.getEndpoint();
        this.sharedOnDemand = OnDemandService.getInstance((DXEndpoint)this.sharedEndpoint);
    }

    @PreDestroy
    public void destroy() {
        log.info("Closing");
        DXFeedContext.INSTANCE.release();
    }

    @Listener(value={"/service/sub"})
    public void sub(ServerSession remote, ServerMessage.Mutable message) {
        this.getOrCreateSession(remote).sub(message);
    }

    @Listener(value={"/service/onDemand"})
    public void onDemand(ServerSession remote, ServerMessage.Mutable message) {
        Map map = message.getDataAsMap();
        String op = (String)map.get("op");
        List rawArgs = (List)map.get("args");
        Method method = ONDEMAND_OPS.get(op);
        if (method == null) {
            throw new IllegalArgumentException("Unsupported ondemand operation: " + op);
        }
        try {
            this.invokeMethod(remote, method, rawArgs);
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to invoke onDemand." + op + " with " + rawArgs, e);
        }
    }

    private void invokeMethod(ServerSession remote, Method method, List<?> rawArgs) throws Exception {
        Class<?>[] paramTypes = method.getParameterTypes();
        Object[] args = new Object[paramTypes.length];
        for (int i = 0; i < paramTypes.length; ++i) {
            Class<?> paramType = paramTypes[i];
            Object rawArg = i < rawArgs.size() ? (Object)rawArgs.get(i) : null;
            Class<?> rawType = rawArg == null ? null : rawArg.getClass();
            Object arg = rawArg;
            if (paramType == Date.class) {
                if (rawType != null && Number.class.isAssignableFrom(rawType)) {
                    arg = new Date(((Number)rawArg).longValue());
                } else if (rawType == String.class) {
                    arg = TimeFormat.DEFAULT.parse((String)rawArg);
                } else if (rawType != Date.class) {
                    throw new IllegalArgumentException("Arg #" + i + " cannot be coerced to Date");
                }
            } else if (paramType == Double.TYPE) {
                if (rawType == String.class) {
                    arg = new Double((String)rawArg);
                } else if (rawType != Double.class) {
                    if (rawType != null && Number.class.isAssignableFrom(rawType)) {
                        arg = ((Number)rawArg).doubleValue();
                    } else {
                        throw new IllegalArgumentException("Arg #" + i + " cannot be coerced to double");
                    }
                }
            }
            args[i] = arg;
        }
        method.invoke((Object)this.getOrCreateSession(remote), args);
    }

    private SessionState getOrCreateSession(ServerSession remote) {
        SessionState session;
        QDFilter filter = (QDFilter)remote.getAttribute(DXFeedContext.FILTER_PARAM);
        if (filter == null) {
            filter = QDFilter.ANYTHING;
        }
        if ((session = this.sessions.get(remote)) != null && !filter.toString().equals(session.filter.toString())) {
            session.removed(remote, false);
            session = null;
        }
        if (session != null) {
            return session;
        }
        session = new SessionState(remote, filter);
        SessionState result = this.sessions.putIfAbsent(remote, session);
        if (result != null) {
            return result;
        }
        remote.addListener((ServerSession.ServerSessionListener)session);
        return session;
    }

    private void toggleMessageProcessingDelaying(ServerSessionImpl session, boolean delayProcessing) {
        SessionState state = this.sessions.get(session);
        if (state != null) {
            state.setDelaySubscriptions(delayProcessing);
        } else {
            log.debug("SessionState not found for ServerSessionImpl " + session);
        }
    }

    static {
        for (Method method : SessionState.class.getDeclaredMethods()) {
            String name = method.getName();
            if (!name.startsWith(ONDEMAND_NAME_PREFIX)) continue;
            String op = name.substring(ONDEMAND_NAME_PREFIX.length());
            op = Character.toLowerCase(op.charAt(0)) + op.substring(1);
            ONDEMAND_OPS.put(op, method);
        }
    }

    private class SessionState
    implements ServerSession.RemoveListener,
    ServerSession.DeQueueListener,
    ServerSession.MaxQueueListener,
    ServerSession.Extension,
    PropertyChangeListener {
        private final ServerSession remote;
        private final ServerSessionImpl sessionImpl;
        private final QDFilter filter;
        private final Map<Class<?>, DXFeedSubscription<Object>> regularSubscriptionsMap = new HashMap();
        private final Map<Class<?>, DXFeedSubscription<Object>> timeSeriesSubscriptionsMap = new HashMap();
        private final EventSymbolMap symbolMap = new EventSymbolMap();
        private DXFeed feed;
        private OnDemandService onDemand;
        private DXFeedImpl onDemandFeed;
        private volatile boolean closed;
        private SessionStats stats = new SessionStats();

        SessionState(@Nonnull ServerSession remote, QDFilter filter) {
            log.info("Create session=" + remote.getId() + ", filter=" + filter);
            this.remote = remote;
            this.sessionImpl = remote instanceof ServerSessionImpl ? (ServerSessionImpl)remote : null;
            this.filter = filter;
            this.feed = DXFeedContext.INSTANCE.getFeed(filter);
            this.stats.sessionId = remote.getId();
            this.stats.numSessions = 1;
            this.stats.createTime = this.stats.lastActiveTime = System.currentTimeMillis();
            remote.setAttribute("sessionStats", (Object)this.stats);
            remote.setAttribute("tmpSessionStats", (Object)new SessionStats());
            remote.addExtension((ServerSession.Extension)this);
            this.deliverStateChange("replaySupported", DataService.this.sharedOnDemand.isReplaySupported());
        }

        private synchronized boolean makeClosedSync() {
            if (this.closed) {
                return false;
            }
            this.closed = true;
            return true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void removed(ServerSession remote, boolean timeout) {
            if (!this.makeClosedSync()) {
                return;
            }
            log.info("Close session=" + remote.getId() + ", timeout=" + timeout);
            OnDemandService onDemand = this.clearOnDemandButNotCloseItYet();
            if (onDemand != null) {
                this.onDemandFeed.closeImpl();
                onDemand.getEndpoint().close();
            }
            this.closeSubscriptions();
            remote.removeExtension((ServerSession.Extension)this);
            remote.removeListener((ServerSession.ServerSessionListener)this);
            if (!remote.getExtensions().isEmpty()) {
                log.warn("ServerSession still has extensions after removal: " + remote.getExtensions());
            }
            if (this.sessionImpl != null) {
                CometReflectionUtil.sessionImplCleanup(this.sessionImpl);
                if (!this.sessionImpl.getListeners().isEmpty()) {
                    log.warn("ServerSessionImpl still has listeners after removal: " + this.sessionImpl.getListeners());
                }
                Object object = this.sessionImpl.getLock();
                synchronized (object) {
                    this.sessionImpl.getQueue().clear();
                }
            }
            DataService.this.sessions.remove(remote);
        }

        private void applySubscriptionAction(Consumer<DXFeedSubscription<?>> action) {
            for (DXFeedSubscription<Object> sub : this.regularSubscriptionsMap.values()) {
                action.accept(sub);
            }
            for (DXFeedSubscription<Object> sub : this.timeSeriesSubscriptionsMap.values()) {
                action.accept(sub);
            }
        }

        private void closeSubscriptions() {
            this.applySubscriptionAction(DXFeedSubscription::close);
        }

        private void setDelaySubscriptions(boolean delaySubscriptions) {
            this.applySubscriptionAction(sub -> {
                Executor executor = sub.getExecutor();
                if (executor instanceof DelayableExecutor) {
                    DelayableExecutor de = (DelayableExecutor)executor;
                    de.setDelayProcessing(delaySubscriptions);
                } else {
                    log.warn("Not a DelayableExecutor " + executor + " on sub " + sub);
                }
            });
        }

        public synchronized void sub(ServerMessage.Mutable message) {
            Boolean reset;
            Map map = (Map)message.getData();
            if (this.closed) {
                log.warn("sub[session=" + this.remote.getId() + "] ignored closed session: " + map);
                return;
            }
            if (log.debugEnabled()) {
                log.debug("sub[session=" + this.remote.getId() + "]: " + map);
            }
            if ((reset = (Boolean)map.get("reset")) != null && reset.booleanValue()) {
                this.resetSub();
            }
            this.processSub(false, false, (Map)map.get("remove"));
            this.processSub(true, false, (Map)map.get("add"));
            this.processSub(false, true, (Map)map.get("removeTimeSeries"));
            this.processSub(true, true, (Map)map.get("addTimeSeries"));
            this.stats.regSubscription(this.regularSubscriptionsMap.values().stream().mapToInt(sub -> sub.getSymbols().size()).sum(), false);
            this.stats.regSubscription(this.timeSeriesSubscriptionsMap.values().stream().mapToInt(sub -> sub.getSymbols().size()).sum(), true);
        }

        private void resetSub() {
            this.closeSubscriptions();
            this.regularSubscriptionsMap.clear();
            this.timeSeriesSubscriptionsMap.clear();
        }

        private void processSub(boolean addSub, boolean timeSeries, Map<String, List<?>> subMap) {
            if (subMap == null) {
                return;
            }
            for (Map.Entry<String, List<?>> entry : subMap.entrySet()) {
                String typeName = entry.getKey();
                Class<? extends EventType<?>> eventType = DXFeedContext.INSTANCE.getEventTypes().get(typeName);
                if (eventType == null || timeSeries && !TimeSeriesEvent.class.isAssignableFrom(eventType)) continue;
                DXFeedSubscription<?> sub = this.getOrCreateSubscription(eventType, timeSeries);
                List symbols = entry.getValue().stream().map(o -> this.resolveSymbol(eventType, o, addSub, timeSeries)).filter(Objects::nonNull).collect(Collectors.toList());
                if (addSub) {
                    sub.addSymbols(symbols);
                    continue;
                }
                sub.removeSymbols(symbols);
                this.symbolMap.cleanupEventSymbolMapping(eventType, this.getSubscriptions(timeSeries).get(eventType));
            }
        }

        private Object resolveSymbol(Class<?> eventType, Object value, boolean addSub, boolean timeSeries) {
            long fromTime;
            String eventSymbol;
            if (timeSeries && addSub) {
                if (!(value instanceof Map)) {
                    return null;
                }
                Map objMap = (Map)value;
                Object eventSymbolObj = objMap.get("eventSymbol");
                Object fromTimeObj = objMap.get("fromTime");
                if (!(eventSymbolObj instanceof String) || !(fromTimeObj instanceof Number)) {
                    return null;
                }
                eventSymbol = (String)eventSymbolObj;
                fromTime = ((Number)fromTimeObj).longValue();
            } else {
                if (!(value instanceof String)) {
                    return null;
                }
                eventSymbol = (String)value;
                fromTime = 0L;
            }
            Object result = this.symbolMap.resolveEventSymbolMapping(eventType, eventSymbol);
            return timeSeries ? new TimeSeriesSubscriptionSymbol(result, fromTime) : result;
        }

        private Map<Class<?>, DXFeedSubscription<Object>> getSubscriptions(boolean timeSeries) {
            return timeSeries ? this.timeSeriesSubscriptionsMap : this.regularSubscriptionsMap;
        }

        @Override
        public void propertyChange(PropertyChangeEvent evt) {
            String propertyName = evt.getPropertyName();
            if ("time".equals(propertyName)) {
                this.deliverStateChange(propertyName, evt.getNewValue());
            }
        }

        private void deliverStateChange(String propertyName, Object value) {
            Map<String, Object> stateChange = Collections.singletonMap(propertyName, value);
            this.remote.deliver((Session)DataService.this.server, DataService.STATE_CHANNEL, stateChange, Promise.noop());
        }

        private synchronized OnDemandService ensureOnDemand() {
            if (this.onDemand == null) {
                DXEndpointImpl endpoint = (DXEndpointImpl)DXEndpoint.create((DXEndpoint.Role)DXEndpoint.Role.ON_DEMAND_FEED);
                this.onDemand = OnDemandService.getInstance((DXEndpoint)endpoint);
                this.onDemand.addPropertyChangeListener((PropertyChangeListener)this);
                this.onDemandFeed = new DXFeedImpl(endpoint, this.filter);
                this.attachTo((DXFeed)this.onDemandFeed);
            }
            return this.onDemand;
        }

        private synchronized OnDemandService getOnDemand() {
            return this.onDemand;
        }

        private synchronized OnDemandService clearOnDemandButNotCloseItYet() {
            OnDemandService onDemand = this.onDemand;
            this.onDemand = null;
            if (onDemand != null) {
                onDemand.removePropertyChangeListener((PropertyChangeListener)this);
            }
            return onDemand;
        }

        private synchronized void attachTo(DXFeed newFeed) {
            for (DXFeedSubscription<Object> sub : this.regularSubscriptionsMap.values()) {
                this.feed.detachSubscriptionAndClear(sub);
                newFeed.attachSubscription(sub);
            }
            for (DXFeedSubscription<Object> sub : this.timeSeriesSubscriptionsMap.values()) {
                this.feed.detachSubscriptionAndClear(sub);
                newFeed.attachSubscription(sub);
            }
            this.feed = newFeed;
        }

        public void onDemandReplay(Date time, double speed) {
            log.info("onDemandReplay(" + TimeFormat.DEFAULT.format(time) + ", " + speed + ")");
            this.ensureOnDemand().replay(time, speed);
        }

        public void onDemandSetSpeed(double speed) {
            log.debug("onDemandSetSpeed(" + speed + ")");
            OnDemandService onDemand = this.getOnDemand();
            if (onDemand != null) {
                onDemand.setSpeed(speed);
            }
        }

        public void onDemandStopAndResume() throws InterruptedException {
            log.info("onDemandStopAndResume()");
            OnDemandService onDemand = this.clearOnDemandButNotCloseItYet();
            if (onDemand != null) {
                this.attachTo(DXFeedContext.INSTANCE.getFeed(this.filter));
                this.onDemandFeed.awaitTerminationAndCloseImpl();
                onDemand.getEndpoint().closeAndAwaitTermination();
            }
        }

        public void onDemandStopAndClear() {
            log.info("onDemandStopAndClear()");
            this.ensureOnDemand().stopAndClear();
        }

        private DXFeedSubscription<?> getOrCreateSubscription(Class<?> eventType, boolean timeSeries) {
            Map<Class<?>, DXFeedSubscription<Object>> subscriptions = this.getSubscriptions(timeSeries);
            DXFeedSubscription sub = subscriptions.get(eventType);
            if (sub == null) {
                sub = this.feed.createSubscription(eventType);
                sub.addEventListener(new Listener(eventType, timeSeries));
                sub.setExecutor((Executor)new DelayableExecutor(() -> ((DXEndpointImpl)DataService.this.sharedEndpoint).getOrCreateExecutor()));
                subscriptions.put(eventType, (DXFeedSubscription<Object>)sub);
            }
            return sub;
        }

        public boolean rcv(ServerSession session, ServerMessage.Mutable message) {
            if (message instanceof DataMessage) {
                this.stats.readEvents += (long)((DataMessage)message).getEvents().size();
            }
            this.stats.lastActiveTime = System.currentTimeMillis();
            ++this.stats.read;
            return true;
        }

        public boolean rcvMeta(ServerSession session, ServerMessage.Mutable message) {
            ++this.stats.readMeta;
            return true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public ServerMessage send(ServerSession sender, ServerSession session, ServerMessage message) {
            if (message.getData() instanceof DataMessage) {
                this.stats.writeEvents += (long)((DataMessage)message.getData()).getEvents().size();
            }
            if (this.sessionImpl != null) {
                Object object = this.sessionImpl.getLock();
                synchronized (object) {
                    this.stats.regQueueSize(((ServerSessionImpl)this.remote).getQueue().size());
                }
            }
            this.stats.lastActiveTime = System.currentTimeMillis();
            ++this.stats.write;
            return message;
        }

        public boolean sendMeta(ServerSession sender, ServerSession session, ServerMessage.Mutable message) {
            ++this.stats.writeMeta;
            return true;
        }

        public boolean queueMaxed(ServerSession session, Queue<ServerMessage> queue, ServerSession sender, Message message) {
            if (session.isConnected()) {
                log.warn("session=" + this.remote.getId() + " closed due to outgoing queue overflow: " + queue.size());
                session.disconnect();
            }
            return false;
        }

        public void deQueue(ServerSession session, Queue<ServerMessage> queue) {
            this.stats.lastSendTime = System.currentTimeMillis();
        }

        private class Listener<T extends EventType<?>>
        implements DXFeedEventListener<T> {
            private final Class<T> eventType;
            private final boolean timeSeries;
            private boolean sendScheme = true;

            private Listener(Class<T> eventType, boolean timeSeries) {
                this.eventType = eventType;
                this.timeSeries = timeSeries;
            }

            public void eventsReceived(List<T> events) {
                if (SessionState.this.sessionImpl != null && SessionState.this.sessionImpl.isTerminated()) {
                    log.warn("Received a DXFeed event for terminated session " + SessionState.this.sessionImpl.getId());
                    SessionState.this.removed((ServerSession)SessionState.this.sessionImpl, false);
                    return;
                }
                int length = events.size();
                for (int i = 0; i < length; i += DataService.this.messageBatchSize) {
                    List<T> eventsBatch = events.subList(i, Math.min(length, i + DataService.this.messageBatchSize));
                    SessionState.this.remote.deliver((Session)DataService.this.server, this.timeSeries ? DataService.TIME_SERIES_DATA_CHANNEL : DataService.DATA_CHANNEL, (Object)new DataMessage(this.sendScheme, this.eventType, eventsBatch, SessionState.this.symbolMap), Promise.noop());
                    this.sendScheme = false;
                }
            }
        }
    }
}

