/*
 * Decompiled with CFR 0.152.
 */
package com.dxfeed.api.test;

import com.devexperts.qd.DataIntField;
import com.devexperts.qd.DataObjField;
import com.devexperts.qd.DataRecord;
import com.devexperts.qd.DataScheme;
import com.devexperts.qd.QDDistributor;
import com.devexperts.qd.QDFactory;
import com.devexperts.qd.QDTicker;
import com.devexperts.qd.SerialFieldType;
import com.devexperts.qd.SymbolCodec;
import com.devexperts.qd.kit.DecimalField;
import com.devexperts.qd.kit.DefaultRecord;
import com.devexperts.qd.kit.DefaultScheme;
import com.devexperts.qd.kit.PentaCodec;
import com.devexperts.qd.ng.RecordBuffer;
import com.devexperts.qd.ng.RecordCursor;
import com.devexperts.qd.ng.RecordMode;
import com.devexperts.qd.ng.RecordSink;
import com.devexperts.qd.ng.RecordSource;
import com.devexperts.qd.qtp.AgentAdapter;
import com.devexperts.qd.qtp.ConfigurableMessageAdapterFactory;
import com.devexperts.qd.qtp.MessageConnector;
import com.devexperts.qd.qtp.MessageConnectors;
import com.devexperts.qd.stats.QDStats;
import com.devexperts.qd.util.Decimal;
import com.devexperts.test.ThreadCleanCheck;
import com.dxfeed.api.DXEndpoint;
import com.dxfeed.api.DXFeedSubscription;
import com.dxfeed.event.market.MarketEvent;
import com.dxfeed.event.market.Quote;
import com.dxfeed.event.market.Trade;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SchemeRemapTest {
    private static final char EX = 'X';
    private static final String SYMBOL = "TEST";
    private static final String SYMBOL_EX = "TEST&X";
    private static final int CIPHER = PentaCodec.INSTANCE.encode("TEST");
    private static final DataRecord TRADE_RECORD = new DefaultRecord(0, "Trade&X", false, new DataIntField[]{new DecimalField(0, "Trade&X.Last.Price", SerialFieldType.DECIMAL), new DecimalField(1, "Trade&X.Last.Size", SerialFieldType.DECIMAL)}, new DataObjField[0]);
    private static final DataScheme SOURCE_SCHEME = new DefaultScheme((SymbolCodec)PentaCodec.INSTANCE, new DataRecord[]{TRADE_RECORD});
    private static final int PORT = (100 + new Random().nextInt(300)) * 100 + 74;
    private DXEndpoint endpoint;
    private List<MessageConnector> connectors = Collections.emptyList();

    @Before
    public void setUp() throws Exception {
        ThreadCleanCheck.before();
        this.endpoint = DXEndpoint.create();
    }

    @After
    public void tearDown() throws Exception {
        this.endpoint.close();
        MessageConnectors.stopMessageConnectors(this.connectors);
        ThreadCleanCheck.after();
    }

    @Test
    public void testSchemeRemap() throws InterruptedException {
        this.runDataProvider();
        this.runDataConsumer();
    }

    private void runDataProvider() {
        QDTicker ticker = QDFactory.getDefaultFactory().createTicker(SOURCE_SCHEME);
        QDDistributor distributor = ticker.distributorBuilder().build();
        distributor.getAddedRecordProvider().setRecordListener(provider -> {
            RecordCursor cursor;
            RecordBuffer sub = RecordBuffer.getInstance((RecordMode)RecordMode.SUBSCRIPTION);
            provider.retrieve((RecordSink)sub);
            while ((cursor = sub.next()) != null) {
                if (!cursor.getDecodedSymbol().equals(SYMBOL)) continue;
                RecordBuffer buf = RecordBuffer.getInstance();
                RecordCursor a = buf.add(TRADE_RECORD, CIPHER, null);
                a.setInt(0, Decimal.compose((double)95.47));
                a.setInt(1, Decimal.compose((double)123.0));
                distributor.process((RecordSource)buf);
                buf.release();
            }
            sub.release();
        });
        this.connectors = MessageConnectors.createMessageConnectors((ConfigurableMessageAdapterFactory)new AgentAdapter.Factory(ticker), (String)(":" + PORT), (QDStats)QDStats.VOID);
        MessageConnectors.startMessageConnectors(this.connectors);
    }

    private void runDataConsumer() throws InterruptedException {
        this.endpoint.connect("localhost:" + PORT);
        ArrayBlockingQueue queue = new ArrayBlockingQueue(10);
        DXFeedSubscription sub = this.endpoint.getFeed().createSubscription(new Class[]{Quote.class, Trade.class});
        sub.addEventListener(queue::addAll);
        sub.addSymbols((Object)SYMBOL_EX);
        MarketEvent event = (MarketEvent)queue.poll(5L, TimeUnit.SECONDS);
        Assert.assertTrue((boolean)(event instanceof Trade));
        Trade trade = (Trade)event;
        Assert.assertEquals((Object)SYMBOL_EX, (Object)trade.getEventSymbol());
        Assert.assertEquals((double)95.47, (double)trade.getPrice(), (double)0.0);
        Assert.assertEquals((long)123L, (long)trade.getSize());
        Assert.assertEquals((long)0L, (long)queue.size());
    }
}

