Course: CPSC 538B Fall 2025
Instructor: Ivan Beschastnikh
Group Members: Jiyeon, Oleg, and Parshan
📄 Full Project Report: TinyGraph.pdf
TinyGraph includes comprehensive end-to-end tests that verify MVCC correctness and distributed system behavior.`
cd test
go test -vWhat the tests verify:
- Basic operations: AddVertex, AddEdge, DeleteEdge, BFS
- MVCC time-travel queries (querying graph state at historical timestamps)
- Edge deletion and re-addition with proper versioning
- Complex graph evolution scenarios
For detailed testing documentation, see test/E2E_TEST_README.md
Use generate_rmat.py to create synthetic R-MAT graphs for testing and benchmarking:
# Generate a small graph (16 nodes, 128 edges)
python3 cmd/client/workloads/generate_rmat.py --scale 4 --edge-factor 8 --output workload.txt
# Generate a larger graph (256 nodes, 2048 edges)
python3 cmd/client/workloads/generate_rmat.py --scale 8 --edge-factor 8 --output large_workload.txt
# Visualize an existing workload file (requires networkx and matplotlib)
python3 cmd/client/workloads/generate_rmat.py --visualize-file workload.txtParameters:
--scale N: Number of nodes = 2^N (default: 4)--edge-factor N: Number of edges = nodes × N (default: 8)--output FILE: Output file path (default: rmat_graph.txt)--visualize: Generate visualization when creating a graph--visualize-file FILE: Visualize an existing workload file
Use the start-cluster tool to launch all shard replicas and the query manager:
# Build the start-cluster tool
go build -o start-cluster cmd/start-cluster/main.go
# Start cluster using default config
./start-cluster
# Start cluster with custom config
./start-cluster 3_shard_3_replica_config.yamlThe cluster will start all shard replicas first, wait for initialization, then start the query manager. Press Ctrl+C to gracefully shut down the entire cluster.
Use the client tool to execute workload files against the running cluster:
# Build the client tool
go build -o client cmd/client/main.go
# Run a workload with default config
./client -workload cmd/client/workloads/simple_graph.txt
# Run a workload with custom config
./client -config config.yaml -workload cmd/client/workloads/rmat_graph.txtWorkload Format:
A from to [weight]- Add edge between vertices (creates vertices if needed)D from to- Delete edge between verticesQ- Fetch all vertices and edges from all shardsQ vertex radius- Perform BFS from vertex with given radius
The client automatically verifies BFS results against a local graph representation.
Alternatively, you can start components manually:
The Query Manager coordinates client requests and routes them to shards:
go run cmd/qm/main.go -config config.yamlOr simply (since config.yaml is the default):
go run cmd/qm/main.goThe QM runs on localhost:9090 by default (configurable in config.yaml).
The client connects to the Query Manager and executes workloads:
go run cmd/client/main.go -config config.yaml -workload cmd/client/workloads/simple_graph.txtOr using defaults:
go run cmd/client/main.go -workload cmd/client/workloads/simple_graph.txtWorkload Format:
A vertex1 vertex2 [weight]- Add edge between verticesD vertex1 vertex2- Delete edge between vertices
go run main.gotinygraph/
├── main.go # Main entry point (demo/test setup)
├── go.mod
├── README.md
│
├── pkg/ # Public packages
│ │
│ ├── mvcc/ # MVCC Core Data Structures
│ │ # - Vertex/Edge structs with version history
│ │ # - Time-travel query utilities
│ │ # - Pure data structures, no networking
│ │
│ ├── shard/ # Shard Implementation
│ │ # - ShardRSM (replicated state machine)
│ │ # - Raft consensus integration
│ │ # - MVCC operation application
│ │ # - RPC server for shard operations
│ │
│ ├── qm/ # Query Manager Implementation
│ │ # - Stateless coordinator logic
│ │ # - Partitioning algorithms (Random, Heuristic)
│ │ # - Request routing to shards
│ │ # - Vertex ID encoding (shard_id + uuid)
│ │ # - Timestamp generation/coordination
│ │ # - Client-facing RPC server
│ │
│ ├── bfs/ # BFS Implementations
│ │ # - Baseline synchronous BFS
│ │ # - Optimized batched BFS (Phase 3)
│ │ # - Async shard-coordinated BFS (Phase 3)
│ │
│ ├── rpc/ # RPC Definitions
│ │ # - Client-facing API (Client → QM)
│ │ # - Shard internal API (QM → Shard, Shard → Shard)
│ │ # - Protocol Buffer definitions
│ │ # - Common RPC types
│ │
│ └── partition/ # Partitioning Utilities
│ # - Partitioning strategy interface
│ # - Shared types for partitioning
│
├── internal/ # Internal packages
│ ├── config/ # System configuration
│ │ # - Config structs
│ │ # - Replication mode enum
│ │
│ └── util/ # Utilities
│ # - ID encoding/decoding helpers
│ # - Clock/timestamp utilities
│ # - Custom error types
│
├── cmd/ # Executables
│ ├── shard/ # Shard server executable
│ ├── qm/ # Query Manager executable
│ └── client/ # CLI client for testing
│
├── test/ # Integration tests
│ # - MVCC correctness tests
│ # - Multi-shard distributed tests
│ # - BFS correctness tests
│
└── benchmark/ # Performance benchmarks
# - Workload generator
# - Benchmark suite
- MVCC Layer (
pkg/mvcc/): Core versioned graph data structures with no dependencies on networking or coordination - Shard Layer (
pkg/shard/): Wraps MVCC in a replicated state machine using Raft, handles local storage and serves internal RPCs - Query Manager Layer (
pkg/qm/): Stateless coordinator that handles partitioning, routing, and distributed query orchestration - RPC Layer (
pkg/rpc/): Clean API contracts separating client operations from internal shard communication
We propose TinyGraph, a lightweight distributed graph database designed for fully dynamic ingestion and Breadth-First Search (BFS) queries across partitioned graphs. TinyGraph leverages Multi-Version Concurrency Control (MVCC) to provide consistent, time-travel-enabled views of the graph state. Our design separates concerns between a stateless Query Manager (QM) layer and a stateful, replicated Shard layer. We plan to evaluate the trade-offs among different replication strategies (strong vs. eventual consistency), dynamic partitioning algorithms (Random vs. Heuristic), and distributed BFS implementations.
Graphs are a powerful way to model relationships in data, capturing complex connections that appear in social networks, financial transactions, biological systems, and more. As datasets continue to grow in both size and complexity, single-machine graph databases quickly reach their limits. This has created a growing need for distributed graph databases, which spread the graph across multiple machines to support larger workloads, faster queries, and real-time updates. By distributing storage and computation, such systems can scale to billions of edges while still supporting fundamental queries like Breadth-First Search (BFS) traversal.
Designing a distributed graph database involves important choices that directly affect performance. These include how to partition the graph, how to replicate data across machines, and how to execute queries efficiently across shards.
In our work, we focus on two especially challenging design decisions:
- Partitioning on the Fly: Deciding where to place each new vertex or edge is critical. A poor partitioning strategy can lead to excessive cross-shard communication during queries, while a good strategy keeps related data close together. Doing this dynamically, as the graph grows, is a difficult but essential problem.
- Replication for Performance and Reliability: Replicating shards improves fault tolerance by ensuring the graph can survive machine failures. Replication can also improve read performance if queries are served from replicas. However, it introduces trade-offs: replicas may lag behind the primary, leading to different consistency guarantees depending on the mode of replication (e.g., strong vs. eventual).
Our system maintains multiple consistent views of the graph over time by adopting a Multi-Version Concurrency Control (MVCC) approach.
- Each vertex and edge stores a version history with associated timestamps that record when an operation (creation, update, or deletion) occurred.
- When a query is issued with a specific timestamp, the system reconstructs the graph state as it existed at that time.
- This mechanism enables TinyGraph to provide temporal consistency (time-travel queries) and concurrent reads without blocking writes.
Our system comprises two primary components:
The QM acts as the stateless middleware between the client and the shards. We allow for multiple QMs to prevent a single bottleneck. Its responsibilities include:
- Partitioning: Deciding which shard will store a newly created vertex based on the chosen partitioning algorithm.
- Query Routing & Execution: Directing client operations to the correct shard and coordinating distributed queries, such as BFS.
- ID Encoding: The QM encodes the shard ID into the vertex_id (e.g., vertex_id = concat[shard_number, uuid]). This design eliminates the need for a separate vertex_id to shard_id mapping service, allowing any QM to immediately locate a vertex's home shard without extra communication.
Shards are the stateful layer where vertices and edges are stored.
- Each Shard is a replicated state machine (RSM), responsible for holding a subset of vertices and their outgoing edges.
- The shards use Raft for replication to ensure high availability and durability.
- Edges, even cross-shard edges, are stored locally on the source vertex's shard and use the vertex_id (which includes the shard ID) to reference the destination vertex.
The following conceptual structures are maintained within each Shard:
// Stores the version history for MVCC
type Vertex struct {
id string
edges map\[string\]\*Edge // Outgoing edges
prop \*VertexProp
ts float64 // Latest version's timestamp
prevs \[\]\*Vertex // Older versions of the vertex
}
// Stores the version history for MVCC
type Edge struct {
id string
from\_id string
to\_id string
prop \*EdgeProp
ts float64
destroyed bool // Marker for deletion
prevs \[\]\*Edge // Older versions of the edge
}
// Shard State Machine: Vertices map
type ShardRSM struct {
mu sync.RWMutex
vertices map\[string\]\*Vertex
}The system supports the following client-facing operations, which include a timestamp argument to enable MVCC-based consistency:
| Operation | Description |
|---|---|
| add_vertex(prop, ts) | Creates a vertex with properties and returns its ID. |
| add_edge(v1, v2, prop, ts) | Adds an edge between vertices v1 and v2. |
| del_edge(v1, v2, ts) | Deletes the edge between v1 and v2. |
| update_vertex(v_id, prop, ts) | Updates the properties of a vertex. |
| update_edge(v1, v2, prop, ts) | Updates the properties of an edge. |
| get_vertex_prop(v_id, ts) | Retrieves the properties of v_id as they existed at ts. |
| get_edge_prop(v1, v2, ts) | Retrieves the edge properties as they existed at ts. |
| bfs(v_start, radius, ts) | Finds all vertices within the specified radius of v_start at time ts. |
Shard RPC Operations: Each Shard supports corresponding RPCs for the above operations, plus key utility calls like GetNeighbors(v_id, ts).
We will evaluate three distinct shard operating modes:
- Primary-Only Read/Write (Strong Consistency): Writes and reads are served only by the primary. This provides strong consistency but may have lower read throughput.
- Primary Write, Replica Read (Eventual Consistency): Writes go to the primary (via Raft log), but reads are served by replicas. This offers higher read throughput at the cost of potential read staleness, as replicas may lag behind the leader.
- Unreplicated (Baseline): A single shard with no replication. Provides strong consistency but no fault tolerance.
We will compare two distinct methods for generating the timestamps essential to the MVCC mechanism:
My approach was to synchronize a lamport digital clock for write operations (create/delete/update), where one coordinator will request/reserve the next timestamp value, use it for its operation, then the value is incremented across all coordinators and can never be re-used again – this ensures we have a total ordering of all writes. This digital clock synchronization across the coordinators can be done in a number of ways – for instance, single-paxos can be used where all coordinators act as proposers/acceptors (note that we expect to not have a terribly large set of coordinators) and increment the value monotonically (but not strictly by values of 1, in case of disagreement between proposers for a phase for liveness). Another approach would be to have a separate clock-server that will provide monotonically increasing and unique timestamps to the coordinators. This ensures a total ordering of *all* events with no risk of drift.
Clients attach a timestamp (e.g., from an NTP server) to their operations.
- Conflict Resolution: Client IDs resolve ties in the rare case of identical timestamps.
- Consistency Check: Each vertex records the timestamp of its last access (
t_last). If a Write operation witht_write arrives after a Read operation witht_read>t_write has been processed (due to network delay), the write is rejected. The client must retry the write with a newer timestamp.
What is Guaranteed?
- Per-operation consistency: Each request (add/update/delete) is applied one-by-one, replicated with Raft, and visible at its own commit timestamp.
- Leader reads: Linearizable (up-to-date) view of a shard.
- Replica reads: Still serve a valid MVCC version ≤ T, though possibly older than the leader’s state.
- MVCC snapshots: Queries at
ts=Treturn the latest version ≤ T for every key in that shard. - Monotonicity per key: Anti-backdating ensures no write can appear in the past relative to a later read on the same object.
What is NOT Guaranteed:
- No transactions: Multi-operation atomicity (e.g., inserting two edges “together”) is not supported; ops become visible independently.
- No global snapshot isolation: A BFS at
ts=Tuses the same timestamp across shards, but shards do not coordinate, so results may mix states from different commit moments. - Replica staleness: When reading from replicas, data may lag behind leader commits.
- Client → QM:
add_vertex(prop) - QM: Uses the partitioning algorithm to select a shard number. Generates
vertex_id = concat_bits[shard_number, uuid]. - QM → Primary Shard: RPC
AddVertex(vertex_id, prop, ts) - Primary Shard: Applies the operation to its Raft log.
- Primary Shard → QM → Client: Returns the new
vertex_id.
- Client → QM:
add_edge(v1, v2, prop) - QM: Uses the encoded shard bits in v1 to find the responsible Primary Shard.
- QM → Primary Shard: RPC
AddEdge(v1, v2, prop, ts) - Primary Shard: Applies the operation to its Raft log.
- Primary Shard → QM → Client: Returns success.
- Client → QM:
get_vertex_prop(v_id)orbfs(v_start, radius) - QM: Finds the target shard(s).
- QM → Shard: RPC
GetVertexProp(v_id, ts)or initiates the distributed BFS algorithm.- Note: Reads can go to the Primary or a Replica depending on the chosen Replication Mode. Since it is a read operation, it does not go through the Raft log.
- Shard(s) → QM → Client: Returns the result.
Baseline BFS Pseudocode:
function DISTRIBUTED_BFS(start_vertex, radius, timestamp):
// 1. Initialization
Visited = {start_vertex}
Frontier = {start_vertex}
Results = {start_vertex}
// 2. Traversal Loop (up to max radius)
for level from 1 to radius:
if Frontier is empty:
break
// Map to group vertices by the shard responsible for them
ShardRequests = MAP(ShardID -> List of Vertices)
// Group all vertices in the current Frontier by their Shard ID
for vertex in Frontier:
shard_id = GET_SHARD_ID(vertex)
ShardRequests[shard_id].append(vertex)
NewNeighbors = SET()
// 3. Batched Concurrent Query
// Send a batch of neighbor-lookup requests to each relevant Shard
// All requests run in parallel (ASYNC)
AllResponses = ASYNC_BATCH_CALL(
for ShardID, VertexList in ShardRequests:
SHARD_RPC:GetNeighbors_Batch(VertexList, timestamp)
)
// 4. Process Responses and Update State
for response in AllResponses:
for neighbor_id in response:
if neighbor_id is not in Visited:
Visited.add(neighbor_id)
Results.add(neighbor_id)
NewNeighbors.add(neighbor_id)
// Set up the next frontier
Frontier = NewNeighbors
return Results
Optimized BFS (# TODO @Oleg)
High level overview – developing this function fully is part of our milestones as this is very complex:
- Start by client calling BFS(rootVertexID, radius, timestamp) to the QueryManager.
- QueryManager will process the request, and reserve a TraversalState object for this request.
- The QueryManager will lookup the shard responsible for rootVertexID, and call the shard’s BFS function, passing in the root vertex, the desired radius, and the timestamp, as well as a UNIQUE request ID and requester address (itself). After sending the request, the QueryManager will add the shard to the TraversalState’s waitingFor field.
- The QueryManager will then wait until the TraversalState’s waitingFor field is completely empty – once this is true, the QueryManager will send all the vertexIDs and VertexProps it has in its TraversalState’s collected field back to the client.
- The Shard, upon receiving a BFS request will check for the requestID and requester address:
- If it already exists on the Shard, the Shard will:
- Start an internal BFS search from the provided rootVertexID, however it will share the Visited variable with any ongoing searches for the same requestID and requester address pair.
- Once that BFS traversal finishes (note, it is possible for it to finish immediately, if the rootVertexID was already visited), it will respond with the collected set of VertexIDs and VertexProps (for this BFS traversal) to the requester’s address (the coordinating QueryManager), as well as its own ShardID.
- If it does not yet exist on the Shard, the Shard will:
- Create a new Visited variable that will be shared by all requestID requester address pairs to this shard
- Perform the same steps as (a).
- If it already exists on the Shard, the Shard will:
- Once the QueryManager receives a response from a shard, it will add it to the set of returned VertexID, VertexProp pairs of the request’s TraversalState, and remove the corresponding ShardID from the TraversalState’s waitingFor field.
This puts the onus of traversing the BFS tree on the shards, by doing the following: if a shard encounters an edge on its traversal whose VertexID is not associated with its ShardID, it will create a new BFS request for the Shard that stores the corresponding VertexID (note, the ShardID is encoded into the VertexID, and Shards are aware of each other), however instead of passing the original radius, it will pass the original radius minus the number of edges it took to get to the edge (since this is BFS, this will always be the minimum), as well as passing the original requestID and requester address pair (that of the QueryManager), instead of its own, so that the results are not processed by this Shard, but rather by the QueryManager associated with the original request. It may do this multiple times, to multiple shards, but the Shard will keep track of all the different Shards it sends requests to (may be multiple times to the same shard!), and include this information when it sends its response to the QueryManager, who, as described above, will remove this Shard from its TraversalState’s waitingFor field, and add the set that that Shard returned to the TraversalState’s waitingFor field. Thus, the QueryManager must wait for all outgoing requests that the Shards have handed off to other Shards to return before it knows to return the final result to the user.
We will evaluate TinyGraph across a range of conditions and design choices to understand their impact on performance, consistency, and scalability.
| Strategy | Consistency | Fault Tolerance | Read Throughput |
|---|---|---|---|
| No Replication (Baseline) | Strong | None | Low/Medium |
| Primary-Only Read/Write | Strong | High | Medium |
| Primary Write, Replica Read | Eventual | High | High |
We will compare:
- Random Partitioning: A simple baseline with minimal computational overhead.
- Heuristic Partitioning: Strategies such as Linear Deterministic Greedy (LDG) or Fennel, which are designed to reduce cross-shard edges and balance shard loads.
We will analyze the latency and throughput of:
- Naive Synchronous BFS: Baseline using sequential RPC calls. (Initial implementation will run on the Query Manager).
- Optimized Synchronous BFS: Improves performance by batching RPCs to reduce network overhead.
- Asynchronous BFS: Explores parallelism by allowing shards to dispatch sub-searches to other shards.
We will use TLA+ Modeling to validate replication correctness and system safety under failures. Our goal is to use TLA+ to confirm that:
- All replicas apply committed operations in the same order (no divergent commits).
- The system preserves a consistent ordering of writes even under leader failures.
We will measure how the system handles larger graphs and higher ingestion/query rates by:
- Varying the number of Query Managers: Evaluate the effect of multiple coordinators on throughput, consistency, and fault tolerance.
- Varying the number of Shards: Study the trade-offs between query performance and partitioning overhead.
That's a more strategic way to view the project! Focusing on the terminal goal of each phase makes the timeline clearer.
Here are the target achievements for each of the four phases, leading up to the final submission on December 21st.
(Ends: October 22nd, 18:00 PST - Milestone 1 Due)
The primary goal of Phase 1 is to establish the core architectural foundation and the time-traveling MVCC mechanism.
| Achievement Goal (Milestone 1) | Details |
|---|---|
| Architectural Blueprint | Finalized RPC interface definitions for the QM and Shard components. |
| MVCC Core Working | A working, single-node prototype of a Shard capable of handling add_vertex and add_edge requests while storing version history using timestamps. |
| Consistent Reads | Successful demonstration of get_prop calls that accurately retrieve the state of a vertex or edge as it existed at a specified past timestamp. |
| Single QM and simple timestamping | We deploy a single QM which can also provide timestamps for operations, therefore no need to worry about timestamps. |
(Ends: November 12th, 18:00 PST - Milestone 2 Due)
The goal of Phase 2 is to achieve full distribution and implement the required consistency and search baselines.
| Achievement Goal (Milestone 2) | Details |
|---|---|
| Distributed & Replicated Storage | The Shards are fully integrated into a Raft cluster, allowing the system to operate reliably in all three proposed replication modes (Strong vs. Eventual Consistency). |
| Full Write Flow | The QM successfully routes all write operations (add/update/del) to the appropriate Shard Primary and through the consensus mechanism. |
| BFS Baseline | A fully functional, distributed, Baseline BFS is running on the Query Manager, using Random Partitioning. |
(Ends: November 26th, 18:00 PST - Milestone 3 Due)
Phase 3 is dedicated to implementing all the advanced features and optimizations required for the evaluation.
| Achievement Goal (Milestone 3) | Details |
|---|---|
| Complete Feature Set | Implementation of optimized BFS algorithms and both partitioning strategies (Random and Heuristic). |
| System Stability | The system is stable and robust enough to handle high-volume, concurrent load necessary for running benchmarks without crashing or deadlocking. |
| Benchmarking Ready | All performance logging and test configurations are set up and validated, allowing the team to immediately begin the formal evaluation phase. |
| Multiple QMs | Allow for multiple QMs, and implement timestamping strategy. |
(Ends: December 21st, 18:00 PST - Final Report Due)
The final phase is focused entirely on validation, analysis, and professional presentation of the results.
| Achievement Goal (Final Report) | Details |
|---|---|
| Comprehensive Evaluation | Completion of all benchmarking runs, including the detailed analysis of the trade-offs between replication, partitioning, and BFS algorithms. |
| Correctness Validation (TLA+) | Successful modeling and verification of the system's critical consistency and ordering properties using TLA+. |
| Final Submission | A polished Final Project Report that clearly documents the design, comprehensively presents the results (with graphs and data), and concludes with key findings and contributions. |
Our system design was inspired by SystemG, which shares the architectural pattern of separating Query Managers (coordinators) from Shards (storage/state).
Key Difference from SystemG: The primary distinction in our approach is that TinyGraph's Query Manager is responsible for coordinating the distributed BFS execution, whereas our initial plan involved having the Shards manage the query execution and result collection. This centralization in the QM simplifies shard implementation and adheres to a more typical client-server-storage pattern.