Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from __future__ import annotations

from typing import TYPE_CHECKING, Generic, TypeVar
from typing import Generic, TypeVar, TYPE_CHECKING

from google.cloud.firestore_v1 import pipeline_stages as stages
from google.cloud.firestore_v1._helpers import DOCUMENT_PATH_DELIMITER
Expand All @@ -32,6 +32,7 @@
from google.cloud.firestore_v1.base_document import BaseDocumentReference
from google.cloud.firestore_v1.base_query import BaseQuery
from google.cloud.firestore_v1.client import Client
from google.cloud.firestore_v1.pipeline_expressions import CONSTANT_TYPE, Expression


PipelineType = TypeVar("PipelineType", bound=_BasePipeline)
Expand Down Expand Up @@ -108,3 +109,65 @@ def documents(self, *docs: "BaseDocumentReference") -> PipelineType:
a new pipeline instance targeting the specified documents
"""
return self._create_pipeline(stages.Documents.of(*docs))

def literals(
self, *documents: dict[str, Expression | CONSTANT_TYPE]
) -> PipelineType:
"""
Returns documents from a fixed set of predefined document objects.

Example:
>>> from google.cloud.firestore_v1.pipeline_expressions import Constant
>>> documents = [
... {"name": "joe", "age": 10},
... {"name": "bob", "age": 30},
... {"name": "alice", "age": 40}
... ]
>>> pipeline = client.pipeline()
... .literals(*documents)
... .where(field("age").lessThan(35))

Output documents:
```json
[
{"name": "joe", "age": 10},
{"name": "bob", "age": 30}
]
```

Behavior:
The `literals(...)` stage can only be used as the first stage in a pipeline (or
sub-pipeline). The order of documents returned from the `literals` matches the
order in which they are defined.

While literal values are the most common, it is also possible to pass in
expressions, which will be evaluated and returned, making it possible to test
out different query / expression behavior without first needing to create some
test data.

For example, the following shows how to quickly test out the `length(...)`
function on some constant test sets:

Example:
>>> from google.cloud.firestore_v1.pipeline_expressions import Constant
>>> documents = [
... {"x": Constant.of("foo-bar-baz").char_length()},
... {"x": Constant.of("bar").char_length()}
... ]
>>> pipeline = client.pipeline().literals(*documents)

Output documents:
```json
[
{"x": 11},
{"x": 3}
]
```

Args:
*documents: One or more documents to be returned by this stage. Each can be a `dict`
of values of `Expression` or `CONSTANT_TYPE` types.
Returns:
A new Pipeline object with this stage appended to the stage list.
"""
return self._create_pipeline(stages.Literals(*documents))
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@

from abc import ABC, abstractmethod
from enum import Enum
from typing import TYPE_CHECKING, Optional, Sequence
from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence

from google.cloud.firestore_v1._helpers import encode_value
from google.cloud.firestore_v1.base_vector_query import DistanceMeasure
from google.cloud.firestore_v1.pipeline_expressions import (
AggregateFunction,
AliasedExpression,
BooleanExpression,
CONSTANT_TYPE,
Expression,
Field,
Ordering,
Expand Down Expand Up @@ -342,6 +343,26 @@ def _pb_args(self):
return [Value(integer_value=self.limit)]


class Literals(Stage):
"""Returns documents from a fixed set of predefined document objects."""

def __init__(self, *documents: dict[str, Expression | CONSTANT_TYPE]):
super().__init__("literals")
self.documents: tuple[Mapping[str, Any], ...] = documents
Copy link
Copy Markdown
Contributor

@daniel-sanche daniel-sanche Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer if we didn't use Any here, since this is a publicly exposed attribute

We could resolve it using a TypeVar:
T = TypeVar("T", bound=Union[Expression, CONSTANT_TYPE])

But honestly, I'd recommend converting the constants at init time instead, which would both resolve this issue, raise conversion errors in a better place, and simplify the code:

class Literals(Stage):
    """Returns documents from a fixed set of predefined document objects."""

    def __init__(self, *documents: dict[str, Expression | CONSTANT_TYPE]):
        super().__init__("literals")
        self.documents: Sequence[dict[str, Expression]] = [
            {k: v if isinstance(v, Expression) else encode_value(v) for k,v in d.items()}
            for d in documents
        ]

    def _pb_args(self):
        return [Value(map_value={"fields": doc}) for doc in self.documents]

Copy link
Copy Markdown
Contributor Author

@Linchin Linchin Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it's a good idea because encoding the values here could break the __repr__() method at the parent class, and we would expose the encoded values in the repr string. For example, instead of "Literals(documents=('hello',))", we would be getting "Literals(documents=(Value(string_value='hello'),))".

What do you think?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I should have tested the code before posting. Value isn't an expression, so that wouldn't have passed mypy. This should though:

class Literals(Stage):
    """Returns documents from a fixed set of predefined document objects."""

    def __init__(self, *documents: Mapping[str, Expression | CONSTANT_TYPE]):
        super().__init__("literals")
        self.documents: list[Mapping[str, Expression]] = [
            {k: v if isinstance(v, Expression) else Constant.of(v) for k,v in d.items()}
            for d in documents
        ]

    def _pb_args(self):
        return [
            Value(map_value={"fields": {k: v.to_pb for k,v in doc.items()}})
            for doc in self.documents
        ]

This would result in the repr of Literals({"hello": "world"})) being Literals(documents=[{"hello": Constant.of('world')}]), which is fine, because represents the same object. And that's how the other stages are set up


def _pb_args(self):
args = []
for doc in self.documents:
encoded_doc = {}
for k, v in doc.items():
if hasattr(v, "_to_pb"):
encoded_doc[k] = v._to_pb()
else:
encoded_doc[k] = encode_value(v)
args.append(Value(map_value={"fields": encoded_doc}))
return args


class Offset(Stage):
"""Skips a specified number of documents."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -684,4 +684,63 @@ tests:
- args:
- fieldReferenceValue: awards
- stringValue: full_replace
name: replace_with
name: replace_with
- description: literals
pipeline:
- Literals:
- title: "The Hitchhiker's Guide to the Galaxy"
author: "Douglas Adams"
- genre: "Science Fiction"
year: 1979
assert_results:
- title: "The Hitchhiker's Guide to the Galaxy"
author: "Douglas Adams"
- genre: "Science Fiction"
year: 1979
assert_proto:
pipeline:
stages:
- args:
- mapValue:
fields:
author:
stringValue: "Douglas Adams"
title:
stringValue: "The Hitchhiker's Guide to the Galaxy"
- mapValue:
fields:
genre:
stringValue: "Science Fiction"
year:
integerValue: '1979'
name: literals
- description: literals_with_expression_input
pipeline:
- Literals:
- res:
FunctionExpression.string_concat:
- Constant: "A"
- Constant: "B"
- Select:
- res
assert_results:
- res: "AB"
assert_proto:
pipeline:
stages:
- args:
- mapValue:
fields:
res:
functionValue:
args:
- stringValue: "A"
- stringValue: "B"
name: string_concat
name: literals
- args:
- mapValue:
fields:
res:
fieldReferenceValue: res
name: select
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,17 @@ def test_documents(self):
assert first_stage.paths[1] == "/a/2"
assert first_stage.paths[2] == "/a/3"

