/*
 * Decompiled with CFR 0.152.
 */
package com.devexperts.qd.qtp.file;

import com.devexperts.io.BufferedInput;
import com.devexperts.io.BufferedInputPart;
import com.devexperts.io.Chunk;
import com.devexperts.io.ChunkedInput;
import com.devexperts.io.StreamCompression;
import com.devexperts.io.URLInputStream;
import com.devexperts.logging.Logging;
import com.devexperts.qd.DataIterator;
import com.devexperts.qd.DataScheme;
import com.devexperts.qd.QDFactory;
import com.devexperts.qd.SubscriptionIterator;
import com.devexperts.qd.ng.RecordCursor;
import com.devexperts.qd.ng.RecordSource;
import com.devexperts.qd.qtp.AbstractQTPParser;
import com.devexperts.qd.qtp.FileConstants;
import com.devexperts.qd.qtp.HeartbeatPayload;
import com.devexperts.qd.qtp.MessageConsumer;
import com.devexperts.qd.qtp.MessageConsumerAdapter;
import com.devexperts.qd.qtp.MessageType;
import com.devexperts.qd.qtp.ProtocolDescriptor;
import com.devexperts.qd.qtp.RawDataConsumer;
import com.devexperts.qd.qtp.fieldreplacer.FieldReplacersCache;
import com.devexperts.qd.qtp.file.BinaryFileQTPParser;
import com.devexperts.qd.qtp.file.FileFormat;
import com.devexperts.qd.qtp.file.FileReaderParams;
import com.devexperts.qd.qtp.file.FileUtils;
import com.devexperts.qd.qtp.file.MessageReader;
import com.devexperts.qd.qtp.file.TimestampedFile;
import com.devexperts.qd.qtp.file.TimestampedFilenameFilter;
import com.devexperts.qd.qtp.file.TimestampedPosition;
import com.devexperts.qd.qtp.file.TimestampsType;
import com.devexperts.qd.stats.QDStats;
import com.devexperts.qd.util.QDConfig;
import com.devexperts.qd.util.TimeSequenceUtil;
import com.devexperts.transport.stats.ConnectionStats;
import com.devexperts.util.InvalidFormatException;
import com.devexperts.util.LogUtil;
import com.devexperts.util.SystemProperties;
import com.devexperts.util.TimePeriod;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
import java.net.URLConnection;
import java.util.ArrayList;
import java.util.Locale;

