package it.geosolutions.geobatch.flow.file;

import it.geosolutions.filesystemmonitor.monitor.FileSystemEvent;
import it.geosolutions.geobatch.catalog.file.FileBaseCatalog;
import it.geosolutions.geobatch.catalog.impl.BasePersistentResource;
import it.geosolutions.geobatch.configuration.event.generator.EventGeneratorConfiguration;
import it.geosolutions.geobatch.configuration.flow.file.FileBasedFlowConfiguration;
import it.geosolutions.geobatch.flow.FlowManager;
import it.geosolutions.geobatch.flow.Job;
import it.geosolutions.geobatch.flow.event.consumer.EventConsumerStatus;
import it.geosolutions.geobatch.flow.event.consumer.file.FileBasedEventConsumer;
import it.geosolutions.geobatch.flow.event.generator.EventGenerator;
import it.geosolutions.geobatch.flow.event.generator.EventGeneratorService;
import it.geosolutions.geobatch.flow.event.generator.FlowEventListener;
import it.geosolutions.geobatch.global.CatalogHolder;
import it.geosolutions.geobatch.settings.GBSettings;
import it.geosolutions.geobatch.settings.GBSettingsCatalog;
import it.geosolutions.geobatch.settings.flow.FlowSettings;
import it.geosolutions.geobatch.tools.file.Path;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jmx.export.annotation.ManagedAttribute;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.stereotype.Component;

@Component("FlowManager")
@ManagedResource(objectName = "spring:name=FileBasedFlowManager", description = "A JMX-managed FileBasedFlowManager")
/* loaded from: input_file:it/geosolutions/geobatch/flow/file/FileBasedFlowManager.class */
public class FileBasedFlowManager extends BasePersistentResource<FileBasedFlowConfiguration> implements FlowManager<FileSystemEvent, FileBasedFlowConfiguration>, Runnable, Job {
    private static final Logger LOGGER = LoggerFactory.getLogger(FlowManager.class);
    private boolean autorun;
    private File workingDirectory;
    private boolean initialized;
    private boolean started;
    private boolean paused;
    private boolean terminationRequest;
    private final BlockingQueue<FileSystemEvent> eventMailBox;
    private FileBasedEventDispatcher dispatcher;
    private EventGenerator<FileSystemEvent> eventGenerator;
    private final List<FileBasedEventConsumer> eventConsumers;
    private int maxStoredConsumers;
    private ThreadPoolExecutor executor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:it/geosolutions/geobatch/flow/file/FileBasedFlowManager$GeneratorListener.class */
    public class GeneratorListener implements FlowEventListener<FileSystemEvent> {
        private GeneratorListener() {
        }

        public void eventGenerated(FileSystemEvent fileSystemEvent) {
            FileBasedFlowManager.this.postEvent(fileSystemEvent);
        }
    }

    public FileBasedFlowManager(FileBasedFlowConfiguration fileBasedFlowConfiguration) throws IOException, NullPointerException {
        super(fileBasedFlowConfiguration.getId(), fileBasedFlowConfiguration.getName(), fileBasedFlowConfiguration.getDescription());
        this.autorun = false;
        this.started = false;
        this.eventMailBox = new LinkedBlockingQueue();
        this.eventConsumers = new ArrayList();
        initialize(fileBasedFlowConfiguration);
    }

    public FileBasedFlowManager(String str, String str2, String str3) {
        super(str, str2, str3);
        this.autorun = false;
        this.started = false;
        this.eventMailBox = new LinkedBlockingQueue();
        this.eventConsumers = new ArrayList();
    }

    @ManagedAttribute
    public ThreadPoolExecutor getExecutor() {
        return this.executor;
    }

