package com.dxfeed.api.test;

import com.devexperts.qd.DataIntField;
import com.devexperts.qd.DataObjField;
import com.devexperts.qd.DataRecord;
import com.devexperts.qd.DataScheme;
import com.devexperts.qd.QDDistributor;
import com.devexperts.qd.QDFactory;
import com.devexperts.qd.QDTicker;
import com.devexperts.qd.SerialFieldType;
import com.devexperts.qd.kit.DecimalField;
import com.devexperts.qd.kit.DefaultRecord;
import com.devexperts.qd.kit.DefaultScheme;
import com.devexperts.qd.kit.PentaCodec;
import com.devexperts.qd.ng.RecordBuffer;
import com.devexperts.qd.ng.RecordCursor;
import com.devexperts.qd.ng.RecordMode;
import com.devexperts.qd.qtp.AgentAdapter;
import com.devexperts.qd.qtp.MessageConnector;
import com.devexperts.qd.qtp.MessageConnectors;
import com.devexperts.qd.stats.QDStats;
import com.devexperts.qd.util.Decimal;
import com.devexperts.test.ThreadCleanCheck;
import com.dxfeed.api.DXEndpoint;
import com.dxfeed.api.DXFeedSubscription;
import com.dxfeed.event.market.MarketEvent;
import com.dxfeed.event.market.Quote;
import com.dxfeed.event.market.Trade;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:com/dxfeed/api/test/SchemeRemapTest.class */
public class SchemeRemapTest {
    private static final char EX = 'X';
    private static final String SYMBOL_EX = "TEST&X";
    private DXEndpoint endpoint;
    private List<MessageConnector> connectors = Collections.emptyList();
    private static final String SYMBOL = "TEST";
    private static final int CIPHER = PentaCodec.INSTANCE.encode(SYMBOL);
    private static final DataRecord TRADE_RECORD = new DefaultRecord(0, "Trade&X", false, new DataIntField[]{new DecimalField(0, "Trade&X.Last.Price", SerialFieldType.DECIMAL), new DecimalField(1, "Trade&X.Last.Size", SerialFieldType.DECIMAL)}, new DataObjField[0]);
    private static final DataScheme SOURCE_SCHEME = new DefaultScheme(PentaCodec.INSTANCE, new DataRecord[]{TRADE_RECORD});
    private static final int PORT = ((100 + new Random().nextInt(300)) * 100) + 74;

    @Before
    public void setUp() throws Exception {
        ThreadCleanCheck.before();
        this.endpoint = DXEndpoint.create();
    }

    @After
    public void tearDown() throws Exception {
        this.endpoint.close();
        MessageConnectors.stopMessageConnectors(this.connectors);
        ThreadCleanCheck.after();
    }

    @Test
    public void testSchemeRemap() throws InterruptedException {
        runDataProvider();
        runDataConsumer();
    }

    private void runDataProvider() {
        QDTicker createTicker = QDFactory.getDefaultFactory().createTicker(SOURCE_SCHEME);
        QDDistributor build = createTicker.distributorBuilder().build();
        build.getAddedRecordProvider().setRecordListener(recordProvider -> {
            RecordBuffer recordBuffer = RecordBuffer.getInstance(RecordMode.SUBSCRIPTION);
            recordProvider.retrieve(recordBuffer);
            while (true) {
                RecordCursor next = recordBuffer.next();
                if (next == null) {
                    recordBuffer.release();
                    return;
                } else if (next.getDecodedSymbol().equals(SYMBOL)) {
                    RecordBuffer recordBuffer2 = RecordBuffer.getInstance();
                    RecordCursor add = recordBuffer2.add(TRADE_RECORD, CIPHER, (String) null);
                    add.setInt(0, Decimal.compose(95.47d));
                    add.setInt(1, Decimal.compose(123.0d));
                    build.process(recordBuffer2);
                    recordBuffer2.release();
                }
            }
        });
        this.connectors = MessageConnectors.createMessageConnectors(new AgentAdapter.Factory(createTicker), ":" + PORT, QDStats.VOID);
        MessageConnectors.startMessageConnectors(this.connectors);
    }

    private void runDataConsumer() throws InterruptedException {
        this.endpoint.connect("localhost:" + PORT);
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(10);
        DXFeedSubscription createSubscription = this.endpoint.getFeed().createSubscription(new Class[]{Quote.class, Trade.class});
        arrayBlockingQueue.getClass();
        createSubscription.addEventListener((v1) -> {
            r1.addAll(v1);
        });
        createSubscription.addSymbols(SYMBOL_EX);
        Trade trade = (MarketEvent) arrayBlockingQueue.poll(5L, TimeUnit.SECONDS);
        Assert.assertTrue(trade instanceof Trade);
        Trade trade2 = trade;
        Assert.assertEquals(SYMBOL_EX, trade2.getEventSymbol());
        Assert.assertEquals(95.47d, trade2.getPrice(), 0.0d);
        Assert.assertEquals(123L, trade2.getSize());
        Assert.assertEquals(0L, arrayBlockingQueue.size());
    }
}
