var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
    return new (P || (P = Promise))(function (resolve, reject) {
        function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
        function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
        function step(result) { result.done ? resolve(result.value) : new P(function (resolve) { resolve(result.value); }).then(fulfilled, rejected); }
        step((generator = generator.apply(thisArg, _arguments || [])).next());
    });
};
import { OnDestroy } from "@angular/core";
import { MatSnackBar } from "@angular/material";
import { Status, StepResultType, LogLevel, } from "../job.types";
import { Observable, BehaviorSubject } from "rxjs";
import { Khonsole } from "app/khonsole";
import { v4 as uuidv4 } from "uuid";
import { WorkspaceComponent } from "app/component/workspace/workspace.component";
import { Store } from "@ngrx/store";
import * as fromRoot from "app/reducer/index.reducer";
import { first, map } from "rxjs/operators";
import { DataService } from "../../data.service";
import Dexie, { liveQuery } from "dexie";
import { prepareJobForDatabase } from "../jobManager.utils";
import * as i0 from "@angular/core";
import * as i1 from "@angular/material/snack-bar";
import * as i2 from "@ngrx/store";
import * as i3 from "../../data.service";
export class WebworkerJobManagerService {
    constructor(_snackbar, store, ds) {
        this._snackbar = _snackbar;
        this.store = store;
        this.ds = ds;
        this.sessionJobs$ = new BehaviorSubject([]);
        this._workers = [];
        /**
         * Former completed jobs from the IndexedDB. These are jobs that have been run in the past and are saved in the database.
         */
        this._dbJobs = [];
        this.snackbarConfig = {
            horizontalPosition: "right",
            verticalPosition: "top",
        };
    }
    newWorker() {
        try {
            const worker = new Worker(this.workerScript);
            const workerId = uuidv4();
            this._workers.push({
                id: workerId,
                worker,
                busy: false,
                jobs: [],
            });
            return {
                success: true,
                workerId: workerId,
            };
        }
        catch (e) {
            return {
                success: false,
                workerId: -1,
                error: e,
            };
        }
    }
    isWorkerBusy$(workerId) {
        return new Observable((observer) => {
            const interval = setInterval(() => {
                observer.next(this.getWorker(workerId).busy);
            }, 1000);
            return () => {
                clearInterval(interval);
            };
        });
    }
    workerJobs$(workerId) {
        return new Observable((observer) => {
            const interval = setInterval(() => {
                observer.next(this.getWorker(workerId).jobs);
            }, 1000);
            return () => {
                clearInterval(interval);
            };
        });
    }
    /**
     * Get jobs from the database. These are jobs that were run in past sessions.
     * @param type The type of job to get
     * @returns Jobs of the given type from the database for this dataset
     */
    dbJobsOfType$(type) {
        return liveQuery(() => __awaiter(this, void 0, void 0, function* () {
            const config = yield this.store
                .select(fromRoot.getGraphAConfig)
                .pipe(first())
                .toPromise();
            const db = yield new Dexie("notitia-" + config.database).open();
            return (yield db
                .table("jobs")
                .where("type")
                .equals(type)
                .and((job) => job.databaseName === config.database)
                .toArray()).map((j) => (Object.assign({}, j, { workerId: undefined })));
        }));
    }
    /**
     * Get jobs from the current session.
     * @param type The type of job to get
     * @param databaseName The name of the database that the session jobs with eventually be assigned to (se we only get the session jobs that are with the current dataset)
     * @returns Jobs of the given type from the session for this dataset
     */
    sessionJobsOfType$(type) {
        return __awaiter(this, void 0, void 0, function* () {
            const config = yield this.store
                .select(fromRoot.getGraphAConfig)
                .pipe(first())
                .toPromise();
            return this.sessionJobs$.pipe(map((jobs) => jobs.filter((j) => j.type === type && j.databaseName === config.database)));
        });
    }
    terminateWorker(workerId) {
        try {
            const worker = this.getWorker(workerId);
            worker.worker.terminate();
            worker.busy = false;
            // Try to cancel running jobs in the worker
            let errorMsg = "";
            let i = 0;
            while (i < worker.jobs.length) {
                const j = worker.jobs[i];
                const cancelJobRes = this.cancelJob(j, false);
                if (!cancelJobRes.success) {
                    errorMsg = `While attempting to terminate worker ${workerId}, failed to cancel job. Internal cancelJob error: ${cancelJobRes.error}`;
                    break;
                }
                i++;
            }
            if (errorMsg !== "") {
                return { success: false, error: errorMsg };
            }
            this.sessionJobs$.next(this.jobsWithWorkerId());
            return { success: true };
        }
        catch (e) {
            return { success: false, error: e };
        }
    }
    cancelJob(job, terminateWorker = true) {
        try {
            let jobFound = false;
            let i = 0;
            while (i < this._workers.length && !jobFound) {
                const worker = this._workers[i];
                if (worker.jobs.some((j) => j.id === job.id)) {
                    jobFound = true;
                    const jobsToCancel = worker.jobs.filter((j) => j.id === job.id);
                    for (const j of jobsToCancel) {
                        if (j.status === Status.Queued || j.status === Status.Running) {
                            j.status = Status.Cancelled;
                            j.finishTime = new Date();
                            j.steps.forEach((step) => {
                                if (step.status === Status.Queued ||
                                    step.status === Status.Running) {
                                    step.status = Status.Cancelled;
                                    step.finishTime = new Date();
                                }
                            });
                        }
                    }
                    if (terminateWorker) {
                        const termWorkerRes = this.terminateWorker(worker.id);
                        if (!termWorkerRes.success) {
                            return {
                                success: false,
                                error: `Failed to terminate parent worker while canceling job ${job.id}. Internal terminateWorker error: ${termWorkerRes.error}`,
                            };
                        }
                    }
                    else {
                        // Since we didn't call terminate waorker, we still need to emit the updated jobs
                        this.sessionJobs$.next(this.jobsWithWorkerId());
                    }
                    return { success: true };
                }
                i++;
            }
            return {
                success: false,
                error: `Could not find job in any worker with jobId ${job.id}`,
            };
        }
        catch (e) {
            return { success: false, error: e };
        }
    }
    deleteJob(job) {
        try {
            // If the job is in the current session, delete it from the session
            for (let i = 0; i < this._workers.length; i++) {
                const index = this._workers[i].jobs.findIndex((j) => j.id === job.id);
                if (index !== -1) {
                    this._workers[i].jobs.splice(index, 1);
                    Khonsole.log(`Deleted job ${job.id} from session`);
                    this.sessionJobs$.next(this.jobsWithWorkerId());
                    return { success: true };
                }
            }
            // If the job is in the database, delete it from the database
            let found = false;
            this.store
                .select(fromRoot.getGraphAConfig)
                .pipe(first())
                .subscribe((graphAConfig) => {
                console.log("deleting job from database", job, graphAConfig);
                WorkspaceComponent.instance.delJob({
                    database: graphAConfig.database,
                    job: job,
                });
                Khonsole.log(`Deleted job ${job.id} from database`);
                found = true;
                return;
            });
            if (found) {
                return { success: true };
            }
            return {
                success: false,
                error: `Could not delete Job ${job.id}. Not found`,
            };
        }
        catch (e) {
            return { success: false, error: e };
        }
    }
    getJob(workerId, jobId) {
        return this.getWorker(workerId).jobs.find((job) => job.id === jobId);
    }
    getWorker(workerId) {
        return this._workers.find((worker) => worker.id === workerId);
    }
    runJob(workerId, name, initialPayload, steps, options = WebworkerJobManagerService.DEFAULT_OPTIONS) {
        return __awaiter(this, void 0, void 0, function* () {
            const stepResponses = [];
            const config = yield this.store
                .select(fromRoot.getGraphAConfig)
                .pipe(first())
                .toPromise();
            if (config === undefined) {
                throw new Error("runJob failed. Could not find graph config");
            }
            let payloadOrPrevStepResult = initialPayload;
            const jobId = uuidv4();
            const worker = this.getWorker(workerId);
            const finalOptions = Object.assign({}, WebworkerJobManagerService.DEFAULT_OPTIONS, options);
            Khonsole.log(`Running job "${name}" (${jobId}) on worker ${workerId} with options:`, finalOptions);
            const finalSteps = steps.map((step, i) => (Object.assign({}, step, { id: i, status: Status.Queued, logs: [], creationTime: new Date() })));
            // add the job to the history,
            worker.jobs.push({
                id: jobId,
                backendIdentifier: workerId,
                name,
                steps: finalSteps,
                status: Status.Running,
                type: finalOptions.type,
                creationTime: new Date(),
                databaseName: config.database,
            });
            this.sessionJobs$.next(this.jobsWithWorkerId());
            const job = this.getJob(workerId, jobId);
            // Show the startup snackbar
            if (finalOptions.showSnackbarOnStart) {
                this._snackbar.open(`Running job: ${name}`, "", Object.assign({ duration: 5000 }, this.snackbarConfig));
            }
            for (let i = 0; i < finalSteps.length; i++) {
                const step = finalSteps[i];
                // run the step
                const response = yield this.runStep(workerId, step, payloadOrPrevStepResult);
                stepResponses.push(response);
                // if the step failed, do not continue with the rest of the steps
                if (!response.success) {
                    if (finalOptions.showSnackbarOnFail) {
                        this._snackbar.open(`Error running job "${name}". Failed on step "${step.name}"`, "Close", Object.assign({ duration: 5000 }, this.snackbarConfig));
                    }
                    this.finishJob(worker, job, Status.Error, finalOptions);
                    return stepResponses;
                }
                // if the step succeeded, put the result in the payload for the next step
                payloadOrPrevStepResult = response.result;
            }
            if (finalOptions.showSnackbarOnSuccess) {
                this._snackbar.open(`Finished job "${name}"`, "Close", Object.assign({ duration: 5000 }, this.snackbarConfig));
            }
            this.finishJob(worker, job, Status.Success, finalOptions);
            return stepResponses;
        });
    }
    /**
     *
     * @param workerId The ID of the worker to run the step on
     * @param jobId The ID of the job to run the step on
     * @param step The step to run
     * @param payloadOrPrevStepResult The payload to run the step with. If the step has a `prevResultToPayload` function, this will be ignored.
     * @returns
     */
    runStep(workerId, step, payloadOrPrevStepResult) {
        return __awaiter(this, void 0, void 0, function* () {
            const worker = this.getWorker(workerId).worker;
            // mark the step as running
            step.status = Status.Running;
            const response = yield new Promise((resolve, _) => {
                // set the payload, either from the provided payload or from the previous step's result
                if (step.payload) {
                    payloadOrPrevStepResult = step.payload;
                }
                else {
                    payloadOrPrevStepResult = step.prevResultToPayload
                        ? step.prevResultToPayload(payloadOrPrevStepResult)
                        : payloadOrPrevStepResult;
                }
                // Run the step
                worker.postMessage(payloadOrPrevStepResult);
                // Handle the response
                worker.onmessage = (
                // event: MessageEvent<WorkerResponse> // Ideally we would use this type, but it doesn't work for some reason, even though MessageEvent is generic
                event) => {
                    const response = event.data;
                    // the status the worker responded with
                    const status = response.status;
                    // The result of the step
                    const dataOrErrorMsg = response.data;
                    // The type of the result of the step
                    const dataType = response.type;
                    switch (status) {
                        case Status.Success:
                            resolve({
                                success: true,
                                result: {
                                    type: dataType,
                                    data: dataOrErrorMsg,
                                },
                            });
                            break;
                        case Status.Error:
                            console.error(dataOrErrorMsg);
                            resolve({
                                success: false,
                                result: {
                                    type: StepResultType.Error,
                                    data: dataOrErrorMsg,
                                },
                            });
                            break;
                        case Status.Log:
                            const data = dataOrErrorMsg;
                            step.logs.push(data);
                            this.sessionJobs$.next(this.jobsWithWorkerId());
                            break;
                        default:
                            console.error(`Unknown status from worker "${status}". Must be "success" or "error"`);
                            resolve({
                                success: false,
                                result: {
                                    type: StepResultType.Error,
                                    data: `Unknown status from worker "${status}". Must be "success" or "error"`,
                                },
                            });
                            break;
                    }
                };
            });
            this.finishStep(step, response);
            return response;
        });
    }
    jobsWithWorkerId() {
        return this._workers.reduce((acc, worker) => {
            return acc.concat(worker.jobs.map((job) => (Object.assign({}, job, { workerId: worker.id }))));
        }, []);
    }
    finishStep(step, response) {
        step.finishTime = new Date();
        if (!response.success) {
            step.status = Status.Error;
            step.result = response.result;
            return;
        }
        step.status = Status.Success;
        if (response.result) {
            const typesToParse = [
                StepResultType.Table,
                StepResultType.JSON,
                StepResultType.VOLCANO_DATA,
            ];
            step.result = {
                type: response.result.type,
                // Parse the data if it is a table
                data: typesToParse.includes(response.result.type)
                    ? JSON.parse(response.result.data)
                    : response.result.data,
            };
        }
        step.logs.push({
            msg: "Success",
            level: LogLevel.Info,
        });
        this.sessionJobs$.next(this.jobsWithWorkerId());
    }
    finishJob(worker, job, status, options = WebworkerJobManagerService.DEFAULT_OPTIONS) {
        return __awaiter(this, void 0, void 0, function* () {
            job.status = status;
            job.finishTime = new Date();
            Khonsole.log(`Job "${job.name}" (${job.id}) finished with status "${status}"`);
            this.sessionJobs$.next(this.jobsWithWorkerId());
            if (options.saveToDatabase) {
                job = prepareJobForDatabase(job);
                console.log("saving job to database", job, job.databaseName);
                WorkspaceComponent.instance.addJob({
                    database: job.databaseName,
                    job: job,
                });
                // remove the job from the session
                const index = worker.jobs.findIndex((j) => j.id === job.id);
                if (index !== -1) {
                    worker.jobs.splice(index, 1);
                }
                this.sessionJobs$.next(this.jobsWithWorkerId());
                if (options.freeWorkerOnFinish) {
                    worker.busy = false;
                }
                return;
            }
            if (options.freeWorkerOnFinish) {
                worker.busy = false;
            }
            if (options.destroyWorkerOnFinish) {
                worker.worker.terminate();
                const index = this._workers.findIndex((w) => w.id === worker.id);
                if (index !== -1) {
                    this._workers.splice(index, 1);
                }
            }
        });
    }
    ngOnDestroy() { }
}
WebworkerJobManagerService.DEFAULT_OPTIONS = {
    type: "generic",
    showSnackbarOnStart: true,
    showSnackbarOnSuccess: true,
    onSuccessSnackbarClick: null,
    showSnackbarOnFail: true,
    onFailSnackbarClick: null,
    saveToDatabase: true,
    freeWorkerOnFinish: true,
    destroyWorkerOnFinish: true,
};
WebworkerJobManagerService.ngInjectableDef = i0.ɵɵdefineInjectable({ factory: function WebworkerJobManagerService_Factory() { return new WebworkerJobManagerService(i0.ɵɵinject(i1.MatSnackBar), i0.ɵɵinject(i2.Store), i0.ɵɵinject(i3.DataService)); }, token: WebworkerJobManagerService, providedIn: "root" });
