123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286 |
- "use strict";
- Object.defineProperty(exports, "__esModule", { value: true });
- exports.RuntimeWorkerPool = exports.RuntimeWorker = exports.RuntimeWorkerState = void 0;
- const http = require("http");
- const uuid = require("uuid");
- const types_1 = require("./types");
- const events_1 = require("events");
- const emulatorLogger_1 = require("./emulatorLogger");
- const error_1 = require("../error");
- var RuntimeWorkerState;
- (function (RuntimeWorkerState) {
- RuntimeWorkerState["CREATED"] = "CREATED";
- RuntimeWorkerState["IDLE"] = "IDLE";
- RuntimeWorkerState["BUSY"] = "BUSY";
- RuntimeWorkerState["FINISHING"] = "FINISHING";
- RuntimeWorkerState["FINISHED"] = "FINISHED";
- })(RuntimeWorkerState = exports.RuntimeWorkerState || (exports.RuntimeWorkerState = {}));
- class RuntimeWorker {
- constructor(key, runtime) {
- this.stateEvents = new events_1.EventEmitter();
- this.logListeners = [];
- this._state = RuntimeWorkerState.CREATED;
- this.id = uuid.v4();
- this.key = key;
- this.runtime = runtime;
- const childProc = this.runtime.process;
- let msgBuffer = "";
- childProc.on("message", (msg) => {
- msgBuffer = this.processStream(msg, msgBuffer);
- });
- let stdBuffer = "";
- if (childProc.stdout) {
- childProc.stdout.on("data", (data) => {
- stdBuffer = this.processStream(data, stdBuffer);
- });
- }
- if (childProc.stderr) {
- childProc.stderr.on("data", (data) => {
- stdBuffer = this.processStream(data, stdBuffer);
- });
- }
- childProc.on("exit", () => {
- this.log("exited");
- this.state = RuntimeWorkerState.FINISHED;
- });
- }
- processStream(s, buf) {
- buf += s.toString();
- const lines = buf.split("\n");
- if (lines.length > 1) {
- lines.slice(0, -1).forEach((line) => {
- const log = types_1.EmulatorLog.fromJSON(line);
- this.runtime.events.emit("log", log);
- if (log.level === "FATAL") {
- this.runtime.events.emit("log", new types_1.EmulatorLog("SYSTEM", "runtime-status", "killed"));
- this.runtime.process.kill();
- }
- });
- }
- return lines[lines.length - 1];
- }
- readyForWork() {
- this.state = RuntimeWorkerState.IDLE;
- }
- sendDebugMsg(debug) {
- return new Promise((resolve, reject) => {
- this.runtime.process.send(JSON.stringify(debug), (err) => {
- if (err) {
- reject(err);
- }
- else {
- resolve();
- }
- });
- });
- }
- request(req, resp, body) {
- this.state = RuntimeWorkerState.BUSY;
- const onFinish = () => {
- if (this.state === RuntimeWorkerState.BUSY) {
- this.state = RuntimeWorkerState.IDLE;
- }
- else if (this.state === RuntimeWorkerState.FINISHING) {
- this.log(`IDLE --> FINISHING`);
- this.runtime.process.kill();
- }
- };
- return new Promise((resolve) => {
- const proxy = http.request({
- method: req.method,
- path: req.path,
- headers: req.headers,
- socketPath: this.runtime.socketPath,
- }, (_resp) => {
- resp.writeHead(_resp.statusCode || 200, _resp.headers);
- const piped = _resp.pipe(resp);
- piped.on("finish", () => {
- onFinish();
- resolve();
- });
- });
- proxy.on("error", (err) => {
- resp.writeHead(500);
- resp.write(JSON.stringify(err));
- resp.end();
- this.runtime.process.kill();
- resolve();
- });
- if (body) {
- proxy.write(body);
- }
- proxy.end();
- });
- }
- get state() {
- return this._state;
- }
- set state(state) {
- if (state === RuntimeWorkerState.IDLE) {
- for (const l of this.logListeners) {
- this.runtime.events.removeListener("log", l);
- }
- this.logListeners = [];
- }
- if (state === RuntimeWorkerState.FINISHED) {
- this.runtime.events.removeAllListeners();
- }
- this.log(state);
- this._state = state;
- this.stateEvents.emit(this._state);
- }
- onLogs(listener, forever = false) {
- if (!forever) {
- this.logListeners.push(listener);
- }
- this.runtime.events.on("log", listener);
- }
- isSocketReady() {
- return new Promise((resolve, reject) => {
- const req = http
- .request({
- method: "GET",
- path: "/__/health",
- socketPath: this.runtime.socketPath,
- }, () => {
- this.readyForWork();
- resolve();
- })
- .end();
- req.on("error", (error) => {
- reject(error);
- });
- });
- }
- async waitForSocketReady() {
- const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
- const timeout = new Promise((resolve, reject) => {
- setTimeout(() => {
- reject(new error_1.FirebaseError("Failed to load function."));
- }, 30000);
- });
- while (true) {
- try {
- await Promise.race([this.isSocketReady(), timeout]);
- break;
- }
- catch (err) {
- if (["ECONNREFUSED", "ENOENT"].includes(err === null || err === void 0 ? void 0 : err.code)) {
- await sleep(100);
- continue;
- }
- throw err;
- }
- }
- }
- log(msg) {
- emulatorLogger_1.EmulatorLogger.forEmulator(types_1.Emulators.FUNCTIONS).log("DEBUG", `[worker-${this.key}-${this.id}]: ${msg}`);
- }
- }
- exports.RuntimeWorker = RuntimeWorker;
- class RuntimeWorkerPool {
- constructor(mode = types_1.FunctionsExecutionMode.AUTO) {
- this.mode = mode;
- this.workers = new Map();
- }
- getKey(triggerId) {
- if (this.mode === types_1.FunctionsExecutionMode.SEQUENTIAL) {
- return "~shared~";
- }
- else {
- return triggerId || "~diagnostic~";
- }
- }
- refresh() {
- for (const arr of this.workers.values()) {
- arr.forEach((w) => {
- if (w.state === RuntimeWorkerState.IDLE) {
- this.log(`Shutting down IDLE worker (${w.key})`);
- w.state = RuntimeWorkerState.FINISHING;
- w.runtime.process.kill();
- }
- else if (w.state === RuntimeWorkerState.BUSY) {
- this.log(`Marking BUSY worker to finish (${w.key})`);
- w.state = RuntimeWorkerState.FINISHING;
- }
- });
- }
- }
- exit() {
- for (const arr of this.workers.values()) {
- arr.forEach((w) => {
- if (w.state === RuntimeWorkerState.IDLE) {
- w.runtime.process.kill();
- }
- else {
- w.runtime.process.kill();
- }
- });
- }
- }
- readyForWork(triggerId) {
- const idleWorker = this.getIdleWorker(triggerId);
- return !!idleWorker;
- }
- async submitRequest(triggerId, req, resp, body, debug) {
- this.log(`submitRequest(triggerId=${triggerId})`);
- const worker = this.getIdleWorker(triggerId);
- if (!worker) {
- throw new error_1.FirebaseError("Internal Error: can't call submitRequest without checking for idle workers");
- }
- if (debug) {
- await worker.sendDebugMsg(debug);
- }
- return worker.request(req, resp, body);
- }
- getIdleWorker(triggerId) {
- this.cleanUpWorkers();
- const triggerWorkers = this.getTriggerWorkers(triggerId);
- if (!triggerWorkers.length) {
- this.setTriggerWorkers(triggerId, []);
- return;
- }
- for (const worker of triggerWorkers) {
- if (worker.state === RuntimeWorkerState.IDLE) {
- return worker;
- }
- }
- return;
- }
- addWorker(triggerId, runtime, extensionLogInfo) {
- const worker = new RuntimeWorker(this.getKey(triggerId), runtime);
- this.log(`addWorker(${worker.key})`);
- const keyWorkers = this.getTriggerWorkers(triggerId);
- keyWorkers.push(worker);
- this.setTriggerWorkers(triggerId, keyWorkers);
- const logger = triggerId
- ? emulatorLogger_1.EmulatorLogger.forFunction(triggerId, extensionLogInfo)
- : emulatorLogger_1.EmulatorLogger.forEmulator(types_1.Emulators.FUNCTIONS);
- worker.onLogs((log) => {
- logger.handleRuntimeLog(log);
- }, true);
- this.log(`Adding worker with key ${worker.key}, total=${keyWorkers.length}`);
- return worker;
- }
- getTriggerWorkers(triggerId) {
- return this.workers.get(this.getKey(triggerId)) || [];
- }
- setTriggerWorkers(triggerId, workers) {
- this.workers.set(this.getKey(triggerId), workers);
- }
- cleanUpWorkers() {
- for (const [key, keyWorkers] of this.workers.entries()) {
- const notDoneWorkers = keyWorkers.filter((worker) => {
- return worker.state !== RuntimeWorkerState.FINISHED;
- });
- if (notDoneWorkers.length !== keyWorkers.length) {
- this.log(`Cleaned up workers for ${key}: ${keyWorkers.length} --> ${notDoneWorkers.length}`);
- }
- this.setTriggerWorkers(key, notDoneWorkers);
- }
- }
- log(msg) {
- emulatorLogger_1.EmulatorLogger.forEmulator(types_1.Emulators.FUNCTIONS).log("DEBUG", `[worker-pool] ${msg}`);
- }
- }
- exports.RuntimeWorkerPool = RuntimeWorkerPool;
|