No Description
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

functionsRuntimeWorker.js 9.9KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.RuntimeWorkerPool = exports.RuntimeWorker = exports.RuntimeWorkerState = void 0;
  4. const http = require("http");
  5. const uuid = require("uuid");
  6. const types_1 = require("./types");
  7. const events_1 = require("events");
  8. const emulatorLogger_1 = require("./emulatorLogger");
  9. const error_1 = require("../error");
  10. var RuntimeWorkerState;
  11. (function (RuntimeWorkerState) {
  12. RuntimeWorkerState["CREATED"] = "CREATED";
  13. RuntimeWorkerState["IDLE"] = "IDLE";
  14. RuntimeWorkerState["BUSY"] = "BUSY";
  15. RuntimeWorkerState["FINISHING"] = "FINISHING";
  16. RuntimeWorkerState["FINISHED"] = "FINISHED";
  17. })(RuntimeWorkerState = exports.RuntimeWorkerState || (exports.RuntimeWorkerState = {}));
  18. class RuntimeWorker {
  19. constructor(key, runtime) {
  20. this.stateEvents = new events_1.EventEmitter();
  21. this.logListeners = [];
  22. this._state = RuntimeWorkerState.CREATED;
  23. this.id = uuid.v4();
  24. this.key = key;
  25. this.runtime = runtime;
  26. const childProc = this.runtime.process;
  27. let msgBuffer = "";
  28. childProc.on("message", (msg) => {
  29. msgBuffer = this.processStream(msg, msgBuffer);
  30. });
  31. let stdBuffer = "";
  32. if (childProc.stdout) {
  33. childProc.stdout.on("data", (data) => {
  34. stdBuffer = this.processStream(data, stdBuffer);
  35. });
  36. }
  37. if (childProc.stderr) {
  38. childProc.stderr.on("data", (data) => {
  39. stdBuffer = this.processStream(data, stdBuffer);
  40. });
  41. }
  42. childProc.on("exit", () => {
  43. this.log("exited");
  44. this.state = RuntimeWorkerState.FINISHED;
  45. });
  46. }
  47. processStream(s, buf) {
  48. buf += s.toString();
  49. const lines = buf.split("\n");
  50. if (lines.length > 1) {
  51. lines.slice(0, -1).forEach((line) => {
  52. const log = types_1.EmulatorLog.fromJSON(line);
  53. this.runtime.events.emit("log", log);
  54. if (log.level === "FATAL") {
  55. this.runtime.events.emit("log", new types_1.EmulatorLog("SYSTEM", "runtime-status", "killed"));
  56. this.runtime.process.kill();
  57. }
  58. });
  59. }
  60. return lines[lines.length - 1];
  61. }
  62. readyForWork() {
  63. this.state = RuntimeWorkerState.IDLE;
  64. }
  65. sendDebugMsg(debug) {
  66. return new Promise((resolve, reject) => {
  67. this.runtime.process.send(JSON.stringify(debug), (err) => {
  68. if (err) {
  69. reject(err);
  70. }
  71. else {
  72. resolve();
  73. }
  74. });
  75. });
  76. }
  77. request(req, resp, body) {
  78. this.state = RuntimeWorkerState.BUSY;
  79. const onFinish = () => {
  80. if (this.state === RuntimeWorkerState.BUSY) {
  81. this.state = RuntimeWorkerState.IDLE;
  82. }
  83. else if (this.state === RuntimeWorkerState.FINISHING) {
  84. this.log(`IDLE --> FINISHING`);
  85. this.runtime.process.kill();
  86. }
  87. };
  88. return new Promise((resolve) => {
  89. const proxy = http.request({
  90. method: req.method,
  91. path: req.path,
  92. headers: req.headers,
  93. socketPath: this.runtime.socketPath,
  94. }, (_resp) => {
  95. resp.writeHead(_resp.statusCode || 200, _resp.headers);
  96. const piped = _resp.pipe(resp);
  97. piped.on("finish", () => {
  98. onFinish();
  99. resolve();
  100. });
  101. });
  102. proxy.on("error", (err) => {
  103. resp.writeHead(500);
  104. resp.write(JSON.stringify(err));
  105. resp.end();
  106. this.runtime.process.kill();
  107. resolve();
  108. });
  109. if (body) {
  110. proxy.write(body);
  111. }
  112. proxy.end();
  113. });
  114. }
  115. get state() {
  116. return this._state;
  117. }
  118. set state(state) {
  119. if (state === RuntimeWorkerState.IDLE) {
  120. for (const l of this.logListeners) {
  121. this.runtime.events.removeListener("log", l);
  122. }
  123. this.logListeners = [];
  124. }
  125. if (state === RuntimeWorkerState.FINISHED) {
  126. this.runtime.events.removeAllListeners();
  127. }
  128. this.log(state);
  129. this._state = state;
  130. this.stateEvents.emit(this._state);
  131. }
  132. onLogs(listener, forever = false) {
  133. if (!forever) {
  134. this.logListeners.push(listener);
  135. }
  136. this.runtime.events.on("log", listener);
  137. }
  138. isSocketReady() {
  139. return new Promise((resolve, reject) => {
  140. const req = http
  141. .request({
  142. method: "GET",
  143. path: "/__/health",
  144. socketPath: this.runtime.socketPath,
  145. }, () => {
  146. this.readyForWork();
  147. resolve();
  148. })
  149. .end();
  150. req.on("error", (error) => {
  151. reject(error);
  152. });
  153. });
  154. }
  155. async waitForSocketReady() {
  156. const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
  157. const timeout = new Promise((resolve, reject) => {
  158. setTimeout(() => {
  159. reject(new error_1.FirebaseError("Failed to load function."));
  160. }, 30000);
  161. });
  162. while (true) {
  163. try {
  164. await Promise.race([this.isSocketReady(), timeout]);
  165. break;
  166. }
  167. catch (err) {
  168. if (["ECONNREFUSED", "ENOENT"].includes(err === null || err === void 0 ? void 0 : err.code)) {
  169. await sleep(100);
  170. continue;
  171. }
  172. throw err;
  173. }
  174. }
  175. }
  176. log(msg) {
  177. emulatorLogger_1.EmulatorLogger.forEmulator(types_1.Emulators.FUNCTIONS).log("DEBUG", `[worker-${this.key}-${this.id}]: ${msg}`);
  178. }
  179. }
  180. exports.RuntimeWorker = RuntimeWorker;
  181. class RuntimeWorkerPool {
  182. constructor(mode = types_1.FunctionsExecutionMode.AUTO) {
  183. this.mode = mode;
  184. this.workers = new Map();
  185. }
  186. getKey(triggerId) {
  187. if (this.mode === types_1.FunctionsExecutionMode.SEQUENTIAL) {
  188. return "~shared~";
  189. }
  190. else {
  191. return triggerId || "~diagnostic~";
  192. }
  193. }
  194. refresh() {
  195. for (const arr of this.workers.values()) {
  196. arr.forEach((w) => {
  197. if (w.state === RuntimeWorkerState.IDLE) {
  198. this.log(`Shutting down IDLE worker (${w.key})`);
  199. w.state = RuntimeWorkerState.FINISHING;
  200. w.runtime.process.kill();
  201. }
  202. else if (w.state === RuntimeWorkerState.BUSY) {
  203. this.log(`Marking BUSY worker to finish (${w.key})`);
  204. w.state = RuntimeWorkerState.FINISHING;
  205. }
  206. });
  207. }
  208. }
  209. exit() {
  210. for (const arr of this.workers.values()) {
  211. arr.forEach((w) => {
  212. if (w.state === RuntimeWorkerState.IDLE) {
  213. w.runtime.process.kill();
  214. }
  215. else {
  216. w.runtime.process.kill();
  217. }
  218. });
  219. }
  220. }
  221. readyForWork(triggerId) {
  222. const idleWorker = this.getIdleWorker(triggerId);
  223. return !!idleWorker;
  224. }
  225. async submitRequest(triggerId, req, resp, body, debug) {
  226. this.log(`submitRequest(triggerId=${triggerId})`);
  227. const worker = this.getIdleWorker(triggerId);
  228. if (!worker) {
  229. throw new error_1.FirebaseError("Internal Error: can't call submitRequest without checking for idle workers");
  230. }
  231. if (debug) {
  232. await worker.sendDebugMsg(debug);
  233. }
  234. return worker.request(req, resp, body);
  235. }
  236. getIdleWorker(triggerId) {
  237. this.cleanUpWorkers();
  238. const triggerWorkers = this.getTriggerWorkers(triggerId);
  239. if (!triggerWorkers.length) {
  240. this.setTriggerWorkers(triggerId, []);
  241. return;
  242. }
  243. for (const worker of triggerWorkers) {
  244. if (worker.state === RuntimeWorkerState.IDLE) {
  245. return worker;
  246. }
  247. }
  248. return;
  249. }
  250. addWorker(triggerId, runtime, extensionLogInfo) {
  251. const worker = new RuntimeWorker(this.getKey(triggerId), runtime);
  252. this.log(`addWorker(${worker.key})`);
  253. const keyWorkers = this.getTriggerWorkers(triggerId);
  254. keyWorkers.push(worker);
  255. this.setTriggerWorkers(triggerId, keyWorkers);
  256. const logger = triggerId
  257. ? emulatorLogger_1.EmulatorLogger.forFunction(triggerId, extensionLogInfo)
  258. : emulatorLogger_1.EmulatorLogger.forEmulator(types_1.Emulators.FUNCTIONS);
  259. worker.onLogs((log) => {
  260. logger.handleRuntimeLog(log);
  261. }, true);
  262. this.log(`Adding worker with key ${worker.key}, total=${keyWorkers.length}`);
  263. return worker;
  264. }
  265. getTriggerWorkers(triggerId) {
  266. return this.workers.get(this.getKey(triggerId)) || [];
  267. }
  268. setTriggerWorkers(triggerId, workers) {
  269. this.workers.set(this.getKey(triggerId), workers);
  270. }
  271. cleanUpWorkers() {
  272. for (const [key, keyWorkers] of this.workers.entries()) {
  273. const notDoneWorkers = keyWorkers.filter((worker) => {
  274. return worker.state !== RuntimeWorkerState.FINISHED;
  275. });
  276. if (notDoneWorkers.length !== keyWorkers.length) {
  277. this.log(`Cleaned up workers for ${key}: ${keyWorkers.length} --> ${notDoneWorkers.length}`);
  278. }
  279. this.setTriggerWorkers(key, notDoneWorkers);
  280. }
  281. }
  282. log(msg) {
  283. emulatorLogger_1.EmulatorLogger.forEmulator(types_1.Emulators.FUNCTIONS).log("DEBUG", `[worker-pool] ${msg}`);
  284. }
  285. }
  286. exports.RuntimeWorkerPool = RuntimeWorkerPool;