package monasca.thresh.infrastructure.thresholding;

import com.google.inject.Module;
import java.util.List;
import java.util.Map;
import monasca.common.model.alarm.AlarmSubExpression;
import monasca.common.model.event.AlarmDefinitionCreatedEvent;
import monasca.common.model.event.AlarmDefinitionDeletedEvent;
import monasca.common.model.event.AlarmDefinitionUpdatedEvent;
import monasca.common.model.event.AlarmDeletedEvent;
import monasca.common.model.event.AlarmUpdatedEvent;
import monasca.common.model.metric.MetricDefinition;
import monasca.common.util.Injector;
import monasca.thresh.domain.model.MetricDefinitionAndTenantId;
import monasca.thresh.domain.model.SubExpression;
import monasca.thresh.domain.model.TenantIdAndMetricName;
import monasca.thresh.domain.service.AlarmDAO;
import monasca.thresh.infrastructure.persistence.PersistenceModule;
import monasca.thresh.utils.Logging;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:monasca/thresh/infrastructure/thresholding/EventProcessingBolt.class */
public class EventProcessingBolt extends BaseRichBolt {
    private static final long serialVersionUID = 897171858708109378L;
    public static final String ALARM_EVENT_STREAM_ID = "alarm-events";
    public static final String METRIC_ALARM_EVENT_STREAM_ID = "metric-alarm-events";
    public static final String METRIC_SUB_ALARM_EVENT_STREAM_ID = "metric-sub-alarm-events";
    public static final String ALARM_DEFINITION_EVENT_STREAM_ID = "alarm-definition-events";
    public static final String[] ALARM_EVENT_STREAM_FIELDS = {"eventType", "alarmId", "alarm"};
    public static final String[] METRIC_ALARM_EVENT_STREAM_FIELDS = {"eventType", "tenantIdAndMetricName", "metricDefinitionAndTenantId", "alarmDefinitionId", "subAlarmId"};
    public static final String[] METRIC_SUB_ALARM_EVENT_STREAM_FIELDS = {"eventType", "subExpression", "alarmDefinitionId"};
    public static final String[] ALARM_DEFINITION_EVENT_FIELDS = {"eventType", "argument"};
    public static final String CREATED = "created";
    public static final String DELETED = "deleted";
    public static final String UPDATED = "updated";
    public static final String RESEND = "resend";
    private transient Logger logger;
    private OutputCollector collector;
    private AlarmDAO alarmDAO;
    private DataSourceFactory dbConfig;

    public EventProcessingBolt(DataSourceFactory dataSourceFactory) {
        this.dbConfig = dataSourceFactory;
    }

    public EventProcessingBolt(AlarmDAO alarmDAO) {
        this.alarmDAO = alarmDAO;
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream(ALARM_EVENT_STREAM_ID, new Fields(ALARM_EVENT_STREAM_FIELDS));
        outputFieldsDeclarer.declareStream(METRIC_ALARM_EVENT_STREAM_ID, new Fields(METRIC_ALARM_EVENT_STREAM_FIELDS));
        outputFieldsDeclarer.declareStream(METRIC_SUB_ALARM_EVENT_STREAM_ID, new Fields(METRIC_SUB_ALARM_EVENT_STREAM_FIELDS));
        outputFieldsDeclarer.declareStream(ALARM_DEFINITION_EVENT_STREAM_ID, new Fields(ALARM_DEFINITION_EVENT_FIELDS));
    }

