Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions _emulator/extensions/firestore-schedule-writes.env.local
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
PROJECT_ID=demo-test
LOCATION=us-central1
5 changes: 2 additions & 3 deletions _emulator/firebase.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
{
"extensions": {
"firestore-record-user-acknowledgements": "../firestore-record-user-acknowledgements",
"firestore-bundle-server": "../firestore-bundle-server"
"firestore-schedule-writes": "../firestore-schedule-writes"
},
"storage": {
"rules": "storage.rules"
Expand Down Expand Up @@ -39,7 +38,7 @@
"rewrites": [
{
"source": "/bundles/*",
"function": "ext-firestore-bundle-server-serve"
"function": "ext-firestore-schedule-writes-serve"
},
{
"source": "**",
Expand Down
36 changes: 18 additions & 18 deletions firestore-schedule-writes/POSTINSTALL.md
Original file line number Diff line number Diff line change
@@ -1,23 +1,5 @@
Schedule Firestore Writes allows you to write data to a collection in Firestore that is then written to a different location at a specified time in the future. This can be used for many common time-related tasks such as delayed notifications, alarms, and reminders. Because this all happens within Firestore, scheduled writes can be listened to by clients or connected with via Cloud Functions.

# See it in action

```js
const TEN_MINUTES_MS = 10 * 60 * 1000;

firebase
.firestore()
.collection("${param:QUEUE_COLLECTION}")
.add({
state: "PENDING", // IMPORTANT! Omitting this field will prevent the scheduled write from being picked up.
collection: "delivered", // only if you didn't specify a TARGET_COLLECTION
data: { message: "Hello from the future!" },
deliverTime: firebase.firestore.Timestamp.fromMillis(
Date.now() + TEN_MINUTES_MS
),
});
```

## Required Setup

To use this extension, Firestore must have a two compound indexes in the `${param:QUEUE_COLLECTION}` collection. These can be created manually in the Firebase console or added to your `firestore.indexes.json` file deployed via Firebase CLI:
Expand Down Expand Up @@ -45,6 +27,24 @@ To use this extension, Firestore must have a two compound indexes in the `${para
}
```

# See it in action

```js
const TEN_MINUTES_MS = 10 * 60 * 1000;

firebase
.firestore()
.collection("${param:QUEUE_COLLECTION}")
.add({
state: "PENDING", // IMPORTANT! Omitting this field will prevent the scheduled write from being picked up.
collection: "delivered", // only if you didn't specify a TARGET_COLLECTION
data: { message: "Hello from the future!" },
deliverTime: firebase.firestore.Timestamp.fromMillis(
Date.now() + TEN_MINUTES_MS
),
});
```

# Using the extension

When writing to your `${param:QUEUE_COLLECTION}` collection, the following fields are available:
Expand Down
2 changes: 1 addition & 1 deletion firestore-schedule-writes/extension.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ resources:
location: ${LOCATION}
scheduleTrigger:
schedule: "${SCHEDULE}"
runtime: "nodejs12"
runtime: "nodejs16"

# In the `params` field, set up your extension's user-configured parameters.
# Learn more in the docs: https://firebase.google.com/docs/extensions/alpha/ref-extension-yaml#params-field
Expand Down
1 change: 1 addition & 0 deletions firestore-schedule-writes/functions/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
lib
251 changes: 251 additions & 0 deletions firestore-schedule-writes/functions/__tests__/functions.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
import * as fft from "firebase-functions-test";
const testEnv = fft({
projectId: "demo-test",
});

import * as admin from "firebase-admin";
import setupEnvironment from "./helpers/setupEnvironment";
//@ts-ignore
import config from "../src/config";
import { addProcessingDocuments, clearFirestore } from "./helpers";

const functions = require("../src/index");

setupEnvironment();
jest.spyOn(admin, "initializeApp").mockImplementation();

/** Setup test environment */
const db = admin.firestore();

/** Setup Mocks */
jest.mock("../src/config", () => ({
default: {
schedule: "every 1 minutes",
mergeWrite: "true",
queueCollection: "queued_writes",
targetCollection: "target_collection",
stalenessThresholdSeconds: 0,
cleanup: "KEEP",
},
}));

describe("firestore-schedule-writes", () => {
afterEach(async () => {
/** Clear mocks */
await jest.clearAllMocks();
});

describe("resetStuckWrites", () => {
describe("when using a single write", () => {
beforeEach(async () => {
/** Clear Firestore data */
await clearFirestore();

/**
* Add processing documents
* 150 axceeds the 100 batch size,
* causes the function to run multiple times
* */
await addProcessingDocuments(1, "PROCESSING", {
leaseExpireTime: admin.firestore.Timestamp.now(),
});
}, 60000);

it("should reset a single stuck write", async () => {
/** Run function */
const wrapped = testEnv.wrap(functions.deliverWrites);
await wrapped({});

/** Wait 5 seconds */
await new Promise((resolve) => setTimeout(resolve, 12000));

/** Asset documents */
const snap = await db.collection(config.queueCollection).get();
const data = snap.docs[0].data();

expect(data.state).toBe("PENDING");
}, 60000);
});

describe("when using batch writes", () => {
beforeEach(async () => {
/** Clear Firestore data */
await clearFirestore();

/**
* Add processing documents
* 150 axceeds the 100 batch size,
* causes the function to run multiple times
* */
await addProcessingDocuments(101, "PROCESSING", {
leaseExpireTime: admin.firestore.Timestamp.now(),
});
}, 60000);

it("should resets multiple batches of stuck writes", async () => {
/** Run function */
const wrapped = testEnv.wrap(functions.deliverWrites);
await wrapped({});

/** Wait 5 seconds */
await new Promise((resolve) => setTimeout(resolve, 5000));

/** Assert documents */
const snap = await db.collection(config.queueCollection).get();

for (const doc of snap.docs) {
const data = doc.data();
expect(data.state).toBe("PENDING");
}
}, 60000);
});
});

describe("fetchAndProcess", () => {
describe("single writes", () => {
beforeEach(async () => {
/** Clear Firestore data */
await clearFirestore();

/** Add processing documents */
await addProcessingDocuments(1, "PENDING", {
deliverTime: admin.firestore.Timestamp.now(),
});
});

test("should process a single write", async () => {
/** Run function */
const wrapped = testEnv.wrap(functions.deliverWrites);
await wrapped({});

/** Wait 5 seconds */
await new Promise((resolve) => setTimeout(resolve, 5000));

/** Assert Queue collections */
const snap = await db.collection(config.queueCollection).get();
const data = snap.docs[0].data();
expect(data.state).toBe("DELIVERED");
expect(data.attempts).toBe(1);

/** Assert tagret collections */
const queueSnap = await db
.collection(config.targetCollection || "")
.get();
const QueueData = queueSnap.docs[0].data();
expect(QueueData.document).toBeDefined();
}, 60000);
});

describe("batch writes", () => {
beforeEach(async () => {
/** Clear Firestore data */
await clearFirestore();

/** Add processing documents */
await addProcessingDocuments(101, "PENDING", {
deliverTime: admin.firestore.Timestamp.now(),
});
}, 60000);

test("should process multiple batches", async () => {
/** Run function */
const wrapped = testEnv.wrap(functions.deliverWrites);
await wrapped({});

/** Wait 5 seconds */
await new Promise((resolve) => setTimeout(resolve, 5000));

/** Asset documents */
const snap = await db.collection(config.queueCollection).get();

for (const doc of snap.docs) {
const data = doc.data();
expect(data.state).toBe("DELIVERED");
expect(data.attempts).toBe(1);
}

/** Assert tagret collections */
const queueSnap = await db
.collection(config.targetCollection || "")
.get();

for (const doc of queueSnap.docs) {
const data = doc.data();
expect(data.document).toBeDefined();
}
}, 1200000);
});
});

describe("stale writes", () => {
beforeEach(async () => {
/** Clear Firestore data */
await clearFirestore();

/** set invalid aftrer time */
config.stalenessThresholdSeconds = 10;

/** Add one minute to timestamp */
const now = admin.firestore.Timestamp.now();
const deliverTime = admin.firestore.Timestamp.fromMillis(
now.toMillis() + 60 * 1000
);

/** Add processing documents */
await addProcessingDocuments(1, "PENDING", {
deliverTime,
});
});

test("should not process stale records", async () => {
/** Run function */
const wrapped = testEnv.wrap(functions.deliverWrites);
await wrapped({});

/** Wait 5 seconds */
await new Promise((resolve) => setTimeout(resolve, 5000));

/** Asset documents */
const snap = await db.collection(config.queueCollection).get();
const data = snap.docs[0].data();

expect(data.state).toBe("PENDING");
}, 60000);
});

describe("invalid after time", () => {
beforeEach(async () => {
/** Clear Firestore data */
await clearFirestore();

/** set invalid aftrer time */
config.stalenessThresholdSeconds = 1;

/** Invalidate time */
const invalidAfterTime = admin.firestore.Timestamp.now();

/** Add processing documents */
await addProcessingDocuments(1, "PENDING", {
invalidAfterTime,
});

/** Wait one second */
await new Promise((resolve) => setTimeout(resolve, 1000));
});

test("should not process if the record is invalid after a specified date", async () => {
/** Run function */
const wrapped = testEnv.wrap(functions.deliverWrites);
await wrapped({});

/** Wait 5 seconds */
await new Promise((resolve) => setTimeout(resolve, 5000));

/** Asset documents */
const snap = await db.collection(config.queueCollection).get();
const data = snap.docs[0].data();

expect(data.state).toBe("PENDING");
}, 60000);
});
});
47 changes: 47 additions & 0 deletions firestore-schedule-writes/functions/__tests__/helpers/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import * as admin from "firebase-admin";
import * as config from "../../src/config";

import fetch from "node-fetch";

export const addProcessingDocuments = async (
count: number,
state = "PENDING",
options = {}
) => {
const db = admin.firestore();
const collection = db.collection(config.default.queueCollection);

// Create a batch object
const batch = db.batch();

// Create an array of documents to be added
const docs = Array.from(Array(count), (_, i) => ({
data: {
document: i,
},
state,
timeouts: 0,
...options,
}));

// Add each document to the batch
docs.forEach((doc) => {
const docRef = collection.doc();
batch.set(docRef, doc);
});

// Commit the batch write operation
await batch.commit();

// Retrieve the documents from the collection
const snap = await collection.get();

return snap.docs;
};

export const clearFirestore = async (): Promise<void> => {
await fetch(
"http://localhost:8080/emulator/v1/projects/demo-test/databases/(default)/documents",
{ method: "DELETE" }
);
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
export default () => {
process.env.FIRESTORE_EMULATOR_HOST = "localhost:8080";
process.env.FIREBASE_FIRESTORE_EMULATOR_ADDRESS = "localhost:8080";
process.env.FIREBASE_AUTH_EMULATOR_HOST = "localhost:9099";
process.env.PUBSUB_EMULATOR_HOST = "localhost:8085";
process.env.FIREBASE_STORAGE_EMULATOR_HOST = "localhost:9199";
process.env.GOOGLE_CLOUD_PROJECT = "demo-test";
process.env.GCLOUD_PROJECT = "demo-test";
process.env.PROJECT_ID = "demo-test";
process.env.EXT_INSTANCE_ID = "demo-test";
};
Loading