Skip to content

sgns::crdt::CrdtDatastore

Forward declaration of CRDT Set class. More...

#include <crdt_datastore.hpp>

Inherits from std::enable_shared_from_this< CrdtDatastore >

Protected Classes

Name
struct RootCIDJob
struct DagWorker

Public Types

Name
enum class JobStatus
enum class Error
using base::Buffer Buffer
using base::Logger Logger
using storage::rocksdb RocksDB
using RocksDB::QueryResult QueryResult
using pb::Delta Delta
using pb::Element Element
using ipfs_lite::ipld::IPLDNode IPLDNode
using CRDTDataFilter::ElementFilterCallback CRDTElementFilterCallback
using CRDTCallbackManager::NewDataCallback CRDTNewElementCallback
using CRDTCallbackManager::DeletedDataCallback CRDTDeletedElementCallback

Public Functions

Name
std::shared_ptr< CrdtDatastore > New(std::shared_ptr< RocksDB > aDatastore, const HierarchicalKey & aKey, std::shared_ptr< DAGSyncer > aDagSyncer, std::shared_ptr< Broadcaster > aBroadcaster, std::shared_ptr< CrdtOptions > aOptions)
Factory method to create a shared_ptr to a CrdtDatastore.
std::shared_ptr< Delta > DeltaMerge(const std::shared_ptr< Delta > & aDelta1, const std::shared_ptr< Delta > & aDelta2)
std::string GetValueSuffix()
outcome::result< std::vector< CID > > DecodeBroadcast(const Buffer & buff)
outcome::result< std::shared_ptr< Delta > > CreateDeltaToAdd(const std::string & key, const std::string & value)
void Start()
Starts the datastore threads.
void StartCIDProcessing()
void StartRebroadcastHeads()
virtual ~CrdtDatastore()
Destructor of the CRDT datastore.
outcome::result< Buffer > GetKey(const HierarchicalKey & aKey) const
outcome::result< QueryResult > QueryKeyValues(std::string_view aPrefix) const
outcome::result< QueryResult > QueryKeyValues(const std::string & prefix_base, const std::string & middle_part, const std::string & remainder_prefix) const
Queries with a middle part that can be a wildcard, negated string or normal string.
std::string GetKeysPrefix() const
outcome::result< CID > PutKey(const HierarchicalKey & aKey, const Buffer & aValue, const std::unordered_set< std::string > & topics)
Stores the given value in the CRDT store.
outcome::result< bool > HasKey(const HierarchicalKey & aKey) const
outcome::result< CID > DeleteKey(const HierarchicalKey & aKey, const std::unordered_set< std::string > & topics)
outcome::result< CID > Publish(const std::shared_ptr< Delta > & aDelta, const std::unordered_set< std::string > & topics)
Publishes a Delta. Creates a DAG node from the given Delta, merges it into the CRDT, and broadcasts the node.
outcome::result< void > PrintDAG()
outcome::result< std::shared_ptr< Delta > > CreateDeltaToRemove(const std::string & key) const
void PrintDataStore()
void Close()
void CancelAndCloseNow()
Immediately cancels CRDT work and closes all worker threads. Safe to call multiple times.
bool RegisterElementFilter(const std::string & pattern, CRDTElementFilterCallback filter)
bool RegisterNewElementCallback(const std::string & pattern, CRDTNewElementCallback callback)
bool RegisterDeletedElementCallback(const std::string & pattern, CRDTDeletedElementCallback callback)
void UnregisterElementFilter(const std::string & pattern)
void UnregisterNewElementCallback(const std::string & pattern)
void UnregisterDeletedElementCallback(const std::string & pattern)
std::shared_ptr< CRDTWorkJournal > GetWorkJournal() const
void AddTopicName(const std::string & topic)
Configure which topic this datastore should filter on.
outcome::result< CrdtHeads::CRDTListResult > GetHeadList()
outcome::result< void > RemoveHead(const CID & aCid, const std::string & topic)
outcome::result< uint64_t > GetHeadHeight(const CID & aCid, const std::string & topic)
outcome::result< void > AddHead(const CID & aCid, const std::string & topic, uint64_t priority)
outcome::result< JobStatus > GetJobStatus(const CID & cid)
outcome::result< void > BroadcastHeadsForTopics(const std::set< std::string > & topics)
Broadcast heads for the specified topics.
bool IsBroadcastEnabled() const
Query whether outgoing head broadcasts are enabled.
std::unordered_set< std::string > GetTopicNames() const
outcome::result< std::vector< std::pair< std::string, base::Buffer > > > GetILPDNodeContent(const std::string & cid_string)

