package com.vortex.util.rocketmq.ons.tcp;

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.vortex.util.rocketmq.AbsConsumer;
import com.vortex.util.rocketmq.IConsumer;
import com.vortex.util.rocketmq.IConsumerConfig;
import com.vortex.util.rocketmq.msg.IRocketMsgListener;
import com.vortex.util.rocketmq.msg.RocketMsg;
import com.vortex.util.rocketmq.ons.AbsOnsFactory;
import java.util.Properties;

/* loaded from: input_file:com/vortex/util/rocketmq/ons/tcp/OnsTcpConsumer.class */
public class OnsTcpConsumer extends AbsConsumer implements IConsumer {
    AbsOnsFactory factory;
    Consumer consumer;

    /* JADX INFO: Access modifiers changed from: protected */
    public OnsTcpConsumer(AbsOnsFactory absOnsFactory, IConsumerConfig iConsumerConfig) {
        super(iConsumerConfig);
        this.factory = absOnsFactory;
        init();
    }

    void init() {
        Properties properties = new Properties();
        properties.put("AccessKey", this.factory.getAccessKey());
        properties.put("SecretKey", this.factory.getSecretKey());
        properties.put("ONSAddr", this.factory.getServerEndpoint());
        properties.put("ConsumerId", this.config.getConsumerId());
        this.consumer = ONSFactory.createConsumer(properties);
    }

    @Override // com.vortex.util.rocketmq.IConsumer
    public void subscribe(final String str, String[] strArr, final IRocketMsgListener iRocketMsgListener) {
        unsubscribe();
        String str2 = "*";
        if (strArr != null && strArr.length > 0) {
            str2 = Joiner.on("||").skipNulls().join(strArr);
        }
        this.consumer.subscribe(str, str2, new MessageListener() { // from class: com.vortex.util.rocketmq.ons.tcp.OnsTcpConsumer.1
            public Action consume(Message message, ConsumeContext consumeContext) {
                String str3 = new String(message.getBody(), Charsets.UTF_8);
                System.out.println("Receive Msg: " + message);
                try {
                    RocketMsg rocketMsg = new RocketMsg(str, str3);
                    rocketMsg.setTags(message.getTag());
                    rocketMsg.setKeys(message.getKey());
                    rocketMsg.setExt(message);
                    iRocketMsgListener.onSuccess(Lists.newArrayList(new RocketMsg[]{rocketMsg}));
                    return Action.CommitMessage;
                } catch (Exception e) {
                    System.out.println(String.format("处理消息发生异常. msgId:%s\ncontent:%s\n%s", message.getMsgID(), str3, e.getMessage()));
                    e.printStackTrace();
                    return Action.ReconsumeLater;
                }
            }
        });
        this.consumer.start();
    }

    @Override // com.vortex.util.rocketmq.IConsumer
    public void unsubscribe() {
        this.consumer.shutdown();
    }
}
