/*
 * Decompiled with CFR 0.152.
 */
package com.devexperts.qd.qtp.file;

import com.devexperts.io.BufferedOutput;
import com.devexperts.io.ChunkList;
import com.devexperts.io.ChunkedOutput;
import com.devexperts.io.StreamCompression;
import com.devexperts.logging.Logging;
import com.devexperts.qd.DataProvider;
import com.devexperts.qd.DataScheme;
import com.devexperts.qd.SubscriptionProvider;
import com.devexperts.qd.ng.AbstractRecordProvider;
import com.devexperts.qd.ng.AbstractRecordSink;
import com.devexperts.qd.ng.RecordCursor;
import com.devexperts.qd.ng.RecordMode;
import com.devexperts.qd.ng.RecordProvider;
import com.devexperts.qd.ng.RecordSink;
import com.devexperts.qd.qtp.AbstractMessageVisitor;
import com.devexperts.qd.qtp.AbstractQTPComposer;
import com.devexperts.qd.qtp.FileConstants;
import com.devexperts.qd.qtp.HeartbeatPayload;
import com.devexperts.qd.qtp.MessageDescriptor;
import com.devexperts.qd.qtp.MessageType;
import com.devexperts.qd.qtp.ProtocolDescriptor;
import com.devexperts.qd.qtp.ProtocolOption;
import com.devexperts.qd.qtp.file.FileFormat;
import com.devexperts.qd.qtp.file.FileUtils;
import com.devexperts.qd.qtp.file.FileWriterParams;
import com.devexperts.qd.qtp.file.ParallelWriter;
import com.devexperts.qd.qtp.file.TimestampedFile;
import com.devexperts.qd.qtp.file.TimestampedFilenameFilter;
import com.devexperts.qd.qtp.file.TimestampedPosition;
import com.devexperts.qd.qtp.file.TimestampsType;
import com.devexperts.qd.util.QDConfig;
import com.devexperts.qd.util.TimeSequenceUtil;
import com.devexperts.util.InvalidFormatException;
import com.devexperts.util.LogUtil;
import com.devexperts.util.SystemProperties;
import com.devexperts.util.TimePeriod;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.nio.file.CopyOption;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
import java.util.Locale;
import java.util.Objects;
import javax.annotation.concurrent.GuardedBy;

