package com.devexperts.qd.qtp.file;

import com.devexperts.logging.Logging;
import com.devexperts.qd.DataScheme;
import com.devexperts.qd.ng.RecordBuffer;
import com.devexperts.qd.ng.RecordMode;
import com.devexperts.qd.qtp.AbstractConnectionHandler;
import com.devexperts.qd.qtp.MessageAdapter;
import com.devexperts.qd.qtp.MessageConnectors;
import com.devexperts.qd.qtp.MessageListener;
import com.devexperts.qd.qtp.MessageProvider;
import com.devexperts.qd.stats.QDStats;

/* loaded from: input_file:WEB-INF/lib/qds-file.jar:com/devexperts/qd/qtp/file/FileWriterHandler.class */
public class FileWriterHandler extends AbstractConnectionHandler<TapeConnector> {
    private static final Logging log = Logging.getLogging((Class<?>) FileWriterHandler.class);
    private final FileWriterImpl writer;
    private final MessageAdapter adapter;
    private final State state;

    /* loaded from: input_file:WEB-INF/lib/qds-file.jar:com/devexperts/qd/qtp/file/FileWriterHandler$State.class */
    private static class State implements MessageListener {
        private volatile boolean messagesAreAvailable = true;
        private volatile boolean processed = false;

        State() {
        }

        @Override // com.devexperts.qd.qtp.MessageListener
        public void messagesAvailable(MessageProvider messageProvider) {
            if (this.messagesAreAvailable) {
                return;
            }
            notifyAvailableSync();
        }

        private synchronized void notifyAvailableSync() {
            if (this.messagesAreAvailable) {
                return;
            }
            this.messagesAreAvailable = true;
            this.processed = false;
            notifyAll();
        }

        synchronized void close() {
            this.messagesAreAvailable = true;
            this.processed = true;
            notifyAll();
        }

        synchronized void awaitProcessed() throws InterruptedException {
            while (!this.processed) {
                wait();
            }
        }

        synchronized void awaitAvailable() throws InterruptedException {
            while (!this.messagesAreAvailable) {
                wait();
            }
            this.messagesAreAvailable = false;
        }

        synchronized void processed(boolean z) {
            if (z) {
                this.messagesAreAvailable = true;
            }
            if (this.messagesAreAvailable) {
                return;
            }
            this.processed = true;
            notifyAll();
        }
    }

    public FileWriterHandler(TapeConnector tapeConnector) {
        super(tapeConnector);
        this.state = new State();
        this.adapter = MessageConnectors.retrieveMessageAdapterFactory(tapeConnector.getFactory()).createAdapter(tapeConnector.getStats().getOrCreate(QDStats.SType.CONNECTIONS));
        this.writer = new FileWriterImpl(tapeConnector.getAddress(), this.adapter.getScheme(), tapeConnector);
    }

    public void init() {
        this.adapter.setMessageListener(this.state);
        this.adapter.start();
        this.writer.open();
        subscribe();
    }

    private void subscribe() {
        RecordBuffer recordBuffer = RecordBuffer.getInstance(RecordMode.SUBSCRIPTION);
        DataScheme scheme = this.adapter.getScheme();
        for (int i = 0; i < scheme.getRecordCount(); i++) {
            recordBuffer.add(scheme.getRecord(i), scheme.getCodec().getWildcardCipher(), null);
        }
        this.adapter.processStreamAddSubscription(recordBuffer);
        recordBuffer.release();
    }

    @Override // com.devexperts.qd.qtp.QTPWorkerThread
    protected void doWork() throws InterruptedException {
        while (!isClosed()) {
            this.state.awaitAvailable();
            if (isClosed()) {
                return;
            }
            boolean z = true;
            try {
                z = this.adapter.retrieveMessages(this.writer);
                this.state.processed(z);
            } catch (Throwable th) {
                this.state.processed(z);
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void awaitProcessed() throws InterruptedException {
        this.state.awaitProcessed();
    }

    @Override // com.devexperts.qd.qtp.AbstractConnectionHandler
    protected void closeImpl(Throwable th) {
        this.writer.close();
        try {
            this.adapter.close();
        } catch (Throwable th2) {
            log.error("Failed to close adapter", th2);
        }
        this.state.close();
        if (th == null || (th instanceof RuntimeException) || (th instanceof Error)) {
            log.info("Writing stopped");
        } else {
            log.error("Writing stopped", th);
        }
    }
}
