package it.geosolutions.geobatch.flow.event.consumer;

import it.geosolutions.geobatch.catalog.impl.BaseIdentifiable;
import it.geosolutions.geobatch.catalog.impl.BaseResource;
import it.geosolutions.geobatch.configuration.event.consumer.EventConsumerConfiguration;
import it.geosolutions.geobatch.flow.event.IProgressListener;
import it.geosolutions.geobatch.flow.event.ProgressListenerForwarder;
import it.geosolutions.geobatch.flow.event.action.Action;
import it.geosolutions.geobatch.flow.event.action.ActionException;
import it.geosolutions.geobatch.flow.event.action.BaseAction;
import it.geosolutions.geobatch.misc.PauseHandler;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.EventObject;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.TimeZone;
import java.util.concurrent.Callable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:it/geosolutions/geobatch/flow/event/consumer/BaseEventConsumer.class */
public abstract class BaseEventConsumer<XEO extends EventObject, ECC extends EventConsumerConfiguration> extends BaseResource implements Callable<Queue<XEO>>, EventConsumer<XEO, ECC> {
    private static Logger LOGGER = LoggerFactory.getLogger(BaseEventConsumer.class);
    private final Calendar creationTimestamp;
    private volatile EventConsumerStatus eventConsumerStatus;
    private String runningContext;
    protected final Queue<XEO> eventsQueue;
    protected final List<BaseAction<XEO>> actions;
    protected volatile BaseAction<XEO> currentAction;
    protected final BaseEventConsumer<XEO, ECC>.EventConsumerListenerForwarder listenerForwarder;
    protected PauseHandler pauseHandler;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:it/geosolutions/geobatch/flow/event/consumer/BaseEventConsumer$EventConsumerListenerForwarder.class */
    public class EventConsumerListenerForwarder extends ProgressListenerForwarder {
        protected EventConsumerListenerForwarder(BaseIdentifiable baseIdentifiable) {
            super(baseIdentifiable);
        }

        public void fireStatusChanged(EventConsumerStatus eventConsumerStatus, EventConsumerStatus eventConsumerStatus2) {
            for (IProgressListener iProgressListener : this.listeners) {
                try {
                    if (iProgressListener instanceof EventConsumerListener) {
                        ((EventConsumerListener) iProgressListener).statusChanged(eventConsumerStatus, eventConsumerStatus2);
                    }
                } catch (Exception e) {
                    LOGGER.warn("Exception in event forwarder: " + e);
                }
            }
        }
    }

    public String getRunningContext() {
        return this.runningContext;
    }

    public void setRunningContext(String str) {
        this.runningContext = str;
    }

    public BaseEventConsumer(String str, String str2, String str3) {
        super(str, str2, str3);
        this.creationTimestamp = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
        this.eventsQueue = new LinkedList();
        this.actions = new ArrayList();
        this.currentAction = null;
        this.pauseHandler = new PauseHandler(false);
        this.listenerForwarder = new EventConsumerListenerForwarder(this);
        setStatus(EventConsumerStatus.IDLE);
    }

    public Calendar getCreationTimestamp() {
        return (Calendar) this.creationTimestamp.clone();
    }

    @Override // it.geosolutions.geobatch.flow.event.consumer.EventConsumer
    public EventConsumerStatus getStatus() {
        return this.eventConsumerStatus;
    }

    protected void setStatus(EventConsumerStatus eventConsumerStatus) {
        if (this.eventConsumerStatus != eventConsumerStatus) {
            this.listenerForwarder.fireStatusChanged(this.eventConsumerStatus, eventConsumerStatus);
            this.listenerForwarder.setTask(eventConsumerStatus.toString());
        }
        this.eventConsumerStatus = eventConsumerStatus;
    }

    public Action<XEO> getCurrentAction() {
        return this.currentAction;
    }

    @Override // it.geosolutions.geobatch.flow.event.consumer.EventConsumer
    public boolean consume(XEO xeo) {
        return this.eventsQueue.offer(xeo);
    }

