/*
 * Decompiled with CFR 0.152.
 */
package com.devexperts.qd.qtp.file;

import com.devexperts.io.BufferedOutput;
import com.devexperts.io.StreamOutput;
import com.devexperts.qd.DataIterator;
import com.devexperts.qd.DataProvider;
import com.devexperts.qd.QDContract;
import com.devexperts.qd.ng.AbstractRecordProvider;
import com.devexperts.qd.ng.RecordCursor;
import com.devexperts.qd.ng.RecordMode;
import com.devexperts.qd.ng.RecordSink;
import com.devexperts.qd.ng.RecordSource;
import com.devexperts.qd.qtp.AbstractQTPComposer;
import com.devexperts.qd.qtp.HeartbeatPayload;
import com.devexperts.qd.qtp.MessageConsumerAdapter;
import com.devexperts.qd.qtp.MessageType;
import com.devexperts.qd.qtp.ProtocolDescriptor;
import com.devexperts.qd.qtp.RuntimeQTPException;
import com.devexperts.qd.qtp.file.MessageReader;
import java.io.IOException;
import java.io.OutputStream;

public class OutputStreamMessageConsumer {
    private final StreamOutput out = new StreamOutput();
    private final MessageReader reader;
    private final AbstractQTPComposer composer;
    private final Consumer consumer = new Consumer();
    private final Provider provider = new Provider();

    public OutputStreamMessageConsumer(OutputStream out, MessageReader reader, AbstractQTPComposer composer) {
        this.out.setOutput(out);
        this.reader = reader;
        this.composer = composer;
        this.composer.setOutput((BufferedOutput)this.out);
    }

    public void write(ProtocolDescriptor descriptor) throws InterruptedException, IOException {
        this.composer.visitDescribeProtocol(descriptor);
        this.reader.readInto(this.consumer);
        this.composer.composeEmptyHeartbeat();
        this.out.flush();
    }

    protected boolean acceptCursor(QDContract contract, RecordCursor cursor) {
        return true;
    }

    private class Provider
    extends AbstractRecordProvider {
        QDContract contract;
        RecordSource source;

        private Provider() {
        }

        public RecordMode getMode() {
            return this.source.getMode();
        }

        public boolean retrieve(RecordSink sink) {
            while (sink.hasCapacity()) {
                RecordCursor cursor;
                do {
                    if ((cursor = this.source.next()) != null) continue;
                    return false;
                } while (!OutputStreamMessageConsumer.this.acceptCursor(this.contract, cursor));
                sink.append(cursor);
            }
            return true;
        }
    }

    private class Consumer
    extends MessageConsumerAdapter {
        private Consumer() {
        }

        public void handleCorruptedStream() {
            super.handleCorruptedStream();
            OutputStreamMessageConsumer.this.reader.close();
        }

        public void handleCorruptedMessage(int messageTypeId) {
            super.handleCorruptedMessage(messageTypeId);
            OutputStreamMessageConsumer.this.reader.close();
        }

        public void handleUnknownMessage(int messageTypeId) {
            super.handleUnknownMessage(messageTypeId);
            OutputStreamMessageConsumer.this.reader.close();
        }

        public void processTimeProgressReport(long timeMillis) {
            OutputStreamMessageConsumer.this.composer.composeTimeProgressReport(timeMillis);
            try {
                OutputStreamMessageConsumer.this.out.flush();
            }
            catch (IOException e) {
                throw new RuntimeQTPException((Throwable)e);
            }
        }

        public void processHeartbeat(HeartbeatPayload heartbeatPayload) {
            OutputStreamMessageConsumer.this.composer.visitHeartbeat(heartbeatPayload);
        }

        protected void processData(DataIterator iterator, MessageType message) {
            ((OutputStreamMessageConsumer)OutputStreamMessageConsumer.this).provider.contract = message.getContract();
            ((OutputStreamMessageConsumer)OutputStreamMessageConsumer.this).provider.source = (RecordSource)iterator;
            while (OutputStreamMessageConsumer.this.composer.visitData((DataProvider)OutputStreamMessageConsumer.this.provider, message)) {
            }
        }
    }
}

