/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.table.sink;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiPredicate;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.compact.CompactResult;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.mergetree.Increment;
import org.apache.flink.table.store.file.operation.FileStoreScan;
import org.apache.flink.table.store.file.operation.FileStoreWrite;
import org.apache.flink.table.store.file.predicate.PredicateConverter;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.table.sink.FileCommittable;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TableCompact {
    private static final Logger LOG = LoggerFactory.getLogger(TableCompact.class);
    private final FileStoreScan scan;
    private final FileStoreWrite<?> write;
    private final RowType partitionType;
    private BiPredicate<BinaryRowData, Integer> partBucketFilter;

    public TableCompact(FileStoreScan scan, FileStoreWrite<?> write, RowType partitionType) {
        this.scan = scan;
        this.write = write;
        this.partitionType = partitionType;
    }

    public TableCompact withPartitions(Map<String, String> partitionSpec) {
        this.scan.withPartitionFilter(PredicateConverter.fromMap(partitionSpec, this.partitionType));
        return this;
    }

    public TableCompact withFilter(BiPredicate<BinaryRowData, Integer> partBucketFilter) {
        this.partBucketFilter = partBucketFilter;
        return this;
    }

    public List<FileCommittable> compact() {
        ArrayList<FileCommittable> committables = new ArrayList<FileCommittable>();
        this.scan.plan().groupByPartFiles().forEach((partition, buckets) -> buckets.forEach((bucket, files) -> this.doCompact((BinaryRowData)partition, (int)bucket, (List<DataFileMeta>)files).ifPresent(committables::add)));
        return committables;
    }

    private Optional<FileCommittable> doCompact(BinaryRowData partition, int bucket, List<DataFileMeta> files) {
        if (!this.partBucketFilter.test(partition, bucket)) {
            return Optional.empty();
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Do compaction for partition {}, bucket {}", FileStorePathFactory.getPartitionComputer(this.partitionType, FileStorePathFactory.PARTITION_DEFAULT_NAME.defaultValue()).generatePartValues(partition), (Object)bucket);
        }
        try {
            CompactResult result = this.write.createCompactWriter(partition.copy(), bucket, files).call();
            FileCommittable committable = new FileCommittable(partition, bucket, Increment.forCompact(result.before(), result.after()));
            return Optional.of(committable);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

