package com.supermap.processing.jobserver.impl;

import com.google.common.collect.Maps;
import com.supermap.processing.jobserver.resource.SteamingResource;
import com.supermap.services.cluster.api.JobRunner;
import com.supermap.services.components.commontypes.JarInfo;
import com.supermap.services.components.commontypes.SparkJobSetting;
import com.supermap.services.components.commontypes.SparkJobState;
import com.supermap.services.components.commontypes.SparkRunState;
import com.supermap.services.util.LogUtil;
import com.supermap.services.util.ResourceManager;
import com.supermap.services.util.Tool;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;
import org.slf4j.cal10n.LocLogger;

/* loaded from: input_file:BOOT-INF/lib/iserver-all-10.0.1-18030-10.0.1-SNAPSHOT.jar:com/supermap/processing/jobserver/impl/StreamingJobRunner.class */
public class StreamingJobRunner implements JobRunner {
    private static final String a = "../../logs/sparklauncher.{0}.log";
    private static ResourceManager b = new ResourceManager((Class<? extends Enum<?>>) SteamingResource.class);
    private static LocLogger c = LogUtil.getLocLogger(StreamingJobRunner.class, b);
    private String d;
    private SparkJobSetting e;
    private SparkJobState f;
    private SparkLauncher g;
    private SparkAppHandle h;
    private IOException i;
    private List<JarInfo> j;
    private String k;
    private String l;
    private SparkLauncherFactory m;
    private SparkAppHandleListenerFactory n;
    private String o;

    /* loaded from: input_file:BOOT-INF/lib/iserver-all-10.0.1-18030-10.0.1-SNAPSHOT.jar:com/supermap/processing/jobserver/impl/StreamingJobRunner$DefaultSparkLauncherFactory.class */
    static class DefaultSparkLauncherFactory implements SparkLauncherFactory {
        private static final String a = "JAVA_HOME";
        private EnvContainer b = new EnvContainer() { // from class: com.supermap.processing.jobserver.impl.StreamingJobRunner.DefaultSparkLauncherFactory.1
            @Override // com.supermap.processing.jobserver.impl.StreamingJobRunner.EnvContainer
            public Map<String, String> getEnv() {
                return System.getenv();
            }
        };
        private SparkLauncherBuilder c = new SparkLauncherBuilder() { // from class: com.supermap.processing.jobserver.impl.StreamingJobRunner.DefaultSparkLauncherFactory.2
            @Override // com.supermap.processing.jobserver.impl.StreamingJobRunner.SparkLauncherBuilder
            public SparkLauncher newSparkLauncher(Map<String, String> map) {
                return new SparkLauncher(map);
            }
        };

        DefaultSparkLauncherFactory() {
        }

        protected void setEnvContainer(EnvContainer envContainer) {
            this.b = envContainer;
        }

        protected void setSparkLauncherBuilder(SparkLauncherBuilder sparkLauncherBuilder) {
            this.c = sparkLauncherBuilder;
        }

        @Override // com.supermap.processing.jobserver.impl.StreamingJobRunner.SparkLauncherFactory
        public SparkLauncher newInstance() {
            HashMap newHashMap = Maps.newHashMap();
            newHashMap.put(a, a());
            return this.c.newSparkLauncher(newHashMap);
        }

