Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .cursor/plans/2026-02-09-dify-performance-userid.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
- 目标:将性能测试的 `DIFY_USER_ID` 更名为“用户数量”并由代码生成 `user1..userN`
- 修改 `performance/.env`:把 `DIFY_USER_COUNT` 设置为整数示例值
- 新增 `performance/user_ids.py`:解析 env 值并生成用户列表
- 更新 `performance/locustfile.py`:改用 `DIFY_USER_COUNT` 并保持随机选择用户
- 更新 `performance/README.md`:同步配置说明与示例
- 新增单元测试:覆盖数值与列表两种输入场景
- 影响文件:`performance/.env`, `performance/locustfile.py`, `performance/user_ids.py`, `performance/README.md`, `tests/unit/utils/test_performance_user_ids.py`

26 changes: 13 additions & 13 deletions CONFIG.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ First, select the operation mode in plugin credentials:
- Recommended for production environments
- Supports high concurrency
- Write operations (Add/Update/Delete/Delete_All): non-blocking, return ACCEPT status immediately
- Read operations (Search/Get/Get_All/History): wait for results with timeout protection (default: 30s)
- Read operations (Search/Get/Get_All/History): wait for results with timeout protection (default: 5s)

- **Sync Mode** (`async_mode=false`)
- Recommended for testing environments
Expand Down Expand Up @@ -116,14 +116,14 @@ After installation, click on the `mem0ai` plugin to configure it. You'll see cre
You can configure the following performance parameters in plugin settings to optimize concurrency and database connections for production environments:

**Performance Parameters:**
- `max_concurrent_memory_operations` - Maximum concurrent memory operations (default: 40)
- `max_concurrent_memory_operations` - Maximum concurrent memory operations (default: 20)
- Applies to all operations including search/add/get/get_all/update/delete/delete_all/history
- Must be a positive integer (>= 1)
- Invalid values (<= 0 or cannot be converted to integer) will use default value 40 with warning logs
- Invalid values (<= 0 or cannot be converted to integer) will use default value 20 with warning logs

**Concurrency Configuration Logic:**
- **`max_concurrent_memory_operations` configured**: Uses the configured value directly
- **Not configured**: Uses default value (40)
- **Not configured**: Uses default value (20)
- **Invalid values** (cannot be converted to positive integers): Uses default values and logs a warning
- **Unset or empty values**: Uses default values and logs a warning

Expand Down Expand Up @@ -276,8 +276,8 @@ The plugin automatically creates a psycopg3 ConnectionPool when `connection_stri
"connection_string": "postgresql://<user>:<password>@<host>:<port>/<db>?sslmode=disable&keepalives=1&keepalives_idle=30&keepalives_interval=10&keepalives_count=3&connect_timeout=5",
"collection_name": "mem0",
"embedding_model_dims": 1536,
"minconn": 2,
"maxconn": 4
"minconn": 10,
"maxconn": 40
}
}
```
Expand Down Expand Up @@ -349,9 +349,9 @@ If you need fine-grained control over pool sizing and lifecycle, you can add the
"collection_name": "mem0",
"embedding_model_dims": 1536,
"min_connections": 10,
"max_connections": 40,
"max_connections": 20,
"pool_min_size": 10,
"pool_max_size": 40,
"pool_max_size": 20,
"pool_max_lifetime": 3600,
"pool_max_idle": 600,
"pool_timeout": 30,
Expand All @@ -364,9 +364,9 @@ If you need fine-grained control over pool sizing and lifecycle, you can add the

**Connection Pool Parameters (Optional, with best practice defaults):**
- `min_connections` (int, default: 10): Default minimum connections (used when `pool_min_size` not provided)
- `max_connections` (int, default: 40): Default maximum connections (used when `pool_max_size` not provided)
- `max_connections` (int, default: 20): Default maximum connections (used when `pool_max_size` not provided)
- `pool_min_size` (int, default: uses `min_connections` or 10): Minimum number of connections in the pool
- `pool_max_size` (int, default: uses `max_connections` or 40): Maximum number of connections in the pool
- `pool_max_size` (int, default: uses `max_connections` or 20): Maximum number of connections in the pool
- `pool_max_lifetime` (float, default: 3600.0): Connection maximum lifetime in seconds (1 hour)
- `pool_max_idle` (float, default: 600.0): Connection maximum idle time in seconds (10 minutes)
- `pool_timeout` (float, default: 30.0): Timeout in seconds to get a connection from the pool
Expand All @@ -389,7 +389,7 @@ If you have a pre-configured psycopg3 ConnectionPool object, you can pass it dir
**Important Notes:**
- If using individual parameters, `user` is required
- Connection pool defaults (`min_connections`, `max_connections`) should be specified in the vector store config JSON
- The plugin automatically sets `minconn` and `maxconn` based on `min_connections`/`max_connections` in config (or defaults: 10 and 40)
- The plugin automatically sets `minconn` and `maxconn` based on `min_connections`/`max_connections` in config (or defaults: 10 and 20)
- **Production recommendation**: Set `max_connections` to match `max_concurrent_memory_operations` for optimal performance
- Parameter priority: `connection_pool` > `connection_string` > individual parameters
- If you provide both `connection_string` and individual parameters, `connection_string` takes precedence
Expand Down Expand Up @@ -936,7 +936,7 @@ Use `get_user_checkpoint` to inspect the extraction checkpoint for a user, optio

