/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.source;

import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.connector.RuntimeConverter;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.FlinkRuntimeException;

@Internal
public final class InputConversionOperator<E>
extends TableStreamOperator<RowData>
implements OneInputStreamOperator<E, RowData> {
    private final DynamicTableSource.DataStructureConverter converter;
    private final boolean requiresWrapping;
    private final boolean produceRowtimeMetadata;
    private final boolean propagateWatermark;
    private final boolean isInsertOnly;
    private transient StreamRecord<RowData> outRecord;

    public InputConversionOperator(DynamicTableSource.DataStructureConverter converter, boolean requiresWrapping, boolean produceRowtimeMetadata, boolean propagateWatermark, boolean isInsertOnly) {
        this.converter = converter;
        this.requiresWrapping = requiresWrapping;
        this.produceRowtimeMetadata = produceRowtimeMetadata;
        this.propagateWatermark = propagateWatermark;
        this.isInsertOnly = isInsertOnly;
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.outRecord = new StreamRecord<Object>(null);
        RuntimeConverter.Context context = RuntimeConverter.Context.create(this.getUserCodeClassloader());
        this.converter.open(context);
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        if (this.propagateWatermark || Watermark.MAX_WATERMARK.equals(mark)) {
            super.processWatermark(mark);
        }
    }

    @Override
    public void processElement(StreamRecord<E> element) throws Exception {
        RowData payloadRowData;
        Object internalRecord;
        E externalRecord = element.getValue();
        try {
            internalRecord = this.converter.toInternal(externalRecord);
        }
        catch (Exception e) {
            throw new FlinkRuntimeException(String.format("Error during input conversion from external DataStream API to internal Table API data structures. Make sure that the provided data types that configure the converters are correctly declared in the schema. Affected record:\n%s", externalRecord), e);
        }
        if (this.requiresWrapping) {
            GenericRowData wrapped = new GenericRowData(RowKind.INSERT, 1);
            wrapped.setField(0, internalRecord);
            payloadRowData = wrapped;
        } else {
            if (internalRecord == null) {
                return;
            }
            payloadRowData = (RowData)internalRecord;
        }
        RowKind kind = payloadRowData.getRowKind();
        if (this.isInsertOnly && kind != RowKind.INSERT) {
            throw new FlinkRuntimeException(String.format("Error during input conversion. Conversion expects insert-only records but DataStream API record contains: %s", new Object[]{kind}));
        }
        if (!this.produceRowtimeMetadata) {
            this.output.collect(this.outRecord.replace(payloadRowData));
            return;
        }
        if (!element.hasTimestamp()) {
            throw new FlinkRuntimeException("Could not find timestamp in DataStream API record. Make sure that timestamps have been assigned before and the event-time characteristic is enabled.");
        }
        GenericRowData rowtimeRowData = new GenericRowData(1);
        rowtimeRowData.setField(0, TimestampData.fromEpochMillis(element.getTimestamp()));
        JoinedRowData joinedRowData = new JoinedRowData(kind, payloadRowData, rowtimeRowData);
        this.output.collect(this.outRecord.replace(joinedRowData));
    }
}

