-
Notifications
You must be signed in to change notification settings - Fork 0
Shared Memory Caches
The Shared Memory Cache Subsystem enables efficient inter-process communication (IPC) between the XID Manager daemon, the PgXidSubscriberMgr daemon, and PostgreSQL Foreign Data Wrapper (FDW) processes. This subsystem allows multiple FDW processes to share system metadata and transaction mutation data without repeated RPC calls to backend services.
- Zero-copy data sharing between processes using Boost Interprocess shared memory
- Reduced RPC overhead by caching frequently accessed metadata locally
- Transaction-aware caching with XID (transaction ID) history tracking
- Automatic staleness detection and cache refresh mechanisms
- LRU-based memory management to prevent unbounded growth
┌─────────────────────────────────────┐
│ XID Manager (xid_mgr_daemon) │
│ - Manages transaction commitments │
└────────────────┬────────────────────┘
│ gRPC: XidPushResponse
↓
┌─────────────────────────────────────────────────────────────┐
│ PgXidSubscriberMgr (daemon, separate thread) │
│ - Receives XID notifications │
│ - Creates 5 shared memory caches │
│ - Populates caches on transaction commit │
│ - Worker threads fetch metadata proactively │
└───┬─────────────────────────────────────────────────────────┘
│ Shared Memory IPC
├─ SHM_CACHE_ROOTS (table metadata)
├─ SHM_CACHE_SCHEMAS (table schemas)
├─ SHM_CACHE_USERTYPES (user-defined types)
├─ SHM_CACHE_TABLE_IDS (pending mutations)
└─ SHM_CACHE_EXTENTS (mutation records)
│
↓ (Opens and reads)
┌──────────────────────────────┐
│ PgFdwMgr (FDW process) │
│ - Runs in PostgreSQL │
│ - Opens existing caches │
│ - Queries pending XIDs │
│ - Applies mutations │
└──────────────────────────────┘
| Component | Role | Process Type |
|---|---|---|
| PgXidSubscriberMgr | Cache producer - creates and maintains caches | Daemon thread |
| PgFdwMgr | Cache consumer - opens and uses caches | PostgreSQL FDW process |
| ShmCache | Shared memory abstraction using Boost Interprocess | Library class |
| MsgCache | Multi-index container with LRU eviction | Template class |
| XidMgrClient | Streams XID commit notifications | gRPC client |
The subsystem manages 5 distinct shared memory caches, each serving a specific purpose:
Purpose: Stores table root/metadata information
Key Properties:
- XID history: Enabled
- Tracks schema changes across transactions
- Contains table root objects with metadata pointers
Data Structure:
Key: (DbId, TableId)
Value: vector<proto::Root> sorted by XIDLocation:
- Created:
pg_xid_subscriber_mgr.cc:53 - Used by:
sys_tbl_mgr::Client::get_roots()
Purpose: Stores table schema definitions
Key Properties:
- XID history: Disabled
- Contains column definitions, types, constraints
- Updated on DDL operations
Data Structure:
Key: (DbId, TableId)
Value: vector<proto::Schema>Location:
- Created:
pg_xid_subscriber_mgr.cc:56 - Used by:
sys_tbl_mgr::Client::get_schema()
Purpose: Stores user-defined type information
Key Properties:
- XID history: Disabled
- Contains custom type definitions
- Required for schema deserialization
Data Structure:
Key: (DbId, TypeId)
Value: vector<proto::UserType>Location:
- Created:
pg_xid_subscriber_mgr.cc:61 - Used by:
sys_tbl_mgr::Client::get_usertype()
Purpose: Maps (DbId, XID) to list of TableIds modified in each transaction
Key Properties:
- XID history: Enabled
- Tracks pending mutations per transaction
- Critical for read-after-write consistency
Data Structure:
Key: (DbId, Xid)
Value: serialized proto::XidPushResponse
Fields: {db_id, xid, has_schema_changes, real_commit, table_ids[]}Location:
- Created:
pg_xid_subscriber_mgr.cc:64 - Used by:
PgFdwMgr::_get_table():645-654
Usage Pattern:
// Query pending XIDs
_table_ids_shm_cache->get_pending_xids(db_id)
// Fetch mutation metadata
_table_ids_shm_cache->find(db_id, xid)Purpose: Stores mutation records (extents) from WriteCacheClient
Key Properties:
- XID history: Disabled
- Contains actual data mutations
- Populated before applying to base tables
Data Structure:
Key: (DbId, ExtentId)
Value: vector<proto::Extent>Location:
- Created:
pg_xid_subscriber_mgr.cc:68 - Used by:
WriteCacheClientfor mutation staging
Header: include/pg_fdw/pg_xid_subscriber_mgr.hh:22-105
Implementation: src/pg_fdw/pg_xid_subscriber_mgr.cc
-
Cache Creation (
init()method):- Creates all 5 shared memory caches with configured sizes
- Removes stale caches from previous runs
- Initializes XID history cleaners for enabled caches
-
XID Push Notification Handling (
task()method):- Subscribes to XidMgrClient push stream
- Receives
proto::XidPushResponsemessages - Dispatches based on
real_commitflag
-
Asynchronous Population (worker threads):
- Configurable thread pool size
- Fetches metadata on transaction commit
- Calls
Client::get_roots(),get_schema(),get_usertype() - Automatically populates caches via registered ShmCache instances
Header: include/pg_fdw/pg_fdw_mgr.hh:133-468
Implementation: src/pg_fdw/pg_fdw_mgr.cc
-
Cache Opening (
_try_create_cache()method):- Opens existing caches created by PgXidSubscriberMgr
- Verifies cache liveness with
is_alive()check - Registers caches with Client and WriteCacheClient singletons
-
Mutation Application (
_get_table()method):- Queries pending XIDs since last committed
- Fetches mutation metadata from table_ids_cache
- Retrieves extent data from WriteCache
- Applies mutations to base table as ChangeSet
-
Lifecycle Management:
- Lazy initialization on first query
- Automatic cache reopening if stale
- Background thread for XID updates
Key Points:
- Opens in read mode (size=0 parameter)
- Handles case where PgXidSubscriberMgr hasn't started yet
- Checks
is_alive()to detect stale caches - Registers with appropriate singleton clients
Performance Considerations:
- Only processes XIDs <= snapshot_xid (transaction isolation)
- Limits to
MAX_WRITE_CACHE_EXTENTS(10) per query - Resets pending XIDs if too many extents found
- Avoids redundant extent fetches
Header: include/sys_tbl_mgr/shm_cache.hh:73-360
Implementation: src/sys_tbl_mgr/shm_cache.cc
Provides a shared memory abstraction using Boost Interprocess for inter-process data sharing.
-
Dual-mode construction:
-
Create mode:
ShmCache(type, size, xid_history)with size > 0 -
Open mode:
ShmCache(type, 0, xid_history)opens existing cache
-
Create mode:
-
XID tracking (if enabled):
- Tracks committed XID per database
- Maintains list of pending XIDs
- Records XID history for schema changes
-
Keep-alive mechanism:
- Caches must be kept alive every 60ms
-
is_alive()checks if cache is stale - Automatic reconnection if stale detected
Usage:
- PgXidSubscriberMgr calls
keep_alive()every 60ms - PgFdwMgr checks
is_alive()before using cache - If stale, PgFdwMgr reopens cache
Header: include/sys_tbl_mgr/msg_cache.hh:31-323
Multi-index container with LRU eviction, providing both fast lookup and insertion-order tracking.
Configuration:
- Free memory limit: 30% (trigger eviction)
- Free memory watermark: 50% (stop eviction)
- Retention: Top 10% frequently accessed items protected (
msg_cache.hh:236)
// PgFdwMgr (pg_fdw_mgr.hh:298)
std::shared_mutex _shm_cache_mutex; // Protects cache pointers
// ShmCache (shm_cache.hh:96)
boost::interprocess::named_sharable_mutex* _mutex; // Per-cache lock
// PgXidSubscriberMgr (pg_xid_subscriber_mgr.hh:93)
std::mutex _mutex; // Protects populate job queue
std::condition_variable _cv; // Worker coordinationFile: Properties singleton (commonly loaded from YAML/JSON config)
Schema:
{
"sys_tbl_mgr": {
"roots_shm_cache_size": 1073741824,
"schema_shm_cache_size": 536870912,
"usertype_shm_cache_size": 268435456,
"table_ids_shm_cache_size": 536870912,
"extents_shm_cache_size": 1073741824,
"rpc_config": {
"server_worker_threads": 4
}
}
}Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
roots_shm_cache_size |
size_t | 1 GB | Size of roots cache in bytes |
schema_shm_cache_size |
size_t | 512 MB | Size of schema cache in bytes |
usertype_shm_cache_size |
size_t | 256 MB | Size of usertype cache in bytes |
table_ids_shm_cache_size |
size_t | 512 MB | Size of table IDs cache in bytes |
extents_shm_cache_size |
size_t | 1 GB | Size of extents cache in bytes |
server_worker_threads |
size_t | 4 | Number of worker threads for async population |
Scenario: PostgreSQL FDW process executes SELECT * FROM table WHERE id = 123 with snapshot XID 1000.
Steps:
// 1. PgFdwMgr::_get_table() is called
auto table = _get_table(db_id, table_id, snapshot_xid=1000);
// 2. Fetch base table from SysTableMgr
auto base_table = SysTableMgr::get_instance()->get_table(db_id, table_id, 1000);
// 3. Query pending XIDs from table_ids_cache
auto pending_xids = _table_ids_shm_cache->get_pending_xids(db_id);
// Result: [950, 975, 1050]
// 4. Filter XIDs <= snapshot_xid
// Process: 950, 975 (skip 1050)
// 5. For XID 950:
auto msg = _table_ids_shm_cache->find(db_id, 950);
proto::XidPushResponse response;
response.ParseFromString(msg);
// response.table_ids() = [10, 25, 30]
// 6. Check if table_id matches
if (table_id == 25) {
// 7. Fetch extents from WriteCache
auto extents = WriteCacheClient::get_instance()->get_extents(db_id, table_id, 950);
// 8. Apply extents as ChangeSet
ChangeSet cs;
for (auto& extent : extents) {
cs.add_extent(extent);
}
base_table->apply_changeset(cs, 950);
}
// 9. Repeat for XID 975
// ... similar logic ...
// 10. Return merged table
return base_table;Result: Query sees all committed data plus uncommitted mutations from XIDs 950 and 975, ensuring read-after-write consistency.
Scenario: PgXidSubscriberMgr crashes and restarts while FDW processes are running.
Steps:
// 1. PgFdwMgr detects stale cache
bool ShmCache::is_alive() const {
auto now = std::chrono::steady_clock::now();
auto elapsed = now - _last_keep_alive;
return elapsed < std::chrono::milliseconds(60);
}
// 2. PgFdwMgr::_try_create_cache() checks liveness
if (!_roots_shm_cache || !_roots_shm_cache->is_alive()) {
// 3. Reopen cache
_roots_shm_cache = std::make_shared<sys_tbl_mgr::ShmCache>(
sys_tbl_mgr::SHM_CACHE_ROOTS, 0, true); // Open mode
// 4. Re-register with Client
sys_tbl_mgr::Client::get_instance()->use_roots_cache(_roots_shm_cache);
}
// 5. PgXidSubscriberMgr restarts
PgXidSubscriberMgr::start();
// 6. New caches created
// Old shared memory segments removed
// Fresh XIDs streamed from XID Manager
// 7. FDW processes reopen caches on next query
// Seamless recovery with minimal downtimeProblem: Unbounded cache growth causes OOM
Solution: Multi-index container with sequenced index for LRU
Benefits:
- Automatic eviction at 30% free memory
- Stops eviction at 50% free memory watermark
# List shared memory segments
ipcs -m
# Remove stale shared memory
ipcrm -m <shmid>
# Check cache sizes
ls -lh /dev/shm/
# Monitor cache usage
watch -n 1 'ipcs -m -u'
# Verify PgXidSubscriberMgr is running
ps aux | grep pg_xid_subscriber_mgr
# Check XID Manager connection
netstat -an | grep <xid_manager_port>## Design Patterns
Pattern:
- Producer: PgXidSubscriberMgr creates and maintains caches
- Consumer: PgFdwMgr opens and uses caches
- Trigger: Query execution in PostgreSQL FDW process
Benefits:
- Loose coupling between producer and consumer
- Graceful handling of startup ordering
- Automatic recovery on producer restart
| File | Description |
|---|---|
include/pg_fdw/pg_xid_subscriber_mgr.hh |
PgXidSubscriberMgr class definition |
src/pg_fdw/pg_xid_subscriber_mgr.cc |
PgXidSubscriberMgr implementation |
include/pg_fdw/pg_fdw_mgr.hh |
PgFdwMgr class definition |
src/pg_fdw/pg_fdw_mgr.cc |
PgFdwMgr implementation |
include/sys_tbl_mgr/shm_cache.hh |
ShmCache class definition |
src/sys_tbl_mgr/shm_cache.cc |
ShmCache implementation |
include/sys_tbl_mgr/msg_cache.hh |
MsgCache template definition |
src/proto/xid_manager.proto |
XidPushResponse protobuf definition |
include/write_cache/write_cache_client.hh |
WriteCacheClient class definition |
| Library | Purpose | Documentation |
|---|---|---|
| Boost Interprocess | Shared memory IPC | boost.org/doc/libs/interprocess |
| Boost Multi-Index | Multi-index containers | boost.org/doc/libs/multi_index |
| Protocol Buffers | Serialization | developers.google.com/protocol-buffers |
| gRPC | RPC framework | grpc.io |
- PostgreSQL Foreign Data Wrapper API
- Springtail Transaction Management
- XID Manager Architecture
- WriteCache Design