/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.connector.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.connector.sink.Committable;
import org.apache.flink.table.store.connector.sink.StateUtils;
import org.apache.flink.table.store.connector.sink.StoreSinkWrite;
import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.sink.FileCommittable;
import org.apache.flink.table.store.table.sink.SinkRecord;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StoreSinkWriteImpl
implements StoreSinkWrite {
    private static final Logger LOG = LoggerFactory.getLogger(StoreSinkWriteImpl.class);
    protected final FileStoreTable table;
    protected final String commitUser;
    protected final TableWrite write;

    public StoreSinkWriteImpl(FileStoreTable table, StateInitializationContext context, String initialCommitUser, IOManager ioManager, boolean isOverwrite) throws Exception {
        this.table = table;
        this.commitUser = StateUtils.getSingleValueFromState(context, "commit_user_state", String.class, initialCommitUser);
        this.write = this.commitUser == null ? null : table.newWrite(this.commitUser).withIOManager(ioManager).withOverwrite(isOverwrite);
    }

    @Override
    public SinkRecord write(RowData rowData) throws Exception {
        return this.write.write(rowData);
    }

    @Override
    public SinkRecord toLogRecord(SinkRecord record) {
        return this.write.toLogRecord(record);
    }

    @Override
    public void compact(BinaryRowData partition, int bucket, boolean fullCompaction) throws Exception {
        this.write.compact(partition, bucket, fullCompaction);
    }

    @Override
    public void notifyNewFiles(long snapshotId, BinaryRowData partition, int bucket, List<DataFileMeta> files) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Receive {} new files from snapshot {}, partition {}, bucket {}", new Object[]{files.size(), snapshotId, partition, bucket});
        }
        this.write.notifyNewFiles(snapshotId, partition, bucket, files);
    }

    @Override
    public List<Committable> prepareCommit(boolean doCompaction, long checkpointId) throws IOException {
        ArrayList<Committable> committables = new ArrayList<Committable>();
        if (this.write != null) {
            try {
                for (FileCommittable committable : this.write.prepareCommit(doCompaction, checkpointId)) {
                    committables.add(new Committable(checkpointId, Committable.Kind.FILE, committable));
                }
            }
            catch (Exception e) {
                throw new IOException(e);
            }
        }
        return committables;
    }

    @Override
    public void snapshotState(StateSnapshotContext context) throws Exception {
    }

    @Override
    public void close() throws Exception {
        if (this.write != null) {
            this.write.close();
        }
    }
}

