package it.geosolutions.geobatch.flow.file;

import it.geosolutions.filesystemmonitor.monitor.FileSystemEvent;
import it.geosolutions.geobatch.catalog.dao.DAO;
import it.geosolutions.geobatch.catalog.file.DataDirHandler;
import it.geosolutions.geobatch.catalog.impl.BasePersistentResource;
import it.geosolutions.geobatch.configuration.event.consumer.EventConsumerConfiguration;
import it.geosolutions.geobatch.configuration.event.generator.EventGeneratorConfiguration;
import it.geosolutions.geobatch.configuration.event.generator.file.FileBasedEventGeneratorConfiguration;
import it.geosolutions.geobatch.configuration.flow.file.FileBasedFlowConfiguration;
import it.geosolutions.geobatch.flow.FlowManager;
import it.geosolutions.geobatch.flow.Job;
import it.geosolutions.geobatch.flow.event.IProgressListener;
import it.geosolutions.geobatch.flow.event.consumer.BaseEventConsumer;
import it.geosolutions.geobatch.flow.event.consumer.EventConsumer;
import it.geosolutions.geobatch.flow.event.consumer.EventConsumerStatus;
import it.geosolutions.geobatch.flow.event.consumer.file.FileBasedEventConsumer;
import it.geosolutions.geobatch.flow.event.generator.EventGenerator;
import it.geosolutions.geobatch.flow.event.generator.EventGeneratorService;
import it.geosolutions.geobatch.flow.event.generator.FlowEventListener;
import it.geosolutions.geobatch.global.CatalogHolder;
import it.geosolutions.geobatch.settings.GBSettings;
import it.geosolutions.geobatch.settings.GBSettingsCatalog;
import it.geosolutions.geobatch.settings.flow.FlowSettings;
import java.io.File;
import java.io.FileNotFoundException;
import java.util.Calendar;
import java.util.Comparator;
import java.util.EventObject;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.set.UnmodifiableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jmx.export.annotation.ManagedAttribute;

/* loaded from: input_file:it/geosolutions/geobatch/flow/file/FileBasedFlowManager.class */
public class FileBasedFlowManager extends BasePersistentResource<FileBasedFlowConfiguration> implements FlowManager<FileSystemEvent, FileBasedFlowConfiguration>, Runnable, Job {
    private static final Logger LOGGER = LoggerFactory.getLogger(FlowManager.class);
    private String name;
    private String description;
    private boolean autorun;
    private boolean initialized;
    private boolean started;
    private boolean paused;
    private boolean terminationRequest;
    private final BlockingQueue<FileSystemEvent> eventMailBox;
    private FileBasedEventDispatcher dispatcher;
    private EventGenerator<FileSystemEvent> eventGenerator;
    private final ConcurrentMap<String, EventConsumer> eventConsumers;
    private File flowConfigDir;
    private File flowTempDir;
    private Integer maxStoredConsumers;
    private Boolean keepConsumers;
    private ThreadPoolExecutor executor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:it/geosolutions/geobatch/flow/file/FileBasedFlowManager$GeneratorListener.class */
    public class GeneratorListener implements FlowEventListener<FileSystemEvent> {
        private GeneratorListener() {
        }

        public void eventGenerated(FileSystemEvent fileSystemEvent) {
            FileBasedFlowManager.this.postEvent(fileSystemEvent);
        }
    }

    public FileBasedFlowManager(FileBasedFlowConfiguration fileBasedFlowConfiguration, DataDirHandler dataDirHandler) throws Exception {
        super(fileBasedFlowConfiguration.getId());
        this.autorun = false;
        this.started = false;
        this.eventMailBox = new LinkedBlockingQueue();
        this.eventConsumers = new ConcurrentHashMap();
        initialize(fileBasedFlowConfiguration, dataDirHandler.getBaseConfigDirectory(), dataDirHandler.getBaseTempDirectory());
        super.setConfiguration(fileBasedFlowConfiguration);
    }

    public FileBasedFlowManager(String str, DAO dao, DataDirHandler dataDirHandler) throws Exception {
        super(str);
        this.autorun = false;
        this.started = false;
        this.eventMailBox = new LinkedBlockingQueue();
        this.eventConsumers = new ConcurrentHashMap();
        setDAO(dao);
        load();
        initialize((FileBasedFlowConfiguration) getConfiguration(), dataDirHandler.getBaseConfigDirectory(), dataDirHandler.getBaseTempDirectory());
    }

