123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183 |
- "use strict";
- Object.defineProperty(exports, "__esModule", { value: true });
- exports.PubsubEmulator = void 0;
- const uuid = require("uuid");
- const pubsub_1 = require("@google-cloud/pubsub");
- const downloadableEmulators = require("./downloadableEmulators");
- const emulatorLogger_1 = require("./emulatorLogger");
- const types_1 = require("../emulator/types");
- const constants_1 = require("./constants");
- const error_1 = require("../error");
- const registry_1 = require("./registry");
- const child_process_1 = require("child_process");
- const PUBSUB_KILL_COMMAND = "pubsub_pids=$(ps aux | grep '[p]ubsub-emulator' | awk '{print $2}');" +
- " if [ ! -z '$pubsub_pids' ]; then kill -9 $pubsub_pids; fi;";
- class PubsubEmulator {
- constructor(args) {
- this.args = args;
- this.logger = emulatorLogger_1.EmulatorLogger.forEmulator(types_1.Emulators.PUBSUB);
- this.triggersForTopic = new Map();
- this.subscriptionForTopic = new Map();
- }
- get pubsub() {
- if (!this._pubsub) {
- this._pubsub = new pubsub_1.PubSub({
- apiEndpoint: registry_1.EmulatorRegistry.url(types_1.Emulators.PUBSUB).host,
- projectId: this.args.projectId,
- });
- }
- return this._pubsub;
- }
- async start() {
- return downloadableEmulators.start(types_1.Emulators.PUBSUB, this.args);
- }
- connect() {
- return Promise.resolve();
- }
- async stop() {
- try {
- await downloadableEmulators.stop(types_1.Emulators.PUBSUB);
- }
- catch (e) {
- this.logger.logLabeled("DEBUG", "pubsub", JSON.stringify(e));
- if (process.platform !== "win32") {
- const buffer = (0, child_process_1.execSync)(PUBSUB_KILL_COMMAND);
- this.logger.logLabeled("DEBUG", "pubsub", "Pubsub kill output: " + JSON.stringify(buffer));
- }
- }
- }
- getInfo() {
- const host = this.args.host || constants_1.Constants.getDefaultHost();
- const port = this.args.port || constants_1.Constants.getDefaultPort(types_1.Emulators.PUBSUB);
- return {
- name: this.getName(),
- host,
- port,
- pid: downloadableEmulators.getPID(types_1.Emulators.PUBSUB),
- };
- }
- getName() {
- return types_1.Emulators.PUBSUB;
- }
- async maybeCreateTopicAndSub(topicName) {
- const topic = this.pubsub.topic(topicName);
- try {
- this.logger.logLabeled("DEBUG", "pubsub", `Creating topic: ${topicName}`);
- await topic.create();
- }
- catch (e) {
- if (e && e.code === 6) {
- this.logger.logLabeled("DEBUG", "pubsub", `Topic ${topicName} exists`);
- }
- else {
- throw new error_1.FirebaseError(`Could not create topic ${topicName}`, { original: e });
- }
- }
- const subName = `emulator-sub-${topicName}`;
- let sub;
- try {
- this.logger.logLabeled("DEBUG", "pubsub", `Creating sub for topic: ${topicName}`);
- [sub] = await topic.createSubscription(subName);
- }
- catch (e) {
- if (e && e.code === 6) {
- this.logger.logLabeled("DEBUG", "pubsub", `Sub for ${topicName} exists`);
- sub = topic.subscription(subName);
- }
- else {
- throw new error_1.FirebaseError(`Could not create sub ${subName}`, { original: e });
- }
- }
- sub.on("message", (message) => {
- this.onMessage(topicName, message);
- });
- return sub;
- }
- async addTrigger(topicName, triggerKey, signatureType) {
- this.logger.logLabeled("DEBUG", "pubsub", `addTrigger(${topicName}, ${triggerKey}, ${signatureType})`);
- const sub = await this.maybeCreateTopicAndSub(topicName);
- const triggers = this.triggersForTopic.get(topicName) || [];
- if (triggers.some((t) => t.triggerKey === triggerKey) &&
- this.subscriptionForTopic.has(topicName)) {
- this.logger.logLabeled("DEBUG", "pubsub", "Trigger already exists");
- return;
- }
- triggers.push({ triggerKey, signatureType });
- this.triggersForTopic.set(topicName, triggers);
- this.subscriptionForTopic.set(topicName, sub);
- }
- ensureFunctionsClient() {
- if (this.client !== undefined)
- return;
- if (!registry_1.EmulatorRegistry.isRunning(types_1.Emulators.FUNCTIONS)) {
- throw new error_1.FirebaseError(`Attempted to execute pubsub trigger but could not find the Functions emulator`);
- }
- this.client = registry_1.EmulatorRegistry.client(types_1.Emulators.FUNCTIONS);
- }
- createLegacyEventRequestBody(topic, message) {
- return {
- context: {
- eventId: uuid.v4(),
- resource: {
- service: "pubsub.googleapis.com",
- name: `projects/${this.args.projectId}/topics/${topic}`,
- },
- eventType: "google.pubsub.topic.publish",
- timestamp: message.publishTime.toISOString(),
- },
- data: {
- data: message.data,
- attributes: message.attributes,
- },
- };
- }
- createCloudEventRequestBody(topic, message) {
- const data = {
- message: {
- messageId: message.id,
- publishTime: message.publishTime,
- attributes: message.attributes,
- orderingKey: message.orderingKey,
- data: message.data.toString("base64"),
- },
- subscription: this.subscriptionForTopic.get(topic).name,
- };
- return {
- specversion: "1",
- id: uuid.v4(),
- time: message.publishTime.toISOString(),
- type: "google.cloud.pubsub.topic.v1.messagePublished",
- source: `//pubsub.googleapis.com/projects/${this.args.projectId}/topics/${topic}`,
- data,
- };
- }
- async onMessage(topicName, message) {
- this.logger.logLabeled("DEBUG", "pubsub", `onMessage(${topicName}, ${message.id})`);
- const triggers = this.triggersForTopic.get(topicName);
- if (!triggers || triggers.length === 0) {
- throw new error_1.FirebaseError(`No trigger for topic: ${topicName}`);
- }
- this.logger.logLabeled("DEBUG", "pubsub", `Executing ${triggers.length} matching triggers (${JSON.stringify(triggers.map((t) => t.triggerKey))})`);
- this.ensureFunctionsClient();
- for (const { triggerKey, signatureType } of triggers) {
- try {
- const path = `/functions/projects/${this.args.projectId}/triggers/${triggerKey}`;
- if (signatureType === "event") {
- await this.client.post(path, this.createLegacyEventRequestBody(topicName, message));
- }
- else if (signatureType === "cloudevent") {
- await this.client.post(path, this.createCloudEventRequestBody(topicName, message), { headers: { "Content-Type": "application/cloudevents+json; charset=UTF-8" } });
- }
- else {
- throw new error_1.FirebaseError(`Unsupported trigger signature: ${signatureType}`);
- }
- }
- catch (e) {
- this.logger.logLabeled("DEBUG", "pubsub", e);
- }
- }
- this.logger.logLabeled("DEBUG", "pubsub", `Acking message ${message.id}`);
- message.ack();
- }
- }
- exports.PubsubEmulator = PubsubEmulator;
|