public class FileReader
implements MessageReader {
    private static final Logging log = Logging.getLogging(FileReader.class);
    private static final long SLEEP_DURATION = TimePeriod.valueOf((String)SystemProperties.getProperty(FileReader.class, (String)"SleepDuration", (String)"1s")).getTime();
    private final ConnectionStats connectionStats;
    private final FileReaderParams params;
    private long startTime = Long.MIN_VALUE;
    private long delayTime = Long.MIN_VALUE;
    private final long stopTime;
    private final StreamCompression compression;
    private final TimestampsType timestampsType;
    private final boolean hasStart;
    private final boolean hasDelay;
    private final boolean hasStop;
    private final boolean hasSpeed;
    private final boolean hasMaxSpeed;
    private final boolean doNotTryToReadTimeFile;
    private final String containerExtension;
    private final String dataFileExtension;
    private final TimestampedFilenameFilter filesFilter;
    private volatile boolean closed;
    private TimestampsType currentTimestampsType;
    private long virtualTime0;
    private long wallTime0;
    private boolean hasTime0;
    private TimestampedFile[] filesList;
    private int nextFileId;
    private long lastFileTime = Long.MIN_VALUE;
    private boolean stopFlag;
    private volatile long delayActual;
    private String dataFileAddress;
    private String timeFileAddress;
    private InputStream dataIn;
    private BufferedReader timeIn;
    private DataScheme scheme;
    private MessageConsumerAdapter adapter;
    private long lastTime;
    private FileFormat format;
    private final ChunkedInput input = new ChunkedInput(FileConstants.CHUNK_POOL);
    private boolean hasBytesOnHold;
    private BufferedInputPart inputPart;
    private AbstractQTPParser parser;
    private Chunk chunk;
    private final QDStats stats;
    private final Consumer consumer = new Consumer();
    private final HeartbeatPayload heartbeatPayload = new HeartbeatPayload();

    public FileReader(String dataFilePath, ConnectionStats connectionStats, FileReaderParams params) {
        String dataFileExtension;
        URL url;
        this.connectionStats = connectionStats;
        this.params = params;
        this.startTime = params.getStartTime();
        this.stopTime = params.getStopTime();
        this.delayTime = params.getDelayTime();
        StreamCompression compression = params.getCompression();
        this.timestampsType = params.isIgnoreTime() ? TimestampsType.NONE : params.getTime();
        boolean doNotTryToReadTimeFile = false;
        this.hasStart = this.startTime != Long.MIN_VALUE;
        this.hasDelay = this.delayTime != Long.MIN_VALUE;
        this.hasStop = this.stopTime != Long.MIN_VALUE;
        this.hasSpeed = params.getSpeed() != 1.0;
        boolean bl = this.hasMaxSpeed = params.getSpeed() == Double.MAX_VALUE;
        if (this.hasStart && this.hasDelay) {
            throw new IllegalArgumentException("Cannot have both start and delay set");
        }
        if (this.hasSpeed && this.hasDelay) {
            throw new IllegalArgumentException("Cannot have both speed and delay set");
        }
        if (this.timestampsType != null && !this.timestampsType.isUsingTimeFile()) {
            doNotTryToReadTimeFile = true;
        }
        this.dataFileAddress = dataFilePath;
        if (this.dataFileAddress.startsWith("file:")) {
            this.dataFileAddress = this.dataFileAddress.substring("file:".length());
        }
        if ((url = FileUtils.addressToURL(dataFilePath)).getQuery() != null) {
            doNotTryToReadTimeFile = true;
        }
        String fileNameRest = url.getFile();
        if (compression == null && (compression = StreamCompression.detectCompressionByExtension((String)fileNameRest)) == StreamCompression.NONE) {
            compression = null;
        }
        String compressionExtension = "";
        if (compression != null && fileNameRest.endsWith(compression.getExtension())) {
            compressionExtension = compression.getExtension();
            fileNameRest = compression.stripExtension(fileNameRest);
        }
        String string = dataFileExtension = doNotTryToReadTimeFile ? "" : FileUtils.retrieveExtension(fileNameRest);
        if (dataFileExtension.equals(".time")) {
            doNotTryToReadTimeFile = true;
        }
        if (doNotTryToReadTimeFile && this.timestampsType != null && this.timestampsType.isUsingTimeFile()) {
            throw new IllegalArgumentException("Cannot read time file");
        }
        File addressFile = FileUtils.urlToFile(url);
        TimestampedFilenameFilter timestampedFilenameFilter = this.filesFilter = addressFile == null ? null : TimestampedFilenameFilter.create(addressFile, compressionExtension);
        if (this.filesFilter != null && this.timestampsType != null && this.timestampsType.isUsingTimeFile()) {
            this.filesFilter.requireTimeFile();
        }
        this.compression = compression;
        this.containerExtension = compressionExtension;
        this.dataFileExtension = dataFileExtension;
        this.doNotTryToReadTimeFile = doNotTryToReadTimeFile;
        this.stats = params.getStats().getOrCreate(QDStats.SType.CONNECTIONS).create(QDStats.SType.CONNECTION, "file=" + LogUtil.hideCredentials((Object)dataFilePath));
    }

    public static String parseParameters(String filePath, FileReaderParams params) {
        ArrayList props = new ArrayList();
        filePath = QDConfig.parseProperties((String)filePath, props);
        QDConfig.setProperties((Object)params, props);
        return filePath;
    }

    private boolean initFileList() throws InterruptedException {
        if (this.filesFilter == null) {
            return true;
        }
        if (this.hasDelay) {
            this.startTime = System.currentTimeMillis() - this.delayTime;
        }
        if (this.hasStop) {
            this.filesFilter.filterByStopTime(this.stopTime);
        }
        return this.rescanFileList(this.startTime, Long.MIN_VALUE);
    }

    private boolean rescanFileList(long rescanStartTime, long lastFileTime) throws InterruptedException {
        if (this.filesFilter == null) {
            return true;
        }
        this.filesFilter.filterByStartAndPreviousFileTime(rescanStartTime, lastFileTime);
        if (!this.waitAngGetFileList()) {
            return false;
        }
        this.nextFileId = 0;
        this.initNextAddress();
        return true;
    }

    private boolean waitAngGetFileList() throws InterruptedException {
        boolean logged = false;
        while (true) {
            this.filesList = this.filesFilter.listTimestampedFiles();
            if (this.filesList.length > 0) break;
            if (this.doNotWait()) {
                log.info("No matching files.");
                return false;
            }
            if (!logged) {
                log.info("No matching files. Waiting for files to appear...");
                logged = true;
            }
            Thread.sleep(SLEEP_DURATION);
        }
        if (logged) {
            log.info("Found files...");
        }
        return true;
    }

    private void initNextAddress() {
        assert (this.filesFilter != null);
        TimestampedFile timestampedFile = this.filesList[this.nextFileId];
        this.dataFileAddress = timestampedFile.address;
        this.lastFileTime = timestampedFile.time;
        this.filesFilter.filterByPreviousFileTime(timestampedFile.time);
        ++this.nextFileId;
        this.adapter.processTimeProgressReport(timestampedFile.time);
    }

    public void setScheme(DataScheme scheme) {
        this.scheme = scheme;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void readInto(MessageConsumerAdapter adapter) throws InterruptedException {
        this.adapter = adapter;
        if (!this.initFileList()) {
            return;
        }
        while (!this.isClosed()) {
            boolean openFilesSuccess = false;
            boolean processSuccess = false;
            try {
                openFilesSuccess = this.openFiles();
                if (openFilesSuccess) {
                    this.onConnected();
                    this.process();
                    processSuccess = true;
                }
            }
            catch (CorruptedFileException | IOException e) {
                log.error("Failed to read file", (Throwable)e);
            }
            finally {
                this.closeFiles();
            }
            if (this.isClosed()) break;
            if (this.filesFilter == null) {
                this.stopFlag = true;
            }
            if (this.stopFlag) {
                if (this.params.isCycle()) {
                    log.info("End of cycle. Starting from beginning");
                    this.stopFlag = false;
                    this.resetParserSessionAndClearInput();
                    this.resetTime0();
                    if (this.initFileList()) continue;
                    return;
                }
                return;
            }
            if (openFilesSuccess && processSuccess && this.isNextFileAvailable()) {
                this.initNextAddress();
                continue;
            }
            boolean hasNextFile = this.rescanFileList(this.lastFileTime + 1L, this.lastFileTime);
            if (hasNextFile || this.params.isCycle()) continue;
            return;
        }
    }

    private boolean isClosed() throws InterruptedException {
        if (Thread.interrupted()) {
            throw new InterruptedException();
        }
        return this.closed;
    }

    private void process() throws IOException, InterruptedException, CorruptedFileException {
        this.lastTime = Long.MIN_VALUE;
        TimestampedPosition next = TimestampedPosition.readFrom(this.timeIn);
        long position = 0L;
        boolean firstBlock = true;
        block0: while (!this.isClosed()) {
            if (!this.waitAndReadChunk()) {
                if (this.input.available() <= 0) break;
                log.error("File was not completely parsed, " + this.input.available() + " bytes remaining");
                this.consumer.handleCorruptedStream();
                break;
            }
            if (firstBlock) {
                this.createParserOnFirstChunk();
                firstBlock = false;
            }
            int remaining = this.chunk.getLength();
            this.input.addToInput(this.chunk, (Object)this);
            this.chunk = null;
            while (remaining > 0) {
                long nextPosition = next == null ? Long.MAX_VALUE : next.getPosition();
                int sendLen = (int)Math.min(nextPosition - position, (long)remaining);
                if (position < nextPosition) {
                    this.processChunkPart(remaining -= sendLen);
                    this.connectionStats.addReadBytes((long)sendLen);
                    position += (long)sendLen;
                }
                if (position < nextPosition) continue block0;
                if (this.timeIn != null) {
                    this.parser.setEventTimeSequence(TimeSequenceUtil.getTimeSequenceFromTimeMillis((long)next.getTime()));
                }
                if (!this.advanceTime(next.getTime())) {
                    return;
                }
                next = TimestampedPosition.readFrom(this.timeIn);
            }
        }
    }

    private boolean advanceTime(long nextTime) {
        try {
            this.initTime0(nextTime);
            if (this.hasStop && nextTime >= this.stopTime) {
                this.waitTillVirtualTime(this.stopTime);
                this.stopFlag = true;
                return false;
            }
            this.waitTillVirtualTime(nextTime);
            this.lastTime = nextTime;
            return true;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }

    private void waitTillVirtualTime(long nextTime) throws InterruptedException {
        long sleepTime;
        if (!this.hasTime0 || this.hasMaxSpeed) {
            return;
        }
        long waitTillTime = (long)((double)this.wallTime0 + (double)(nextTime - this.virtualTime0) / this.params.getSpeed());
        long curTime = System.currentTimeMillis();
        while ((sleepTime = waitTillTime - curTime) > 0L) {
            Thread.sleep(sleepTime);
            curTime = System.currentTimeMillis();
        }
        this.delayActual = curTime - nextTime;
    }

    private void initTime0(long nextTime) {
        if (this.hasTime0) {
            return;
        }
        this.wallTime0 = System.currentTimeMillis();
        if (this.hasDelay) {
            this.virtualTime0 = this.wallTime0 - this.delayTime;
        } else if (this.hasStart) {
            if (nextTime < this.startTime) {
                return;
            }
            this.virtualTime0 = this.startTime;
        } else {
            this.virtualTime0 = nextTime;
        }
        this.hasTime0 = true;
    }

    private void resetTime0() {
        this.hasTime0 = false;
        this.delayActual = this.delayTime == Long.MIN_VALUE ? 0L : this.delayTime;
    }

    private void processChunkPart(int bytesOnHold) {
        if (this.hasBytesOnHold) {
            this.inputPart.syncInputPosition();
        }
        if (bytesOnHold > 0) {
            if (this.inputPart == null) {
                this.inputPart = new BufferedInputPart();
            }
            this.input.mark();
            this.inputPart.setInput((BufferedInput)this.input, (long)(this.input.available() - bytesOnHold));
            this.parser.setInput((BufferedInput)this.inputPart);
            this.hasBytesOnHold = true;
        } else if (this.hasBytesOnHold) {
            this.input.unmark();
            this.parser.setInput((BufferedInput)this.input);
            this.hasBytesOnHold = false;
        }
        this.parser.parse((MessageConsumer)this.consumer);
    }

    boolean onCorruptedShallContinueWithNextFile() {
        return this.filesFilter != null || this.params.isCycle();
    }

    boolean shallProcessMessage() {
        return (this.currentTimestampsType == null || this.currentTimestampsType == TimestampsType.NONE || this.lastTime >= this.startTime) && !this.stopFlag;
    }

    public void processDescribeProtocolMessage(ProtocolDescriptor desc) {
        this.adapter.processDescribeProtocol(desc, true);
        String timeProperty = desc.getProperty("time");
        if (timeProperty != null) {
            try {
                this.currentTimestampsType = TimestampsType.valueOf(timeProperty.toUpperCase(Locale.US));
            }
            catch (IllegalArgumentException e) {
                log.error("Unrecognized timestamps type in time property: \"" + timeProperty + "\"");
            }
        }
    }

    void processHeartbeatMessage(HeartbeatPayload heartbeatPayload) {
        if ((this.currentTimestampsType == null || this.currentTimestampsType == TimestampsType.MESSAGE) && heartbeatPayload.hasTimeMillis()) {
            this.currentTimestampsType = TimestampsType.MESSAGE;
            long nextTime = heartbeatPayload.getTimeMillis();
            if (!this.advanceTime(nextTime)) {
                return;
            }
        }
        if (this.shallProcessMessage()) {
            if (heartbeatPayload.hasTimeMillis()) {
                this.heartbeatPayload.setTimeMillis(heartbeatPayload.getTimeMillis());
                this.parser.setEventTimeSequence(TimeSequenceUtil.getTimeSequenceFromTimeMillis((long)heartbeatPayload.getTimeMillis()));
            }
            this.adapter.processHeartbeat(heartbeatPayload);
        }
    }

    void processRecordSourceMessage(RecordSource source, MessageType message) {
        RecordCursor cursor = source.current();
        if (cursor == null) {
            return;
        }
        if (cursor.getEventTimeSequence() != 0L && (this.currentTimestampsType == null || this.currentTimestampsType == TimestampsType.FIELD)) {
            this.currentTimestampsType = TimestampsType.FIELD;
            if (!(!this.hasMaxSpeed || this.hasStop || this.hasStart && this.lastTime < this.startTime)) {
                this.adapter.processRecordSource(source, message);
            } else {
                this.processRecordSourceMessageByEventTime(source, message);
            }
        } else {
            if (!this.shallProcessMessage()) {
                return;
            }
            if (this.lastTime != Long.MIN_VALUE && this.lastTime != this.heartbeatPayload.getTimeMillis()) {
                this.heartbeatPayload.setTimeMillis(this.lastTime);
                this.adapter.processHeartbeat(this.heartbeatPayload);
            }
            this.adapter.processRecordSource(source, message);
        }
    }

    private void processRecordSourceMessageByEventTime(RecordSource source, MessageType message) {
        RecordCursor cursor = source.current();
        long position = source.getPosition();
        while (true) {
            long endPosition;
            long nextTimeSequence;
            long nextTime;
            if (!this.advanceTime(nextTime = TimeSequenceUtil.getTimeMillisFromTimeSequence((long)(nextTimeSequence = cursor.getEventTimeSequence())))) {
                return;
            }
            do {
                endPosition = source.getPosition();
            } while ((cursor = source.next()) != null && cursor.getEventTimeSequence() <= nextTimeSequence);
            if (this.shallProcessMessage()) {
                this.adapter.processRecordSource(source.newSource(position, endPosition), message);
            }
            if (endPosition == source.getLimit()) break;
            position = endPosition;
        }
    }

    private boolean waitAndReadChunk() throws IOException, InterruptedException {
        int len;
        if (this.chunk == null) {
            this.chunk = FileConstants.CHUNK_POOL.getChunk((Object)this);
        }
        boolean loggedWaitingMessage = false;
        while ((len = this.dataIn.read(this.chunk.getBytes(), this.chunk.getOffset(), this.chunk.getLength())) <= 0) {
            if (this.isClosed()) {
                return false;
            }
            if (this.isNextFileAvailable()) break;
            if (this.doNotWait()) {
                this.stopFlag = true;
                return false;
            }
            if (!loggedWaitingMessage) {
                log.info("Data file is over. Waiting for more data...");
                loggedWaitingMessage = true;
            }
            Thread.sleep(SLEEP_DURATION);
        }
        if (loggedWaitingMessage) {
            log.info("Reading more data...");
        }
        if (len <= 0) {
            return false;
        }
        this.chunk.setLength(len, (Object)this);
        return true;
    }

    private boolean doNotWait() {
        return this.filesFilter == null || this.params.isCycle() || this.hasStop || this.hasSpeed;
    }

    private boolean isNextFileAvailable() {
        if (this.filesFilter != null) {
            if (this.nextFileId < this.filesList.length) {
                return true;
            }
            this.filesList = this.filesFilter.listTimestampedFiles();
            if (this.filesList.length != 0) {
                this.nextFileId = 0;
                return true;
            }
        }
        return false;
    }

    private void createParserOnFirstChunk() {
        FileFormat prevFormat = this.format;
        FileFormat fileFormat = this.format = this.params.getFormat() != null ? this.params.getFormat() : FileFormat.detectFormat(this.chunk.getBytes());
        if (this.parser == null || this.format != prevFormat) {
            this.parser = this.createParser(this.format, this.scheme == null ? QDFactory.getDefaultScheme() : this.scheme);
            this.configureParser();
        } else {
            this.input.clear();
        }
        if (this.format.isBareBones()) {
            ProtocolDescriptor desc = ProtocolDescriptor.newSelfProtocolDescriptor((String)"tape");
            desc.addSend(desc.newMessageDescriptor(MessageType.STREAM_DATA));
            this.processDescribeProtocolMessage(desc);
        }
    }

    private void configureParser() {
        this.parser.setInput((BufferedInput)this.input);
        this.parser.setReadEventTimeSequence(this.timestampsType != TimestampsType.NONE);
        this.parser.setEventTimeSequence(0L);
        this.parser.readAs(this.params.getReadAs());
        this.parser.setStats(this.stats);
        this.configureFieldReplacers();
        if (this.params.isSchemeKnown()) {
            if (!(this.parser instanceof BinaryFileQTPParser)) {
                throw new InvalidFormatException("schemeKnown is supported only for binary format");
            }
            ((BinaryFileQTPParser)this.parser).setSchemeKnown(true);
        }
        if (this.params.getResyncOn() != null) {
            if (!(this.parser instanceof BinaryFileQTPParser)) {
                throw new InvalidFormatException("resyncOn is supported only for binary format");
            }
            ((BinaryFileQTPParser)this.parser).setResyncOn(this.params.getResyncOn());
        }
    }

    private void configureFieldReplacers() {
        if (this.params.getFieldReplacer() == null) {
            return;
        }
        FieldReplacersCache cache = FieldReplacersCache.valueOf((DataScheme)(this.scheme != null ? this.scheme : QDFactory.getDefaultScheme()), (String)this.params.getFieldReplacer());
        this.parser.setFieldReplacers(cache);
    }

    protected AbstractQTPParser createParser(FileFormat format, DataScheme scheme) {
        return format.createQTPParser(scheme);
    }

    private void resetParserSessionAndClearInput() {
        if (this.parser != null) {
            this.parser.resetSession();
            this.input.clear();
        }
    }

    private boolean openFiles() {
        this.currentTimestampsType = this.timestampsType;
        this.dataIn = this.tryOpenFile(false, false);
        if (this.dataIn == null) {
            return false;
        }
        if (!this.doNotTryToReadTimeFile) {
            InputStream in = this.tryOpenFile(true, this.timestampsType == null);
            if (in == null) {
                this.timeIn = null;
                return this.timestampsType == null;
            }
            this.currentTimestampsType = TimestampsType.TEXT;
            this.timeIn = new BufferedReader(new InputStreamReader(in));
        }
        return true;
    }

    protected void onConnected() {
    }

    private InputStream tryOpenFile(boolean openTimeFile, boolean ignoreFileNotFound) {
        String address = this.dataFileAddress;
        try {
            URL url = FileUtils.addressToURL(this.dataFileAddress);
            if (openTimeFile) {
                String timeFile = FileUtils.getTimeFilePath(url.getFile(), this.dataFileExtension, this.containerExtension);
                url = new URL(url.getProtocol(), url.getHost(), url.getPort(), timeFile);
                address = this.timeFileAddress = url.toString();
            }
            URLConnection con = URLInputStream.openConnection((URL)url, (String)this.params.getUser(), (String)this.params.getPassword());
            InputStream in = con.getInputStream();
            StreamCompression compression = this.compression;
            if (compression == null) {
                if (!in.markSupported()) {
                    in = new BufferedInputStream(in);
                }
                compression = StreamCompression.detectCompressionByHeader((InputStream)in);
            }
            in = compression.decompress(in);
            log.info("Reading " + (openTimeFile ? "time" : "data") + " from " + LogUtil.hideCredentials((Object)address) + (compression == StreamCompression.NONE ? "" : " with " + compression));
            return in;
        }
        catch (IOException e) {
            if (ignoreFileNotFound && e instanceof FileNotFoundException) {
                return null;
            }
            log.error("Failed to open " + LogUtil.hideCredentials((Object)address), (Throwable)e);
            return null;
        }
    }

    private void closeFiles() {
        FileUtils.tryClose(this.timeIn, this.timeFileAddress);
        this.timeIn = null;
        FileUtils.tryClose(this.dataIn, this.dataFileAddress);
        this.dataIn = null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        FileReader fileReader = this;
        synchronized (fileReader) {
            if (this.closed) {
                return;
            }
            this.closed = true;
        }
        QDStats stats = this.stats;
        if (stats != null) {
            stats.close();
        }
    }

    public long getDelayActual() {
        return this.delayActual;
    }

    static class CorruptedFileException
    extends RuntimeException {
        CorruptedFileException(String message) {
            super(message);
        }
    }

    private class Consumer
    extends MessageConsumerAdapter
    implements RawDataConsumer {
        private Consumer() {
        }

        public String getSymbol(char[] chars, int offset, int length) {
            return FileReader.this.adapter.getSymbol(chars, offset, length);
        }

        public void handleCorruptedStream() {
            if (FileReader.this.onCorruptedShallContinueWithNextFile()) {
                super.handleCorruptedStream();
                throw new CorruptedFileException("Corrupted QTP byte stream");
            }
            FileReader.this.adapter.handleCorruptedStream();
        }

        public void handleCorruptedMessage(int messageTypeId) {
            if (FileReader.this.onCorruptedShallContinueWithNextFile()) {
                super.handleCorruptedMessage(messageTypeId);
                throw new CorruptedFileException("Corrupted QTP message");
            }
            FileReader.this.adapter.handleCorruptedMessage(messageTypeId);
        }

        public void handleUnknownMessage(int messageTypeId) {
            if (FileReader.this.onCorruptedShallContinueWithNextFile()) {
                super.handleUnknownMessage(messageTypeId);
                throw new CorruptedFileException("Unknown QTP message");
            }
            FileReader.this.adapter.handleUnknownMessage(messageTypeId);
        }

        public void processDescribeProtocol(ProtocolDescriptor desc, boolean logDescriptor) {
            FileReader.this.processDescribeProtocolMessage(desc);
        }

        public void processHeartbeat(HeartbeatPayload heartbeatPayload) {
            FileReader.this.processHeartbeatMessage(heartbeatPayload);
        }

        public void processData(DataIterator iterator, MessageType message) {
            FileReader.this.processRecordSourceMessage((RecordSource)iterator, message);
        }

        public void processSubscription(SubscriptionIterator iterator, MessageType message) {
            FileReader.this.processRecordSourceMessage((RecordSource)iterator, message);
        }

        public void processOtherMessage(int messageId, BufferedInput data, int len) {
            if (FileReader.this.shallProcessMessage()) {
                FileReader.this.adapter.processOtherMessage(messageId, data, len);
            }
        }
    }
}