def test_literals(self):
from google.cloud.firestore_v1.pipeline_expressions import Field

instance = self._make_client().pipeline()
documents = ({"field": Field.of("a")}, {"name": "joe"})
ppl = instance.literals(*documents)
assert isinstance(ppl, self._expected_pipeline_type)
assert len(ppl.stages) == 1
first_stage = ppl.stages[0]
assert isinstance(first_stage, stages.Literals)


class TestPipelineSourceWithAsyncClient(TestPipelineSource):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,112 @@ def test_to_pb(self):
assert len(result.options) == 0


class TestLiterals:
def _make_one(self, *args, **kwargs):
return stages.Literals(*args, **kwargs)

def test_ctor(self):
val1 = {"a": Field.of("a")}
val2 = {"b": 2}
instance = self._make_one(val1, val2)
assert instance.documents == (val1, val2)
assert instance.name == "literals"

def test_ctor_extended_types(self):
import datetime
from google.cloud.firestore_v1._helpers import GeoPoint
from google.cloud.firestore_v1.vector import Vector

doc = {
"a": 1,
"b": "string",
"c": 3.14,
"d": True,
"e": None,
"f": b"bytes",
"g": datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc),
"h": GeoPoint(1.0, 2.0),
"i": Vector([1.0, 2.0]),
}
instance = self._make_one(doc)
assert instance.documents == (doc,)
assert instance.name == "literals"

def test_ctor_w_expressions(self):
from google.cloud.firestore_v1.pipeline_expressions import FunctionExpression

expr = FunctionExpression("string_concat", [Constant("A"), Constant("B")])
doc = {"res": expr}
instance = self._make_one(doc)
assert instance.documents == (doc,)
assert instance.name == "literals"

def test_repr(self):
from google.cloud.firestore_v1.pipeline_expressions import Field

val1 = {"a": Field.of("a")}
instance = self._make_one(val1, {"b": 2})
repr_str = repr(instance)
assert repr_str == "Literals(documents=({'a': Field.of('a')}, {'b': 2}))"

def test_to_pb_constant_types(self):
import datetime
from google.cloud.firestore_v1._helpers import GeoPoint
from google.cloud.firestore_v1.vector import Vector

doc = {
"a": 1,
"b": "string",
"c": 3.14,
"d": True,
"e": None,
"f": b"bytes",
"g": datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc),
"h": GeoPoint(1.0, 2.0),
"i": Vector([1.0, 2.0]),
}
instance = self._make_one(doc)
result = instance._to_pb()
assert result.name == "literals"
assert len(result.args) == 1

fields = result.args[0].map_value.fields
assert fields["a"].integer_value == 1
assert fields["b"].string_value == "string"
assert fields["c"].double_value == 3.14
assert fields["d"].boolean_value is True
assert fields["e"].null_value == 0
assert fields["f"].bytes_value == b"bytes"
assert fields["g"].timestamp_value == datetime.datetime(
2025, 1, 1, tzinfo=datetime.timezone.utc
)
assert fields["h"].geo_point_value.latitude == 1.0
assert fields["h"].geo_point_value.longitude == 2.0
assert (
fields["i"].map_value.fields["value"].array_value.values[0].double_value
== 1.0
)
assert (
fields["i"].map_value.fields["value"].array_value.values[1].double_value
== 2.0
)

def test_to_pb_w_expression(self):
from google.cloud.firestore_v1.pipeline_expressions import FunctionExpression

expr = FunctionExpression("string_concat", [Constant("A"), Constant("B")])
doc = {"res": expr}
instance = self._make_one(doc)
result = instance._to_pb()
assert result.name == "literals"
assert len(result.args) == 1

fields = result.args[0].map_value.fields
assert fields["res"].function_value.name == "string_concat"
assert fields["res"].function_value.args[0].string_value == "A"
assert fields["res"].function_value.args[1].string_value == "B"


class TestOffset:
def _make_one(self, *args, **kwargs):
return stages.Offset(*args, **kwargs)
Expand Down
Loading