package monasca.thresh.infrastructure.thresholding;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Values;
import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import monasca.thresh.EventSpoutConfig;
import monasca.thresh.infrastructure.thresholding.deserializer.EventDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:monasca/thresh/infrastructure/thresholding/EventSpout.class */
public class EventSpout extends KafkaSpout {
    private static final Logger logger = LoggerFactory.getLogger(EventSpout.class);
    private static final long serialVersionUID = 8457340455857276878L;
    private final EventDeserializer deserializer;

    public EventSpout(EventSpoutConfig eventSpoutConfig, EventDeserializer eventDeserializer) {
        super(eventSpoutConfig);
        this.deserializer = eventDeserializer;
        logger.info("EventSpout created");
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(this.deserializer.getOutputFields());
    }

    @Override // monasca.thresh.infrastructure.thresholding.KafkaSpout
    protected void processMessage(byte[] bArr, SpoutOutputCollector spoutOutputCollector) {
        List<List<?>> deserialize = this.deserializer.deserialize(bArr);
        if (deserialize != null) {
            Iterator<List<?>> it = deserialize.iterator();
            while (it.hasNext()) {
                Object obj = it.next().get(0);
                if (obj instanceof Serializable) {
                    spoutOutputCollector.emit(new Values(new Object[]{obj}));
                } else {
                    logger.error("Class {} is not Serializable: {}", obj.getClass(), obj);
                }
            }
        }
    }
}
