package monasca.thresh.infrastructure.thresholding;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import monasca.common.configuration.KafkaProducerConfiguration;
import monasca.common.model.alarm.AlarmState;
import monasca.common.model.alarm.AlarmSubExpression;
import monasca.common.model.event.AlarmDefinitionUpdatedEvent;
import monasca.common.model.event.AlarmStateTransitionedEvent;
import monasca.common.model.event.AlarmUpdatedEvent;
import monasca.common.util.Injector;
import monasca.common.util.Serialization;
import monasca.thresh.domain.model.Alarm;
import monasca.thresh.domain.model.AlarmDefinition;
import monasca.thresh.domain.model.MetricDefinitionAndTenantId;
import monasca.thresh.domain.model.SubAlarm;
import monasca.thresh.domain.model.SubExpression;
import monasca.thresh.domain.service.AlarmDAO;
import monasca.thresh.domain.service.AlarmDefinitionDAO;
import monasca.thresh.infrastructure.persistence.PersistenceModule;
import monasca.thresh.utils.Logging;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:monasca/thresh/infrastructure/thresholding/AlarmThresholdingBolt.class */
public class AlarmThresholdingBolt extends BaseRichBolt {
    private static final long serialVersionUID = -4126465124017857754L;
    private transient Logger logger;
    private DataSourceFactory dbConfig;
    private KafkaProducerConfiguration producerConfiguration;
    final Map<String, Alarm> alarms = new HashMap();
    final Map<String, AlarmDefinition> alarmDefinitions = new HashMap();
    private transient AlarmDAO alarmDAO;
    private transient AlarmDefinitionDAO alarmDefinitionDAO;
    private transient AlarmEventForwarder alarmEventForwarder;
    private OutputCollector collector;

    public AlarmThresholdingBolt(DataSourceFactory dataSourceFactory, KafkaProducerConfiguration kafkaProducerConfiguration) {
        this.dbConfig = dataSourceFactory;
        this.producerConfiguration = kafkaProducerConfiguration;
    }

    public AlarmThresholdingBolt(AlarmDAO alarmDAO, AlarmDefinitionDAO alarmDefinitionDAO, AlarmEventForwarder alarmEventForwarder) {
        this.alarmDAO = alarmDAO;
        this.alarmDefinitionDAO = alarmDefinitionDAO;
        this.alarmEventForwarder = alarmEventForwarder;
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    }

    public void execute(Tuple tuple) {
        this.logger.debug("tuple: {}", tuple);
        try {
            try {
                if ("default".equals(tuple.getSourceStreamId())) {
                    Alarm orCreateAlarm = getOrCreateAlarm(tuple.getString(0));
                    if (orCreateAlarm == null) {
                        this.collector.ack(tuple);
                        return;
                    }
                    evaluateThreshold(orCreateAlarm, (SubAlarm) tuple.getValue(1));
                } else if (EventProcessingBolt.ALARM_EVENT_STREAM_ID.equals(tuple.getSourceStreamId())) {
                    String string = tuple.getString(0);
                    String string2 = tuple.getString(1);
                    if (EventProcessingBolt.DELETED.equals(string)) {
                        handleAlarmDeleted(string2);
                    } else if (EventProcessingBolt.UPDATED.equals(string)) {
                        handleAlarmUpdated(string2, (AlarmUpdatedEvent) tuple.getValue(2));
                    }
                } else if (EventProcessingBolt.ALARM_DEFINITION_EVENT_STREAM_ID.equals(tuple.getSourceStreamId())) {
                    if (EventProcessingBolt.UPDATED.equals(tuple.getString(0))) {
                        handle((AlarmDefinitionUpdatedEvent) tuple.getValue(1));
                    }
                } else if (EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_ID.equals(tuple.getSourceStreamId()) && EventProcessingBolt.UPDATED.equals(tuple.getString(0))) {
                    handleAlarmSubExpressionUpdated((SubExpression) tuple.getValue(1), tuple.getString(2));
                }
                this.collector.ack(tuple);
            } catch (Exception e) {
                this.logger.error("Error processing tuple {}", tuple, e);
                this.collector.ack(tuple);
            }
        } catch (Throwable th) {
            this.collector.ack(tuple);
            throw th;
        }
    }

    private void handleAlarmSubExpressionUpdated(SubExpression subExpression, String str) {
        int i = 0;
        for (Alarm alarm : this.alarms.values()) {
            if (alarm.getAlarmDefinitionId().equals(str)) {
                for (SubAlarm subAlarm : alarm.getSubAlarms()) {
                    if (subAlarm.getAlarmSubExpressionId().equals(subExpression.getId())) {
                        subAlarm.setExpression(subExpression.getAlarmSubExpression());
                        i++;
                    }
                }
            }
        }
        this.logger.debug("Updated {} SubAlarms", Integer.valueOf(i));
    }