Protected Functions

Name
void HandleCIDBroadcast()
Handles when a CID broadcast gets received If the CID is not known triggers HandleRootCIDBlock.
outcome::result< void > HandleRootCIDBlock(const CID & aCid)
Handles a root CID block by creating a job to fetch and process its content.
outcome::result< RootCIDJob > CreateRootJob(const CID & aRootCID)
Creates a RootCIDJob for the given root CID.
outcome::result< std::set< CID > > GetLinksToFetch(const RootCIDJob & job)
Gets the links to fetch for a given node in a job.
outcome::result< void > FetchNodes(const RootCIDJob & aRootJob, const std::set< CID > & aLinks)
Fetches the nodes for the given links and root job.
outcome::result< Delta > GetDeltaFromNode(const IPLDNode & aNode, bool created_by_self)
Gets the Delta from a given IPLD node, filtering it if it wasn't created by self.
outcome::result< void > MergeDataFromDelta(const CID & node_cid, const Delta & aDelta)
Merges the data from a given Delta into the CRDT set.
outcome::result< void > ProcessJobIteration(const RootCIDJob & job_to_process)
Processes A Root CID job.
outcome::result< void > Sync(const HierarchicalKey & aKey)
outcome::result< void > PrintDAGRec(const CID & aCID, uint64_t aDepth, std::vector< CID > & aSet)
void RebroadcastHeads()
outcome::result< void > Broadcast(const std::set< CID > & cids, const std::string & topic, boost::optional< libp2p::peer::PeerInfo > peerInfo =boost::none)
Broadcasts a set of CIDs. Encodes and broadcasts the provided list of CIDs.
outcome::result< Buffer > EncodeBroadcast(const std::set< CID > & heads)
outcome::result< std::shared_ptr< IPLDNode > > CreateIPLDNode(const std::vector< std::pair< CID, std::string > > & aHeads, const std::shared_ptr< Delta > & aDelta, const std::unordered_set< std::string > & topics) const
outcome::result< std::shared_ptr< IPLDNode > > CreateDAGNode(const std::shared_ptr< Delta > & aDelta, const std::unordered_set< std::string > & topics)
outcome::result< CID > AddDAGNode(const std::shared_ptr< CrdtDatastore::IPLDNode > & node)
outcome::result< void > SyncDatastore(const std::vector< HierarchicalKey > & aKeyList)
void PutElementsCallback(const std::string & key, const Buffer & value, const std::string & cid)
void DeleteElementsCallback(const std::string & key, const std::string & cid)
void UpdateCRDTHeads(const CID & rootCID, uint64_t rootPriority, bool add_topics_to_broadcast)
bool EnqueueRootCID(const CID & cid)
outcome::result< CID > WaitForJob(const CID & cid)
outcome::result< Buffer > EncodeBroadcastStatic(const std::set< CID > & heads)

Friends

Name
class PubSubBroadcasterExt

Detailed Description

class sgns::crdt::CrdtDatastore;

Forward declaration of CRDT Set class.

CRDT datastore class based on https://github.com/ipfs/go-ds-crdt

Public Types Documentation

enum JobStatus

Enumerator Value Description
PENDING
COMPLETED
FAILED