- **Read Operations** (Search/Get/Get_All/History):
- Wait for results and return actual data
- **Timeout protection**: All async read operations have timeout mechanisms (default: 30s, configurable)
- **Timeout protection**: All async read operations have timeout mechanisms (default: 5s, configurable)
- On timeout or error: logs event, cancels background tasks, returns default/empty results

### Sync Mode (`async_mode=false`)
Expand Down Expand Up @@ -1147,7 +1147,7 @@ For detailed upgrade instructions and field mapping, see [README.md - Upgrade Gu
- **Cause**: Invalid or unset concurrency parameter values (cannot be converted to positive integers)
- **Solution**:
- Check logs for specific warning messages indicating which parameter has an invalid value
- Ensure concurrency parameters are positive integers (minimum: 1, default: 40)
- Ensure concurrency parameters are positive integers (minimum: 1, default: 20)
- Configure `max_concurrent_memory_operations` to control concurrency for all operations
- See [Performance Parameters](#step-3-configure-performance-parameters-optional-recommended-for-production) for detailed configuration logic

Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ Note: `extract_long_term_memory` uses `conversations_limit` as the per-user tota
- `local_graph_db_json_secret` (was `local_graph_db_json`, optional)
- `local_reranker_json_secret` (was `local_reranker_json`, optional)
- **Important**: If you previously used `pgvector_min_connections` and `pgvector_max_connections` credential fields, you must now configure them in the `local_vector_db_json_secret` JSON config:
- Add `"minconn": 10` and `"maxconn": 40` to your pgvector config JSON (see [CONFIG.md](https://github.com/beersoccer/mem0_dify_plugin/blob/main/CONFIG.md#vector-store-configuration-local_vector_db_json_secret) for examples)
- Add `"minconn": 10` and `"maxconn": 20` to your pgvector config JSON (or set `maxconn` to match your `max_concurrent_memory_operations`, default: 20). See [CONFIG.md](https://github.com/beersoccer/mem0_dify_plugin/blob/main/CONFIG.md#vector-store-configuration-local_vector_db_json_secret) for examples.
- These fields are no longer available as separate credential fields
- Use the same configuration values you backed up in step 1
- Save the configuration
Expand All @@ -243,7 +243,7 @@ Note: `extract_long_term_memory` uses `conversations_limit` as the per-user tota
**New Features:**
- **Dynamic Log Level**: You can now change log level (INFO/DEBUG/WARNING/ERROR) in plugin credentials without redeployment
- **Request Tracing**: All tools now support `run_id` parameter for better call chain tracking (recommended to use Dify's `workflow_run_id`)
- **Timeout Optimization**: Read operation timeout reduced to 15 seconds for better responsiveness
- **Timeout Optimization**: Read operation timeout is tuned for responsiveness (current default: 5s, configurable per tool)

### Upgrading from v0.1.3

Expand Down
10 changes: 5 additions & 5 deletions performance/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ locust -f performance/locustfile.py --host=http://localhost
### Headless Mode

```bash
# Run without web UI
locust -f performance/locustfile.py \
# Run without web UI (override user count)
DIFY_USER_COUNT=10 locust -f performance/locustfile.py \
--host=http://localhost \
--users 10 \
--spawn-rate 2 \
Expand Down Expand Up @@ -60,7 +60,7 @@ DIFY_API_KEY="<your-dify-api-key>"
DIFY_BASE_URL="http://<your-dify-host>/v1" # Base URL for remote testing (overrides --host if set)
DIFY_ENDPOINT="/chat-messages"
DIFY_QUERY="<your-custom-query>"
DIFY_USER_ID="<user_a>" # Single user, or comma-separated list: "<user_a>,<user_b>"
DIFY_USER_COUNT=5 # 参与测试的用户数量,实际用户 ID 为 user1..userN(默认 5)
DIFY_RESPONSE_MODE="streaming"
DIFY_MIN_TURNS=3 # Minimum number of follow-up conversation turns (default: 3)
DIFY_MAX_TURNS=5 # Maximum number of follow-up conversation turns (default: 5)
Expand All @@ -70,7 +70,7 @@ The script will automatically load variables from `performance/.env` if it exist

**Note**:
- The conversation will have 3-5 follow-up turns (randomly selected) after the initial message. This simulates multi-turn conversations that can trigger long-term memory extraction.
- If `DIFY_USER_ID` contains multiple users (comma-separated), each request will randomly select one user from the list. This allows testing with different user contexts.
- If `DIFY_USER_COUNT` is an integer N, each request will randomly select one user from user1..userN. This allows testing with different user contexts.
- Use a base URL ending with `/v1` and endpoints starting with `/chat-messages` and `/messages/...` to match the Dify API structure.

**Option 2: Set environment variables directly**
Expand All @@ -80,7 +80,7 @@ DIFY_API_KEY='<your-dify-api-key>' \
DIFY_BASE_URL='http://<your-dify-host>/v1' \
DIFY_ENDPOINT='/chat-messages' \
DIFY_QUERY='<your-custom-query>' \
DIFY_USER_ID='<user_a>' \
DIFY_USER_COUNT=10 \
DIFY_RESPONSE_MODE='streaming' \
DIFY_MIN_TURNS=3 \
DIFY_MAX_TURNS=5 \
Expand Down
11 changes: 5 additions & 6 deletions performance/locustfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
DIFY_API_KEY='key' \
DIFY_ENDPOINT='/chat-messages' \
DIFY_QUERY='Your custom query' \
DIFY_USER_ID='test_user' \
DIFY_USER_COUNT='10' \
DIFY_RESPONSE_MODE='streaming' \
locust -f performance/locustfile.py --host=http://localhost
"""
Expand All @@ -51,6 +51,7 @@

from dotenv import load_dotenv
from locust import HttpUser, TaskSet, between, events, task
from user_ids import build_user_ids

# Load environment variables from .env file in performance directory
env_file = Path(__file__).parent / ".env"
Expand Down Expand Up @@ -91,11 +92,9 @@ def on_start(self) -> None:
# Configurable payload template
self.response_mode = os.getenv("DIFY_RESPONSE_MODE", "streaming")

# Parse user_id(s) - can be comma-separated list for random selection
user_id_str = os.getenv("DIFY_USER_ID", "test_user")
self.user_ids = [uid.strip() for uid in user_id_str.split(",") if uid.strip()]
if not self.user_ids:
self.user_ids = ["test_user"]
# Parse user_id(s) - integer count generates user1..userN
user_count_str = os.getenv("DIFY_USER_COUNT", "5")
self.user_ids = build_user_ids(user_count_str, default_user="test_user")

# Multi-turn conversation settings
# min_turns and max_turns define the range for follow-up conversation rounds
Expand Down
22 changes: 22 additions & 0 deletions performance/user_ids.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from __future__ import annotations


def build_user_ids(raw_value: str, default_user: str = "test_user") -> list[str]:
"""Build a user id list from env value.

- If raw_value is an integer N, generate user1..userN.
- Otherwise, treat raw_value as a comma-separated list.
"""
normalized = (raw_value or "").strip()
if not normalized:
return [default_user]

if normalized.isdigit():
count = int(normalized)
if count <= 0:
return ["user1"]
return [f"user{i}" for i in range(1, count + 1)]

user_ids = [uid.strip() for uid in normalized.split(",") if uid.strip()]
return user_ids or [default_user]

7 changes: 4 additions & 3 deletions provider/mem0ai.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,10 @@ def _validate_credentials(self, credentials: dict[str, Any]) -> None:

logger.debug("Validating Mem0 provider credentials")

# Use a longer timeout for validation to allow for vector DB initialization
# (e.g., Pinecone connection setup, index creation, etc.)
validation_timeout = READ_OPERATION_TIMEOUT * 2 # 30 seconds
# Use a longer timeout for validation to allow for vector DB initialization.
# NOTE: This should be independent from runtime read timeouts (e.g. search timeout=5s),
# otherwise first-time credential validation can become flaky in real networks.
validation_timeout = max(30, READ_OPERATION_TIMEOUT * 2)

try:
async_mode = is_async_mode(credentials)
Expand Down
6 changes: 3 additions & 3 deletions provider/mem0ai.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,13 @@ credentials_for_provider:
max_concurrent_memory_operations:
type: text-input
required: false
default: "40"
default: "20"
label:
en_US: Max Concurrent Memory Operations
zh_Hans: 最大并发记忆操作数
help:
en_US: "Maximum concurrent memory operations (default: 40). Must be a positive integer (>= 1). Applies to all operations including search/add/get/get_all/update/delete/delete_all/history."
zh_Hans: "记忆操作的最大并发数(默认值:40)。必须为正整数(>= 1),涵盖所有操作包括 search/add/get/get_all/update/delete/delete_all/history。"
en_US: "Maximum concurrent memory operations (default: 20). Must be a positive integer (>= 1). Applies to all operations including search/add/get/get_all/update/delete/delete_all/history."
zh_Hans: "记忆操作的最大并发数(默认值:20)。必须为正整数(>= 1),涵盖所有操作包括 search/add/get/get_all/update/delete/delete_all/history。"
log_level:
type: select
required: false
Expand Down
59 changes: 59 additions & 0 deletions tests/unit/tools/test_add_memory_overload_guard.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from __future__ import annotations

from unittest.mock import MagicMock

import pytest

from tools.add_memory import AddMemoryTool


def test_add_memory_async_overload_skips_enqueue(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""When overloaded, add_memory should not enqueue a background task."""
import tools.add_memory as add_mod

class FakeClient:
max_ops = 10

def get_pending_tasks_count(self) -> int:
return 999

def ensure_bg_loop(self): # pragma: no cover
raise AssertionError("ensure_bg_loop should not be called on overload")

def track_bg_task(self, *_a, **_kw): # pragma: no cover
raise AssertionError("track_bg_task should not be called on overload")

def _boom(*_a, **_kw): # pragma: no cover
raise AssertionError("run_coroutine_threadsafe should not be called on overload")

monkeypatch.setattr(add_mod, "get_async_client", lambda _c: FakeClient())
monkeypatch.setattr(add_mod.asyncio, "run_coroutine_threadsafe", _boom)

mock_runtime = MagicMock()
mock_runtime.credentials = {} # async_mode defaults to True
tool = AddMemoryTool(runtime=mock_runtime, session=MagicMock())

# Make message objects easy to assert on
tool.create_json_message = lambda d: d # type: ignore[method-assign]
tool.create_text_message = lambda t: t # type: ignore[method-assign]

msgs = list(
tool._invoke(
{
"user_id": "u1",
"user": "hi",
"assistant": "",
"timeout": 1,
}
)
)

assert len(msgs) == 2
assert isinstance(msgs[0], dict)
assert msgs[0]["status"] == "OVERLOAD"
assert "results" in msgs[0]
assert isinstance(msgs[1], str)


52 changes: 52 additions & 0 deletions tests/unit/utils/test_overload_guard_preenqueue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from __future__ import annotations

import time
from unittest.mock import MagicMock

import pytest


def test_execute_async_read_operation_rejects_before_enqueue_when_overloaded(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Overload guard must run BEFORE scheduling a new background task."""
import utils.memory_tool_helpers as helpers

class FakeClient:
max_ops = 10

def get_pending_tasks_count(self) -> int:
return 999

def ensure_bg_loop(self): # pragma: no cover
raise AssertionError("ensure_bg_loop should not be called on overload")

def track_bg_task(self, *_a, **_kw): # pragma: no cover
raise AssertionError("track_bg_task should not be called on overload")

monkeypatch.setattr(helpers, "get_async_client", lambda _c: FakeClient())

def _boom(*_a, **_kw): # pragma: no cover
raise AssertionError("run_coroutine_threadsafe should not be called on overload")

monkeypatch.setattr(helpers.asyncio, "run_coroutine_threadsafe", _boom)

tool = MagicMock()
tool.runtime.credentials = {}

result, error_type = helpers.execute_async_read_operation(
tool_instance=tool,
operation=MagicMock(),
operation_args=(),
operation_kwargs={},
timeout=1.0,
request_id="req1",
mode_str="async",
start_time=time.time(),
operation_name="search_memory(user_id=u1)",
)

assert result is None
assert error_type == "OVERLOAD"


29 changes: 29 additions & 0 deletions tests/unit/utils/test_performance_user_ids.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from __future__ import annotations

import importlib.util
from pathlib import Path


def _load_module():
module_path = Path(__file__).resolve().parents[3] / "performance" / "user_ids.py"
spec = importlib.util.spec_from_file_location("performance_user_ids", module_path)
module = importlib.util.module_from_spec(spec)
assert spec and spec.loader
spec.loader.exec_module(module)
return module


def test_build_user_ids_from_count() -> None:
module = _load_module()
assert module.build_user_ids("3") == ["user1", "user2", "user3"]


def test_build_user_ids_from_list() -> None:
module = _load_module()
assert module.build_user_ids("alice, bob") == ["alice", "bob"]


def test_build_user_ids_from_empty() -> None:
module = _load_module()
assert module.build_user_ids("") == ["test_user"]

Loading