public class FileWriterImpl
extends AbstractMessageVisitor
implements Closeable {
    private static final Logging log = Logging.getLogging(FileWriterImpl.class);
    private static final boolean NO_HEADER_AND_FOOTER = SystemProperties.getBooleanProperty(FileWriterImpl.class, (String)"noHeaderAndFooter", (boolean)false);
    private static final int TASK_QUEUE_SIZE = SystemProperties.getIntProperty(FileWriterImpl.class, (String)"taskQueueSize", (int)4);
    private static final boolean IS_POSIX = FileSystems.getDefault().supportedFileAttributeViews().contains("posix");
    private static final FileAttribute<?>[] POSIX_FS_FILE_ATTRIBUTES = new FileAttribute[]{PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rw-rw-rw-"))};
    private static final FileAttribute<?>[] OTHER_FS_FILE_ATTRIBUTES = new FileAttribute[0];
    private final DataScheme scheme;
    private final TimePeriod split;
    private final StreamCompression compression;
    private final FileFormat format;
    private final TimestampsType time;
    private final MessageType saveAs;
    private final long storageTime;
    private final long storageSize;
    private final ProtocolOption.Set optSet;
    private final String containerExtension;
    private final String dataFileExtension;
    private final boolean storageLimited;
    private final TimestampedFilenameFilter fileFilter;
    private final ProtocolDescriptor protocolDescriptor = ProtocolDescriptor.newSelfProtocolDescriptor((String)"tape");
    private FlushThread flushThread;
    private ParallelWriter dataWriter;
    private ParallelWriter timeWriter;
    private volatile BufferedOutput dataOut;
    private PrintWriter timeOut;
    private final ChunkedOutput output = new ChunkedOutput(FileConstants.CHUNK_POOL);
    private AbstractQTPComposer composer;
    private long position;
    private long lastTime;
    private long curTime;
    private long lastIncomingTimeMillis;
    private volatile long nextSplitTime;
    private String dataFilePath;
    private String currentDataFilePath;
    private String currentTimeFilePath;
    private Path tmpDirPath;
    private final HeartbeatPayload heartbeatPayload = new HeartbeatPayload();
    private final TimestampedSink timestampedSink = new TimestampedSink();
    private final TimestampedProvider timestampedProvider = new TimestampedProvider();

    public static FileWriterImpl open(String dataFilePathWithParams, DataScheme scheme) {
        FileWriterParams.Default params = new FileWriterParams.Default();
        String dataFilePath = FileWriterImpl.parseParameters(dataFilePathWithParams, params);
        return new FileWriterImpl(dataFilePath, scheme, params).open();
    }

    private static String parseParameters(String dataFilePathWithParams, FileWriterParams params) {
        ArrayList props = new ArrayList();
        dataFilePathWithParams = QDConfig.parseProperties((String)dataFilePathWithParams, props);
        QDConfig.setProperties((Object)params, props);
        return dataFilePathWithParams;
    }

    public FileWriterImpl(String dataFilePath, DataScheme scheme, FileWriterParams params) {
        String trimTmpPath;
        this.scheme = scheme;
        this.dataFilePath = dataFilePath;
        if (params.getTmpDir() != null && !(trimTmpPath = params.getTmpDir().trim()).isEmpty()) {
            this.tmpDirPath = Paths.get(trimTmpPath, new String[0]);
        }
        this.split = params.getSplit();
        this.format = FileWriterImpl.fromNullable(params.getFormat(), FileFormat.BINARY);
        this.time = FileWriterImpl.fromNullable(params.getTime(), this.format.getTimestampsType());
        if (this.time != TimestampsType.NONE) {
            this.protocolDescriptor.setProperty("time", this.time.toString().toLowerCase(Locale.US));
        }
        this.saveAs = params.getSaveAs();
        this.storageSize = params.getStorageSize();
        this.storageTime = params.getStorageTime().getTime();
        this.optSet = FileWriterImpl.fromNullable(ProtocolOption.parseProtocolOptions((String)params.getOpt()), ProtocolOption.SUPPORTED_SET);
        this.protocolDescriptor.setProperty("opt", this.optSet.isEmpty() ? null : params.getOpt());
        MessageType saveAs = params.getSaveAs();
        if (saveAs != null) {
            this.protocolDescriptor.addSend(this.protocolDescriptor.newMessageDescriptor(saveAs));
        }
        boolean bl = this.storageLimited = this.storageTime != Long.MAX_VALUE || this.storageSize != Long.MAX_VALUE;
        if (this.storageLimited && this.split == null) {
            throw new InvalidFormatException("Storage time or size can be limited only in split mode");
        }
        this.compression = FileWriterImpl.fromNullable(params.getCompression(), StreamCompression.detectCompressionByExtension((String)dataFilePath));
        this.containerExtension = dataFilePath.endsWith(this.compression.getExtension()) ? this.compression.getExtension() : "";
        this.dataFileExtension = FileUtils.retrieveExtension(this.compression.stripExtension(dataFilePath));
        if (this.time.isUsingTimeFile() && this.dataFileExtension.equals(".time")) {
            throw new InvalidFormatException("File extension must differ from \".time\" when timestamps are written");
        }
        if (this.split != null) {
            this.fileFilter = TimestampedFilenameFilter.create(new File(dataFilePath), this.containerExtension);
            if (this.fileFilter == null) {
                throw new InvalidFormatException("There must be timestamp marker '~' in file name when using \"split\" option");
            }
            if (this.time.isUsingTimeFile()) {
                this.fileFilter.requireTimeFile();
            }
            log.info("Create FileWriter which writes tape to " + LogUtil.hideCredentials((Object)dataFilePath) + (this.time.isUsingTimeFile() ? "/.time" : "") + ", where \"" + "~" + "\" is replaced by current date and time. Splitting files with interval " + this.split + (Objects.nonNull(params.getTmpDir()) ? ". Use tmp dir: " + params.getTmpDir() : ""));
        } else {
            this.nextSplitTime = Long.MAX_VALUE;
            this.fileFilter = null;
            if (new File(dataFilePath).getName().contains("~")) {
                throw new InvalidFormatException("\"split\" option must be used when file named contains timestamp marker '~'");
            }
        }
    }

    private static <T> T fromNullable(T value, T defaultValue) {
        return value != null ? value : defaultValue;
    }

    public synchronized void addSendMessageType(MessageType messageType) {
        if (this.saveAs != null) {
            return;
        }
        this.protocolDescriptor.addSend(this.protocolDescriptor.newMessageDescriptor(messageType));
    }

    public synchronized void visitDescribeProtocol(ProtocolDescriptor descriptor) {
        if (this.saveAs != null) {
            return;
        }
        for (MessageDescriptor md : descriptor.getSendMessages()) {
            this.protocolDescriptor.addSend(this.protocolDescriptor.newMessageDescriptor(md.getMessageType()));
        }
    }

    public synchronized FileWriterImpl open() throws InvalidFormatException {
        this.close();
        this.composer = this.format.createQTPComposer(this.scheme);
        this.composer.setOutput((BufferedOutput)this.output);
        this.composer.setWriteEventTimeSequence(this.time == TimestampsType.FIELD);
        this.composer.setWriteHeartbeat(this.time == TimestampsType.MESSAGE);
        this.composer.setOptSet(this.optSet);
        this.flushThread = new FlushThread(LogUtil.hideCredentials((Object)this.currentDataFilePath));
        this.flushThread.start();
        this.dataWriter = new ParallelWriter("Write-Data-" + LogUtil.hideCredentials((Object)this.currentDataFilePath), TASK_QUEUE_SIZE);
        this.dataWriter.start();
        if (this.time.isUsingTimeFile()) {
            this.timeWriter = new ParallelWriter("Write-Time-" + LogUtil.hideCredentials((Object)this.currentDataFilePath), TASK_QUEUE_SIZE);
            this.timeWriter.start();
        }
        return this;
    }

    private String getTimeFilePath(String dataFilePath) {
        return FileUtils.getTimeFilePath(dataFilePath, this.dataFileExtension, this.containerExtension);
    }

    private boolean needNewSplitFile(long curTime) {
        return curTime >= this.nextSplitTime;
    }

    private void reopenFilesIfNeeded() throws IOException {
        if (this.dataOut == null) {
            if (this.split != null) {
                this.nextSplitTime = (this.curTime / this.split.getTime() + 1L) * this.split.getTime();
                this.reopenFiles(this.fileFilter.getPathForTime(this.curTime));
            } else {
                this.reopenFiles(this.dataFilePath);
            }
        } else if (this.needNewSplitFile(this.curTime)) {
            this.nextSplitTime = (this.curTime / this.split.getTime() + 1L) * this.split.getTime();
            this.reopenFiles(this.fileFilter.getPathForTime(this.nextSplitTime - this.split.getTime()));
        }
    }

    private synchronized void reopenFiles(String dataFilePath) throws IOException {
        Runnable dataCloseHandler;
        this.closeCurrentFiles(this.nextSplitTime);
        this.position = 0L;
        this.lastTime = 0L;
        this.dataFilePath = dataFilePath;
        if (this.tmpDirPath != null) {
            FileUtils.checkOrCreateDirectory(this.tmpDirPath);
            this.currentDataFilePath = this.createTmpFile(this.tmpDirPath, dataFilePath);
            dataCloseHandler = this.getCloseHandler(this.currentDataFilePath, dataFilePath, this.storageLimited);
        } else {
            this.currentDataFilePath = dataFilePath;
            dataCloseHandler = this.storageLimited ? this::deleteOldFiles : null;
        }
        log.info("Writing tape data to " + LogUtil.hideCredentials((Object)this.currentDataFilePath) + (this.compression == StreamCompression.NONE ? "" : " with " + this.compression));
        this.dataOut = this.reuseOutputStream(this.currentDataFilePath, this.dataWriter, Paths.get(dataFilePath, new String[0]).getFileName().toString(), dataCloseHandler);
        if (this.time.isUsingTimeFile()) {
            Runnable timeCloseHandler;
            String timeFilePath = this.getTimeFilePath(dataFilePath);
            if (this.tmpDirPath != null) {
                this.currentTimeFilePath = this.createTmpFile(this.tmpDirPath, timeFilePath);
                timeCloseHandler = this.getCloseHandler(this.currentTimeFilePath, timeFilePath, false);
            } else {
                this.currentTimeFilePath = timeFilePath;
                timeCloseHandler = null;
            }
            log.info("Writing tape time to " + LogUtil.hideCredentials((Object)this.currentTimeFilePath) + (this.compression == StreamCompression.NONE ? "" : " with " + this.compression));
            this.timeOut = new PrintWriter((OutputStream)this.reuseOutputStream(this.currentTimeFilePath, this.timeWriter, Paths.get(timeFilePath, new String[0]).getFileName().toString(), timeCloseHandler));
        } else {
            this.timeOut = null;
        }
        this.writeHeader();
    }

    private Runnable getCloseHandler(String sourcePath, String destinationPath, boolean deleteOldFiles) {
        return () -> {
            this.moveFile(sourcePath, destinationPath);
            if (deleteOldFiles) {
                this.deleteOldFiles();
            }
        };
    }

    private String createTmpFile(Path dir, String originFile) throws IOException {
        String filName = Paths.get(originFile, new String[0]).getFileName().toString();
        FileAttribute<?>[] fileAttributes = IS_POSIX && dir.getFileSystem() == FileSystems.getDefault() ? POSIX_FS_FILE_ATTRIBUTES : OTHER_FS_FILE_ATTRIBUTES;
        Path path = Files.createTempFile(dir, filName, null, fileAttributes);
        return path.toString();
    }

    private BufferedOutput reuseOutputStream(String path, ParallelWriter writeThread, String compressedName, Runnable closeHandler) {
        return writeThread.open(() -> {
            Path currentFilePath = Paths.get(path, new String[0]);
            Path parentDirectory = currentFilePath.getParent();
            if (parentDirectory != null) {
                FileUtils.checkOrCreateDirectory(parentDirectory);
            }
            return this.compression.compress(Files.newOutputStream(currentFilePath, new OpenOption[0]), this.compression.stripExtension(compressedName));
        }, closeHandler);
    }

    private void deleteOldFiles() {
        int k;
        TimestampedFile[] files = this.fileFilter.listTimestampedFiles();
        long allowedTime = this.storageTime == Long.MAX_VALUE ? Long.MIN_VALUE : System.currentTimeMillis() - this.storageTime;
        long totalSize = 0L;
        for (k = files.length - 1; k > 0 && files[k].time >= allowedTime; --k) {
            File dataFile = files[k].file;
            totalSize += dataFile.length();
            if (this.time.isUsingTimeFile()) {
                totalSize += new File(this.getTimeFilePath(dataFile.getPath())).length();
            }
            if (totalSize > this.storageSize) break;
        }
        for (int i = 0; i < k; ++i) {
            File dataFile = files[i].file;
            if (this.time.isUsingTimeFile()) {
                this.tryDelete(new File(this.getTimeFilePath(dataFile.getPath())));
            }
            this.tryDelete(dataFile);
        }
    }

    private void tryDelete(File file) {
        if (!file.exists()) {
            return;
        }
        log.info("Deleting file " + LogUtil.hideCredentials((Object)file));
        if (!file.delete()) {
            log.warn("Failed to delete " + LogUtil.hideCredentials((Object)file));
        }
    }

    @Override
    public synchronized void close() {
        boolean initialized = this.flushThread != null;
        try {
            this.closeCurrentFiles(this.nextSplitTime);
        }
        finally {
            FileUtils.tryClose(this.flushThread, null);
            this.flushThread = null;
            FileUtils.tryClose(this.dataWriter, this.currentDataFilePath);
            this.dataWriter = null;
            FileUtils.tryClose(this.timeWriter, this.currentTimeFilePath);
            this.timeWriter = null;
            if (initialized && this.storageLimited) {
                this.deleteOldFiles();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void closeCurrentFiles(long expectedNextSplitTime) {
        if (this.dataOut == null) {
            return;
        }
        if (expectedNextSplitTime != this.nextSplitTime) {
            return;
        }
        try {
            this.writeFooter();
        }
        catch (IOException e) {
            log.error("Failed to write tape file", (Throwable)e);
        }
        finally {
            FileUtils.tryClose((Closeable)this.dataOut, this.currentDataFilePath);
            this.dataOut = null;
            FileUtils.tryClose(this.timeOut, this.currentTimeFilePath);
            this.timeOut = null;
        }
    }

    private void moveFile(String sourceFile, String destinationFile) {
        Path destinationPath;
        if (sourceFile == null || destinationFile == null) {
            return;
        }
        Path sourcePath = Paths.get(sourceFile, new String[0]);
        if (sourcePath.equals(destinationPath = Paths.get(destinationFile, new String[0]))) {
            return;
        }
        log.info("Move source file: " + sourcePath + " to destination file: " + destinationPath);
        try {
            Path parentFolder = destinationPath.getParent();
            if (parentFolder != null) {
                FileUtils.checkOrCreateDirectory(parentFolder);
            }
            FileUtils.tryAtomicFileMove(sourcePath, destinationPath, new CopyOption[0]);
        }
        catch (IOException e) {
            log.error("Failed to create directory or to move file", (Throwable)e);
        }
    }

    @GuardedBy(value="this")
    private void writeHeader() throws IOException {
        if (this.format.isBareBones() || NO_HEADER_AND_FOOTER) {
            return;
        }
        this.composer.composeDescribeProtocol(this.protocolDescriptor);
        this.writeDataWithoutTimestamp();
    }

    @GuardedBy(value="this")
    private void writeFooter() throws IOException {
        if (this.format.isBareBones() || NO_HEADER_AND_FOOTER) {
            return;
        }
        this.composer.composeEmptyHeartbeat();
        this.writeDataWithoutTimestamp();
    }

    @GuardedBy(value="this")
    private void writeDataWithoutTimestamp() throws IOException {
        ChunkList chunks = this.output.getOutput((Object)this);
        if (chunks != null) {
            this.position += chunks.getTotalLength();
            this.dataOut.writeAllFromChunkList(chunks, (Object)this);
        }
    }

    private synchronized void writeData() {
        ChunkList chunks = this.output.getOutput((Object)this);
        if (chunks == null) {
            return;
        }
        try {
            this.reopenFilesIfNeeded();
            if (this.time.isUsingTimeFile() && this.curTime != this.lastTime) {
                new TimestampedPosition(this.curTime, this.position).writeTo(this.timeOut, this.time);
                this.lastTime = this.curTime;
            }
            this.position += chunks.getTotalLength();
            this.dataOut.writeAllFromChunkList(chunks, (Object)this);
        }
        catch (IOException e) {
            log.error("Failed to write tape file", (Throwable)e);
        }
    }

    synchronized void flush() {
        if (this.dataOut == null) {
            return;
        }
        try {
            this.dataOut.flush();
            if (this.timeOut != null) {
                this.timeOut.flush();
            }
        }
        catch (IOException e) {
            log.error("Failed to write tape file", (Throwable)e);
        }
    }

    public synchronized void visitHeartbeat(HeartbeatPayload heartbeatPayload) {
        if (heartbeatPayload.hasTimeMillis()) {
            this.lastIncomingTimeMillis = heartbeatPayload.getTimeMillis();
        }
        if (this.time.isUsingEmbeddedTime()) {
            this.heartbeatPayload.updateFrom(heartbeatPayload);
            this.updateCurTime();
            if (!this.heartbeatPayload.isEmpty()) {
                this.composer.visitHeartbeat(this.heartbeatPayload);
            }
        }
    }

    public synchronized boolean visitData(DataProvider provider, MessageType message) {
        MessageType saveMessage;
        MessageType messageType = saveMessage = this.saveAs != null && this.saveAs.isData() ? this.saveAs : message;
        if (this.isTimestampedDataProvider(provider)) {
            return this.visitTimestampedData((RecordProvider)provider, saveMessage);
        }
        return this.visitRegularData(provider, saveMessage);
    }

    private boolean isTimestampedDataProvider(DataProvider provider) {
        return provider instanceof RecordProvider && ((RecordProvider)provider).getMode().hasEventTimeSequence();
    }

    private void updateCurTime() {
        this.curTime = this.lastIncomingTimeMillis;
        if (this.curTime == 0L) {
            this.curTime = System.currentTimeMillis();
        }
        if (this.needNewSplitFile(this.curTime)) {
            this.composer.resetSession();
        }
        if (this.time.isUsingEmbeddedTime() && this.curTime != this.lastTime) {
            this.heartbeatPayload.setTimeMillis(this.curTime);
            this.composer.visitHeartbeat(this.heartbeatPayload);
            this.heartbeatPayload.clear();
            this.lastTime = this.curTime;
        }
    }

    private boolean visitTimestampedData(RecordProvider provider, MessageType saveMessage) {
        this.timestampedProvider.provider = provider;
        this.timestampedSink.saveMessage = saveMessage;
        this.timestampedSink.first = true;
        boolean result = this.composer.visitData((DataProvider)this.timestampedProvider, saveMessage);
        this.writeData();
        return result;
    }

    private boolean visitRegularData(DataProvider provider, MessageType saveMessage) {
        this.updateCurTime();
        boolean result = this.composer.visitData(provider, saveMessage);
        this.writeData();
        return result;
    }

    public synchronized boolean visitSubscription(SubscriptionProvider provider, MessageType message) {
        this.updateCurTime();
        MessageType saveMessage = this.saveAs != null && this.saveAs.isSubscription() ? this.saveAs : message;
        boolean result = this.composer.visitSubscription(provider, saveMessage);
        this.writeData();
        return result;
    }

    public synchronized String toString() {
        return "tape data file " + LogUtil.hideCredentials((Object)this.dataFilePath);
    }

    private class FlushThread
    extends Thread
    implements Closeable {
        private volatile boolean closed;

        FlushThread(String dataFilePath) {
            super("FileWriter-Flush-" + dataFilePath);
        }

        @Override
        public void close() throws IOException {
            this.closed = true;
            this.interrupt();
        }

        @Override
        public void run() {
            try {
                long lastNextSplitTime = Long.MAX_VALUE;
                int rounds = 0;
                while (!this.closed) {
                    FileWriterImpl.this.flush();
                    if (FileWriterImpl.this.split != null && FileWriterImpl.this.dataOut != null) {
                        long curNextSplitTime = FileWriterImpl.this.nextSplitTime;
                        if (lastNextSplitTime != curNextSplitTime) {
                            lastNextSplitTime = curNextSplitTime;
                            rounds = 0;
                        } else {
                            ++rounds;
                        }
                        if ((long)rounds * FileConstants.MAX_BUFFER_TIME >= FileConstants.MAX_OPEN_FACTOR * FileWriterImpl.this.split.getTime()) {
                            FileWriterImpl.this.closeCurrentFiles(curNextSplitTime);
                        }
                    }
                    Thread.sleep(FileConstants.MAX_BUFFER_TIME);
                }
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    private class TimestampedProvider
    extends AbstractRecordProvider {
        RecordProvider provider;

        private TimestampedProvider() {
        }

        public RecordMode getMode() {
            return this.provider.getMode();
        }

        public boolean retrieve(RecordSink sink) {
            ((FileWriterImpl)FileWriterImpl.this).timestampedSink.sink = sink;
            return this.provider.retrieve((RecordSink)FileWriterImpl.this.timestampedSink);
        }
    }

    private class TimestampedSink
    extends AbstractRecordSink {
        MessageType saveMessage;
        RecordSink sink;
        private long timeSequenceCache;
        private long timeMillisCache;
        boolean first;

        private TimestampedSink() {
        }

        public boolean hasCapacity() {
            return this.sink.hasCapacity();
        }

        public void append(RecordCursor cursor) {
            long curEventTimeSequence = cursor.getEventTimeSequence();
            if (this.timeSequenceCache != curEventTimeSequence) {
                this.timeSequenceCache = curEventTimeSequence;
                this.timeMillisCache = TimeSequenceUtil.getTimeMillisFromTimeSequence((long)curEventTimeSequence);
            }
            if (curEventTimeSequence != 0L && FileWriterImpl.this.lastIncomingTimeMillis != this.timeMillisCache) {
                FileWriterImpl.this.lastIncomingTimeMillis = this.timeMillisCache;
                if (FileWriterImpl.this.needNewSplitFile(FileWriterImpl.this.lastIncomingTimeMillis) || FileWriterImpl.this.time.isSlipMessageOnTime() && FileWriterImpl.this.curTime != FileWriterImpl.this.lastIncomingTimeMillis) {
                    FileWriterImpl.this.composer.endMessage();
                    FileWriterImpl.this.writeData();
                    FileWriterImpl.this.updateCurTime();
                    FileWriterImpl.this.composer.beginMessage(this.saveMessage);
                }
            } else if (this.first) {
                FileWriterImpl.this.composer.endMessage();
                FileWriterImpl.this.updateCurTime();
                FileWriterImpl.this.composer.beginMessage(this.saveMessage);
            }
            this.first = false;
            this.sink.append(cursor);
        }
    }
}

