/*
 * Decompiled with CFR 0.152.
 */
package com.devexperts.qd.dxlink.websocket.application;

import com.devexperts.io.BufferedOutput;
import com.devexperts.qd.DataField;
import com.devexperts.qd.DataIntField;
import com.devexperts.qd.DataObjField;
import com.devexperts.qd.DataRecord;
import com.devexperts.qd.DataScheme;
import com.devexperts.qd.QDContract;
import com.devexperts.qd.dxlink.websocket.application.Delegates;
import com.devexperts.qd.dxlink.websocket.application.DxLinkJsonMessageFactory;
import com.devexperts.qd.dxlink.websocket.application.DxLinkSubscription;
import com.devexperts.qd.dxlink.websocket.application.DxLinkSubscriptionFactory;
import com.devexperts.qd.dxlink.websocket.application.DxLinkWebSocketApplicationConnectionFactory;
import com.devexperts.qd.dxlink.websocket.application.HeartbeatProcessor;
import com.devexperts.qd.ng.RecordCursor;
import com.devexperts.qd.qtp.AbstractQTPComposer;
import com.devexperts.qd.qtp.MessageProvider;
import com.devexperts.qd.qtp.MessageType;
import com.devexperts.qd.qtp.MessageVisitor;
import com.devexperts.qd.qtp.ProtocolDescriptor;
import com.devexperts.qd.qtp.QTPConstants;
import com.devexperts.qd.qtp.RuntimeQTPException;
import com.devexperts.util.SystemProperties;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