    private void initialize(FileBasedFlowConfiguration fileBasedFlowConfiguration) throws IOException, NullPointerException {
        GBSettings flowSettings;
        GBSettingsCatalog settingsCatalog = CatalogHolder.getSettingsCatalog();
        try {
            GBSettings find = settingsCatalog.find("FLOW");
            if (find == null || !(find instanceof FlowSettings)) {
                flowSettings = new FlowSettings();
                settingsCatalog.save(flowSettings);
            } else {
                flowSettings = (FlowSettings) find;
            }
            this.initialized = false;
            this.paused = false;
            this.terminationRequest = false;
            this.maxStoredConsumers = fileBasedFlowConfiguration.getMaxStoredConsumers();
            if (this.maxStoredConsumers < 1) {
                this.maxStoredConsumers = flowSettings.getMaxStoredConsumers();
            }
            File baseDirectory = ((FileBaseCatalog) CatalogHolder.getCatalog()).getBaseDirectory();
            if (baseDirectory == null) {
                throw new NullPointerException("Base Working dir is null");
            }
            this.workingDirectory = Path.findLocation(fileBasedFlowConfiguration.getWorkingDirectory(), baseDirectory);
            if (this.workingDirectory == null) {
                throw new IllegalArgumentException("Working dir is invalid: >" + baseDirectory + "< >" + fileBasedFlowConfiguration.getWorkingDirectory() + "< ");
            }
            if (!this.workingDirectory.canWrite() || !this.workingDirectory.isDirectory()) {
                throw new IllegalArgumentException("Working dir is invalid: >" + baseDirectory + "< >" + fileBasedFlowConfiguration.getWorkingDirectory() + "< ");
            }
            this.autorun = fileBasedFlowConfiguration.isAutorun();
            this.executor = new ThreadPoolExecutor(fileBasedFlowConfiguration.getCorePoolSize() > 0 ? fileBasedFlowConfiguration.getCorePoolSize() : flowSettings.getCorePoolSize(), fileBasedFlowConfiguration.getMaximumPoolSize() > 0 ? fileBasedFlowConfiguration.getMaximumPoolSize() : flowSettings.getMaximumPoolSize(), fileBasedFlowConfiguration.getKeepAliveTime() > 0 ? fileBasedFlowConfiguration.getKeepAliveTime() : flowSettings.getKeepAliveTime(), TimeUnit.SECONDS, new ArrayBlockingQueue(fileBasedFlowConfiguration.getWorkQueueSize() > 0 ? fileBasedFlowConfiguration.getWorkQueueSize() : flowSettings.getWorkQueueSize()));
            if (this.autorun) {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("Automatic Flow Startup for '" + getName() + "'");
                }
                resume();
            }
        } catch (Throwable th) {
            if (LOGGER.isErrorEnabled()) {
                LOGGER.error("Failed to save the flow settings");
            }
            throw new IOException(th);
        }
    }

    public synchronized void dispose() {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("FileBasedFlowManager:dispose(): " + getId());
        }
        this.terminationRequest = true;
        notify();
    }

    public boolean isRunning() {
        return !this.paused && this.started;
    }

    public void dispose(FileBasedEventConsumer fileBasedEventConsumer) {
        if (!this.eventConsumers.contains(fileBasedEventConsumer)) {
            throw new IllegalArgumentException("FileBasedFlowManager:dispose(): This flow is not managing " + fileBasedEventConsumer);
        }
        synchronized (this.eventConsumers) {
            this.eventConsumers.remove(fileBasedEventConsumer);
        }
        if (fileBasedEventConsumer.getStatus() == EventConsumerStatus.COMPLETED || fileBasedEventConsumer.getStatus() == EventConsumerStatus.FAILED) {
            return;
        }
        if (LOGGER.isWarnEnabled()) {
            LOGGER.warn("FileBasedFlowManager:dispose(): Trying to dispose and uncompleted consumer " + fileBasedEventConsumer);
        }
        fileBasedEventConsumer.cancel();
    }

    @Override // java.lang.Runnable
    public synchronized void run() {
        RuntimeException runtimeException;
        while (!this.terminationRequest) {
            while (this.paused) {
                try {
                    if (this.initialized && this.eventGenerator != null && this.eventGenerator.isRunning()) {
                        this.eventGenerator.pause();
                    }
                    wait();
                    if (this.terminationRequest) {
                        break;
                    }
                } catch (InterruptedException e) {
                    String str = "FileBasedFlowManager:run(): Error on dispatcher initialization: " + e.getLocalizedMessage();
                    LOGGER.error(str);
                    throw new RuntimeException(str);
                }
            }
            if (!this.initialized) {
                this.dispatcher = new FileBasedEventDispatcher(this, this.eventMailBox);
                this.dispatcher.start();
                this.initialized = true;
            }
            while (!this.paused) {
                try {
                    if (this.initialized) {
                        if (this.eventGenerator == null) {
                            try {
                                createGenerator();
                            } finally {
                            }
                        } else {
                            this.eventGenerator.start();
                        }
                    }
                    wait();
                    if (this.terminationRequest) {
                        break;
                    }
                } catch (InterruptedException e2) {
                    LOGGER.error("FileBasedFlowManager:run(): FlowManager cycle exception: " + e2.getLocalizedMessage(), e2);
                    throw new RuntimeException(e2);
                }
            }
        }
        if (this.initialized) {
            this.dispatcher.shutdown();
            this.eventGenerator.dispose();
            this.initialized = false;
        }
        this.paused = true;
    }

    private void createGenerator() {
        EventGeneratorConfiguration eventGeneratorConfiguration = getConfiguration().getEventGeneratorConfiguration();
        String serviceID = eventGeneratorConfiguration.getServiceID();
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("FileBasedFlowManager:createGenerator(): EventGeneratorCreationServiceID: " + serviceID);
        }
        EventGeneratorService resource = CatalogHolder.getCatalog().getResource(serviceID, EventGeneratorService.class);
        if (resource == null) {
            if (LOGGER.isErrorEnabled()) {
                LOGGER.error("FileBasedFlowManager::createGenerator(): Unable to get the generator service as resource from the catalog");
            }
            throw new RuntimeException("FileBasedFlowManager::createGenerator(): Unable to get the generator service as resource from the catalog");
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("FileBasedFlowManager:createGenerator(): EventGeneratorService found");
        }
        this.eventGenerator = resource.createEventGenerator(eventGeneratorConfiguration);
        if (this.eventGenerator == null) {
            if (LOGGER.isErrorEnabled()) {
                LOGGER.error("FileBasedFlowManager:createGenerator(): Error on EventGenerator creations");
            }
            throw new RuntimeException("FileBasedFlowManager:createGenerator(): Error on EventGenerator creations");
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("FileBasedFlowManager:createGenerator(): FileSystemEventGenerator created");
        }
        this.eventGenerator.addListener(new GeneratorListener());
        this.eventGenerator.start();
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("FileBasedFlowManager:createGenerator(): FileSystemEventGenerator started");
        }
    }

    public synchronized void resume() {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("FileBasedFlowManager::resume(): RESUMING ->" + getId());
        }
        if (!this.started) {
            this.executor.execute(this);
            this.started = true;
            this.paused = false;
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("FileBasedFlowManager::resume(): STARTED ->" + getId());
                return;
            }
            return;
        }
        if (isRunning()) {
            return;
        }
        this.paused = false;
        notify();
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("FileBasedFlowManager::resume(): RESUMED ->" + getId());
        }
    }

    public synchronized boolean pause() {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("FileBasedFlowManager::pause(): PAUSING -> " + getId());
        }
        if (!isRunning()) {
            return true;
        }
        this.paused = true;
        notify();
        return true;
    }

    public synchronized boolean pause(boolean z) {
        pause();
        if (!z) {
            return true;
        }
        Iterator<FileBasedEventConsumer> it2 = this.eventConsumers.iterator();
        while (it2.hasNext()) {
            it2.next().pause(true);
        }
        return true;
    }

    public void reset() {
        LOGGER.info("FileBasedFlowManager: Resetting: " + getId());
    }

    public String toString() {
        return getId();
    }

    public boolean isInited() {
        return this.initialized;
    }

    public boolean isPaused() {
        return this.paused;
    }

    public boolean isTermination() {
        return this.terminationRequest;
    }

    public File getWorkingDirectory() {
        return this.workingDirectory;
    }

    public void setWorkingDirectory(File file) {
        this.workingDirectory = file;
    }

    public EventGenerator<FileSystemEvent> getEventGenerator() {
        return this.eventGenerator;
    }

    public void setEventGenerator(EventGenerator<FileSystemEvent> eventGenerator) {
        this.eventGenerator = eventGenerator;
    }

    public synchronized void setConfiguration(FileBasedFlowConfiguration fileBasedFlowConfiguration) {
        super.setConfiguration(fileBasedFlowConfiguration);
        try {
            initialize(fileBasedFlowConfiguration);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public synchronized void load() throws IOException {
        super.load();
        setName(getConfiguration().getName());
    }

    public synchronized boolean remove() throws IOException {
        return super.remove();
    }

    public synchronized void persist() throws IOException {
        super.persist();
    }

    public boolean isAutorun() {
        return this.autorun;
    }

    public void setAutorun(boolean z) {
        this.autorun = z;
    }

    public List<FileBasedEventConsumer> getEventConsumers() {
        return this.eventConsumers;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void add(FileBasedEventConsumer fileBasedEventConsumer) {
        int size = this.eventConsumers.size();
        if (size >= this.maxStoredConsumers) {
            Iterator<FileBasedEventConsumer> it2 = this.eventConsumers.iterator();
            ArrayList arrayList = new ArrayList();
            while (it2.hasNext() && size >= this.maxStoredConsumers) {
                FileBasedEventConsumer next = it2.next();
                EventConsumerStatus status = next.getStatus();
                if (status == EventConsumerStatus.CANCELED || status == EventConsumerStatus.COMPLETED || status == EventConsumerStatus.FAILED) {
                    next.clear();
                    arrayList.add(next);
                    size--;
                }
            }
            this.eventConsumers.removeAll(arrayList);
            arrayList.clear();
        }
        this.eventConsumers.add(fileBasedEventConsumer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Future<Queue<FileSystemEvent>> execute(FileBasedEventConsumer fileBasedEventConsumer) {
        if (fileBasedEventConsumer.getStatus() != EventConsumerStatus.EXECUTING) {
            String str = "FileBasedFlowManager:execute(): Consumer " + fileBasedEventConsumer + " is not in an EXECUTING state.";
            if (LOGGER.isErrorEnabled()) {
                LOGGER.error(str);
            }
            throw new IllegalStateException(str);
        }
        if (!this.eventConsumers.contains(fileBasedEventConsumer)) {
            String str2 = "FileBasedFlowManager:execute(): Consumer " + fileBasedEventConsumer + " is not handled by the current flow manager.";
            if (LOGGER.isErrorEnabled()) {
                LOGGER.error(str2);
            }
            throw new IllegalArgumentException(str2);
        }
        try {
            return this.executor.submit((Callable) fileBasedEventConsumer);
        } catch (RejectedExecutionException e) {
            if (LOGGER.isErrorEnabled()) {
                LOGGER.error("FileBasedFlowManager:execute(): Unable to submit the consumer (id:" + fileBasedEventConsumer.getId() + ") to the flow manager (id:" + getId() + ") queue.\nMessage is:" + e.getLocalizedMessage() + "\nThread pool executor info:\nMaximum allowed number of threads: " + this.executor.getMaximumPoolSize() + "\nWorking Queue size: " + this.executor.getQueue().size() + "\nWorking Queue remaining capacity: " + this.executor.getQueue().remainingCapacity() + "\nCurrent number of threads: " + this.executor.getPoolSize() + "\nApproximate number of threads that are actively executing : " + this.executor.getActiveCount() + "\nCore number of threads: " + this.executor.getCorePoolSize() + "\nKeepAliveTime [secs]: " + this.executor.getKeepAliveTime(TimeUnit.SECONDS), e);
            }
            throw new RuntimeException(e);
        } catch (Throwable th) {
            if (LOGGER.isErrorEnabled()) {
                LOGGER.error("FileBasedFlowManager:execute(): Unable to submit the consumer (id:" + fileBasedEventConsumer.getId() + ") to the flow manager (id:" + getId() + ") queue.\nMessage is:" + th.getLocalizedMessage(), th);
            }
            throw new RuntimeException(th);
        }
    }

    public void postEvent(FileSystemEvent fileSystemEvent) {
        try {
            this.eventMailBox.put(fileSystemEvent);
        } catch (InterruptedException e) {
            if (LOGGER.isErrorEnabled()) {
                LOGGER.error("FileBasedFlowManager:postEvent(): Unable to add event [" + fileSystemEvent.toString() + "] to the flow manager (id:" + getId() + ") eventMailBox.\nMessage is:" + e.getLocalizedMessage(), e);
            }
            throw new RuntimeException(e);
        } catch (NullPointerException e2) {
            if (LOGGER.isErrorEnabled()) {
                LOGGER.error("FileBasedFlowManager:postEvent(): Unable to add a null event to the flow manager (id:" + getId() + ") eventMailBox.\nMessage is:" + e2.getLocalizedMessage(), e2);
            }
            throw e2;
        }
    }
}