    public void execute(Tuple tuple) {
        try {
            Object value = tuple.getValue(0);
            this.logger.trace("Received event for processing {}", value);
            if (value instanceof AlarmDefinitionCreatedEvent) {
                handle((AlarmDefinitionCreatedEvent) value);
            } else if (value instanceof AlarmDefinitionUpdatedEvent) {
                handle((AlarmDefinitionUpdatedEvent) value);
            } else if (value instanceof AlarmDefinitionDeletedEvent) {
                handle((AlarmDefinitionDeletedEvent) value);
            } else if (value instanceof AlarmDeletedEvent) {
                handle((AlarmDeletedEvent) value);
            } else if (value instanceof AlarmUpdatedEvent) {
                handle((AlarmUpdatedEvent) value);
            }
        } catch (Exception e) {
            this.logger.error("Error processing tuple {}", tuple, e);
        } finally {
            this.collector.ack(tuple);
        }
    }

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.logger = LoggerFactory.getLogger(Logging.categoryFor(getClass(), topologyContext));
        this.logger.info("Preparing");
        this.collector = outputCollector;
        if (this.alarmDAO == null) {
            Injector.registerIfNotBound(AlarmDAO.class, new Module[]{new PersistenceModule(this.dbConfig)});
            this.alarmDAO = (AlarmDAO) Injector.getInstance(AlarmDAO.class);
        }
    }

    void handle(AlarmDefinitionCreatedEvent alarmDefinitionCreatedEvent) {
        this.collector.emit(ALARM_DEFINITION_EVENT_STREAM_ID, new Values(new Object[]{CREATED, alarmDefinitionCreatedEvent}));
    }

    void handle(AlarmDefinitionDeletedEvent alarmDefinitionDeletedEvent) {
        this.collector.emit(ALARM_DEFINITION_EVENT_STREAM_ID, new Values(new Object[]{DELETED, alarmDefinitionDeletedEvent}));
    }

    void handle(AlarmDeletedEvent alarmDeletedEvent) {
        this.logger.debug("Alarm {} deleted", alarmDeletedEvent.alarmId);
        processSubAlarms(DELETED, alarmDeletedEvent.tenantId, alarmDeletedEvent.alarmDefinitionId, alarmDeletedEvent.alarmMetrics, alarmDeletedEvent.subAlarms);
        this.collector.emit(ALARM_EVENT_STREAM_ID, new Values(new Object[]{DELETED, alarmDeletedEvent.alarmId, alarmDeletedEvent}));
    }

    private void processSubAlarms(String str, String str2, String str3, List<MetricDefinition> list, Map<String, AlarmSubExpression> map) {
        for (MetricDefinition metricDefinition : list) {
            for (Map.Entry<String, AlarmSubExpression> entry : map.entrySet()) {
                if (isSuperSet(metricDefinition, entry.getValue().getMetricDefinition())) {
                    sendSubAlarmMsg(str, entry.getKey(), str2, str3, metricDefinition);
                }
            }
        }
    }

    private boolean isSuperSet(MetricDefinition metricDefinition, MetricDefinition metricDefinition2) {
        if (!metricDefinition.name.equals(metricDefinition2.name)) {
            return false;
        }
        if (metricDefinition2.dimensions == null || metricDefinition2.dimensions.isEmpty()) {
            return true;
        }
        for (Map.Entry entry : metricDefinition.dimensions.entrySet()) {
            String str = (String) metricDefinition2.dimensions.get(entry.getKey());
            if (str != null && !str.equals(entry.getValue())) {
                return false;
            }
        }
        return true;
    }

    private void sendSubAlarmMsg(String str, String str2, String str3, String str4, MetricDefinition metricDefinition) {
        this.collector.emit(METRIC_ALARM_EVENT_STREAM_ID, new Values(new Object[]{str, new TenantIdAndMetricName(str3, metricDefinition.name), new MetricDefinitionAndTenantId(metricDefinition, str3), str4, str2}));
    }

    void handle(AlarmDefinitionUpdatedEvent alarmDefinitionUpdatedEvent) {
        for (Map.Entry entry : alarmDefinitionUpdatedEvent.changedSubExpressions.entrySet()) {
            this.logger.info("Updated {} SubAlarms with new AlarmSubExpression {} {}", new Object[]{Integer.valueOf(this.alarmDAO.updateSubAlarmExpressions((String) entry.getKey(), (AlarmSubExpression) entry.getValue())), entry.getKey(), entry.getValue()});
            this.collector.emit(METRIC_SUB_ALARM_EVENT_STREAM_ID, new Values(new Object[]{UPDATED, new SubExpression((String) entry.getKey(), (AlarmSubExpression) entry.getValue()), alarmDefinitionUpdatedEvent.alarmDefinitionId}));
        }
        this.collector.emit(ALARM_DEFINITION_EVENT_STREAM_ID, new Values(new Object[]{UPDATED, alarmDefinitionUpdatedEvent}));
    }

    void handle(AlarmUpdatedEvent alarmUpdatedEvent) {
        if (alarmUpdatedEvent.oldAlarmState.equals(alarmUpdatedEvent.alarmState)) {
            this.logger.info("No state change for {}, ignoring", alarmUpdatedEvent.alarmId);
        }
        this.logger.info("Received AlarmUpdatedEvent {}", alarmUpdatedEvent);
        processSubAlarms(RESEND, alarmUpdatedEvent.tenantId, alarmUpdatedEvent.alarmDefinitionId, alarmUpdatedEvent.alarmMetrics, alarmUpdatedEvent.subAlarms);
        this.collector.emit(ALARM_EVENT_STREAM_ID, new Values(new Object[]{UPDATED, alarmUpdatedEvent.alarmId, alarmUpdatedEvent}));
    }
}