        private String a() {
            String str = null;
            Map<String, String> env = this.b.getEnv();
            if (env.containsKey("JRE_HOME")) {
                str = env.get("JRE_HOME");
            }
            if (StringUtils.isEmpty(str)) {
                str = SystemUtils.getJavaHome().getAbsolutePath();
            }
            return str;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/iserver-all-10.0.1-18030-10.0.1-SNAPSHOT.jar:com/supermap/processing/jobserver/impl/StreamingJobRunner$EnvContainer.class */
    public interface EnvContainer {
        Map<String, String> getEnv();
    }

    /* loaded from: input_file:BOOT-INF/lib/iserver-all-10.0.1-18030-10.0.1-SNAPSHOT.jar:com/supermap/processing/jobserver/impl/StreamingJobRunner$SparkAppHandleListener.class */
    class SparkAppHandleListener implements SparkAppHandle.Listener {
        SparkAppHandleListener() {
        }

        public void stateChanged(SparkAppHandle sparkAppHandle) {
            StreamingJobRunner.this.onStateChanged(sparkAppHandle);
        }

        public void infoChanged(SparkAppHandle sparkAppHandle) {
            StreamingJobRunner.this.f.runState = SparkRunState.valueOf(sparkAppHandle.getState().name());
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/iserver-all-10.0.1-18030-10.0.1-SNAPSHOT.jar:com/supermap/processing/jobserver/impl/StreamingJobRunner$SparkAppHandleListenerFactory.class */
    interface SparkAppHandleListenerFactory {
        SparkAppHandleListener instance();
    }

    /* loaded from: input_file:BOOT-INF/lib/iserver-all-10.0.1-18030-10.0.1-SNAPSHOT.jar:com/supermap/processing/jobserver/impl/StreamingJobRunner$SparkLauncherBuilder.class */
    interface SparkLauncherBuilder {
        SparkLauncher newSparkLauncher(Map<String, String> map);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/iserver-all-10.0.1-18030-10.0.1-SNAPSHOT.jar:com/supermap/processing/jobserver/impl/StreamingJobRunner$SparkLauncherFactory.class */
    public interface SparkLauncherFactory {
        SparkLauncher newInstance();
    }

    public StreamingJobRunner(String str, String str2, SparkJobSetting sparkJobSetting) {
        this(str, str2, sparkJobSetting, new DefaultSparkLauncherFactory());
    }

    protected StreamingJobRunner(String str, String str2, SparkJobSetting sparkJobSetting, SparkLauncherFactory sparkLauncherFactory) {
        this.f = new SparkJobState(SparkRunState.UNKNOWN);
        this.d = str;
        this.l = str2;
        this.e = sparkJobSetting;
        this.m = sparkLauncherFactory;
        this.n = () -> {
            return new SparkAppHandleListener();
        };
    }

    @Override // com.supermap.services.cluster.api.JobRunner
    public IOException getLanuchEx() {
        return this.i;
    }

    @Override // com.supermap.services.cluster.api.JobRunner
    public void kill() {
        if (this.h == null) {
            return;
        }
        try {
            this.h.stop();
            this.h.kill();
            this.h.disconnect();
        } catch (Exception e) {
            c.warn("job has already finish or killed");
        }
    }

    @Override // com.supermap.services.cluster.api.JobRunner
    public SparkJobState state() {
        if (SparkRunState.RUNNING.equals(this.f.runState)) {
            this.f.elapsedTime = System.currentTimeMillis() - this.f.startTime;
        }
        return new SparkJobState(this.f);
    }

    @Override // com.supermap.services.cluster.api.JobRunner
    public String getJobID() {
        return this.k;
    }

    @Override // com.supermap.services.cluster.api.JobRunner
    public void state(SparkJobState sparkJobState) {
        this.f = sparkJobState;
    }

    @Override // com.supermap.services.cluster.api.JobRunner
    public void dispose() {
    }

    @Override // com.supermap.services.cluster.api.JobRunner
    public SparkJobSetting getSetting() {
        return this.e;
    }

    public List<JarInfo> getJarInfos() {
        return this.j;
    }

    public String getMasterAdress() {
        return this.l;
    }

    public void setJarInfos(List<JarInfo> list) {
        this.j = list;
    }

    @Override // com.supermap.services.cluster.api.JobRunner
    public boolean lanuch() {
        init();
        try {
            if (c.isDebugEnabled()) {
                c.debug("submit job to spark ,and the command is : " + submitCommands());
            }
            this.h = this.g.startApplication(new SparkAppHandle.Listener[]{this.n.instance()});
            this.f.startTime = System.currentTimeMillis();
            this.k = a();
            return true;
        } catch (IOException e) {
            this.i = e;
            this.f.errorMsg = e.getMessage();
            return false;
        }
    }

    protected void setListenerFactory(SparkAppHandleListenerFactory sparkAppHandleListenerFactory) {
        this.n = sparkAppHandleListenerFactory;
    }

    protected File sparkLogFile(File file) {
        File file2 = file == null ? new File(Tool.getApplicationPath(a.replace("{0}", b()))) : file;
        if (!file2.getParentFile().exists()) {
            file2.getParentFile().mkdirs();
        }
        return file2;
    }

    protected String getSparkHome() {
        return this.d;
    }

    protected void onStateChanged(SparkAppHandle sparkAppHandle) {
        this.f.runState = SparkRunState.valueOf(sparkAppHandle.getState().name());
        if (SparkRunState.KILLED.equals(this.f.runState) || SparkRunState.FAILED.equals(this.f.runState) || SparkRunState.LOST.equals(this.f.runState)) {
            this.f.endState = true;
        }
        if (SparkRunState.FINISHED.equals(this.f.runState)) {
            onStateChangedToFinished();
        }
    }

    protected void onStateChangedToFinished() {
        this.f.endTime = System.currentTimeMillis();
        this.f.publisherelapsedTime = System.currentTimeMillis();
        this.f.endState = true;
    }

    protected void init() {
        this.g = this.m.newInstance();
        File sparkLogFile = sparkLogFile(this.e.sparkLogFile);
        this.g.redirectError(sparkLogFile);
        this.g.redirectOutput(sparkLogFile);
        this.g.setSparkHome(Tool.getApplicationPath(new File(this.d).getPath()));
        this.g.setMaster(this.l);
        this.g.setAppName(this.e.appName);
        this.g.setMainClass(this.e.mainClass);
        if (this.e.args != null) {
            this.g.addAppArgs(this.e.args);
        }
        a(this.g);
    }

    protected String submitCommands() {
        return c() + " --master " + this.l + " --name " + this.e.appName + " --class " + this.e.mainClass + " " + this.o + " " + StringUtils.join(this.e.args, " ");
    }

    private String a() {
        return UUID.randomUUID().toString().replace('-', '_');
    }

    private CharSequence b() {
        return new SimpleDateFormat("YYYY-MM-dd").format(new Date());
    }

    private void a(SparkLauncher sparkLauncher) {
        List<JarInfo> list = this.j;
        if (list == null) {
            return;
        }
        for (JarInfo jarInfo : list) {
            if (jarInfo.primary) {
                this.o = jarInfo.path;
                sparkLauncher.setAppResource(jarInfo.path);
            } else {
                sparkLauncher.addJar(jarInfo.path);
            }
        }
    }

    private String c() {
        StringBuilder append = new StringBuilder(this.d).append(File.separator).append("bin").append(File.separator);
        if (SystemUtils.IS_OS_WINDOWS) {
            append.append("spark-submit.cmd");
        } else {
            if (!SystemUtils.IS_OS_LINUX) {
                throw new IllegalStateException("only windows and linux is supported");
            }
            append.append("spark-submit");
        }
        return Tool.getApplicationPath(append.toString());
    }
}