enum Error

Enumerator Value Description
INVALID_PARAM 0
FETCH_ROOT_NODE
NODE_DESERIALIZATION
FETCHING_GRAPH
NODE_CREATION
GET_NODE
INVALID_JOB

using Buffer

using sgns::crdt::CrdtDatastore::Buffer = base::Buffer;

using Logger

using sgns::crdt::CrdtDatastore::Logger = base::Logger;

using RocksDB

using sgns::crdt::CrdtDatastore::RocksDB = storage::rocksdb;

using QueryResult

using sgns::crdt::CrdtDatastore::QueryResult = RocksDB::QueryResult;

using Delta

using sgns::crdt::CrdtDatastore::Delta = pb::Delta;

using Element

using sgns::crdt::CrdtDatastore::Element = pb::Element;

using IPLDNode

using sgns::crdt::CrdtDatastore::IPLDNode = ipfs_lite::ipld::IPLDNode;

using CRDTElementFilterCallback

using sgns::crdt::CrdtDatastore::CRDTElementFilterCallback = CRDTDataFilter::ElementFilterCallback;

using CRDTNewElementCallback

using sgns::crdt::CrdtDatastore::CRDTNewElementCallback = CRDTCallbackManager::NewDataCallback;

using CRDTDeletedElementCallback

using sgns::crdt::CrdtDatastore::CRDTDeletedElementCallback = CRDTCallbackManager::DeletedDataCallback;

Public Functions Documentation

function New

static std::shared_ptr< CrdtDatastore > New(
    std::shared_ptr< RocksDB > aDatastore,
    const HierarchicalKey & aKey,
    std::shared_ptr< DAGSyncer > aDagSyncer,
    std::shared_ptr< Broadcaster > aBroadcaster,
    std::shared_ptr< CrdtOptions > aOptions
)

Factory method to create a shared_ptr to a CrdtDatastore.

Parameters:

  • aDatastore The underlying database where CRDT is stored
  • aKey The namespace key on the database where CRDT's variables will be stored
  • aDagSyncer The MerkleDAG syncer to request content of CIDs
  • aBroadcaster The broadcaster to publish CIDs
  • aOptions Options to construct the object

Return: A new instance of CrdtDatastore

function DeltaMerge

static std::shared_ptr< Delta > DeltaMerge(
    const std::shared_ptr< Delta > & aDelta1,
    const std::shared_ptr< Delta > & aDelta2
)

Parameters:

Return: pointer to merged delta

Static function to merge delta elements and tombstones, use highest priority for the result delta

function GetValueSuffix

static std::string GetValueSuffix()

Return: value suffix

Get value suffix used in set, e.g. /v

function DecodeBroadcast

static outcome::result< std::vector< CID > > DecodeBroadcast(
    const Buffer & buff
)

Parameters:

Return: vector of CIDs or outcome::failure on error

DecodeBroadcast decodes CRDT broadcast data

function CreateDeltaToAdd

static outcome::result< std::shared_ptr< Delta > > CreateDeltaToAdd(
    const std::string & key,
    const std::string & value
)

Parameters:

  • key - delta key to add to datastore
  • value - delta value to add to datastore

Return: pointer to new delta or outcome::failure on error

Returns a new delta-set adding the given key/value.

function Start

void Start()

Starts the datastore threads.

function StartCIDProcessing

void StartCIDProcessing()

function StartRebroadcastHeads

void StartRebroadcastHeads()

function ~CrdtDatastore

virtual ~CrdtDatastore()

Destructor of the CRDT datastore.

function GetKey

outcome::result< Buffer > GetKey(
    const HierarchicalKey & aKey
) const

Parameters:

  • aKey Hierarchical key to get

Return: value as a Buffer

Get the value of an element not tombstoned from the CRDT set by key

function QueryKeyValues

outcome::result< QueryResult > QueryKeyValues(
    std::string_view aPrefix
) const

