diff --git a/_emulator/extensions/firestore-schedule-writes.env.local b/_emulator/extensions/firestore-schedule-writes.env.local new file mode 100644 index 00000000..c8be777f --- /dev/null +++ b/_emulator/extensions/firestore-schedule-writes.env.local @@ -0,0 +1,2 @@ +PROJECT_ID=demo-test +LOCATION=us-central1 \ No newline at end of file diff --git a/_emulator/firebase.json b/_emulator/firebase.json index 0f5728b6..ea8e6e8b 100644 --- a/_emulator/firebase.json +++ b/_emulator/firebase.json @@ -1,7 +1,6 @@ { "extensions": { - "firestore-record-user-acknowledgements": "../firestore-record-user-acknowledgements", - "firestore-bundle-server": "../firestore-bundle-server" + "firestore-schedule-writes": "../firestore-schedule-writes" }, "storage": { "rules": "storage.rules" @@ -39,7 +38,7 @@ "rewrites": [ { "source": "/bundles/*", - "function": "ext-firestore-bundle-server-serve" + "function": "ext-firestore-schedule-writes-serve" }, { "source": "**", diff --git a/firestore-schedule-writes/POSTINSTALL.md b/firestore-schedule-writes/POSTINSTALL.md index 1102f0a2..8ae0cce4 100644 --- a/firestore-schedule-writes/POSTINSTALL.md +++ b/firestore-schedule-writes/POSTINSTALL.md @@ -1,23 +1,5 @@ Schedule Firestore Writes allows you to write data to a collection in Firestore that is then written to a different location at a specified time in the future. This can be used for many common time-related tasks such as delayed notifications, alarms, and reminders. Because this all happens within Firestore, scheduled writes can be listened to by clients or connected with via Cloud Functions. -# See it in action - -```js -const TEN_MINUTES_MS = 10 * 60 * 1000; - -firebase - .firestore() - .collection("${param:QUEUE_COLLECTION}") - .add({ - state: "PENDING", // IMPORTANT! Omitting this field will prevent the scheduled write from being picked up. - collection: "delivered", // only if you didn't specify a TARGET_COLLECTION - data: { message: "Hello from the future!" }, - deliverTime: firebase.firestore.Timestamp.fromMillis( - Date.now() + TEN_MINUTES_MS - ), - }); -``` - ## Required Setup To use this extension, Firestore must have a two compound indexes in the `${param:QUEUE_COLLECTION}` collection. These can be created manually in the Firebase console or added to your `firestore.indexes.json` file deployed via Firebase CLI: @@ -45,6 +27,24 @@ To use this extension, Firestore must have a two compound indexes in the `${para } ``` +# See it in action + +```js +const TEN_MINUTES_MS = 10 * 60 * 1000; + +firebase + .firestore() + .collection("${param:QUEUE_COLLECTION}") + .add({ + state: "PENDING", // IMPORTANT! Omitting this field will prevent the scheduled write from being picked up. + collection: "delivered", // only if you didn't specify a TARGET_COLLECTION + data: { message: "Hello from the future!" }, + deliverTime: firebase.firestore.Timestamp.fromMillis( + Date.now() + TEN_MINUTES_MS + ), + }); +``` + # Using the extension When writing to your `${param:QUEUE_COLLECTION}` collection, the following fields are available: diff --git a/firestore-schedule-writes/extension.yaml b/firestore-schedule-writes/extension.yaml index 0359826d..71a75b5c 100644 --- a/firestore-schedule-writes/extension.yaml +++ b/firestore-schedule-writes/extension.yaml @@ -49,7 +49,7 @@ resources: location: ${LOCATION} scheduleTrigger: schedule: "${SCHEDULE}" - runtime: "nodejs12" + runtime: "nodejs16" # In the `params` field, set up your extension's user-configured parameters. # Learn more in the docs: https://firebase.google.com/docs/extensions/alpha/ref-extension-yaml#params-field diff --git a/firestore-schedule-writes/functions/.gitignore b/firestore-schedule-writes/functions/.gitignore new file mode 100644 index 00000000..7951405f --- /dev/null +++ b/firestore-schedule-writes/functions/.gitignore @@ -0,0 +1 @@ +lib \ No newline at end of file diff --git a/firestore-schedule-writes/functions/__tests__/functions.test.ts b/firestore-schedule-writes/functions/__tests__/functions.test.ts new file mode 100644 index 00000000..ccc11c2d --- /dev/null +++ b/firestore-schedule-writes/functions/__tests__/functions.test.ts @@ -0,0 +1,251 @@ +import * as fft from "firebase-functions-test"; +const testEnv = fft({ + projectId: "demo-test", +}); + +import * as admin from "firebase-admin"; +import setupEnvironment from "./helpers/setupEnvironment"; +//@ts-ignore +import config from "../src/config"; +import { addProcessingDocuments, clearFirestore } from "./helpers"; + +const functions = require("../src/index"); + +setupEnvironment(); +jest.spyOn(admin, "initializeApp").mockImplementation(); + +/** Setup test environment */ +const db = admin.firestore(); + +/** Setup Mocks */ +jest.mock("../src/config", () => ({ + default: { + schedule: "every 1 minutes", + mergeWrite: "true", + queueCollection: "queued_writes", + targetCollection: "target_collection", + stalenessThresholdSeconds: 0, + cleanup: "KEEP", + }, +})); + +describe("firestore-schedule-writes", () => { + afterEach(async () => { + /** Clear mocks */ + await jest.clearAllMocks(); + }); + + describe("resetStuckWrites", () => { + describe("when using a single write", () => { + beforeEach(async () => { + /** Clear Firestore data */ + await clearFirestore(); + + /** + * Add processing documents + * 150 axceeds the 100 batch size, + * causes the function to run multiple times + * */ + await addProcessingDocuments(1, "PROCESSING", { + leaseExpireTime: admin.firestore.Timestamp.now(), + }); + }, 60000); + + it("should reset a single stuck write", async () => { + /** Run function */ + const wrapped = testEnv.wrap(functions.deliverWrites); + await wrapped({}); + + /** Wait 5 seconds */ + await new Promise((resolve) => setTimeout(resolve, 12000)); + + /** Asset documents */ + const snap = await db.collection(config.queueCollection).get(); + const data = snap.docs[0].data(); + + expect(data.state).toBe("PENDING"); + }, 60000); + }); + + describe("when using batch writes", () => { + beforeEach(async () => { + /** Clear Firestore data */ + await clearFirestore(); + + /** + * Add processing documents + * 150 axceeds the 100 batch size, + * causes the function to run multiple times + * */ + await addProcessingDocuments(101, "PROCESSING", { + leaseExpireTime: admin.firestore.Timestamp.now(), + }); + }, 60000); + + it("should resets multiple batches of stuck writes", async () => { + /** Run function */ + const wrapped = testEnv.wrap(functions.deliverWrites); + await wrapped({}); + + /** Wait 5 seconds */ + await new Promise((resolve) => setTimeout(resolve, 5000)); + + /** Assert documents */ + const snap = await db.collection(config.queueCollection).get(); + + for (const doc of snap.docs) { + const data = doc.data(); + expect(data.state).toBe("PENDING"); + } + }, 60000); + }); + }); + + describe("fetchAndProcess", () => { + describe("single writes", () => { + beforeEach(async () => { + /** Clear Firestore data */ + await clearFirestore(); + + /** Add processing documents */ + await addProcessingDocuments(1, "PENDING", { + deliverTime: admin.firestore.Timestamp.now(), + }); + }); + + test("should process a single write", async () => { + /** Run function */ + const wrapped = testEnv.wrap(functions.deliverWrites); + await wrapped({}); + + /** Wait 5 seconds */ + await new Promise((resolve) => setTimeout(resolve, 5000)); + + /** Assert Queue collections */ + const snap = await db.collection(config.queueCollection).get(); + const data = snap.docs[0].data(); + expect(data.state).toBe("DELIVERED"); + expect(data.attempts).toBe(1); + + /** Assert tagret collections */ + const queueSnap = await db + .collection(config.targetCollection || "") + .get(); + const QueueData = queueSnap.docs[0].data(); + expect(QueueData.document).toBeDefined(); + }, 60000); + }); + + describe("batch writes", () => { + beforeEach(async () => { + /** Clear Firestore data */ + await clearFirestore(); + + /** Add processing documents */ + await addProcessingDocuments(101, "PENDING", { + deliverTime: admin.firestore.Timestamp.now(), + }); + }, 60000); + + test("should process multiple batches", async () => { + /** Run function */ + const wrapped = testEnv.wrap(functions.deliverWrites); + await wrapped({}); + + /** Wait 5 seconds */ + await new Promise((resolve) => setTimeout(resolve, 5000)); + + /** Asset documents */ + const snap = await db.collection(config.queueCollection).get(); + + for (const doc of snap.docs) { + const data = doc.data(); + expect(data.state).toBe("DELIVERED"); + expect(data.attempts).toBe(1); + } + + /** Assert tagret collections */ + const queueSnap = await db + .collection(config.targetCollection || "") + .get(); + + for (const doc of queueSnap.docs) { + const data = doc.data(); + expect(data.document).toBeDefined(); + } + }, 1200000); + }); + }); + + describe("stale writes", () => { + beforeEach(async () => { + /** Clear Firestore data */ + await clearFirestore(); + + /** set invalid aftrer time */ + config.stalenessThresholdSeconds = 10; + + /** Add one minute to timestamp */ + const now = admin.firestore.Timestamp.now(); + const deliverTime = admin.firestore.Timestamp.fromMillis( + now.toMillis() + 60 * 1000 + ); + + /** Add processing documents */ + await addProcessingDocuments(1, "PENDING", { + deliverTime, + }); + }); + + test("should not process stale records", async () => { + /** Run function */ + const wrapped = testEnv.wrap(functions.deliverWrites); + await wrapped({}); + + /** Wait 5 seconds */ + await new Promise((resolve) => setTimeout(resolve, 5000)); + + /** Asset documents */ + const snap = await db.collection(config.queueCollection).get(); + const data = snap.docs[0].data(); + + expect(data.state).toBe("PENDING"); + }, 60000); + }); + + describe("invalid after time", () => { + beforeEach(async () => { + /** Clear Firestore data */ + await clearFirestore(); + + /** set invalid aftrer time */ + config.stalenessThresholdSeconds = 1; + + /** Invalidate time */ + const invalidAfterTime = admin.firestore.Timestamp.now(); + + /** Add processing documents */ + await addProcessingDocuments(1, "PENDING", { + invalidAfterTime, + }); + + /** Wait one second */ + await new Promise((resolve) => setTimeout(resolve, 1000)); + }); + + test("should not process if the record is invalid after a specified date", async () => { + /** Run function */ + const wrapped = testEnv.wrap(functions.deliverWrites); + await wrapped({}); + + /** Wait 5 seconds */ + await new Promise((resolve) => setTimeout(resolve, 5000)); + + /** Asset documents */ + const snap = await db.collection(config.queueCollection).get(); + const data = snap.docs[0].data(); + + expect(data.state).toBe("PENDING"); + }, 60000); + }); +}); diff --git a/firestore-schedule-writes/functions/__tests__/helpers/index.ts b/firestore-schedule-writes/functions/__tests__/helpers/index.ts new file mode 100644 index 00000000..d4f125b9 --- /dev/null +++ b/firestore-schedule-writes/functions/__tests__/helpers/index.ts @@ -0,0 +1,47 @@ +import * as admin from "firebase-admin"; +import * as config from "../../src/config"; + +import fetch from "node-fetch"; + +export const addProcessingDocuments = async ( + count: number, + state = "PENDING", + options = {} +) => { + const db = admin.firestore(); + const collection = db.collection(config.default.queueCollection); + + // Create a batch object + const batch = db.batch(); + + // Create an array of documents to be added + const docs = Array.from(Array(count), (_, i) => ({ + data: { + document: i, + }, + state, + timeouts: 0, + ...options, + })); + + // Add each document to the batch + docs.forEach((doc) => { + const docRef = collection.doc(); + batch.set(docRef, doc); + }); + + // Commit the batch write operation + await batch.commit(); + + // Retrieve the documents from the collection + const snap = await collection.get(); + + return snap.docs; +}; + +export const clearFirestore = async (): Promise => { + await fetch( + "http://localhost:8080/emulator/v1/projects/demo-test/databases/(default)/documents", + { method: "DELETE" } + ); +}; diff --git a/firestore-schedule-writes/functions/__tests__/helpers/setupEnvironment.ts b/firestore-schedule-writes/functions/__tests__/helpers/setupEnvironment.ts new file mode 100644 index 00000000..4d6293dc --- /dev/null +++ b/firestore-schedule-writes/functions/__tests__/helpers/setupEnvironment.ts @@ -0,0 +1,11 @@ +export default () => { + process.env.FIRESTORE_EMULATOR_HOST = "localhost:8080"; + process.env.FIREBASE_FIRESTORE_EMULATOR_ADDRESS = "localhost:8080"; + process.env.FIREBASE_AUTH_EMULATOR_HOST = "localhost:9099"; + process.env.PUBSUB_EMULATOR_HOST = "localhost:8085"; + process.env.FIREBASE_STORAGE_EMULATOR_HOST = "localhost:9199"; + process.env.GOOGLE_CLOUD_PROJECT = "demo-test"; + process.env.GCLOUD_PROJECT = "demo-test"; + process.env.PROJECT_ID = "demo-test"; + process.env.EXT_INSTANCE_ID = "demo-test"; +}; diff --git a/firestore-schedule-writes/functions/__tests__/jest.config.ts b/firestore-schedule-writes/functions/__tests__/jest.config.ts new file mode 100644 index 00000000..7258b132 --- /dev/null +++ b/firestore-schedule-writes/functions/__tests__/jest.config.ts @@ -0,0 +1,12 @@ +module.exports = { + rootDir: "./", + preset: "ts-jest", + testEnvironment: "node", + globals: { + "ts-jest": { + tsConfig: "/__tests__/tsconfig.json", + }, + }, + setupFiles: ["/__tests__/jest.setup.ts"], + testMatch: ["**/__tests__/*.test.ts"], +}; diff --git a/firestore-schedule-writes/functions/__tests__/jest.setup.ts b/firestore-schedule-writes/functions/__tests__/jest.setup.ts new file mode 100644 index 00000000..85cf651c --- /dev/null +++ b/firestore-schedule-writes/functions/__tests__/jest.setup.ts @@ -0,0 +1,15 @@ +const path = require("path"); + +(async function () { + require("dotenv").config({ + path: path.resolve( + __dirname, + "../../../_emulator/extensions/firestore-schedule-writes.env.local" + ), + }); + + process.env.EXT_INSTANCE_ID = "firestore-schedule-writes"; + + process.env.GCLOUD_PROJECT = "demo-test"; + process.env.PROJECT_ID = "demo-test"; +})(); diff --git a/firestore-schedule-writes/functions/__tests__/tsconfig.json b/firestore-schedule-writes/functions/__tests__/tsconfig.json new file mode 100644 index 00000000..a32dd4f2 --- /dev/null +++ b/firestore-schedule-writes/functions/__tests__/tsconfig.json @@ -0,0 +1,4 @@ +{ + "extends": "../tsconfig.json", + "include": ["**/*"] +} diff --git a/firestore-schedule-writes/functions/jest.config.js b/firestore-schedule-writes/functions/jest.config.js new file mode 100644 index 00000000..7258b132 --- /dev/null +++ b/firestore-schedule-writes/functions/jest.config.js @@ -0,0 +1,12 @@ +module.exports = { + rootDir: "./", + preset: "ts-jest", + testEnvironment: "node", + globals: { + "ts-jest": { + tsConfig: "/__tests__/tsconfig.json", + }, + }, + setupFiles: ["/__tests__/jest.setup.ts"], + testMatch: ["**/__tests__/*.test.ts"], +}; diff --git a/firestore-schedule-writes/functions/lib/index.js b/firestore-schedule-writes/functions/lib/index.js deleted file mode 100644 index 43dfd182..00000000 --- a/firestore-schedule-writes/functions/lib/index.js +++ /dev/null @@ -1,152 +0,0 @@ -"use strict"; -Object.defineProperty(exports, "__esModule", { value: true }); -const functions = require("firebase-functions"); -const admin = require("firebase-admin"); -admin.initializeApp(); -const MERGE_WRITE = process.env.MERGE_WRITE === "true"; -const BATCH_SIZE = 100; -const QUEUE_COLLECTION = process.env.QUEUE_COLLECTION || "queued_writes"; -const TARGET_COLLECTION = process.env.TARGET_COLLECTION; -const STALENESS_THRESHOLD_SECONDS = parseInt(process.env.STALENESS_THRESHOLD_SECONDS || "0", 10); -const CLEANUP = process.env.CLEANUP || "DELETE"; -const db = admin.firestore(); -const queueRef = db.collection(QUEUE_COLLECTION); -const targetRef = TARGET_COLLECTION ? db.collection(TARGET_COLLECTION) : null; -async function fetchAndProcess() { - const toProcess = await queueRef - .where("state", "==", "PENDING") - .where("deliverTime", "<=", admin.firestore.Timestamp.now()) - .orderBy("deliverTime") - .limit(BATCH_SIZE) - .get(); - if (toProcess.docs.length === 0) { - functions.logger.info("No writes to process."); - return; - } - const promises = toProcess.docs.map((doc) => { - return processWrite(doc.ref, doc.data()); - }); - const results = await Promise.all(promises); - let successCount = 0; - for (const result of results) { - if (result.success) { - successCount++; - continue; - } - functions.logger.error(`Failed to deliver "${QUEUE_COLLECTION}/${result.id}":`, result.error); - } - functions.logger.info(`Delivered ${successCount} queued writes.`); - if (toProcess.docs.length === BATCH_SIZE) { - return fetchAndProcess(); - } -} -function isStale(write) { - return ((write.invalidAfterTime && - write.invalidAfterTime.toMillis() < Date.now()) || - (STALENESS_THRESHOLD_SECONDS > 0 && - write.deliverTime.toMillis() + STALENESS_THRESHOLD_SECONDS * 1000 < - Date.now())); -} -async function processWrite(ref, write) { - let error; - try { - if (!write.collection && !write.doc && !TARGET_COLLECTION) { - throw new Error("no target collection/doc was specified for this write"); - } - await admin.firestore().runTransaction(async (txn) => { - const existingWrite = await txn.get(ref); - if (existingWrite.get("state") !== "PENDING") { - throw new Error(`expected PENDING state but was ${existingWrite.get("state")}`); - } - return txn.update(ref, { - state: "PROCESSING", - attempts: admin.firestore.FieldValue.increment(1), - startTime: admin.firestore.FieldValue.serverTimestamp(), - updateTime: admin.firestore.FieldValue.serverTimestamp(), - }); - }); - if (isStale(write)) { - functions.logger.warn(`Write "${QUEUE_COLLECTION}/${ref.id}" is past invalidAfterTime, skipped delivery.`); - } - else { - await deliver(write); - functions.logger.info(`Delivered write "${QUEUE_COLLECTION}/${ref.id}"`); - } - } - catch (e) { - error = e; - await ref.update({ - state: "FAILED", - error: { message: e.message }, - updateTime: admin.firestore.FieldValue.serverTimestamp(), - endTime: admin.firestore.FieldValue.serverTimestamp(), - }); - } - if (!error) { - switch (CLEANUP) { - case "DELETE": - await ref.delete(); - break; - case "KEEP": - await ref.update({ - state: "DELIVERED", - updateTime: admin.firestore.FieldValue.serverTimestamp(), - endTime: admin.firestore.FieldValue.serverTimestamp(), - }); - break; - } - } - return { success: !error, error, id: ref.id }; -} -async function resetStuck() { - const stuck = await queueRef - .where("state", "==", "PROCESSING") - .where("leaseExpireTime", "<=", admin.firestore.Timestamp.now()) - .limit(BATCH_SIZE) - .select() - .get(); - await Promise.all(stuck.docs.map(async (doc) => { - await doc.ref.update({ - state: "PENDING", - timeouts: admin.firestore.FieldValue.increment(1), - lastTimeoutTime: admin.firestore.FieldValue.serverTimestamp(), - }); - functions.logger.error(`Write "${QUEUE_COLLECTION}/${doc.id}" was still PROCESSING after lease expired. Reset to PENDING.`); - })); - if (stuck.docs.length === BATCH_SIZE) { - return resetStuck(); - } -} -function deliver(write) { - let ref; - if (TARGET_COLLECTION && write.id) { - ref = targetRef.doc(write.id); - } - else if (TARGET_COLLECTION) { - ref = targetRef.doc(); - } - else if (write.doc) { - ref = db.doc(write.doc); - } - else if (write.collection) { - ref = db.collection(write.collection).doc(); - } - else { - throw new Error(`unable to determine write location from scheduled write: ${JSON.stringify(write)}`); - } - const data = Object.assign({}, write.data); - for (const field of write.serverTimestampFields || []) { - data[field] = admin.firestore.FieldValue.serverTimestamp(); - } - return ref.set(data, { merge: MERGE_WRITE }); -} -exports.deliverWrites = functions.handler.pubsub.schedule.onRun(async () => { - try { - await resetStuck(); - await fetchAndProcess(); - } - catch (e) { - console.error(e); - } -}); -//# sourceMappingURL=index.js.map \ No newline at end of file diff --git a/firestore-schedule-writes/functions/lib/index.js.map b/firestore-schedule-writes/functions/lib/index.js.map deleted file mode 100644 index f17abc50..00000000 --- a/firestore-schedule-writes/functions/lib/index.js.map +++ /dev/null @@ -1 +0,0 @@ -{"version":3,"file":"index.js","sourceRoot":"","sources":["../src/index.ts"],"names":[],"mappings":";;AAAA,gDAAgD;AAChD,wCAAwC;AAExC,KAAK,CAAC,aAAa,EAAE,CAAC;AAiBtB,MAAM,WAAW,GAAG,OAAO,CAAC,GAAG,CAAC,WAAW,KAAK,MAAM,CAAC;AACvD,MAAM,UAAU,GAAG,GAAG,CAAC;AACvB,MAAM,gBAAgB,GAAG,OAAO,CAAC,GAAG,CAAC,gBAAgB,IAAI,eAAe,CAAC;AACzE,MAAM,iBAAiB,GAAG,OAAO,CAAC,GAAG,CAAC,iBAAiB,CAAC;AACxD,MAAM,2BAA2B,GAAG,QAAQ,CAC1C,OAAO,CAAC,GAAG,CAAC,2BAA2B,IAAI,GAAG,EAC9C,EAAE,CACH,CAAC;AACF,MAAM,OAAO,GACV,OAAO,CAAC,GAAG,CAAC,OAA6B,IAAI,QAAQ,CAAC;AAEzD,MAAM,EAAE,GAAG,KAAK,CAAC,SAAS,EAAE,CAAC;AAC7B,MAAM,QAAQ,GAAG,EAAE,CAAC,UAAU,CAAC,gBAAgB,CAAC,CAAC;AACjD,MAAM,SAAS,GAAG,iBAAiB,CAAC,CAAC,CAAC,EAAE,CAAC,UAAU,CAAC,iBAAiB,CAAC,CAAC,CAAC,CAAC,IAAI,CAAC;AAE9E,KAAK,UAAU,eAAe;IAC5B,MAAM,SAAS,GAAG,MAAM,QAAQ;SAC7B,KAAK,CAAC,OAAO,EAAE,IAAI,EAAE,SAAS,CAAC;SAC/B,KAAK,CAAC,aAAa,EAAE,IAAI,EAAE,KAAK,CAAC,SAAS,CAAC,SAAS,CAAC,GAAG,EAAE,CAAC;SAC3D,OAAO,CAAC,aAAa,CAAC;SACtB,KAAK,CAAC,UAAU,CAAC;SACjB,GAAG,EAAE,CAAC;IAET,IAAI,SAAS,CAAC,IAAI,CAAC,MAAM,KAAK,CAAC,EAAE;QAC/B,SAAS,CAAC,MAAM,CAAC,IAAI,CAAC,uBAAuB,CAAC,CAAC;QAC/C,OAAO;KACR;IAED,MAAM,QAAQ,GAAG,SAAS,CAAC,IAAI,CAAC,GAAG,CAAC,CAAC,GAAG,EAAE,EAAE;QAC1C,OAAO,YAAY,CAAC,GAAG,CAAC,GAAG,EAAE,GAAG,CAAC,IAAI,EAAiB,CAAC,CAAC;IAC1D,CAAC,CAAC,CAAC;IAEH,MAAM,OAAO,GAAG,MAAM,OAAO,CAAC,GAAG,CAAC,QAAQ,CAAC,CAAC;IAC5C,IAAI,YAAY,GAAG,CAAC,CAAC;IACrB,KAAK,MAAM,MAAM,IAAI,OAAO,EAAE;QAC5B,IAAI,MAAM,CAAC,OAAO,EAAE;YAClB,YAAY,EAAE,CAAC;YACf,SAAS;SACV;QAED,SAAS,CAAC,MAAM,CAAC,KAAK,CACpB,sBAAsB,gBAAgB,IAAI,MAAM,CAAC,EAAE,IAAI,EACvD,MAAM,CAAC,KAAK,CACb,CAAC;KACH;IACD,SAAS,CAAC,MAAM,CAAC,IAAI,CAAC,aAAa,YAAY,iBAAiB,CAAC,CAAC;IAElE,IAAI,SAAS,CAAC,IAAI,CAAC,MAAM,KAAK,UAAU,EAAE;QACxC,OAAO,eAAe,EAAE,CAAC;KAC1B;AACH,CAAC;AAQD,SAAS,OAAO,CAAC,KAAkB;IACjC,OAAO,CACL,CAAC,KAAK,CAAC,gBAAgB;QACrB,KAAK,CAAC,gBAAgB,CAAC,QAAQ,EAAE,GAAG,IAAI,CAAC,GAAG,EAAE,CAAC;QACjD,CAAC,2BAA2B,GAAG,CAAC;YAC9B,KAAK,CAAC,WAAW,CAAC,QAAQ,EAAE,GAAG,2BAA2B,GAAG,IAAI;gBAC/D,IAAI,CAAC,GAAG,EAAE,CAAC,CAChB,CAAC;AACJ,CAAC;AAED,KAAK,UAAU,YAAY,CACzB,GAAwC,EACxC,KAAkB;IAElB,IAAI,KAAK,CAAC;IACV,IAAI;QACF,IAAI,CAAC,KAAK,CAAC,UAAU,IAAI,CAAC,KAAK,CAAC,GAAG,IAAI,CAAC,iBAAiB,EAAE;YACzD,MAAM,IAAI,KAAK,CAAC,uDAAuD,CAAC,CAAC;SAC1E;QAED,MAAM,KAAK,CAAC,SAAS,EAAE,CAAC,cAAc,CAAC,KAAK,EAAE,GAAG,EAAE,EAAE;YACnD,MAAM,aAAa,GAAG,MAAM,GAAG,CAAC,GAAG,CAAC,GAAG,CAAC,CAAC;YACzC,IAAI,aAAa,CAAC,GAAG,CAAC,OAAO,CAAC,KAAK,SAAS,EAAE;gBAC5C,MAAM,IAAI,KAAK,CACb,kCAAkC,aAAa,CAAC,GAAG,CAAC,OAAO,CAAC,EAAE,CAC/D,CAAC;aACH;YAED,OAAO,GAAG,CAAC,MAAM,CAAC,GAAG,EAAE;gBACrB,KAAK,EAAE,YAAY;gBACnB,QAAQ,EAAE,KAAK,CAAC,SAAS,CAAC,UAAU,CAAC,SAAS,CAAC,CAAC,CAAC;gBACjD,SAAS,EAAE,KAAK,CAAC,SAAS,CAAC,UAAU,CAAC,eAAe,EAAE;gBACvD,UAAU,EAAE,KAAK,CAAC,SAAS,CAAC,UAAU,CAAC,eAAe,EAAE;aACzD,CAAC,CAAC;QACL,CAAC,CAAC,CAAC;QAEH,IAAI,OAAO,CAAC,KAAK,CAAC,EAAE;YAClB,SAAS,CAAC,MAAM,CAAC,IAAI,CACnB,UAAU,gBAAgB,IAAI,GAAG,CAAC,EAAE,+CAA+C,CACpF,CAAC;SACH;aAAM;YACL,MAAM,OAAO,CAAC,KAAK,CAAC,CAAC;YACrB,SAAS,CAAC,MAAM,CAAC,IAAI,CAAC,oBAAoB,gBAAgB,IAAI,GAAG,CAAC,EAAE,GAAG,CAAC,CAAC;SAC1E;KACF;IAAC,OAAO,CAAM,EAAE;QACf,KAAK,GAAG,CAAC,CAAC;QACV,MAAM,GAAG,CAAC,MAAM,CAAC;YACf,KAAK,EAAE,QAAQ;YACf,KAAK,EAAE,EAAE,OAAO,EAAE,CAAC,CAAC,OAAiB,EAAE;YACvC,UAAU,EAAE,KAAK,CAAC,SAAS,CAAC,UAAU,CAAC,eAAe,EAAE;YACxD,OAAO,EAAE,KAAK,CAAC,SAAS,CAAC,UAAU,CAAC,eAAe,EAAE;SACtD,CAAC,CAAC;KACJ;IAED,IAAI,CAAC,KAAK,EAAE;QACV,QAAQ,OAAO,EAAE;YACf,KAAK,QAAQ;gBACX,MAAM,GAAG,CAAC,MAAM,EAAE,CAAC;gBACnB,MAAM;YACR,KAAK,MAAM;gBACT,MAAM,GAAG,CAAC,MAAM,CAAC;oBACf,KAAK,EAAE,WAAW;oBAClB,UAAU,EAAE,KAAK,CAAC,SAAS,CAAC,UAAU,CAAC,eAAe,EAAE;oBACxD,OAAO,EAAE,KAAK,CAAC,SAAS,CAAC,UAAU,CAAC,eAAe,EAAE;iBACtD,CAAC,CAAC;gBACH,MAAM;SACT;KACF;IAED,OAAO,EAAE,OAAO,EAAE,CAAC,KAAK,EAAE,KAAK,EAAE,EAAE,EAAE,GAAG,CAAC,EAAE,EAAE,CAAC;AAChD,CAAC;AAED,KAAK,UAAU,UAAU;IACvB,MAAM,KAAK,GAAG,MAAM,QAAQ;SACzB,KAAK,CAAC,OAAO,EAAE,IAAI,EAAE,YAAY,CAAC;SAClC,KAAK,CAAC,iBAAiB,EAAE,IAAI,EAAE,KAAK,CAAC,SAAS,CAAC,SAAS,CAAC,GAAG,EAAE,CAAC;SAC/D,KAAK,CAAC,UAAU,CAAC;SACjB,MAAM,EAAE;SACR,GAAG,EAAE,CAAC;IAET,MAAM,OAAO,CAAC,GAAG,CACf,KAAK,CAAC,IAAI,CAAC,GAAG,CAAC,KAAK,EAAE,GAAG,EAAE,EAAE;QAC3B,MAAM,GAAG,CAAC,GAAG,CAAC,MAAM,CAAC;YACnB,KAAK,EAAE,SAAS;YAChB,QAAQ,EAAE,KAAK,CAAC,SAAS,CAAC,UAAU,CAAC,SAAS,CAAC,CAAC,CAAC;YACjD,eAAe,EAAE,KAAK,CAAC,SAAS,CAAC,UAAU,CAAC,eAAe,EAAE;SAC9D,CAAC,CAAC;QACH,SAAS,CAAC,MAAM,CAAC,KAAK,CACpB,UAAU,gBAAgB,IAAI,GAAG,CAAC,EAAE,+DAA+D,CACpG,CAAC;IACJ,CAAC,CAAC,CACH,CAAC;IAEF,IAAI,KAAK,CAAC,IAAI,CAAC,MAAM,KAAK,UAAU,EAAE;QACpC,OAAO,UAAU,EAAE,CAAC;KACrB;AACH,CAAC;AAED,SAAS,OAAO,CAAC,KAAkB;IACjC,IAAI,GAAsC,CAAC;IAC3C,IAAI,iBAAiB,IAAI,KAAK,CAAC,EAAE,EAAE;QACjC,GAAG,GAAG,SAAU,CAAC,GAAG,CAAC,KAAK,CAAC,EAAE,CAAC,CAAC;KAChC;SAAM,IAAI,iBAAiB,EAAE;QAC5B,GAAG,GAAG,SAAU,CAAC,GAAG,EAAE,CAAC;KACxB;SAAM,IAAI,KAAK,CAAC,GAAG,EAAE;QACpB,GAAG,GAAG,EAAE,CAAC,GAAG,CAAC,KAAK,CAAC,GAAG,CAAC,CAAC;KACzB;SAAM,IAAI,KAAK,CAAC,UAAU,EAAE;QAC3B,GAAG,GAAG,EAAE,CAAC,UAAU,CAAC,KAAK,CAAC,UAAU,CAAC,CAAC,GAAG,EAAE,CAAC;KAC7C;SAAM;QACL,MAAM,IAAI,KAAK,CACb,4DAA4D,IAAI,CAAC,SAAS,CACxE,KAAK,CACN,EAAE,CACJ,CAAC;KACH;IAED,MAAM,IAAI,qBAAQ,KAAK,CAAC,IAAI,CAAE,CAAC;IAC/B,KAAK,MAAM,KAAK,IAAI,KAAK,CAAC,qBAAqB,IAAI,EAAE,EAAE;QACrD,IAAI,CAAC,KAAK,CAAC,GAAG,KAAK,CAAC,SAAS,CAAC,UAAU,CAAC,eAAe,EAAE,CAAC;KAC5D;IAED,OAAO,GAAG,CAAC,GAAG,CAAC,IAAI,EAAE,EAAE,KAAK,EAAE,WAAW,EAAE,CAAC,CAAC;AAC/C,CAAC;AAED,OAAO,CAAC,aAAa,GAAG,SAAS,CAAC,OAAO,CAAC,MAAM,CAAC,QAAQ,CAAC,KAAK,CAAC,KAAK,IAAI,EAAE;IACzE,IAAI;QACF,MAAM,UAAU,EAAE,CAAC;QACnB,MAAM,eAAe,EAAE,CAAC;KACzB;IAAC,OAAO,CAAC,EAAE;QACV,OAAO,CAAC,KAAK,CAAC,CAAC,CAAC,CAAC;KAClB;AACH,CAAC,CAAC,CAAC"} \ No newline at end of file diff --git a/firestore-schedule-writes/functions/package.json b/firestore-schedule-writes/functions/package.json index b2418160..4be0c1c9 100644 --- a/firestore-schedule-writes/functions/package.json +++ b/firestore-schedule-writes/functions/package.json @@ -7,16 +7,21 @@ }, "main": "lib/index.js", "dependencies": { - "@types/node": "^14.14.8", - "@types/promise.allsettled": "^1.0.3", - "firebase-admin": "^8.6.0", - "firebase-functions": "^3.6.1", - "promise.allsettled": "^1.0.2" + "@types/jest": "^29.5.0", + "@types/node": "^16.18.23", + "firebase-admin": "^11.5.0", + "firebase-functions": "^4.2.1" }, "devDependencies": { + "@types/node-fetch": "^2.6.3", + "dotenv": "^16.0.3", + "firebase-functions-test": "^3.0.0", + "jest": "^29.5.0", + "node-fetch": "^2.6.9", "prettier": "^2.0.5", + "ts-jest": "^29.0.5", "tslint": "^5.12.0", - "typescript": "^4.1.3" + "typescript": "^5.0.3" }, "private": true } diff --git a/firestore-schedule-writes/functions/src/config.ts b/firestore-schedule-writes/functions/src/config.ts new file mode 100644 index 00000000..6525a8e7 --- /dev/null +++ b/firestore-schedule-writes/functions/src/config.ts @@ -0,0 +1,11 @@ +export default { + schedule: process.env.SCHEDULE || "every 1 minutes", + mergeWrite: process.env.MERGE_WRITE === "true", + queueCollection: process.env.QUEUE_COLLECTION || "queued_writes", + targetCollection: process.env.TARGET_COLLECTION, + stalenessThresholdSeconds: parseInt( + process.env.STALENESS_THRESHOLD_SECONDS || "0", + 10 + ), + cleanup: (process.env.CLEANUP as "DELETE" | "KEEP") || "DELETE", +}; diff --git a/firestore-schedule-writes/functions/src/index.ts b/firestore-schedule-writes/functions/src/index.ts index cfbc916e..734c142e 100644 --- a/firestore-schedule-writes/functions/src/index.ts +++ b/firestore-schedule-writes/functions/src/index.ts @@ -1,5 +1,6 @@ import * as functions from "firebase-functions"; import * as admin from "firebase-admin"; +import config from "./config"; admin.initializeApp(); @@ -18,22 +19,19 @@ interface QueuedWrite { serverTimestampFields?: string[]; } -const MERGE_WRITE = process.env.MERGE_WRITE === "true"; const BATCH_SIZE = 100; -const QUEUE_COLLECTION = process.env.QUEUE_COLLECTION || "queued_writes"; -const TARGET_COLLECTION = process.env.TARGET_COLLECTION; -const STALENESS_THRESHOLD_SECONDS = parseInt( - process.env.STALENESS_THRESHOLD_SECONDS || "0", - 10 -); -const CLEANUP: "DELETE" | "KEEP" = - (process.env.CLEANUP as "DELETE" | "KEEP") || "DELETE"; - -const db = admin.firestore(); -const queueRef = db.collection(QUEUE_COLLECTION); -const targetRef = TARGET_COLLECTION ? db.collection(TARGET_COLLECTION) : null; + +const { + mergeWrite, + queueCollection, + targetCollection, + stalenessThresholdSeconds, + cleanup, +} = config; async function fetchAndProcess(): Promise { + const queueRef = admin.firestore().collection(queueCollection); + const toProcess = await queueRef .where("state", "==", "PENDING") .where("deliverTime", "<=", admin.firestore.Timestamp.now()) @@ -59,7 +57,7 @@ async function fetchAndProcess(): Promise { } functions.logger.error( - `Failed to deliver "${QUEUE_COLLECTION}/${result.id}":`, + `Failed to deliver "${queueCollection}/${result.id}":`, result.error ); } @@ -80,8 +78,8 @@ function isStale(write: QueuedWrite): boolean { return ( (write.invalidAfterTime && write.invalidAfterTime.toMillis() < Date.now()) || - (STALENESS_THRESHOLD_SECONDS > 0 && - write.deliverTime.toMillis() + STALENESS_THRESHOLD_SECONDS * 1000 < + (stalenessThresholdSeconds > 0 && + write.deliverTime.toMillis() + stalenessThresholdSeconds * 1000 < Date.now()) ); } @@ -92,7 +90,7 @@ async function processWrite( ): Promise { let error; try { - if (!write.collection && !write.doc && !TARGET_COLLECTION) { + if (!write.collection && !write.doc && !targetCollection) { throw new Error("no target collection/doc was specified for this write"); } @@ -114,11 +112,11 @@ async function processWrite( if (isStale(write)) { functions.logger.warn( - `Write "${QUEUE_COLLECTION}/${ref.id}" is past invalidAfterTime, skipped delivery.` + `Write "${queueCollection}/${ref.id}" is past invalidAfterTime, skipped delivery.` ); } else { await deliver(write); - functions.logger.info(`Delivered write "${QUEUE_COLLECTION}/${ref.id}"`); + functions.logger.info(`Delivered write "${queueCollection}/${ref.id}"`); } } catch (e: any) { error = e; @@ -131,7 +129,7 @@ async function processWrite( } if (!error) { - switch (CLEANUP) { + switch (cleanup) { case "DELETE": await ref.delete(); break; @@ -149,6 +147,8 @@ async function processWrite( } async function resetStuck(): Promise { + const queueRef = admin.firestore().collection(queueCollection); + const stuck = await queueRef .where("state", "==", "PROCESSING") .where("leaseExpireTime", "<=", admin.firestore.Timestamp.now()) @@ -164,7 +164,7 @@ async function resetStuck(): Promise { lastTimeoutTime: admin.firestore.FieldValue.serverTimestamp(), }); functions.logger.error( - `Write "${QUEUE_COLLECTION}/${doc.id}" was still PROCESSING after lease expired. Reset to PENDING.` + `Write "${queueCollection}/${doc.id}" was still PROCESSING after lease expired. Reset to PENDING.` ); }) ); @@ -174,37 +174,41 @@ async function resetStuck(): Promise { } } +function getTargetRef(write: QueuedWrite) { + const targetRef = targetCollection + ? admin.firestore().collection(targetCollection) + : null; + + if (targetCollection && write.id) return targetRef!.doc(write.id); + if (targetCollection) return targetRef!.doc(); + if (write.doc) return admin.firestore().doc(write.doc); + if (write.collection) + return admin.firestore().collection(write.collection).doc(); + throw new Error( + `unable to determine write location from scheduled write: ${JSON.stringify( + write + )}` + ); +} + function deliver(write: QueuedWrite) { - let ref: admin.firestore.DocumentReference; - if (TARGET_COLLECTION && write.id) { - ref = targetRef!.doc(write.id); - } else if (TARGET_COLLECTION) { - ref = targetRef!.doc(); - } else if (write.doc) { - ref = db.doc(write.doc); - } else if (write.collection) { - ref = db.collection(write.collection).doc(); - } else { - throw new Error( - `unable to determine write location from scheduled write: ${JSON.stringify( - write - )}` - ); - } + const ref = getTargetRef(write); const data = { ...write.data }; for (const field of write.serverTimestampFields || []) { data[field] = admin.firestore.FieldValue.serverTimestamp(); } - return ref.set(data, { merge: MERGE_WRITE }); + return ref.set(data, { merge: mergeWrite }); } -exports.deliverWrites = functions.handler.pubsub.schedule.onRun(async () => { - try { - await resetStuck(); - await fetchAndProcess(); - } catch (e) { - console.error(e); - } -}); +exports.deliverWrites = functions.pubsub + .schedule(config.schedule) + .onRun(async () => { + try { + await resetStuck(); + await fetchAndProcess(); + } catch (e) { + console.error(e); + } + });