/*
 * Decompiled with CFR 0.152.
 */
package com.devexperts.qd.qtp.file;

import com.devexperts.connector.proto.ApplicationConnectionFactory;
import com.devexperts.logging.Logging;
import com.devexperts.qd.DataScheme;
import com.devexperts.qd.SubscriptionIterator;
import com.devexperts.qd.ng.RecordBuffer;
import com.devexperts.qd.ng.RecordMode;
import com.devexperts.qd.qtp.AbstractConnectionHandler;
import com.devexperts.qd.qtp.AbstractMessageConnector;
import com.devexperts.qd.qtp.MessageAdapter;
import com.devexperts.qd.qtp.MessageConnectors;
import com.devexperts.qd.qtp.MessageListener;
import com.devexperts.qd.qtp.MessageProvider;
import com.devexperts.qd.qtp.MessageVisitor;
import com.devexperts.qd.qtp.ProtocolOption;
import com.devexperts.qd.qtp.file.FileWriterImpl;
import com.devexperts.qd.qtp.file.TapeConnector;
import com.devexperts.qd.stats.QDStats;

public class FileWriterHandler
extends AbstractConnectionHandler<TapeConnector> {
    private static final Logging log = Logging.getLogging(FileWriterHandler.class);
    private final FileWriterImpl writer;
    private final MessageAdapter adapter;
    private final State state = new State();

    public FileWriterHandler(TapeConnector connector) {
        super((AbstractMessageConnector)connector);
        MessageAdapter.Factory factory = MessageConnectors.retrieveMessageAdapterFactory((ApplicationConnectionFactory)connector.getFactory());
        this.adapter = factory.createAdapter(connector.getStats().getOrCreate(QDStats.SType.CONNECTIONS));
        this.setRemoteOptSet(this.adapter, ProtocolOption.parseProtocolOptions((String)connector.getOpt()));
        this.writer = new FileWriterImpl(connector.getAddress(), this.adapter.getScheme(), connector);
    }

    public void init() {
        this.adapter.setMessageListener((MessageListener)this.state);
        this.adapter.start();
        this.writer.open();
        this.subscribe();
    }

    private void subscribe() {
        RecordBuffer buf = RecordBuffer.getInstance((RecordMode)RecordMode.SUBSCRIPTION);
        DataScheme scheme = this.adapter.getScheme();
        for (int i = 0; i < scheme.getRecordCount(); ++i) {
            buf.add(scheme.getRecord(i), scheme.getCodec().getWildcardCipher(), null);
        }
        this.adapter.processStreamAddSubscription((SubscriptionIterator)buf);
        buf.release();
    }

    protected void doWork() throws InterruptedException {
        while (!this.isClosed()) {
            this.state.awaitAvailable();
            if (this.isClosed()) {
                return;
            }
            boolean hasMore = true;
            try {
                hasMore = this.adapter.retrieveMessages((MessageVisitor)this.writer);
                continue;
            }
            finally {
                this.state.processed(hasMore);
                continue;
            }
            break;
        }
        return;
    }

    void awaitProcessed() throws InterruptedException {
        this.state.awaitProcessed();
    }

    protected void closeImpl(Throwable reason) {
        this.writer.close();
        try {
            this.adapter.close();
        }
        catch (Throwable t) {
            log.error("Failed to close adapter", t);
        }
        this.state.close();
        if (reason == null || reason instanceof RuntimeException || reason instanceof Error) {
            log.info("Writing stopped");
        } else {
            log.error("Writing stopped", reason);
        }
    }

    private static class State
    implements MessageListener {
        private volatile boolean messagesAreAvailable = true;
        private volatile boolean processed = false;

        State() {
        }

        public void messagesAvailable(MessageProvider provider) {
            if (!this.messagesAreAvailable) {
                this.notifyAvailableSync();
            }
        }

        private synchronized void notifyAvailableSync() {
            if (!this.messagesAreAvailable) {
                this.messagesAreAvailable = true;
                this.processed = false;
                this.notifyAll();
            }
        }

        synchronized void close() {
            this.messagesAreAvailable = true;
            this.processed = true;
            this.notifyAll();
        }

        synchronized void awaitProcessed() throws InterruptedException {
            while (!this.processed) {
                this.wait();
            }
        }

        synchronized void awaitAvailable() throws InterruptedException {
            while (!this.messagesAreAvailable) {
                this.wait();
            }
            this.messagesAreAvailable = false;
        }

        synchronized void processed(boolean hasMore) {
            if (hasMore) {
                this.messagesAreAvailable = true;
            }
            if (!this.messagesAreAvailable) {
                this.processed = true;
                this.notifyAll();
            }
        }
    }
}

