package org.springframework.cloud.stream.binder.rocketmq.integration;

import java.util.Optional;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
import org.springframework.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.context.Lifecycle;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.support.DefaultErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/* loaded from: input_file:BOOT-INF/lib/spring-cloud-stream-binder-rocketmq-0.9.0.RELEASE.jar:org/springframework/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.class */
public class RocketMQMessageHandler extends AbstractMessageHandler implements Lifecycle {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) RocketMQMessageHandler.class);
    private MessageChannel sendFailureChannel;
    private final RocketMQTemplate rocketMQTemplate;
    private final Boolean transactional;
    private final String destination;
    private final String groupName;
    private final InstrumentationManager instrumentationManager;
    private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy();
    private boolean sync = false;
    private volatile boolean running = false;

    public RocketMQMessageHandler(RocketMQTemplate rocketMQTemplate, String str, String str2, Boolean bool, InstrumentationManager instrumentationManager) {
        this.rocketMQTemplate = rocketMQTemplate;
        this.destination = str;
        this.groupName = str2;
        this.transactional = bool;
        this.instrumentationManager = instrumentationManager;
    }

    @Override // org.springframework.context.Lifecycle
    public void start() {
        if (!this.transactional.booleanValue()) {
            this.instrumentationManager.addHealthInstrumentation(new Instrumentation(this.destination));
            try {
                this.rocketMQTemplate.afterPropertiesSet();
                this.instrumentationManager.getHealthInstrumentation(this.destination).markStartedSuccessfully();
            } catch (Exception e) {
                this.instrumentationManager.getHealthInstrumentation(this.destination).markStartFailed(e);
                log.error("RocketMQTemplate startup failed, Caused by " + e.getMessage());
                throw new MessagingException((Message<?>) MessageBuilder.withPayload("RocketMQTemplate startup failed, Caused by " + e.getMessage()).build(), e);
            }
        }
        this.running = true;
    }

    @Override // org.springframework.context.Lifecycle
    public void stop() {
        if (!this.transactional.booleanValue()) {
            this.rocketMQTemplate.destroy();
        }
        this.running = false;
    }

    @Override // org.springframework.context.Lifecycle
    public boolean isRunning() {
        return this.running;
    }

    @Override // org.springframework.integration.handler.AbstractMessageHandler
    protected void handleMessageInternal(final Message<?> message) throws Exception {
        try {
            final StringBuilder sb = new StringBuilder(this.destination);
            String obj = Optional.ofNullable(message.getHeaders().get("TAGS")).orElse("").toString();
            if (!StringUtils.isEmpty(obj)) {
                sb.append(":").append(obj);
            }
            SendResult sendResult = null;
            if (this.transactional.booleanValue()) {
                sendResult = this.rocketMQTemplate.sendMessageInTransaction(this.groupName, sb.toString(), message, message.getHeaders().get(RocketMQBinderConstants.ROCKET_TRANSACTIONAL_ARG));
                log.debug("transactional send to topic " + ((Object) sb) + " " + sendResult);
            } else {
                int i = 0;
                try {
                    Object orDefault = message.getHeaders().getOrDefault(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 0);
                    if (orDefault instanceof Number) {
                        i = ((Number) orDefault).intValue();
                    } else if (orDefault instanceof String) {
                        i = Integer.parseInt((String) orDefault);
                    }
                } catch (Exception e) {
                }
                if (this.sync) {
                    sendResult = this.rocketMQTemplate.syncSend(sb.toString(), message, this.rocketMQTemplate.getProducer().getSendMsgTimeout(), i);
                    log.debug("sync send to topic " + ((Object) sb) + " " + sendResult);
                } else {
                    this.rocketMQTemplate.asyncSend(sb.toString(), message, new SendCallback() { // from class: org.springframework.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler.1
                        @Override // org.apache.rocketmq.client.producer.SendCallback
                        public void onSuccess(SendResult sendResult2) {
                            RocketMQMessageHandler.log.debug("async send to topic " + ((Object) sb) + " " + sendResult2);
                        }

                        @Override // org.apache.rocketmq.client.producer.SendCallback
                        public void onException(Throwable th) {
                            RocketMQMessageHandler.log.error("RocketMQ Message hasn't been sent. Caused by " + th.getMessage());
                            if (RocketMQMessageHandler.this.getSendFailureChannel() != null) {
                                RocketMQMessageHandler.this.getSendFailureChannel().send(RocketMQMessageHandler.this.errorMessageStrategy.buildErrorMessage(new MessagingException((Message<?>) message, th), null));
                            }
                        }
                    });
                }
            }
            if (sendResult != null && !sendResult.getSendStatus().equals(SendStatus.SEND_OK)) {
                if (getSendFailureChannel() == null) {
                    throw new MessagingException(message, new MQClientException("message hasn't been sent", (Throwable) null));
                }
                getSendFailureChannel().send(message);
            }
        } catch (Exception e2) {
            log.error("RocketMQ Message hasn't been sent. Caused by " + e2.getMessage());
            if (getSendFailureChannel() == null) {
                throw new MessagingException(message, e2);
            }
            getSendFailureChannel().send(this.errorMessageStrategy.buildErrorMessage(new MessagingException(message, e2), null));
        }
    }

    public void setSendFailureChannel(MessageChannel messageChannel) {
        this.sendFailureChannel = messageChannel;
    }

    public void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) {
        Assert.notNull(errorMessageStrategy, "'errorMessageStrategy' cannot be null");
        this.errorMessageStrategy = errorMessageStrategy;
    }

    public MessageChannel getSendFailureChannel() {
        return this.sendFailureChannel;
    }

    public void setSync(boolean z) {
        this.sync = z;
    }
}