    protected Queue<XEO> applyActions(Queue<XEO> queue) throws ActionException {
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("Applying " + this.actions.size() + " actions on " + queue.size() + " events.");
        }
        int i = 0;
        for (BaseAction<XEO> baseAction : this.actions) {
            try {
                this.pauseHandler.waitUntilResumed();
                this.listenerForwarder.setProgress((100.0f * i) / this.actions.size());
                this.listenerForwarder.setTask("Running " + baseAction.getClass().getSimpleName() + "(" + (i + 1) + "/" + this.actions.size() + ")");
                this.listenerForwarder.progressing();
                baseAction.setRunningContext(getRunningContext());
                this.currentAction = baseAction;
                queue = baseAction.execute(queue);
            } catch (ActionException e) {
                if (LOGGER.isErrorEnabled()) {
                    LOGGER.error(e.getLocalizedMessage(), e);
                }
                this.listenerForwarder.setTask("Action " + baseAction.getClass().getSimpleName() + " failed (" + e + ")");
                this.listenerForwarder.progressing();
                if (!this.currentAction.isFailIgnored()) {
                    queue.clear();
                    throw e;
                }
            } catch (Exception e2) {
                if (LOGGER.isErrorEnabled()) {
                    LOGGER.error("Action threw an unhandled exception: " + e2.getLocalizedMessage(), e2);
                }
                this.listenerForwarder.setTask("Action " + baseAction.getClass().getSimpleName() + " failed (" + e2 + ")");
                this.listenerForwarder.progressing();
                if (!this.currentAction.isFailIgnored()) {
                    if (queue == null) {
                        throw new IllegalArgumentException("Action " + baseAction.getClass().getSimpleName() + " left no event in queue.");
                    }
                    queue.clear();
                    throw new ActionException(this.currentAction, e2.getMessage(), e2);
                }
            }
            if (queue == null) {
                throw new IllegalArgumentException("Action " + baseAction.getClass().getSimpleName() + " left no event in queue.");
                break;
            }
            if (queue.isEmpty() && LOGGER.isWarnEnabled()) {
                LOGGER.warn("Action " + baseAction.getClass().getSimpleName() + " left no event in queue.");
            }
            i++;
        }
        if (queue != null && !queue.isEmpty()) {
            LOGGER.info("There are " + queue.size() + " events left in the queue after last action (" + this.currentAction.getClass().getSimpleName() + ")");
        }
        return queue;
    }

    @Override // it.geosolutions.geobatch.flow.Job
    public boolean pause() {
        this.pauseHandler.pause();
        setStatus(EventConsumerStatus.PAUSED);
        return true;
    }

    @Override // it.geosolutions.geobatch.flow.Job
    public boolean pause(boolean z) {
        EventConsumerStatus status = getStatus();
        if (!status.equals(EventConsumerStatus.EXECUTING) && !status.equals(EventConsumerStatus.WAITING) && !status.equals(EventConsumerStatus.IDLE)) {
            if (!LOGGER.isInfoEnabled()) {
                return true;
            }
            LOGGER.info("Consumer " + getName() + " [" + this.creationTimestamp + "] is already in state: " + getStatus());
            return true;
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Pausing consumer " + getName() + " [" + this.creationTimestamp + "]");
        }
        this.pauseHandler.pause();
        setStatus(EventConsumerStatus.PAUSED);
        if (this.currentAction == null) {
            return true;
        }
        LOGGER.info("Pausing action " + this.currentAction.getClass().getSimpleName() + " in consumer " + getName() + " [" + this.creationTimestamp + "]");
        this.currentAction.pause();
        return true;
    }

    @Override // it.geosolutions.geobatch.flow.Job
    public void resume() {
        LOGGER.info("Resuming consumer " + getName() + " [" + this.creationTimestamp + "]");
        if (this.currentAction != null) {
            LOGGER.info("Resuming action " + this.currentAction.getClass().getSimpleName() + " in consumer " + getName() + " [" + this.creationTimestamp + "]");
            this.currentAction.resume();
        }
        this.pauseHandler.resume();
        setStatus(EventConsumerStatus.EXECUTING);
    }

    @Override // it.geosolutions.geobatch.flow.Job
    public boolean isPaused() {
        return this.pauseHandler.isPaused();
    }

    public List<BaseAction<XEO>> getActions() {
        return this.actions;
    }

    protected void addActions(List<BaseAction<XEO>> list) {
        this.actions.addAll(list);
    }

    @Override // it.geosolutions.geobatch.catalog.impl.BaseResource, it.geosolutions.geobatch.catalog.Resource
    public void dispose() {
        this.eventsQueue.clear();
    }

    @Override // it.geosolutions.geobatch.misc.ListenerRegistry
    public synchronized void addListener(EventConsumerListener eventConsumerListener) {
        this.listenerForwarder.addListener((IProgressListener) eventConsumerListener);
    }

    @Override // it.geosolutions.geobatch.misc.ListenerRegistry
    public synchronized void removeListener(EventConsumerListener eventConsumerListener) {
        this.listenerForwarder.removeListener((IProgressListener) eventConsumerListener);
    }

    protected ProgressListenerForwarder getListenerForwarder() {
        return this.listenerForwarder;
    }

    public IProgressListener getProgressListener(Class<IProgressListener> cls) {
        for (IProgressListener iProgressListener : getListenerForwarder().getListeners()) {
            if (cls.isAssignableFrom(iProgressListener.getClass())) {
                return iProgressListener;
            }
        }
        return null;
    }
}
