No Description
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

pubsubEmulator.js 7.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.PubsubEmulator = void 0;
  4. const uuid = require("uuid");
  5. const pubsub_1 = require("@google-cloud/pubsub");
  6. const downloadableEmulators = require("./downloadableEmulators");
  7. const emulatorLogger_1 = require("./emulatorLogger");
  8. const types_1 = require("../emulator/types");
  9. const constants_1 = require("./constants");
  10. const error_1 = require("../error");
  11. const registry_1 = require("./registry");
  12. const child_process_1 = require("child_process");
  13. const PUBSUB_KILL_COMMAND = "pubsub_pids=$(ps aux | grep '[p]ubsub-emulator' | awk '{print $2}');" +
  14. " if [ ! -z '$pubsub_pids' ]; then kill -9 $pubsub_pids; fi;";
  15. class PubsubEmulator {
  16. constructor(args) {
  17. this.args = args;
  18. this.logger = emulatorLogger_1.EmulatorLogger.forEmulator(types_1.Emulators.PUBSUB);
  19. this.triggersForTopic = new Map();
  20. this.subscriptionForTopic = new Map();
  21. }
  22. get pubsub() {
  23. if (!this._pubsub) {
  24. this._pubsub = new pubsub_1.PubSub({
  25. apiEndpoint: registry_1.EmulatorRegistry.url(types_1.Emulators.PUBSUB).host,
  26. projectId: this.args.projectId,
  27. });
  28. }
  29. return this._pubsub;
  30. }
  31. async start() {
  32. return downloadableEmulators.start(types_1.Emulators.PUBSUB, this.args);
  33. }
  34. connect() {
  35. return Promise.resolve();
  36. }
  37. async stop() {
  38. try {
  39. await downloadableEmulators.stop(types_1.Emulators.PUBSUB);
  40. }
  41. catch (e) {
  42. this.logger.logLabeled("DEBUG", "pubsub", JSON.stringify(e));
  43. if (process.platform !== "win32") {
  44. const buffer = (0, child_process_1.execSync)(PUBSUB_KILL_COMMAND);
  45. this.logger.logLabeled("DEBUG", "pubsub", "Pubsub kill output: " + JSON.stringify(buffer));
  46. }
  47. }
  48. }
  49. getInfo() {
  50. const host = this.args.host || constants_1.Constants.getDefaultHost();
  51. const port = this.args.port || constants_1.Constants.getDefaultPort(types_1.Emulators.PUBSUB);
  52. return {
  53. name: this.getName(),
  54. host,
  55. port,
  56. pid: downloadableEmulators.getPID(types_1.Emulators.PUBSUB),
  57. };
  58. }
  59. getName() {
  60. return types_1.Emulators.PUBSUB;
  61. }
  62. async maybeCreateTopicAndSub(topicName) {
  63. const topic = this.pubsub.topic(topicName);
  64. try {
  65. this.logger.logLabeled("DEBUG", "pubsub", `Creating topic: ${topicName}`);
  66. await topic.create();
  67. }
  68. catch (e) {
  69. if (e && e.code === 6) {
  70. this.logger.logLabeled("DEBUG", "pubsub", `Topic ${topicName} exists`);
  71. }
  72. else {
  73. throw new error_1.FirebaseError(`Could not create topic ${topicName}`, { original: e });
  74. }
  75. }
  76. const subName = `emulator-sub-${topicName}`;
  77. let sub;
  78. try {
  79. this.logger.logLabeled("DEBUG", "pubsub", `Creating sub for topic: ${topicName}`);
  80. [sub] = await topic.createSubscription(subName);
  81. }
  82. catch (e) {
  83. if (e && e.code === 6) {
  84. this.logger.logLabeled("DEBUG", "pubsub", `Sub for ${topicName} exists`);
  85. sub = topic.subscription(subName);
  86. }
  87. else {
  88. throw new error_1.FirebaseError(`Could not create sub ${subName}`, { original: e });
  89. }
  90. }
  91. sub.on("message", (message) => {
  92. this.onMessage(topicName, message);
  93. });
  94. return sub;
  95. }
  96. async addTrigger(topicName, triggerKey, signatureType) {
  97. this.logger.logLabeled("DEBUG", "pubsub", `addTrigger(${topicName}, ${triggerKey}, ${signatureType})`);
  98. const sub = await this.maybeCreateTopicAndSub(topicName);
  99. const triggers = this.triggersForTopic.get(topicName) || [];
  100. if (triggers.some((t) => t.triggerKey === triggerKey) &&
  101. this.subscriptionForTopic.has(topicName)) {
  102. this.logger.logLabeled("DEBUG", "pubsub", "Trigger already exists");
  103. return;
  104. }
  105. triggers.push({ triggerKey, signatureType });
  106. this.triggersForTopic.set(topicName, triggers);
  107. this.subscriptionForTopic.set(topicName, sub);
  108. }
  109. ensureFunctionsClient() {
  110. if (this.client !== undefined)
  111. return;
  112. if (!registry_1.EmulatorRegistry.isRunning(types_1.Emulators.FUNCTIONS)) {
  113. throw new error_1.FirebaseError(`Attempted to execute pubsub trigger but could not find the Functions emulator`);
  114. }
  115. this.client = registry_1.EmulatorRegistry.client(types_1.Emulators.FUNCTIONS);
  116. }
  117. createLegacyEventRequestBody(topic, message) {
  118. return {
  119. context: {
  120. eventId: uuid.v4(),
  121. resource: {
  122. service: "pubsub.googleapis.com",
  123. name: `projects/${this.args.projectId}/topics/${topic}`,
  124. },
  125. eventType: "google.pubsub.topic.publish",
  126. timestamp: message.publishTime.toISOString(),
  127. },
  128. data: {
  129. data: message.data,
  130. attributes: message.attributes,
  131. },
  132. };
  133. }
  134. createCloudEventRequestBody(topic, message) {
  135. const data = {
  136. message: {
  137. messageId: message.id,
  138. publishTime: message.publishTime,
  139. attributes: message.attributes,
  140. orderingKey: message.orderingKey,
  141. data: message.data.toString("base64"),
  142. },
  143. subscription: this.subscriptionForTopic.get(topic).name,
  144. };
  145. return {
  146. specversion: "1",
  147. id: uuid.v4(),
  148. time: message.publishTime.toISOString(),
  149. type: "google.cloud.pubsub.topic.v1.messagePublished",
  150. source: `//pubsub.googleapis.com/projects/${this.args.projectId}/topics/${topic}`,
  151. data,
  152. };
  153. }
  154. async onMessage(topicName, message) {
  155. this.logger.logLabeled("DEBUG", "pubsub", `onMessage(${topicName}, ${message.id})`);
  156. const triggers = this.triggersForTopic.get(topicName);
  157. if (!triggers || triggers.length === 0) {
  158. throw new error_1.FirebaseError(`No trigger for topic: ${topicName}`);
  159. }
  160. this.logger.logLabeled("DEBUG", "pubsub", `Executing ${triggers.length} matching triggers (${JSON.stringify(triggers.map((t) => t.triggerKey))})`);
  161. this.ensureFunctionsClient();
  162. for (const { triggerKey, signatureType } of triggers) {
  163. try {
  164. const path = `/functions/projects/${this.args.projectId}/triggers/${triggerKey}`;
  165. if (signatureType === "event") {
  166. await this.client.post(path, this.createLegacyEventRequestBody(topicName, message));
  167. }
  168. else if (signatureType === "cloudevent") {
  169. await this.client.post(path, this.createCloudEventRequestBody(topicName, message), { headers: { "Content-Type": "application/cloudevents+json; charset=UTF-8" } });
  170. }
  171. else {
  172. throw new error_1.FirebaseError(`Unsupported trigger signature: ${signatureType}`);
  173. }
  174. }
  175. catch (e) {
  176. this.logger.logLabeled("DEBUG", "pubsub", e);
  177. }
  178. }
  179. this.logger.logLabeled("DEBUG", "pubsub", `Acking message ${message.id}`);
  180. message.ack();
  181. }
  182. }
  183. exports.PubsubEmulator = PubsubEmulator;