package it.geosolutions.geobatch.flow.file;

import it.geosolutions.filesystemmonitor.monitor.FileSystemEvent;
import it.geosolutions.geobatch.catalog.Identifiable;
import it.geosolutions.geobatch.configuration.event.consumer.EventConsumerConfiguration;
import it.geosolutions.geobatch.configuration.event.consumer.file.FileBasedEventConsumerConfiguration;
import it.geosolutions.geobatch.flow.event.IProgressListener;
import it.geosolutions.geobatch.flow.event.consumer.EventConsumer;
import it.geosolutions.geobatch.flow.event.consumer.EventConsumerStatus;
import it.geosolutions.geobatch.flow.event.consumer.file.FileBasedEventConsumer;
import java.util.EventObject;
import java.util.concurrent.BlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:it/geosolutions/geobatch/flow/file/FileBasedEventDispatcher.class */
class FileBasedEventDispatcher extends Thread {
    private static final Logger LOGGER = LoggerFactory.getLogger(FileBasedEventDispatcher.class);
    private final BlockingQueue<FileSystemEvent> eventMailBox;
    private final FileBasedFlowManager flowManager;

    /* loaded from: input_file:it/geosolutions/geobatch/flow/file/FileBasedEventDispatcher$DispatcherListener.class */
    class DispatcherListener implements IProgressListener {
        EventConsumer<EventObject, EventConsumerConfiguration> consumer;
        FileBasedEventDispatcher dispatcher;

        public DispatcherListener(EventConsumer<EventObject, EventConsumerConfiguration> eventConsumer, FileBasedEventDispatcher fileBasedEventDispatcher) {
            this.consumer = eventConsumer;
            this.dispatcher = fileBasedEventDispatcher;
        }

        public void failed(Throwable th) {
            if (FileBasedEventDispatcher.LOGGER.isDebugEnabled()) {
                FileBasedEventDispatcher.LOGGER.debug("Consumer has failed the job notifying the dispatcher");
            }
            synchronized (this.dispatcher) {
                this.dispatcher.notify();
            }
        }

        public void completed() {
            if (FileBasedEventDispatcher.LOGGER.isDebugEnabled()) {
                FileBasedEventDispatcher.LOGGER.debug("Consumer has completed the job notifying the dispatcher");
            }
            synchronized (this.dispatcher) {
                this.dispatcher.notify();
            }
        }

        public void terminated() {
            if (FileBasedEventDispatcher.LOGGER.isDebugEnabled()) {
                FileBasedEventDispatcher.LOGGER.debug("Consumer has completed the job notifying the dispatcher");
            }
            synchronized (this.dispatcher) {
                this.dispatcher.notify();
            }
        }

        public void started() {
        }

        public void setTask(String str) {
        }

        public void setProgress(float f) {
        }

        public void resumed() {
        }

        public void progressing() {
        }

        public void paused() {
        }

        public String getTask() {
            return null;
        }

        public float getProgress() {
            return 0.0f;
        }

        public Identifiable getOwner() {
            return this.consumer;
        }
    }

    public FileBasedEventDispatcher(FileBasedFlowManager fileBasedFlowManager, BlockingQueue<FileSystemEvent> blockingQueue) {
        super("FileBasedEventDispatcher: EventDispatcherThread-" + fileBasedFlowManager.getId());
        this.eventMailBox = blockingQueue;
        this.flowManager = fileBasedFlowManager;
        setDaemon(true);
        interrupted();
    }

    public void shutdown() {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Shutting down the dispatcher ... NOW!");
        }
        interrupt();
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        try {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Ready to dispatch Events to flow " + this.flowManager.getId() + "(" + this.flowManager.getName() + ")");
            }
            while (!isInterrupted()) {
                try {
                    FileSystemEvent take = this.eventMailBox.take();
                    FileBasedEventConsumer fileBasedEventConsumer = new FileBasedEventConsumer(((FileBasedEventConsumerConfiguration) this.flowManager.getConfiguration().getEventConsumerConfiguration()).m5clone(), this.flowManager.getFlowConfigDir(), this.flowManager.getFlowTempDir());
                    fileBasedEventConsumer.setFlowName(this.flowManager.getName());
                    fileBasedEventConsumer.addListener(new DispatcherListener(fileBasedEventConsumer, this));
                    while (!this.flowManager.addConsumer(fileBasedEventConsumer)) {
                        if (LOGGER.isInfoEnabled()) {
                            LOGGER.info("Waiting for free space in consumer map to start consumer " + fileBasedEventConsumer + ")");
                        }
                        synchronized (this) {
                            wait();
                        }
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug("Dispatcher was notified by the FlowManager.");
                        }
                    }
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("Processing incoming event " + take);
                    }
                    if (fileBasedEventConsumer.consume(take)) {
                        if (fileBasedEventConsumer.getStatus() == EventConsumerStatus.EXECUTING) {
                            if (LOGGER.isDebugEnabled()) {
                                LOGGER.debug(take + " was the only needed event for " + fileBasedEventConsumer);
                            }
                            this.flowManager.execute(fileBasedEventConsumer);
                        } else if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug(fileBasedEventConsumer + " created on event " + take);
                        }
                    } else if (LOGGER.isErrorEnabled()) {
                        LOGGER.error("---------------------------------------------------------------");
                        LOGGER.error("The consumer could serve " + take + " (neither " + fileBasedEventConsumer + " could)");
                        LOGGER.error("---------------------------------------------------------------");
                    }
                } catch (InterruptedException e) {
                    LOGGER.error(e.getLocalizedMessage(), e);
                    interrupt();
                    return;
                }
            }
        } catch (InterruptedException e2) {
            LOGGER.error(e2.getLocalizedMessage(), e2);
        } catch (Exception e3) {
            LOGGER.error(e3.getLocalizedMessage(), e3);
        }
    }
}
