/*
 * Decompiled with CFR 0.152.
 */
package com.dxfeed.ondemand.impl.connector;

import com.devexperts.connector.proto.ApplicationConnectionFactory;
import com.devexperts.qd.qtp.AbstractConnectionHandler;
import com.devexperts.qd.qtp.AbstractMessageConnector;
import com.devexperts.qd.qtp.MessageConnectorState;
import com.devexperts.qd.qtp.ReconnectHelper;
import com.devexperts.qd.qtp.help.MessageConnectorProperty;
import com.devexperts.qd.qtp.help.MessageConnectorSummary;
import com.devexperts.util.LogUtil;
import com.devexperts.util.TimeFormat;
import com.devexperts.util.TimePeriod;
import com.dxfeed.api.impl.OnDemandConnectorMarker;
import com.dxfeed.ondemand.impl.MarketDataReplay;
import com.dxfeed.ondemand.impl.MarketDataToken;
import com.dxfeed.ondemand.impl.connector.OnDemandConnectorMBean;
import com.dxfeed.ondemand.impl.connector.ReplayConnectionHandler;
import java.util.Date;
import javax.management.AttributeChangeNotification;
import javax.management.ListenerNotFoundException;
import javax.management.MBeanNotificationInfo;
import javax.management.NotificationBroadcaster;
import javax.management.NotificationBroadcasterSupport;
import javax.management.NotificationFilter;
import javax.management.NotificationListener;

@MessageConnectorSummary(info="On-demand historical data replay.", addressFormat="ondemand:<address>")
public class OnDemandConnector
extends AbstractMessageConnector
implements OnDemandConnectorMBean,
NotificationBroadcaster,
OnDemandConnectorMarker {
    private final String address;
    private final ReconnectHelper reconnectHelper;
    private final MarketDataReplay replay = new MarketDataReplay();
    private volatile ReplayConnectionHandler handler;
    private Date time;
    private double speed = 1.0;
    private volatile TimePeriod tickPeriod = TimePeriod.valueOf((long)20L);
    private final NotificationBroadcasterSupport broadcaster = new NotificationBroadcasterSupport();
    private long notificationSequence = 0L;

    public OnDemandConnector(ApplicationConnectionFactory factory, String address) {
        super(factory);
        this.address = address;
        this.reconnectHelper = new ReconnectHelper(this.getReconnectDelay());
    }

    public synchronized void start() {
        if (this.isActive() || this.isClosed()) {
            return;
        }
        if (this.time == null) {
            return;
        }
        this.log.info("Starting OnDemandConnector to " + LogUtil.hideCredentials((Object)this.getAddress()) + " at " + TimeFormat.DEFAULT.format(this.time));
        this.reconnectHelper.setReconnectDelay(this.getReconnectDelay());
        this.handler = new ReplayConnectionHandler(this);
        this.handler.start();
        this.notifyMessageConnectorListeners();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected AbstractMessageConnector.Joinable stopImpl() {
        ReplayConnectionHandler handler;
        OnDemandConnector onDemandConnector = this;
        synchronized (onDemandConnector) {
            if (!this.isActive()) {
                return null;
            }
            handler = this.handler;
            this.handler = null;
        }
        if (handler != null) {
            this.log.info("Stopping OnDemandConnector");
            handler.close();
        }
        this.notifyMessageConnectorListeners();
        return handler;
    }

    public String getAddress() {
        return this.address;
    }

    public boolean isActive() {
        return this.handler != null;
    }

    public MessageConnectorState getState() {
        ReplayConnectionHandler handler = this.handler;
        return handler != null ? handler.getConnectionState() : MessageConnectorState.DISCONNECTED;
    }

    public int getConnectionCount() {
        return this.isActive() ? 1 : 0;
    }

    @Override
    public TimePeriod getTickPeriod() {
        return this.tickPeriod;
    }

    @Override
    @MessageConnectorProperty(value="Tick period")
    public void setTickPeriod(TimePeriod tickPeriod) {
        if (tickPeriod == null) {
            throw new NullPointerException();
        }
        this.tickPeriod = tickPeriod;
    }

    @Override
    public long getCacheLimit() {
        return this.replay.getCacheLimit();
    }

    @Override
    @MessageConnectorProperty(value="Cache limit size in bytes")
    public void setCacheLimit(long amount) {
        this.replay.setCacheLimit(amount);
    }

    @Override
    public long getFileCacheLimit() {
        return this.replay.getFileCacheLimit();
    }

    @Override
    @MessageConnectorProperty(value="File cache limit size in bytes")
    public void setFileCacheLimit(long amount) {
        this.replay.setFileCacheLimit(amount);
    }

    @Override
    public synchronized String getFileCachePath() {
        return this.replay.getFileCachePath();
    }

    @Override
    @MessageConnectorProperty(value="Cache file directory, empty by default (current dir)")
    public synchronized void setFileCachePath(String cacheFileDir) {
        this.replay.setFileCachePath(cacheFileDir);
    }

    @Override
    public synchronized Date getTime() {
        return this.time;
    }

    @Override
    @MessageConnectorProperty(value="Replay time")
    public synchronized void setTime(Date time) {
        this.updateTime(time);
        this.reconfigure();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void updateTime(Date newTime) {
        long sequence;
        Date oldTime;
        OnDemandConnector onDemandConnector = this;
        synchronized (onDemandConnector) {
            oldTime = this.time;
            this.time = newTime;
            sequence = ++this.notificationSequence;
        }
        this.broadcaster.sendNotification(new AttributeChangeNotification(this, sequence, System.currentTimeMillis(), "Time changed", "time", Date.class.getName(), oldTime, newTime));
    }

    @Override
    public synchronized double getSpeed() {
        return this.speed;
    }

    @Override
    @MessageConnectorProperty(value="Replay speed, 1.0 by default (real-time speed)")
    public synchronized void setSpeed(double speed) {
        this.speed = speed;
        if (this.handler != null) {
            this.handler.setSpeed(speed);
        }
    }

    @Override
    public void addNotificationListener(NotificationListener listener, NotificationFilter filter, Object handback) {
        this.broadcaster.addNotificationListener(listener, filter, handback);
    }

    @Override
    public void removeNotificationListener(NotificationListener listener) throws ListenerNotFoundException {
        this.broadcaster.removeNotificationListener(listener);
    }

    @Override
    public MBeanNotificationInfo[] getNotificationInfo() {
        return new MBeanNotificationInfo[]{new MBeanNotificationInfo(new String[]{"jmx.attribute.change"}, AttributeChangeNotification.class.getName(), "Attribute change notification")};
    }

    public void startImmediately() {
        this.reconnectHelper.reset();
        this.start();
    }

    ReconnectHelper getReconnectHelper() {
        return this.reconnectHelper;
    }

    MarketDataReplay getMarketDataReplay() {
        return this.replay;
    }

    MarketDataToken resolveToken() {
        return MarketDataToken.fromUserPassword(this.getUser(), this.getPassword(), this.address);
    }

    protected synchronized void handlerClosed(AbstractConnectionHandler handler) {
        if (handler != this.handler) {
            return;
        }
        this.handler = null;
        this.start();
    }
}