    private void handle(AlarmDefinitionUpdatedEvent alarmDefinitionUpdatedEvent) {
        AlarmDefinition alarmDefinition = this.alarmDefinitions.get(alarmDefinitionUpdatedEvent.alarmDefinitionId);
        if (alarmDefinition == null) {
            this.logger.debug("Update of AlarmDefinition {} skipped. Not in use by this bolt", alarmDefinitionUpdatedEvent.alarmDefinitionId);
            return;
        }
        this.logger.info("Updating AlarmDefinition {}", alarmDefinitionUpdatedEvent.alarmDefinitionId);
        alarmDefinition.setName(alarmDefinitionUpdatedEvent.alarmName);
        alarmDefinition.setDescription(alarmDefinitionUpdatedEvent.alarmDescription);
        alarmDefinition.setSeverity(alarmDefinitionUpdatedEvent.severity);
        alarmDefinition.setActionsEnabled(alarmDefinitionUpdatedEvent.alarmActionsEnabled);
        alarmDefinition.setExpression(alarmDefinitionUpdatedEvent.alarmExpression);
        for (Map.Entry<String, AlarmSubExpression> entry : alarmDefinitionUpdatedEvent.changedSubExpressions.entrySet()) {
            if (!alarmDefinition.updateSubExpression(entry.getKey(), entry.getValue())) {
                this.logger.error("AlarmDefinition {}: Did not finding matching SubAlarmExpression id={} SubAlarmExpression{}", alarmDefinitionUpdatedEvent.alarmDefinitionId, entry.getKey(), entry.getValue());
            }
        }
    }

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.logger = LoggerFactory.getLogger(Logging.categoryFor(getClass(), topologyContext));
        this.logger.info("Preparing");
        this.collector = outputCollector;
        if (this.alarmDAO == null) {
            Injector.registerIfNotBound(AlarmDAO.class, new PersistenceModule(this.dbConfig));
            this.alarmDAO = (AlarmDAO) Injector.getInstance(AlarmDAO.class);
        }
        if (this.alarmDefinitionDAO == null) {
            Injector.registerIfNotBound(AlarmDefinitionDAO.class, new PersistenceModule(this.dbConfig));
            this.alarmDefinitionDAO = (AlarmDefinitionDAO) Injector.getInstance(AlarmDefinitionDAO.class);
        }
        if (this.alarmEventForwarder == null) {
            Injector.registerIfNotBound(AlarmEventForwarder.class, new ProducerModule(this.producerConfiguration));
            this.alarmEventForwarder = (AlarmEventForwarder) Injector.getInstance(AlarmEventForwarder.class);
        }
    }

    private void evaluateThreshold(Alarm alarm, SubAlarm subAlarm) {
        this.logger.debug("Received state change for {}", subAlarm);
        subAlarm.setNoState(false);
        alarm.updateSubAlarm(subAlarm);
        AlarmState state = alarm.getState();
        if (allSubAlarmsHaveState(alarm) && alarm.evaluate(this.alarmDefinitions.get(alarm.getAlarmDefinitionId()).getAlarmExpression())) {
            changeAlarmState(alarm, state, alarm.getStateChangeReason());
        }
    }

    private boolean allSubAlarmsHaveState(Alarm alarm) {
        for (SubAlarm subAlarm : alarm.getSubAlarms()) {
            if (subAlarm.isNoState() && !subAlarm.onlyImmediateEvaluation()) {
                return false;
            }
        }
        return true;
    }

    private void changeAlarmState(Alarm alarm, AlarmState alarmState, String str) {
        AlarmDefinition alarmDefinition = this.alarmDefinitions.get(alarm.getAlarmDefinitionId());
        if (alarmDefinition == null) {
            this.logger.warn("Failed to locate alarm definition for id {}, ignoring state update to alarm with id {}", alarm.getAlarmDefinitionId(), alarm.getId());
            return;
        }
        long timestamp = getTimestamp();
        this.alarmDAO.updateState(alarm.getId(), alarm.getState(), timestamp);
        ArrayList arrayList = new ArrayList(alarm.getAlarmedMetrics().size());
        Iterator<MetricDefinitionAndTenantId> it = alarm.getAlarmedMetrics().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().metricDefinition);
        }
        this.logger.debug("Alarm {} transitioned from {} to {}", alarm, alarmState, alarm.getState());
        try {
            this.alarmEventForwarder.send(Serialization.toJson(new AlarmStateTransitionedEvent(alarmDefinition.getTenantId(), alarm.getId(), alarmDefinition.getId(), arrayList, alarmDefinition.getName(), alarmDefinition.getDescription(), alarmState, alarm.getState(), alarmDefinition.getSeverity(), alarm.getLink(), alarm.getLifecycleState(), alarmDefinition.isActionsEnabled(), str, alarm.getTransitionSubAlarms(), timestamp)));
        } catch (Exception e) {
            this.logger.debug("Failure sending alarm", (Throwable) e);
        }
    }

    protected long getTimestamp() {
        return System.currentTimeMillis();
    }

    void handleAlarmDeleted(String str) {
        this.logger.debug("Received AlarmDeletedEvent for alarm id {}", str);
        this.alarms.remove(str);
    }

    void handleAlarmUpdated(String str, AlarmUpdatedEvent alarmUpdatedEvent) {
        Alarm alarm = this.alarms.get(str);
        if (alarm == null) {
            this.logger.debug("Updated Alarm {} not loaded, ignoring");
            return;
        }
        alarm.setState(alarmUpdatedEvent.alarmState);
        alarm.setLink(alarmUpdatedEvent.link);
        alarm.setLifecycleState(alarmUpdatedEvent.lifecycleState);
    }

    private Alarm getOrCreateAlarm(String str) {
        Alarm alarm = this.alarms.get(str);
        if (alarm == null) {
            alarm = this.alarmDAO.findById(str);
            if (alarm == null) {
                this.logger.error("Failed to locate alarm for id {}", str);
                return null;
            }
            if (this.alarmDefinitions.get(alarm.getAlarmDefinitionId()) == null) {
                AlarmDefinition findById = this.alarmDefinitionDAO.findById(alarm.getAlarmDefinitionId());
                if (findById == null) {
                    this.logger.error("Failed to locate alarm definition for id {}", alarm.getAlarmDefinitionId());
                    return null;
                }
                this.alarmDefinitions.put(findById.getId(), findById);
            }
            Iterator<SubAlarm> it = alarm.getSubAlarms().iterator();
            while (it.hasNext()) {
                it.next().setNoState(true);
            }
            this.alarms.put(str, alarm);
        }
        return alarm;
    }
}
