Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions lib/crewai/src/crewai/flow/persistence/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async def async_method(self):

# Constants for log messages
LOG_MESSAGES: Final[dict[str, str]] = {
"save_state": "Saving flow state to memory for ID: {}",
"save_state": "Saving flow state for ID: {} (storage: {})",
"save_error": "Failed to persist state for method {}: {}",
"state_missing": "Flow instance has no state",
"id_missing": "Flow state must have an 'id' field for persistence",
Expand Down Expand Up @@ -100,13 +100,6 @@ def persist_state(
if not flow_uuid:
raise ValueError("Flow state must have an 'id' field for persistence")

# Log state saving only if verbose is True
if verbose:
PRINTER.print(
LOG_MESSAGES["save_state"].format(flow_uuid), color="cyan"
)
logger.info(LOG_MESSAGES["save_state"].format(flow_uuid))

try:
state_data = state._unwrap() if hasattr(state, "_unwrap") else state
persistence_instance.save_state(
Expand All @@ -120,6 +113,15 @@ def persist_state(
PRINTER.print(error_msg, color="red")
logger.error(error_msg)
raise RuntimeError(f"State persistence failed: {e!s}") from e

Comment thread
cursor[bot] marked this conversation as resolved.
# Log storage location so users can find their persisted data
storage_location = getattr(
persistence_instance, "db_path", type(persistence_instance).__name__
)
msg = LOG_MESSAGES["save_state"].format(flow_uuid, storage_location)
if verbose:
PRINTER.print(msg, color="cyan")
logger.info(msg)
except AttributeError as e:
error_msg = LOG_MESSAGES["state_missing"]
if verbose:
Expand Down
5 changes: 5 additions & 0 deletions lib/crewai/src/crewai/flow/persistence/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from datetime import datetime, timezone
import json
import logging
import os
from pathlib import Path
import sqlite3
Expand All @@ -17,6 +18,9 @@
from crewai.utilities.paths import db_storage_path


logger = logging.getLogger(__name__)


if TYPE_CHECKING:
from crewai.flow.async_feedback.types import PendingFeedbackContext

Expand Down Expand Up @@ -66,6 +70,7 @@ def __init__(self, db_path: str | None = None, /, **kwargs: Any) -> None:
def _setup(self) -> Self:
self._lock_name = f"sqlite:{os.path.realpath(self.db_path)}"
self.init_db()
logger.info("SQLiteFlowPersistence initialized with db_path: %s", self.db_path)
return self

def init_db(self) -> None:
Expand Down
106 changes: 89 additions & 17 deletions lib/crewai/tests/test_flow_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,23 +176,6 @@ def test_persist_decorator_verbose_logging(tmp_path, caplog):
db_path = os.path.join(tmp_path, "test_flows.db")
persistence = SQLiteFlowPersistence(db_path)

# Test with verbose=False (default)
class QuietFlow(Flow[Dict[str, str]]):
initial_state = dict()

@start()
@persist(persistence) # Default verbose=False
def init_step(self):
self.state["message"] = "Hello, World!"
self.state["id"] = "test-uuid-1"

flow = QuietFlow(persistence=persistence)
flow.kickoff()
assert "Saving flow state" not in caplog.text

# Clear the log
caplog.clear()

# Test with verbose=True
class VerboseFlow(Flow[Dict[str, str]]):
initial_state = dict()
Expand Down Expand Up @@ -248,3 +231,92 @@ def init_step(self):
assert message.type == "text"
assert message.content == "Hello, World!"
assert isinstance(flow.state._unwrap(), State)


def test_sqlite_persistence_logs_db_path_on_init(tmp_path, caplog):
"""Test that SQLiteFlowPersistence logs its db_path on initialization."""
caplog.set_level("INFO")

db_path = os.path.join(tmp_path, "my_custom.db")
SQLiteFlowPersistence(db_path)

assert "SQLiteFlowPersistence initialized with db_path" in caplog.text
assert db_path in caplog.text


def test_sqlite_persistence_default_path_is_logged(caplog):
"""Test that the default persistence path is logged so users can discover it."""
caplog.set_level("INFO")

persistence = SQLiteFlowPersistence()

assert "SQLiteFlowPersistence initialized with db_path" in caplog.text
assert "flow_states.db" in caplog.text
# Verify the db_path attribute is accessible for programmatic discovery
assert persistence.db_path.endswith("flow_states.db")


def test_persist_logs_storage_location_on_save(tmp_path, caplog):
"""Test that the persist decorator logs the storage location when state is saved."""
caplog.set_level("INFO")

db_path = os.path.join(tmp_path, "test_flows.db")
persistence = SQLiteFlowPersistence(db_path)

class LocationLogFlow(Flow[TestState]):
@start()
@persist(persistence)
def init_step(self):
self.state.message = "test"

flow = LocationLogFlow(persistence=persistence)
flow.kickoff()

# Verify that the storage location (db_path) is logged after saving
assert "storage:" in caplog.text
assert db_path in caplog.text


def test_persist_verbose_shows_storage_location_with_db_path(tmp_path, caplog):
"""Test that verbose persist includes storage location with actual db_path."""
caplog.set_level("INFO")

db_path = os.path.join(tmp_path, "verbose_test.db")
persistence = SQLiteFlowPersistence(db_path)

class VerboseLocationFlow(Flow[Dict[str, str]]):
initial_state = dict()

@start()
@persist(persistence, verbose=True)
def init_step(self):
self.state["message"] = "Hello!"
self.state["id"] = "verbose-uuid"

flow = VerboseLocationFlow(persistence=persistence)
flow.kickoff()

# Verbose mode should log both save message and storage location
assert "Saving flow state for ID: verbose-uuid" in caplog.text
assert f"storage: {db_path}" in caplog.text


def test_persist_class_level_logs_storage_location(tmp_path, caplog):
"""Test that class-level @persist also logs the storage location."""
caplog.set_level("INFO")

db_path = os.path.join(tmp_path, "class_level_test.db")
persistence = SQLiteFlowPersistence(db_path)

@persist(persistence)
class ClassLevelFlow(Flow[TestState]):
@start()
def init_step(self):
self.state.message = "class level"

flow = ClassLevelFlow(persistence=persistence)
flow.kickoff()

# Verify storage location is logged even with class-level decorator
assert "storage:" in caplog.text
assert db_path in caplog.text
Loading