Parameters:

  • aPrefix prefix to search, if empty string, return all

Return: list of key-value pairs matches prefix

Query CRDT set key-value pairs by prefix, if prefix empty return all elements are not tombstoned

function QueryKeyValues

outcome::result< QueryResult > QueryKeyValues(
    const std::string & prefix_base,
    const std::string & middle_part,
    const std::string & remainder_prefix
) const

Queries with a middle part that can be a wildcard, negated string or normal string.

Parameters:

  • prefix_base The base prefix to query
  • middle_part Either a string (normal query), '*' or !string
  • remainder_prefix The remainder part of the query prefix

Return: A list of key value pairs

function GetKeysPrefix

std::string GetKeysPrefix() const

Return: key prefix

Get key prefix used in set, e.g. /namespace/s/k/

function PutKey

outcome::result< CID > PutKey(
    const HierarchicalKey & aKey,
    const Buffer & aValue,
    const std::unordered_set< std::string > & topics
)

Stores the given value in the CRDT store.

Parameters:

  • aKey Hierarchical key to put
  • aValue Value to be stored
  • topics Topics to publish to

Return: outcome::success if stored and broadcasted successfully, or outcome::failure otherwise.

function HasKey

outcome::result< bool > HasKey(
    const HierarchicalKey & aKey
) const

Parameters:

Return: true if key found or false if not found or outcome::failure on error

HasKey returns whether the key is mapped to a value in set

function DeleteKey

outcome::result< CID > DeleteKey(
    const HierarchicalKey & aKey,
    const std::unordered_set< std::string > & topics
)

Parameters:

Return: outcome::failure on error or success otherwise

Delete removes the value for given key.

function Publish

outcome::result< CID > Publish(
    const std::shared_ptr< Delta > & aDelta,
    const std::unordered_set< std::string > & topics
)

Publishes a Delta. Creates a DAG node from the given Delta, merges it into the CRDT, and broadcasts the node.

Parameters:

  • aDelta Delta to publish
  • topics Topics to publish to

Return: returns outcome::success on success or outcome::failure otherwise

function PrintDAG

outcome::result< void > PrintDAG()

Return: returns outcome::success on success or outcome::failure otherwise

PrintDAG pretty prints the current Merkle-DAG using the given printFunc

function CreateDeltaToRemove

outcome::result< std::shared_ptr< Delta > > CreateDeltaToRemove(
    const std::string & key
) const

Parameters:

  • key - delta key to remove from datastore

Return: pointer to delta or outcome::failure on error

Returns a new delta-set removing the given keys with prefix /namespace/s/key

function PrintDataStore

void PrintDataStore()

function Close

void Close()

Close shuts down the CRDT datastore and worker threads. It should not be used afterwards.

function CancelAndCloseNow

void CancelAndCloseNow()

Immediately cancels CRDT work and closes all worker threads. Safe to call multiple times.

function RegisterElementFilter

bool RegisterElementFilter(
    const std::string & pattern,
    CRDTElementFilterCallback filter
)

function RegisterNewElementCallback

bool RegisterNewElementCallback(
    const std::string & pattern,
    CRDTNewElementCallback callback
)

function RegisterDeletedElementCallback

bool RegisterDeletedElementCallback(
    const std::string & pattern,
    CRDTDeletedElementCallback callback
)

function UnregisterElementFilter

void UnregisterElementFilter(
    const std::string & pattern
)

function UnregisterNewElementCallback

void UnregisterNewElementCallback(
    const std::string & pattern
)

function UnregisterDeletedElementCallback

void UnregisterDeletedElementCallback(
    const std::string & pattern
)

function GetWorkJournal

std::shared_ptr< CRDTWorkJournal > GetWorkJournal() const

function AddTopicName

void AddTopicName(
    const std::string & topic
)

Configure which topic this datastore should filter on.

Parameters:

  • topic The topic name to use when filtering links. Only links whose IPLDLinkImpl::getName() equals this string will be processed.

