/*
 * Decompiled with CFR 0.152.
 */
package com.github.davidmoten.rx;

import com.github.davidmoten.rx.Bytes;
import com.github.davidmoten.util.Optional;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.schedulers.Schedulers;

public final class Processes {
    public static void main(String[] args) throws IOException, InterruptedException {
        Processes.execute("ls").map(new Func1<byte[], String>(){

            @Override
            public String call(byte[] bytes) {
                return new String(bytes);
            }
        });
    }

    public static Observable<byte[]> execute(String ... command) {
        return Processes.execute(new Parameters(Arrays.asList(command), Optional.<Map<String, String>>absent(), true, new File("."), Optional.<Long>absent()));
    }

    public static Observable<byte[]> execute(final Parameters parameters) {
        Func0<Process> resourceFactory = new Func0<Process>(){

            @Override
            public Process call() {
                ProcessBuilder b = new ProcessBuilder(parameters.command());
                if (parameters.env().isPresent()) {
                    if (parameters.appendEnv()) {
                        b.environment().clear();
                    }
                    b.environment().putAll(parameters.env().get());
                }
                b.directory(parameters.directory());
                b.redirectErrorStream(true);
                try {
                    return b.start();
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        };
        Func1<Process, Observable<byte[]>> factory = new Func1<Process, Observable<byte[]>>(){

            @Override
            public Observable<byte[]> call(final Process process) {
                InputStream is = process.getInputStream();
                Observable<Object> output = is != null ? Bytes.from(is) : Observable.empty();
                Observable<byte[]> completion = Observable.create(new Observable.OnSubscribe<byte[]>(){

                    @Override
                    public void call(Subscriber<? super byte[]> sub) {
                        try {
                            if (!parameters.waitForMs().isPresent()) {
                                int exitCode = process.waitFor();
                                if (exitCode != 0) {
                                    sub.onError(new ProcessException(exitCode));
                                }
                                return;
                            }
                            sub.onError(new IllegalArgumentException("not implemented yet"));
                            sub.onCompleted();
                        }
                        catch (InterruptedException e) {
                            sub.onError(e);
                        }
                    }
                }).subscribeOn(Schedulers.io());
                return output.concatWith(completion);
            }
        };
        Action1<Process> disposeAction = new Action1<Process>(){

            @Override
            public void call(Process process) {
                process.destroy();
            }
        };
        return Observable.using(resourceFactory, factory, disposeAction);
    }

    public static final class Parameters {
        private final List<String> command;
        private final Optional<Map<String, String>> env;
        private final boolean appendEnv;
        private final File directory;
        private final Optional<Long> waitForMs;

        public Parameters(List<String> command, Optional<Map<String, String>> env, boolean appendEnv, File directory, Optional<Long> waitForMs) {
            this.command = command;
            this.env = env;
            this.appendEnv = appendEnv;
            this.directory = directory;
            this.waitForMs = waitForMs;
        }

        public Optional<Long> waitForMs() {
            return this.waitForMs;
        }

        public File directory() {
            return this.directory;
        }

        public List<String> command() {
            return this.command;
        }

        public Optional<Map<String, String>> env() {
            return this.env;
        }

        public boolean appendEnv() {
            return this.appendEnv;
        }
    }

    public static class ProcessException
    extends RuntimeException {
        private static final long serialVersionUID = 722422557667123473L;
        private final int exitCode;

        public ProcessException(int exitCode) {
            super("process returned exitCode " + exitCode);
            this.exitCode = exitCode;
        }

        public int exitCode() {
            return this.exitCode;
        }
    }
}

