package monasca.thresh;

import backtype.storm.Config;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import java.util.HashMap;
import javax.inject.Named;
import monasca.common.util.Injector;
import monasca.thresh.infrastructure.thresholding.AlarmCreationBolt;
import monasca.thresh.infrastructure.thresholding.AlarmThresholdingBolt;
import monasca.thresh.infrastructure.thresholding.EventProcessingBolt;
import monasca.thresh.infrastructure.thresholding.EventSpout;
import monasca.thresh.infrastructure.thresholding.MetricAggregationBolt;
import monasca.thresh.infrastructure.thresholding.MetricFilteringBolt;
import monasca.thresh.infrastructure.thresholding.MetricSpout;
import monasca.thresh.infrastructure.thresholding.deserializer.EventDeserializer;
import monasca.thresh.utils.StatsdMetricConsumer;

/* loaded from: input_file:monasca/thresh/TopologyModule.class */
public class TopologyModule extends AbstractModule {
    private final ThresholdingConfiguration config;
    private Config stormConfig;
    private IRichSpout metricSpout;
    private IRichSpout eventSpout;

    public TopologyModule(ThresholdingConfiguration thresholdingConfiguration) {
        this.config = thresholdingConfiguration;
    }

    public TopologyModule(ThresholdingConfiguration thresholdingConfiguration, Config config, IRichSpout iRichSpout, IRichSpout iRichSpout2) {
        this(thresholdingConfiguration);
        this.stormConfig = config;
        this.metricSpout = iRichSpout;
        this.eventSpout = iRichSpout2;
    }

    @Override // com.google.inject.AbstractModule
    protected void configure() {
    }

    @Provides
    Config stormConfig() {
        if (this.stormConfig == null) {
            this.stormConfig = new Config();
            this.stormConfig.setNumWorkers(this.config.numWorkerProcesses.intValue());
            this.stormConfig.setNumAckers(this.config.numAckerThreads.intValue());
            HashMap hashMap = new HashMap();
            if (this.config.statsdConfig.getHost() != null) {
                hashMap.put(StatsdMetricConsumer.STATSD_HOST, this.config.statsdConfig.getHost());
            }
            if (this.config.statsdConfig.getPort() != null) {
                hashMap.put(StatsdMetricConsumer.STATSD_PORT, this.config.statsdConfig.getPort());
            }
            if (this.config.statsdConfig.getWhitelist() != null) {
                hashMap.put(StatsdMetricConsumer.STATSD_WHITELIST, this.config.statsdConfig.getWhitelist());
            }
            if (this.config.statsdConfig.getMetricmap() != null) {
                hashMap.put(StatsdMetricConsumer.STATSD_METRICMAP, this.config.statsdConfig.getMetricmap());
            }
            if (this.config.statsdConfig.getDimensions() != null) {
                hashMap.put(StatsdMetricConsumer.STATSD_DIMENSIONS, this.config.statsdConfig.getDimensions());
            }
            if (this.config.statsdConfig.getDebugmetrics() != null) {
                hashMap.put(StatsdMetricConsumer.STATSD_DEBUGMETRICS, this.config.statsdConfig.getDebugmetrics());
            }
            this.stormConfig.registerMetricsConsumer(StatsdMetricConsumer.class, hashMap, 2L);
        }
        return this.stormConfig;
    }

    @Provides
    @Named("metrics")
    IRichSpout metricSpout() {
        return this.metricSpout == null ? new MetricSpout(this.config.metricSpoutConfig) : this.metricSpout;
    }

    @Provides
    @Named("event")
    IRichSpout eventSpout() {
        return this.eventSpout == null ? new EventSpout(this.config.eventSpoutConfig, new EventDeserializer()) : this.eventSpout;
    }

    @Provides
    StormTopology topology() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("metrics-spout", (IRichSpout) Injector.getInstance(IRichSpout.class, "metrics"), this.config.metricSpoutThreads).setNumTasks(this.config.metricSpoutTasks);
        topologyBuilder.setSpout("event-spout", (IRichSpout) Injector.getInstance(IRichSpout.class, "event"), this.config.eventSpoutThreads).setNumTasks(this.config.eventSpoutTasks);
        topologyBuilder.setBolt("event-bolt", new EventProcessingBolt(this.config.database), this.config.eventBoltThreads).shuffleGrouping("event-spout").setNumTasks(this.config.eventBoltTasks);
        topologyBuilder.setBolt("filtering-bolt", new MetricFilteringBolt(this.config.database), this.config.filteringBoltThreads).fieldsGrouping("metrics-spout", new Fields(new String[]{MetricSpout.FIELDS[0]})).allGrouping("event-bolt", EventProcessingBolt.METRIC_ALARM_EVENT_STREAM_ID).allGrouping("event-bolt", EventProcessingBolt.ALARM_DEFINITION_EVENT_STREAM_ID).setNumTasks(this.config.filteringBoltTasks);
        topologyBuilder.setBolt("alarm-creation-bolt", new AlarmCreationBolt(this.config.database), this.config.alarmCreationBoltThreads).fieldsGrouping("filtering-bolt", MetricFilteringBolt.NEW_METRIC_FOR_ALARM_DEFINITION_STREAM, new Fields(new String[]{AlarmCreationBolt.ALARM_CREATION_FIELDS[3]})).allGrouping("event-bolt", EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_ID).allGrouping("event-bolt", EventProcessingBolt.ALARM_EVENT_STREAM_ID).allGrouping("event-bolt", EventProcessingBolt.ALARM_DEFINITION_EVENT_STREAM_ID).setNumTasks(this.config.alarmCreationBoltTasks);
        topologyBuilder.setBolt("aggregation-bolt", new MetricAggregationBolt(this.config), this.config.aggregationBoltThreads).fieldsGrouping("filtering-bolt", new Fields(new String[]{MetricFilteringBolt.FIELDS[0]})).allGrouping("filtering-bolt", MetricAggregationBolt.METRIC_AGGREGATION_CONTROL_STREAM).fieldsGrouping("filtering-bolt", AlarmCreationBolt.ALARM_CREATION_STREAM, new Fields(new String[]{AlarmCreationBolt.ALARM_CREATION_FIELDS[1]})).allGrouping("event-bolt", EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_ID).fieldsGrouping("event-bolt", EventProcessingBolt.METRIC_ALARM_EVENT_STREAM_ID, new Fields(new String[]{EventProcessingBolt.METRIC_ALARM_EVENT_STREAM_FIELDS[1]})).fieldsGrouping("alarm-creation-bolt", AlarmCreationBolt.ALARM_CREATION_STREAM, new Fields(new String[]{AlarmCreationBolt.ALARM_CREATION_FIELDS[1]})).setNumTasks(this.config.aggregationBoltTasks);
        topologyBuilder.setBolt("thresholding-bolt", new AlarmThresholdingBolt(this.config.database, this.config.kafkaProducerConfig), this.config.thresholdingBoltThreads).fieldsGrouping("aggregation-bolt", new Fields(new String[]{MetricAggregationBolt.FIELDS[0]})).fieldsGrouping("event-bolt", EventProcessingBolt.ALARM_EVENT_STREAM_ID, new Fields(new String[]{EventProcessingBolt.ALARM_EVENT_STREAM_FIELDS[1]})).allGrouping("event-bolt", EventProcessingBolt.ALARM_DEFINITION_EVENT_STREAM_ID).allGrouping("event-bolt", EventProcessingBolt.METRIC_SUB_ALARM_EVENT_STREAM_ID).setNumTasks(this.config.thresholdingBoltTasks);
        return topologyBuilder.createTopology();
    }
}