When processing or rebroadcasting Merkle-DAG links, only those whose name exactly matches the topic set via this call will be considered.

function GetHeadList

outcome::result< CrdtHeads::CRDTListResult > GetHeadList()

function RemoveHead

outcome::result< void > RemoveHead(
    const CID & aCid,
    const std::string & topic
)

function GetHeadHeight

outcome::result< uint64_t > GetHeadHeight(
    const CID & aCid,
    const std::string & topic
)

function AddHead

outcome::result< void > AddHead(
    const CID & aCid,
    const std::string & topic,
    uint64_t priority
)

function GetJobStatus

outcome::result< JobStatus > GetJobStatus(
    const CID & cid
)

function BroadcastHeadsForTopics

outcome::result< void > BroadcastHeadsForTopics(
    const std::set< std::string > & topics
)

Broadcast heads for the specified topics.

Parameters:

  • topics Vector of topic names to broadcast heads for

Return: outcome::success on success, or outcome::failure on error

function IsBroadcastEnabled

bool IsBroadcastEnabled() const

Query whether outgoing head broadcasts are enabled.

Return: true when broadcasts are enabled.

function GetTopicNames

std::unordered_set< std::string > GetTopicNames() const

function GetILPDNodeContent

outcome::result< std::vector< std::pair< std::string, base::Buffer > > > GetILPDNodeContent(
    const std::string & cid_string
)

Protected Functions Documentation

function HandleCIDBroadcast

void HandleCIDBroadcast()

Handles when a CID broadcast gets received If the CID is not known triggers HandleRootCIDBlock.

function HandleRootCIDBlock

outcome::result< void > HandleRootCIDBlock(
    const CID & aCid
)

Handles a root CID block by creating a job to fetch and process its content.

Parameters:

  • aCid The root CID to be handled

Return: Success if the Root Job was created, or failure otherwise

function CreateRootJob

outcome::result< RootCIDJob > CreateRootJob(
    const CID & aRootCID
)

Creates a RootCIDJob for the given root CID.

Parameters:

  • aRootCID The root CID to create the job for

Return: Success if Root Job created, or failure otherwise

function GetLinksToFetch

outcome::result< std::set< CID > > GetLinksToFetch(
    const RootCIDJob & job
)

Gets the links to fetch for a given node in a job.

Parameters:

  • job The root job of the current links to fetch

Return: List of CIDs to fetch, or failure otherwise

function FetchNodes

outcome::result< void > FetchNodes(
    const RootCIDJob & aRootJob,
    const std::set< CID > & aLinks
)

Fetches the nodes for the given links and root job.

Parameters:

  • aRootJob The root job of the current links to fetch
  • aLinks The links to fetch

Return: Success if the nodes were fetched, or failure otherwise

function GetDeltaFromNode

outcome::result< Delta > GetDeltaFromNode(
    const IPLDNode & aNode,
    bool created_by_self
)

Gets the Delta from a given IPLD node, filtering it if it wasn't created by self.

Parameters:

  • aNode The IPLD node to get the Delta from
  • created_by_self True if the node was created by self, false otherwise

Return: The Delta contained in the node, or failure otherwise

function MergeDataFromDelta

outcome::result< void > MergeDataFromDelta(
    const CID & node_cid,
    const Delta & aDelta
)

Merges the data from a given Delta into the CRDT set.

Parameters:

  • node_cid The CID of the node from which the Delta was obtained
  • aDelta The Delta to be merged

Return: Success if the Delta was merged, or failure otherwise

function ProcessJobIteration

outcome::result< void > ProcessJobIteration(
    const RootCIDJob & job_to_process
)

Processes A Root CID job.

Parameters:

Return: Success if the job was processed, or failure otherwise

function Sync

outcome::result< void > Sync(
    const HierarchicalKey & aKey
)

Return: returns outcome::success on success or outcome::failure otherwise