    public FileBasedFlowManager(String str, String str2, String str3, DataDirHandler dataDirHandler) throws Exception {
        super(str);
        this.autorun = false;
        this.started = false;
        this.eventMailBox = new LinkedBlockingQueue();
        this.eventConsumers = new ConcurrentHashMap();
        this.name = str2;
        this.description = str3;
        initialize(new FileBasedFlowConfiguration(str, str2, str3, null, null), dataDirHandler.getBaseConfigDirectory(), dataDirHandler.getBaseTempDirectory());
    }

    @ManagedAttribute
    public ThreadPoolExecutor getExecutor() {
        return this.executor;
    }

    private void initialize(FileBasedFlowConfiguration fileBasedFlowConfiguration, File file, File file2) throws Exception, NullPointerException {
        GBSettings flowSettings;
        this.initialized = false;
        this.name = fileBasedFlowConfiguration.getName();
        this.description = fileBasedFlowConfiguration.getDescription();
        this.flowConfigDir = initConfigDir(fileBasedFlowConfiguration, file);
        this.flowTempDir = initTempDir(fileBasedFlowConfiguration, file2);
        GBSettingsCatalog settingsCatalog = CatalogHolder.getSettingsCatalog();
        GBSettings find = settingsCatalog.find("FLOW");
        if (find == null || !(find instanceof FlowSettings)) {
            flowSettings = new FlowSettings();
            settingsCatalog.save(flowSettings);
        } else {
            flowSettings = (FlowSettings) find;
        }
        this.keepConsumers = fileBasedFlowConfiguration.isKeepConsumers();
        if (flowSettings.isKeepConsumers() && this.keepConsumers == null) {
            this.keepConsumers = true;
        } else {
            this.keepConsumers = false;
        }
        this.maxStoredConsumers = fileBasedFlowConfiguration.getMaxStoredConsumers();
        if (this.maxStoredConsumers == null || this.maxStoredConsumers.intValue() < 1) {
            this.maxStoredConsumers = Integer.valueOf(flowSettings.getMaxStoredConsumers());
        }
        this.executor = new ThreadPoolExecutor(fileBasedFlowConfiguration.getCorePoolSize() > 0 ? fileBasedFlowConfiguration.getCorePoolSize() : flowSettings.getCorePoolSize(), fileBasedFlowConfiguration.getMaximumPoolSize() > 0 ? fileBasedFlowConfiguration.getMaximumPoolSize() : flowSettings.getMaximumPoolSize(), fileBasedFlowConfiguration.getKeepAliveTime() > 0 ? fileBasedFlowConfiguration.getKeepAliveTime() : flowSettings.getKeepAliveTime(), TimeUnit.SECONDS, new ArrayBlockingQueue(fileBasedFlowConfiguration.getWorkQueueSize() > 0 ? fileBasedFlowConfiguration.getWorkQueueSize() : flowSettings.getWorkQueueSize()));
        this.paused = false;
        this.terminationRequest = false;
        this.autorun = fileBasedFlowConfiguration.isAutorun();
        if (this.autorun) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Automatic Flow Startup for '" + getId() + "'");
            }
            resume();
        }
    }

    public static File initConfigDir(FileBasedFlowConfiguration fileBasedFlowConfiguration, File file) throws FileNotFoundException {
        File file2;
        File overrideConfigDir = fileBasedFlowConfiguration.getOverrideConfigDir();
        if (overrideConfigDir == null) {
            file2 = new File(file, fileBasedFlowConfiguration.getId());
            if (!file2.exists() && LOGGER.isDebugEnabled()) {
                LOGGER.debug("Default config dir does not exist: " + file2);
            }
        } else {
            file2 = !overrideConfigDir.isAbsolute() ? new File(file, overrideConfigDir.getPath()) : overrideConfigDir;
            if (!file2.isDirectory() || !file2.canRead()) {
                throw new FileNotFoundException("Unable to locate the overriden configDir: " + file2);
            }
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Flow: " + fileBasedFlowConfiguration.getId() + " - conf dir is now set to -> " + file2);
        }
        return file2;
    }

    public static File initTempDir(FileBasedFlowConfiguration fileBasedFlowConfiguration, File file) throws FileNotFoundException {
        File file2;
        File overrideTempDir = fileBasedFlowConfiguration.getOverrideTempDir();
        if (overrideTempDir != null) {
            file2 = overrideTempDir;
            if (!file2.isAbsolute()) {
                throw new IllegalStateException("Override temp dir must be an absolute path (" + overrideTempDir + ")");
            }
        } else {
            file2 = new File(file, fileBasedFlowConfiguration.getId());
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("FlowBaseTempDir = " + file2);
            }
        }
        if ((!file2.mkdir() && !file2.exists()) || !file2.canWrite()) {
            throw new IllegalStateException("Can't write temp dir (" + file2 + ")");
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Flow: " + fileBasedFlowConfiguration.getId() + " - temp dir is now set to -> " + file2);
        }
        return file2;
    }

    public synchronized void dispose() {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("dispose: " + getId());
        }
        this.terminationRequest = true;
        notify();
    }

    public boolean isRunning() {
        return !this.paused && this.started;
    }

    public void disposeConsumer(EventConsumer eventConsumer) throws IllegalArgumentException {
        if (eventConsumer == null) {
            throw new IllegalArgumentException("Unable to dispose a null consumer object");
        }
        disposeConsumer(eventConsumer.getId());
    }

    public void disposeConsumer(String str) throws IllegalArgumentException {
        if (str == null) {
            throw new IllegalArgumentException("Unable to dispose a null consumer object");
        }
        EventConsumer remove = this.eventConsumers.remove(str);
        if (remove == null) {
            throw new IllegalArgumentException("This flow is not managing consumer: " + str);
        }
        if (remove.getStatus() != EventConsumerStatus.COMPLETED && remove.getStatus() != EventConsumerStatus.FAILED) {
            if (LOGGER.isWarnEnabled()) {
                LOGGER.warn("Goning to dispose and uncompleted consumer " + remove);
            }
            remove.cancel();
        }
        remove.dispose();
        if (this.dispatcher != null) {
            synchronized (this.dispatcher) {
                this.dispatcher.notify();
            }
        }
    }

    @Override // java.lang.Runnable
    public synchronized void run() {
        while (!this.terminationRequest) {
            while (this.paused) {
                try {
                    if (this.initialized && this.eventGenerator != null && this.eventGenerator.isRunning()) {
                        this.eventGenerator.pause();
                    }
                    wait();
                    if (this.terminationRequest) {
                        break;
                    }
                } catch (InterruptedException e) {
                    String str = "Error on dispatcher initialization: " + e.getLocalizedMessage();
                    LOGGER.error(str);
                    throw new RuntimeException(str, e);
                }
            }
            if (!this.initialized) {
                this.dispatcher = new FileBasedEventDispatcher(this, this.eventMailBox);
                this.dispatcher.start();
                this.initialized = true;
            }
            while (!this.paused) {
                try {
                    if (this.initialized) {
                        if (this.eventGenerator == null) {
                            try {
                                createGenerator();
                            } catch (Exception e2) {
                                String str2 = "Error on FS-Monitor initialization for '" + this.name + "': " + e2.getLocalizedMessage();
                                LOGGER.error(str2, e2);
                                throw new RuntimeException(str2, e2);
                            }
                        } else {
                            this.eventGenerator.start();
                        }
                    }
                    wait();
                    if (this.terminationRequest) {
                        break;
                    }
                } catch (InterruptedException e3) {
                    LOGGER.error("FlowManager cycle exception: " + e3.getLocalizedMessage(), e3);
                    throw new RuntimeException(e3);
                }
            }
        }
        if (this.initialized) {
            this.dispatcher.shutdown();
            this.eventGenerator.dispose();
            this.initialized = false;
        }
        this.paused = true;
    }

    private void createGenerator() {
        EventGeneratorConfiguration eventGeneratorConfiguration = getConfiguration().getEventGeneratorConfiguration();
        if (eventGeneratorConfiguration == null) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Unable to create a null event generator. Please configure one.");
                return;
            }
            return;
        }
        String serviceID = eventGeneratorConfiguration.getServiceID();
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("EventGeneratorCreationServiceID: " + serviceID);
        }
        EventGeneratorService resource = CatalogHolder.getCatalog().getResource(serviceID, EventGeneratorService.class);
        if (resource == null) {
            String str = "Unable to get the " + serviceID + " generator service as resource from the catalog";
            if (LOGGER.isErrorEnabled()) {
                LOGGER.error(str);
            }
            throw new RuntimeException(str);
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("EventGeneratorService found");
        }
        this.eventGenerator = resource.createEventGenerator(eventGeneratorConfiguration);
        if (this.eventGenerator == null) {
            if (LOGGER.isErrorEnabled()) {
                LOGGER.error("Error on EventGenerator creations");
            }
            throw new RuntimeException("Error on EventGenerator creations");
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("FileSystemEventGenerator created (" + getId() + ")");
        }
        this.eventGenerator.addListener(new GeneratorListener());
        this.eventGenerator.start();
        if (LOGGER.isInfoEnabled()) {
            if (eventGeneratorConfiguration instanceof FileBasedEventGeneratorConfiguration) {
                LOGGER.info("FileSystemEventGenerator started on " + ((FileBasedEventGeneratorConfiguration) eventGeneratorConfiguration).getWatchDirectory());
            } else {
                LOGGER.info("FileSystemEventGenerator started");
            }
        }
    }

    public synchronized void resume() {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("RESUMING ->" + getId());
        }
        if (!this.started) {
            this.executor.execute(this);
            this.started = true;
            this.paused = false;
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("STARTED ->" + getId());
                return;
            }
            return;
        }
        if (isRunning()) {
            return;
        }
        this.paused = false;
        notify();
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("RESUMED ->" + getId());
        }
    }

    public synchronized boolean pause() {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("PAUSING -> " + getId());
        }
        if (!isRunning()) {
            return true;
        }
        this.paused = true;
        notify();
        return true;
    }

    public synchronized boolean pause(boolean z) {
        pause();
        if (!z) {
            return true;
        }
        for (String str : this.eventConsumers.keySet()) {
            EventConsumer eventConsumer = this.eventConsumers.get(str);
            if (eventConsumer != null) {
                eventConsumer.pause(true);
            } else {
                this.eventConsumers.remove(str);
            }
        }
        return true;
    }

    public void reset() {
        LOGGER.info("Resetting: " + getId());
    }

    public String toString() {
        return getId();
    }

    public boolean isInited() {
        return this.initialized;
    }

    public boolean isPaused() {
        return this.paused;
    }

    public boolean isTermination() {
        return this.terminationRequest;
    }

    public EventGenerator<FileSystemEvent> getEventGenerator() {
        return this.eventGenerator;
    }

    public void setEventGenerator(EventGenerator<FileSystemEvent> eventGenerator) {
        this.eventGenerator = eventGenerator;
    }

    public String getDescription() {
        return this.description;
    }

    public void setDescription(String str) {
        this.description = str;
    }

    public String getName() {
        return this.name;
    }

    public void setName(String str) {
        this.name = str;
    }

    public boolean isAutorun() {
        return this.autorun;
    }

    public void setAutorun(boolean z) {
        this.autorun = z;
    }

    public final File getFlowConfigDir() {
        return this.flowConfigDir;
    }

    public final File getFlowTempDir() {
        return this.flowTempDir;
    }

    public final Set<BaseEventConsumer> getEventConsumers() {
        TreeSet treeSet = new TreeSet(new Comparator<BaseEventConsumer<EventObject, EventConsumerConfiguration>>() { // from class: it.geosolutions.geobatch.flow.file.FileBasedFlowManager.1
            @Override // java.util.Comparator
            public int compare(BaseEventConsumer<EventObject, EventConsumerConfiguration> baseEventConsumer, BaseEventConsumer<EventObject, EventConsumerConfiguration> baseEventConsumer2) {
                Calendar creationTimestamp = baseEventConsumer.getCreationTimestamp();
                Calendar creationTimestamp2 = baseEventConsumer2.getCreationTimestamp();
                if (creationTimestamp.before(creationTimestamp2)) {
                    return 1;
                }
                return creationTimestamp.after(creationTimestamp2) ? -1 : 0;
            }
        });
        treeSet.addAll(this.eventConsumers.values());
        return treeSet;
    }

    /* renamed from: getEventConsumersId, reason: merged with bridge method [inline-methods] */
    public final Set<String> m15getEventConsumersId() {
        return UnmodifiableSet.decorate(this.eventConsumers.keySet());
    }

    public final EventConsumer<? extends EventObject, EventConsumerConfiguration> getConsumer(String str) {
        return this.eventConsumers.get(str);
    }

    public synchronized boolean addConsumer(EventConsumer eventConsumer) throws IllegalArgumentException {
        if (eventConsumer == null) {
            throw new IllegalArgumentException("Unable to add a null consumer");
        }
        if (this.maxStoredConsumers.intValue() - this.eventConsumers.size() > 0) {
            this.eventConsumers.put(eventConsumer.getId(), eventConsumer);
            return true;
        }
        if (purgeConsumers(1) <= 0) {
            return false;
        }
        this.eventConsumers.put(eventConsumer.getId(), eventConsumer);
        return true;
    }

    public EventConsumerStatus getStatus(String str) {
        EventConsumer eventConsumer = this.eventConsumers.get(str);
        if (eventConsumer != null) {
            return eventConsumer.getStatus();
        }
        return null;
    }

    public int purgeConsumers(int i) {
        int i2 = 0;
        if (this.keepConsumers.booleanValue()) {
            return 0;
        }
        EventConsumer[] eventConsumerArr = (BaseEventConsumer[]) getEventConsumers().toArray(new BaseEventConsumer[0]);
        int length = eventConsumerArr.length;
        while (true) {
            length--;
            if (length < 0 || i2 >= i) {
                break;
            }
            EventConsumer eventConsumer = eventConsumerArr[length];
            EventConsumerStatus status = eventConsumer.getStatus();
            if (status == EventConsumerStatus.CANCELED || status == EventConsumerStatus.COMPLETED || status == EventConsumerStatus.FAILED) {
                disposeConsumer(eventConsumer);
                eventConsumerArr[length] = null;
                i2++;
            }
        }
        return i2;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Future<Queue<FileSystemEvent>> execute(FileBasedEventConsumer fileBasedEventConsumer) throws Exception {
        if (fileBasedEventConsumer.getStatus() != EventConsumerStatus.EXECUTING) {
            String str = "Consumer " + fileBasedEventConsumer + " is not in an EXECUTING state.";
            if (LOGGER.isErrorEnabled()) {
                LOGGER.error(str);
            }
            throw new IllegalStateException(str);
        }
        if (!this.eventConsumers.containsKey(fileBasedEventConsumer.getId())) {
            String str2 = "Consumer " + fileBasedEventConsumer + " is not handled by the current flow manager.";
            if (LOGGER.isErrorEnabled()) {
                LOGGER.error(str2);
            }
            throw new IllegalArgumentException(str2);
        }
        try {
            return this.executor.submit((Callable) fileBasedEventConsumer);
        } catch (RejectedExecutionException e) {
            if (LOGGER.isErrorEnabled()) {
                LOGGER.error("Unable to submit the consumer (id:" + fileBasedEventConsumer.getId() + ") to the flow manager (id:" + getId() + ") queue.\nMessage is:" + e.getLocalizedMessage() + "\nThread pool executor info:\nMaximum allowed number of threads: " + this.executor.getMaximumPoolSize() + "\nWorking Queue size: " + this.executor.getQueue().size() + "\nWorking Queue remaining capacity: " + this.executor.getQueue().remainingCapacity() + "\nCurrent number of threads: " + this.executor.getPoolSize() + "\nApproximate number of threads that are actively executing : " + this.executor.getActiveCount() + "\nCore number of threads: " + this.executor.getCorePoolSize() + "\nKeepAliveTime [secs]: " + this.executor.getKeepAliveTime(TimeUnit.SECONDS), e);
            }
            Iterator it2 = fileBasedEventConsumer.getListeners().iterator();
            while (it2.hasNext()) {
                ((IProgressListener) it2.next()).failed(e);
            }
            throw e;
        } catch (Exception e2) {
            if (LOGGER.isErrorEnabled()) {
                LOGGER.error("Unable to submit the consumer (id:" + fileBasedEventConsumer.getId() + ") to the flow manager (id:" + getId() + ") queue.\nMessage is:" + e2.getLocalizedMessage(), e2);
            }
            throw e2;
        }
    }

    public void postEvent(FileSystemEvent fileSystemEvent) {
        try {
            if (!this.eventMailBox.offer(fileSystemEvent) && LOGGER.isErrorEnabled()) {
                LOGGER.error("--------------------------------------------------------------------");
                LOGGER.error("Unable to add the event to the eventMailBox. (Flow id:" + getId() + ").\nEvent source: " + fileSystemEvent.getSource() + "\nMailBox size: " + this.eventMailBox.size() + "\nPlease check your configuration.");
                LOGGER.error("--------------------------------------------------------------------");
            }
        } catch (NullPointerException e) {
            if (LOGGER.isErrorEnabled()) {
                LOGGER.error("Unable to add a null event to the flow manager (id:" + getId() + ") eventMailBox.\nMessage is:" + e.getLocalizedMessage(), e);
            }
            throw e;
        }
    }
}
