/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager.slotmanager;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.resourcemanager.slotmanager.PendingTaskManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocationResult;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocationStrategy;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerUtils;
import org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerResourceInfoProvider;
import org.apache.flink.runtime.slots.ResourceRequirement;

public class DefaultResourceAllocationStrategy
implements ResourceAllocationStrategy {
    private final ResourceProfile defaultSlotResourceProfile;
    private final ResourceProfile totalResourceProfile;
    private final int numSlotsPerWorker;

    public DefaultResourceAllocationStrategy(ResourceProfile totalResourceProfile, int numSlotsPerWorker) {
        this.totalResourceProfile = totalResourceProfile;
        this.numSlotsPerWorker = numSlotsPerWorker;
        this.defaultSlotResourceProfile = SlotManagerUtils.generateDefaultSlotResourceProfile(totalResourceProfile, numSlotsPerWorker);
    }

    @Override
    public ResourceAllocationResult tryFulfillRequirements(Map<JobID, Collection<ResourceRequirement>> missingResources, TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
        ResourceAllocationResult.Builder resultBuilder = ResourceAllocationResult.builder();
        List<InternalResourceInfo> registeredResources = DefaultResourceAllocationStrategy.getRegisteredResources(taskManagerResourceInfoProvider, resultBuilder);
        List<InternalResourceInfo> pendingResources = DefaultResourceAllocationStrategy.getPendingResources(taskManagerResourceInfoProvider, resultBuilder);
        for (Map.Entry<JobID, Collection<ResourceRequirement>> resourceRequirements : missingResources.entrySet()) {
            JobID jobId = resourceRequirements.getKey();
            Collection<ResourceRequirement> unfulfilledJobRequirements = DefaultResourceAllocationStrategy.tryFulfillRequirementsForJobWithResources(jobId, resourceRequirements.getValue(), registeredResources);
            if (unfulfilledJobRequirements.isEmpty()) continue;
            this.tryFulfillRequirementsForJobWithPendingResources(jobId, unfulfilledJobRequirements, pendingResources, resultBuilder);
        }
        return resultBuilder.build();
    }

    private static List<InternalResourceInfo> getRegisteredResources(TaskManagerResourceInfoProvider taskManagerResourceInfoProvider, ResourceAllocationResult.Builder resultBuilder) {
        return taskManagerResourceInfoProvider.getRegisteredTaskManagers().stream().map(taskManager -> new InternalResourceInfo(taskManager.getDefaultSlotResourceProfile(), taskManager.getAvailableResource(), (jobId, slotProfile) -> resultBuilder.addAllocationOnRegisteredResource((JobID)jobId, taskManager.getInstanceId(), (ResourceProfile)slotProfile))).collect(Collectors.toList());
    }

    private static List<InternalResourceInfo> getPendingResources(TaskManagerResourceInfoProvider taskManagerResourceInfoProvider, ResourceAllocationResult.Builder resultBuilder) {
        return taskManagerResourceInfoProvider.getPendingTaskManagers().stream().map(pendingTaskManager -> new InternalResourceInfo(pendingTaskManager.getDefaultSlotResourceProfile(), pendingTaskManager.getTotalResourceProfile(), (jobId, slotProfile) -> resultBuilder.addAllocationOnPendingResource((JobID)jobId, pendingTaskManager.getPendingTaskManagerId(), (ResourceProfile)slotProfile))).collect(Collectors.toList());
    }

    private static int tryFulfilledRequirementWithResource(List<InternalResourceInfo> internalResource, int numUnfulfilled, ResourceProfile requiredResource, JobID jobId) {
        Iterator<InternalResourceInfo> internalResourceInfoItr = internalResource.iterator();
        while (numUnfulfilled > 0 && internalResourceInfoItr.hasNext()) {
            InternalResourceInfo currentTaskManager = internalResourceInfoItr.next();
            while (numUnfulfilled > 0 && currentTaskManager.tryAllocateSlotForJob(jobId, requiredResource)) {
                --numUnfulfilled;
            }
            if (!currentTaskManager.availableProfile.equals(ResourceProfile.ZERO)) continue;
            internalResourceInfoItr.remove();
        }
        return numUnfulfilled;
    }

    private static Collection<ResourceRequirement> tryFulfillRequirementsForJobWithResources(JobID jobId, Collection<ResourceRequirement> missingResources, List<InternalResourceInfo> registeredResources) {
        ArrayList<ResourceRequirement> outstandingRequirements = new ArrayList<ResourceRequirement>();
        for (ResourceRequirement resourceRequirement : missingResources) {
            int numMissingRequirements = DefaultResourceAllocationStrategy.tryFulfilledRequirementWithResource(registeredResources, resourceRequirement.getNumberOfRequiredSlots(), resourceRequirement.getResourceProfile(), jobId);
            if (numMissingRequirements <= 0) continue;
            outstandingRequirements.add(ResourceRequirement.create(resourceRequirement.getResourceProfile(), numMissingRequirements));
        }
        return outstandingRequirements;
    }

    private static boolean canFulfillRequirement(ResourceProfile requirement, ResourceProfile resourceProfile) {
        return resourceProfile.allFieldsNoLessThan(requirement);
    }

    private void tryFulfillRequirementsForJobWithPendingResources(JobID jobId, Collection<ResourceRequirement> unfulfilledRequirements, List<InternalResourceInfo> availableResources, ResourceAllocationResult.Builder resultBuilder) {
        for (ResourceRequirement missingResource : unfulfilledRequirements) {
            ResourceProfile effectiveProfile = SlotManagerUtils.getEffectiveResourceProfile(missingResource.getResourceProfile(), this.defaultSlotResourceProfile);
            int numUnfulfilled = DefaultResourceAllocationStrategy.tryFulfilledRequirementWithResource(availableResources, missingResource.getNumberOfRequiredSlots(), missingResource.getResourceProfile(), jobId);
            if (!this.totalResourceProfile.allFieldsNoLessThan(effectiveProfile)) {
                resultBuilder.addUnfulfillableJob(jobId);
                continue;
            }
            while (numUnfulfilled > 0) {
                PendingTaskManager newPendingTaskManager = new PendingTaskManager(this.totalResourceProfile, this.numSlotsPerWorker);
                resultBuilder.addPendingTaskManagerAllocate(newPendingTaskManager);
                ResourceProfile remainResource = this.totalResourceProfile;
                while (numUnfulfilled > 0 && DefaultResourceAllocationStrategy.canFulfillRequirement(effectiveProfile, remainResource)) {
                    --numUnfulfilled;
                    resultBuilder.addAllocationOnPendingResource(jobId, newPendingTaskManager.getPendingTaskManagerId(), effectiveProfile);
                    remainResource = remainResource.subtract(effectiveProfile);
                }
                if (remainResource.equals(ResourceProfile.ZERO)) continue;
                availableResources.add(new InternalResourceInfo(this.defaultSlotResourceProfile, remainResource, (jobID, slotProfile) -> resultBuilder.addAllocationOnPendingResource((JobID)jobID, newPendingTaskManager.getPendingTaskManagerId(), (ResourceProfile)slotProfile)));
            }
        }
    }

    private static class InternalResourceInfo {
        private final ResourceProfile defaultSlotProfile;
        private final BiConsumer<JobID, ResourceProfile> allocationConsumer;
        private ResourceProfile availableProfile;

        InternalResourceInfo(ResourceProfile defaultSlotProfile, ResourceProfile availableProfile, BiConsumer<JobID, ResourceProfile> allocationConsumer) {
            this.defaultSlotProfile = defaultSlotProfile;
            this.availableProfile = availableProfile;
            this.allocationConsumer = allocationConsumer;
        }

        boolean tryAllocateSlotForJob(JobID jobId, ResourceProfile requirement) {
            ResourceProfile effectiveProfile = SlotManagerUtils.getEffectiveResourceProfile(requirement, this.defaultSlotProfile);
            if (this.availableProfile.allFieldsNoLessThan(effectiveProfile)) {
                this.availableProfile = this.availableProfile.subtract(effectiveProfile);
                this.allocationConsumer.accept(jobId, effectiveProfile);
                return true;
            }
            return false;
        }
    }
}