Sync ensures that all the data under the given prefix is flushed to disk in the underlying datastore

function PrintDAGRec

outcome::result< void > PrintDAGRec(
    const CID & aCID,
    uint64_t aDepth,
    std::vector< CID > & aSet
)

Parameters:

  • aCID CID of DAG record
  • aDepth depth used for indenting printed records
  • aSet set of CIDs to print

Return: returns outcome::success on success or outcome::failure otherwise

Helper funtion to print Merkle-DAG records

function RebroadcastHeads

void RebroadcastHeads()

Regularly send out a list of heads that we have not recently seen

function Broadcast

outcome::result< void > Broadcast(
    const std::set< CID > & cids,
    const std::string & topic,
    boost::optional< libp2p::peer::PeerInfo > peerInfo =boost::none
)

Broadcasts a set of CIDs. Encodes and broadcasts the provided list of CIDs.

Parameters:

  • cids The list of CIDs to broadcast.
  • topic The topic to broadcast to.
  • peerInfo Optional peer info to avoid repeated GetPeerInfo calls.

Return: outcome::success on success, or outcome::failure if an error occurs.

function EncodeBroadcast

outcome::result< Buffer > EncodeBroadcast(
    const std::set< CID > & heads
)

Parameters:

  • heads list of CIDs

Return: data encoded into Buffer data or outcome::failure on error

EncodeBroadcast encodes list of CIDs to CRDT broadcast data

function CreateIPLDNode

outcome::result< std::shared_ptr< IPLDNode > > CreateIPLDNode(
    const std::vector< std::pair< CID, std::string > > & aHeads,
    const std::shared_ptr< Delta > & aDelta,
    const std::unordered_set< std::string > & topics
) const

Parameters:

  • aHeads list of CIDs to add to node as IPLD links
  • aDelta Delta to serialize into IPLD node
  • topics Topics to add as links

Return: IPLD node or outcome::failure on error

CreateIPLDNode add block node to DAGSyncer

function CreateDAGNode

outcome::result< std::shared_ptr< IPLDNode > > CreateDAGNode(
    const std::shared_ptr< Delta > & aDelta,
    const std::unordered_set< std::string > & topics
)

function AddDAGNode

outcome::result< CID > AddDAGNode(
    const std::shared_ptr< CrdtDatastore::IPLDNode > & node
)

Parameters:

  • node Node to add and process

Return: CID or outcome::failure on error

AddDAGNode adds node to DAGSyncer and processes new blocks.

function SyncDatastore

outcome::result< void > SyncDatastore(
    const std::vector< HierarchicalKey > & aKeyList
)

Parameters:

  • aKeyList all heads and the set entries related to the given prefix

Return: returns outcome::success on success or outcome::failure otherwise

SyncDatastore sync heads and set datastore

function PutElementsCallback

void PutElementsCallback(
    const std::string & key,
    const Buffer & value,
    const std::string & cid
)

function DeleteElementsCallback

void DeleteElementsCallback(
    const std::string & key,
    const std::string & cid
)

function UpdateCRDTHeads

void UpdateCRDTHeads(
    const CID & rootCID,
    uint64_t rootPriority,
    bool add_topics_to_broadcast
)

function EnqueueRootCID

bool EnqueueRootCID(
    const CID & cid
)

function WaitForJob

outcome::result< CID > WaitForJob(
    const CID & cid
)

function EncodeBroadcastStatic

static outcome::result< Buffer > EncodeBroadcastStatic(
    const std::set< CID > & heads
)

Parameters:

  • heads list of CIDs

Return: data encoded into Buffer data or outcome::failure on error

EncodeBroadcastStatic encodes list of CIDs to CRDT broadcast data

Friends

friend PubSubBroadcasterExt

friend class PubSubBroadcasterExt(
    PubSubBroadcasterExt 
);

Updated on 2026-06-28 at 18:54:57 -0700