class DxLinkWebSocketQTPComposer
extends AbstractQTPComposer {
    private static final String PROTOCOL_VERSION = SystemProperties.getProperty((String)"com.devexperts.qd.dxlink.protocolVersion", (String)"0.1");
    private static final String SERVICE_NAME = SystemProperties.getProperty((String)"com.devexperts.qd.dxlink.feedService.name", (String)"FEED");
    private static final String ACCEPT_DATA_FORMAT = SystemProperties.getProperty((String)"com.devexperts.qd.dxlink.feedService.acceptDataFormat", (String)"COMPACT");
    private static final int MAIN_CHANNEL = 0;
    private static final int TICKER_CHANNEL_NUMBER = 1;
    private static final int STREAM_CHANNEL_NUMBER = 3;
    private static final int HISTORY_CHANNEL_NUMBER = 5;
    private final DxLinkJsonMessageFactory messageFactory;
    private final Map<QDContract, ChannelSubscriptionProcessor> processors = new EnumMap<QDContract, ChannelSubscriptionProcessor>(QDContract.class);
    private final HeartbeatProcessor heartbeatProcessor;
    private final DxLinkWebSocketApplicationConnectionFactory factory;
    private final Delegates delegates;
    private ChannelSubscriptionProcessor currentSubscriptionProcessor;
    private List<ByteBuf> messages = new ArrayList<ByteBuf>();
    private int payloadSize = 0;

    public DxLinkWebSocketQTPComposer(DataScheme scheme, Delegates delegates, DxLinkJsonMessageFactory messageFactory, HeartbeatProcessor heartbeatProcessor, DxLinkWebSocketApplicationConnectionFactory factory) {
        super(scheme, true);
        this.messageFactory = messageFactory;
        this.heartbeatProcessor = heartbeatProcessor;
        this.factory = factory;
        this.delegates = delegates;
        for (QDContract contract : QDContract.values()) {
            this.processors.put(contract, new ChannelSubscriptionProcessor(contract));
        }
    }

    boolean composeMessage(MessageProvider provider) {
        try {
            boolean result = provider.retrieveMessages((MessageVisitor)this);
            this.stats.updateIOWriteBytes((long)this.payloadSize);
            return result;
        }
        catch (Throwable t) {
            this.abortMessageAndRethrow(t);
            return false;
        }
    }

    void composeKeepalive() {
        try {
            ByteBuf keepalive = this.messageFactory.createKeepalive(0);
            this.messages.add(keepalive);
            this.stats.updateIOWriteBytes((long)keepalive.readableBytes());
        }
        catch (Throwable t) {
            this.abortMessageAndRethrow(t);
        }
    }

    protected void abortMessageAndRethrow(Throwable t) {
        this.messages.clear();
        this.payloadSize = 0;
        super.abortMessageAndRethrow(t);
    }

    protected int writeRecordHeader(DataRecord record, int cipher, String symbol, int eventFlags) {
        throw new IllegalStateException();
    }

    protected void writeHistorySubscriptionTime(DataRecord record, long time) {
        throw new IllegalStateException();
    }

    protected void writeMessageHeader(MessageType messageType) {
        if (messageType.isSubscription()) {
            this.currentSubscriptionProcessor = this.processors.get(messageType.getContract());
            this.currentSubscriptionProcessor.begin(messageType.isSubscriptionRemove());
        }
    }

    public void visitRecord(DataRecord record, int cipher, String symbol, long time) {
        try {
            this.currentSubscriptionProcessor.add(record, cipher, symbol, time);
        }
        catch (IOException e) {
            throw new RuntimeQTPException((Throwable)e);
        }
    }

    public void resetSession() {
        this.currentSubscriptionProcessor = null;
        this.messages.clear();
        this.payloadSize = 0;
        super.resetSession();
    }

    protected void undoWriteMessageHeaderStateChange() {
        this.finishComposingMessage(null);
    }

    protected void finishComposingMessage(BufferedOutput out) {
        if (this.currentSubscriptionProcessor != null) {
            this.currentSubscriptionProcessor.end();
            this.currentSubscriptionProcessor = null;
        }
    }

    protected void writeDescribeProtocolMessage(BufferedOutput out, ProtocolDescriptor descriptor) throws IOException {
        String token = descriptor.getProperty("authorization");
        if (token != null) {
            String clearToken = token.substring("dxlink".length() + 1);
            ByteBuf auth = this.messageFactory.createAuth(0, clearToken);
            this.payloadSize += auth.readableBytes();
            this.messages.add(auth);
        } else {
            ByteBuf setup = this.messageFactory.createSetup(0, PROTOCOL_VERSION, this.heartbeatProcessor.getHeartbeatTimeout(), this.heartbeatProcessor.getDisconnectTimeout(), this.factory.getAgentInfo());
            this.payloadSize += setup.readableBytes();
            this.messages.add(setup);
        }
    }

    public boolean hasCapacity() {
        int completedSize = this.payloadSize + (this.currentSubscriptionProcessor != null ? this.currentSubscriptionProcessor.payloadSize() : 0);
        return completedSize < QTPConstants.COMPOSER_THRESHOLD;
    }

    protected long getMessagePayloadSize() {
        return this.payloadSize;
    }

    public List<ByteBuf> retrieveMessages() {
        List<ByteBuf> m = this.messages;
        this.messages = new ArrayList<ByteBuf>();
        this.payloadSize = 0;
        return m;
    }

    protected void writeIntField(DataIntField field, int value) {
        throw new UnsupportedOperationException("Legacy field-by-field writing is not supported, use 'append'");
    }

    protected void writeObjField(DataObjField field, Object value) {
        throw new UnsupportedOperationException("Legacy field-by-field writing is not supported, use 'append'");
    }

    protected void writeField(DataField field, RecordCursor cursor) {
        throw new UnsupportedOperationException("Legacy field-by-field writing is not supported, use 'append'");
    }

    private class ChannelSubscriptionProcessor {
        private final QDContract contract;
        private final int channel;
        private final Map<String, List<String>> fieldsByType;
        private final List<DxLinkSubscription> subscriptions = new ArrayList<DxLinkSubscription>();
        private final Map<String, Collection<String>> fieldsByTypeToSend = new HashMap<String, Collection<String>>();
        private final DxLinkSubscriptionFactory subscriptionFactory = new DxLinkSubscriptionFactory();
        private boolean isRemove;
        private boolean channelIsOpened = false;
        private ByteBuf channelRequest;
        private ByteBuf feedSetup;
        private ByteBuf feedSubscription;

        private ChannelSubscriptionProcessor(QDContract contract) {
            this.contract = contract;
            this.fieldsByType = DxLinkWebSocketQTPComposer.this.delegates.fieldsByEventType();
            switch (contract) {
                case TICKER: {
                    this.channel = 1;
                    break;
                }
                case STREAM: {
                    this.channel = 3;
                    break;
                }
                case HISTORY: {
                    this.channel = 5;
                    break;
                }
                default: {
                    throw new IllegalStateException();
                }
            }
        }

        public void begin(boolean isRemove) {
            this.isRemove = isRemove;
            this.subscriptions.clear();
            this.fieldsByTypeToSend.clear();
            this.channelRequest = null;
            this.feedSetup = null;
            this.feedSubscription = null;
        }

        public void add(DataRecord record, int cipher, String symbol, long time) throws IOException {
            DxLinkSubscription subscription = this.subscriptionFactory.createSubscription(this.contract, record, cipher, symbol, time);
            if (subscription != null) {
                List<String> fields;
                if (!this.channelIsOpened) {
                    this.channelRequest = DxLinkWebSocketQTPComposer.this.messageFactory.createChannelRequest(this.channel, SERVICE_NAME, this.contract.name());
                    this.channelIsOpened = true;
                }
                if (!this.fieldsByType.isEmpty() && (fields = this.fieldsByType.remove(subscription.type)) != null) {
                    List<String> acceptedFields = DxLinkWebSocketQTPComposer.this.factory.getAcceptedEventFieldsByType(subscription.type);
                    if (!acceptedFields.isEmpty()) {
                        fields.removeIf(field -> !"eventSymbol".equals(field) && !acceptedFields.contains(field));
                    }
                    if ("FULL".equals(ACCEPT_DATA_FORMAT)) {
                        fields.add(0, "eventType");
                    }
                    this.fieldsByTypeToSend.put(subscription.type, fields);
                    this.feedSetup = DxLinkWebSocketQTPComposer.this.messageFactory.createFeedSetup(this.channel, DxLinkWebSocketQTPComposer.this.factory.getAcceptAggregationPeriod().getTime(), ACCEPT_DATA_FORMAT, this.fieldsByTypeToSend);
                }
                this.subscriptions.add(subscription);
                this.feedSubscription = DxLinkWebSocketQTPComposer.this.messageFactory.createFeedSubscription(this.channel, this.isRemove ? Collections.emptyList() : this.subscriptions, this.isRemove ? this.subscriptions : Collections.emptyList(), false);
            }
        }

        public void end() {
            if (this.channelRequest != null) {
                DxLinkWebSocketQTPComposer.this.messages.add(this.channelRequest);
            }
            if (this.feedSetup != null) {
                DxLinkWebSocketQTPComposer.this.messages.add(this.feedSetup);
            }
            if (this.feedSubscription != null) {
                DxLinkWebSocketQTPComposer.this.messages.add(this.feedSubscription);
            }
            DxLinkWebSocketQTPComposer.this.payloadSize = DxLinkWebSocketQTPComposer.this.payloadSize + this.payloadSize();
        }

        public int payloadSize() {
            int size = 0;
            if (this.channelRequest != null) {
                size += this.channelRequest.readableBytes();
            }
            if (this.feedSetup != null) {
                size += this.feedSetup.readableBytes();
            }
            if (this.feedSubscription != null) {
                size += this.feedSubscription.readableBytes();
            }
            return size;
        }
    }
}

