Skip to content

account/GeniusNode.cpp

More...

Namespaces

Name
sgns

Functions

Name
OUTCOME_CPP_DEFINE_CATEGORY_3(sgns , GeniusNode::Error , e )
base::Logger GeniusNodeLogger()
std::string generate_uuid_with_ipfs_id(const std::string & ipfs_id)

Detailed Description

Date: 2024-04-18 Henrique A. Klein (hklein@gnus.ai)

Functions Documentation

function OUTCOME_CPP_DEFINE_CATEGORY_3

OUTCOME_CPP_DEFINE_CATEGORY_3(
    sgns ,
    GeniusNode::Error ,
    e 
)

function GeniusNodeLogger

base::Logger GeniusNodeLogger()

function generate_uuid_with_ipfs_id

std::string generate_uuid_with_ipfs_id(
    const std::string & ipfs_id
)

Source code

#include <chrono>
#include <stdexcept>
#include <thread>
#include <memory>
#include <exception>
#include <random>
#include <filesystem>
#include <set>

#include <boost/format.hpp>
#include <boost/multiprecision/cpp_int.hpp>
#include <boost/uuid/uuid.hpp>
#include <fstream>
#include <sstream>
#include <rapidjson/document.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <boost/dll.hpp>
#include <boost/json.hpp>
#include <openssl/ssl.h>
#include <openssl/err.h>
#include <ipfs_lite/ipfs/graphsync/impl/network/network.hpp>
#include <ipfs_lite/ipfs/graphsync/impl/local_requests.hpp>
#include <libp2p/basic/scheduler/asio_scheduler_backend.hpp>
#include <WalletCore/HDWallet.h>
#include <WalletCore/Coin.h>

#include "account/GeniusAccount.hpp"
#include "base/sgns_version.hpp"
#include "account/TokenAmount.hpp"
#include "account/GeniusNode.hpp"
#include "account/ChainRpcEndpointProvider.hpp"
#include "account/MigrationManager.hpp"
#include "crdt/globaldb/keypair_file_storage.hpp"
#include "upnp.hpp"
#include "processing/processing_tasksplit.hpp"
#include <eth/abi_decoder.hpp>
#include <base/parse_utility.hpp>
#include <eth/rpc_http_transport.hpp>
#include <eth/json_rpc.hpp>
#include <eth/event_filter.hpp>
#include <eth/eth_watch_cli.hpp>     // event_registry().params_for() — Bridge V2 ABI decode (D-13)
#include <eth/secp256k1_utility.hpp> // DecompressXOnlyPubkey() — X-only key decompression (D-13)
#include "processing/processing_subtask_enqueuer_impl.hpp"
#include "processing/impl/TaskQueueImpl.hpp"
#include "outcome/outcome.hpp"
#include "blockchain/ValidatorRegistry.hpp"
#include <Generators.hpp>

namespace
{
    uint16_t GenerateRandomPort( uint16_t base, const std::string &seed )
    {
        uint32_t seed_hash = 0;
        for ( char c : seed )
        {
            seed_hash = seed_hash * 31 + static_cast<uint8_t>( c );
        }

        std::mt19937                            rng( seed_hash );
        std::uniform_int_distribution<uint16_t> dist( 0, 300 );

        return base + dist( rng );
    }

    const char *NodeStateToString( sgns::GeniusNode::NodeState state )
    {
        using State = sgns::GeniusNode::NodeState;
        switch ( state )
        {
            case State::CREATING:
                return "CREATING";
            case State::MIGRATING_DATABASE:
                return "MIGRATING_DATABASE";
            case State::INITIALIZING_DATABASE:
                return "INITIALIZING_DATABASE";
            case State::INITIALIZING_PROCESSING:
                return "INITIALIZING_PROCESSING";
            case State::INITIALIZING_BLOCKCHAIN:
                return "INITIALIZING_BLOCKCHAIN";
            case State::INITIALIZING_TRANSACTIONS:
                return "INITIALIZING_TRANSACTIONS";
            case State::READY:
                return "READY";
        }
        return "UNKNOWN";
    }
}

OUTCOME_CPP_DEFINE_CATEGORY_3( sgns, GeniusNode::Error, e )
{
    switch ( e )
    {
        case sgns::GeniusNode::Error::INSUFFICIENT_FUNDS:
            return "Insufficient funds for the transaction";
        case sgns::GeniusNode::Error::DATABASE_WRITE_ERROR:
            return "Error writing data into the database";
        case sgns::GeniusNode::Error::INVALID_TRANSACTION_HASH:
            return "Input transaction hash is invalid";
        case sgns::GeniusNode::Error::INVALID_CHAIN_ID:
            return "Chain ID is invalid";
        case sgns::GeniusNode::Error::INVALID_TOKEN_ID:
            return "Token ID is invalid";
        case sgns::GeniusNode::Error::TOKEN_ID_MISMATCH:
            return "Informed Token ID doesn't match initialized ID";
        case sgns::GeniusNode::Error::PROCESS_COST_ERROR:
            return "The calculated Processing cost was negative";
        case sgns::GeniusNode::Error::PROCESS_INFO_MISSING:
            return "Processing information missing on JSON file";
        case sgns::GeniusNode::Error::INVALID_JSON:
            return "Json cannot be parsed";
        case sgns::GeniusNode::Error::INVALID_BLOCK_PARAMETERS:
            return "Json missing block params";
        case sgns::GeniusNode::Error::NO_PROCESSOR:
            return "Json missing processor";
        case sgns::GeniusNode::Error::NO_PRICE:
            return "Could not get a price";
        case sgns::GeniusNode::Error::TRANSACTIONS_NOT_READY:
            return "Transaction manager is not ready";
        case sgns::GeniusNode::Error::TRANSACTION_NOT_FINALIZED:
            return "Requested transaction not finalized within timeout";
        case sgns::GeniusNode::Error::TRANSACTION_FAILED:
            return "Requested transaction failed";
    }
    return "Unknown error";
}

using namespace boost::multiprecision;

namespace sgns
{
    base::Logger GeniusNodeLogger()
    {
        return base::createLogger( "GeniusNode" );
    }

    std::shared_ptr<GeniusNode> GeniusNode::New( const DevConfig_st &dev_config,
                                                 bool                autodht,
                                                 bool                isprocessor,
                                                 uint16_t            base_port,
                                                 bool                is_full_node )
    {
        auto account = GeniusAccount::New( dev_config.TokenID, dev_config.BaseWritePath, is_full_node );
        if ( account == nullptr )
        {
            return nullptr;
        }
        auto instance = std::shared_ptr<GeniusNode>(
            new GeniusNode( dev_config, std::move( account ), autodht, isprocessor, base_port, is_full_node ) );

        if ( instance )
        {
            instance->BeginDBInitialization();
        }

        return instance;
    }

    std::shared_ptr<GeniusNode> GeniusNode::New( const DevConfig_st &dev_config,
                                                 const char         *eth_private_key,
                                                 bool                autodht,
                                                 bool                isprocessor,
                                                 uint16_t            base_port,
                                                 bool                is_full_node )
    {
        auto account = GeniusAccount::New( dev_config.TokenID,
                                           eth_private_key,
                                           dev_config.BaseWritePath,
                                           is_full_node );
        if ( account == nullptr )
        {
            return nullptr;
        }

        auto instance = std::shared_ptr<GeniusNode>(
            new GeniusNode( dev_config, std::move( account ), autodht, isprocessor, base_port, is_full_node ) );

        if ( instance )
        {
            instance->BeginDBInitialization();
        }

        return instance;
    }

    std::shared_ptr<GeniusNode> GeniusNode::NewFromMnemonic( const DevConfig_st &dev_config,
                                                             const std::string  &mnemonic,
                                                             bool                autodht,
                                                             bool                isprocessor,
                                                             uint16_t            base_port,
                                                             bool                is_full_node )
    {
        try
        {
            auto account = GeniusAccount::NewFromMnemonic( dev_config.TokenID,
                                                           mnemonic,
                                                           dev_config.BaseWritePath,
                                                           is_full_node );

            if ( account == nullptr )
            {
                return nullptr;
            }

            auto instance = std::shared_ptr<GeniusNode>(
                new GeniusNode( dev_config, std::move( account ), autodht, isprocessor, base_port, is_full_node ) );

            if ( instance )
            {
                instance->BeginDBInitialization();
            }

            return instance;
        }
        catch ( const std::invalid_argument &err )
        {
            std::cerr << "Failed to generate address from mnemonic: " << err.what() << '\n';
        }
        return nullptr;
    }

    GeniusNode::GeniusNode( const DevConfig_st            &dev_config,
                            std::shared_ptr<GeniusAccount> account,
                            bool                           autodht,
                            bool                           isprocessor,
                            uint16_t                       base_port,
                            bool                           is_full_node ) :
        write_base_path_( dev_config.BaseWritePath ),
        account_( std::move( account ) ),
        io_( std::make_shared<boost::asio::io_context>() ),
        io_work_guard_( boost::asio::make_work_guard( *io_ ) ),
        autodht_( autodht ),
        isprocessor_( isprocessor ),
        is_full_node_( is_full_node ),
        dev_config_( dev_config ),
        processing_channel_topic_( std::string( PROCESSING_CHANNEL ) ),
        processing_grid_chanel_topic_( std::string( PROCESSING_GRID_CHANNEL ) ),
        m_lastApiCall( std::chrono::system_clock::now() - MIN_API_CALL_INTERVAL ),
        scheduler_( std::make_shared<libp2p::basic::SchedulerImpl>(
            std::make_shared<libp2p::basic::AsioSchedulerBackend>( io_ ),
            libp2p::basic::Scheduler::Config{ std::chrono::milliseconds( 100 ) } ) ),
        generator_( std::make_shared<ipfs_lite::ipfs::graphsync::RequestIdGenerator>() ),
        processing_callback_pool_( std::make_unique<boost::asio::thread_pool>( 1 ) )
    {
        // Rotate log files before initializing logging system
        RotateLogFiles( write_base_path_ );
        InitOpenSSL();

        if ( !InitLoggers( write_base_path_ ) )
        {
            throw std::runtime_error( "Could not configure loggers" );
        }

        node_logger_->info( sgns::version::SuperGeniusVersionText() );

        if ( !InitNetwork( base_port, is_full_node_ ) )
        {
            throw std::runtime_error( "Network initialization error" );
        }
        node_logger_->debug( "Account Address {}", account_->GetAddress() );

        // Initializes the thread pool for IO context
        io_threads_.reserve( io_thread_count_ );
        for ( unsigned i = 0; i < io_thread_count_; ++i )
        {
            io_threads_.emplace_back( [ctx = io_] { ctx->run(); } );
        }

        LoadCrdtConfig();
    }

    void GeniusNode::LoadCrdtConfig()
    {
        crdt_backup_config_ = crdt::GlobalDB::BackupOptions{ true, 15, 12, true };

        const std::string config_path = write_base_path_ + "/crdt_config.json";
        std::ifstream     config_file( config_path );
        if ( !config_file.good() )
        {
            node_logger_->info( "crdt_config.json not found at {}, using defaults", config_path );
            return;
        }

        std::stringstream buffer;
        buffer << config_file.rdbuf();

        rapidjson::Document config_json;
        config_json.Parse( buffer.str().c_str() );
        if ( config_json.HasParseError() || !config_json.IsObject() )
        {
            node_logger_->warn( "Invalid crdt_config.json at {}, using defaults", config_path );
            return;
        }

        if ( config_json.HasMember( "backup_enabled" ) && config_json["backup_enabled"].IsBool() )
        {
            crdt_backup_config_.enabled = config_json["backup_enabled"].GetBool();
        }
        if ( config_json.HasMember( "backup_interval_minutes" ) && config_json["backup_interval_minutes"].IsUint() )
        {
            crdt_backup_config_.interval_minutes = config_json["backup_interval_minutes"].GetUint();
        }
        if ( config_json.HasMember( "backup_keep_count" ) && config_json["backup_keep_count"].IsUint() )
        {
            crdt_backup_config_.keep_count = config_json["backup_keep_count"].GetUint();
        }
        if ( config_json.HasMember( "backup_auto_restore_on_repair_failure" ) &&
             config_json["backup_auto_restore_on_repair_failure"].IsBool() )
        {
            crdt_backup_config_.auto_restore_on_repair_failure = config_json["backup_auto_restore_on_repair_failure"]
                                                                     .GetBool();
        }

        if ( crdt_backup_config_.interval_minutes == 0 )
        {
            crdt_backup_config_.interval_minutes = 15;
        }
        if ( crdt_backup_config_.keep_count == 0 )
        {
            crdt_backup_config_.keep_count = 12;
        }

        node_logger_->info(
            "CRDT backup config loaded: enabled={}, interval_minutes={}, keep_count={}, auto_restore={}",
            crdt_backup_config_.enabled,
            crdt_backup_config_.interval_minutes,
            crdt_backup_config_.keep_count,
            crdt_backup_config_.auto_restore_on_repair_failure );
    }

    void GeniusNode::LoadLogConfig()
    {
        const std::string config_path = write_base_path_ + "/log_config.json";
        std::ifstream     config_file( config_path );
        if ( !config_file.good() )
        {
            return;
        }

        std::stringstream buffer;
        buffer << config_file.rdbuf();

        rapidjson::Document config_json;
        config_json.Parse( buffer.str().c_str() );
        if ( config_json.HasParseError() || !config_json.IsObject() )
        {
            node_logger_->warn( "Invalid log_config.json at {}, using defaults", config_path );
            return;
        }

        if ( !config_json.HasMember( "loggers" ) || !config_json["loggers"].IsObject() )
        {
            node_logger_->warn( "log_config.json missing 'loggers' object at {}", config_path );
            return;
        }

        for ( auto it = config_json["loggers"].MemberBegin(); it != config_json["loggers"].MemberEnd(); ++it )
        {
            if ( !it->value.IsString() )
            {
                node_logger_->warn( "log_config.json: logger '{}' value is not a string, skipping",
                                    it->name.GetString() );
                continue;
            }

            std::string logger_name  = it->name.GetString();
            std::string level_string = it->value.GetString();

            auto logger = spdlog::get( logger_name );
            if ( !logger )
            {
                node_logger_->warn( "log_config.json: logger '{}' not found, skipping", logger_name );
                continue;
            }

            auto level = spdlog::level::from_str( level_string );
            logger->set_level( level );
            if ( level != spdlog::level::off )
            {
                logger->flush_on( level );
            }
            node_logger_->info( "log_config override: {} -> {}", logger_name, level_string );
        }
    }

    void GeniusNode::BeginDBInitialization()
    {
        StateTransition( NodeState::MIGRATING_DATABASE );
    }

    void GeniusNode::StateTransition( NodeState next_state )
    {
        state_.store( next_state );
        node_logger_->debug( "Transitioning to state {}", NodeStateToString( next_state ) );

        switch ( next_state )
        {
            case NodeState::MIGRATING_DATABASE:
            {
                if ( !bootstrap_fullnodes_.empty() )
                {
                    pubsub_->AddPeers( bootstrap_fullnodes_ );
                    node_logger_->info( "Added {} bootstrap fullnodes", bootstrap_fullnodes_.size() );
                }
                account_->InitMessenger( pubsub_ );
                MigrateDatabase(
                    [weak_self( weak_from_this() )]( outcome::result<void> result )
                    {
                        if ( auto strong = weak_self.lock() )
                        {
                            if ( result.has_error() )
                            {
                                strong->node_logger_->error( "Database migration error: {}", result.error().message() );
                                if ( result.error() == MigrationManager::Error::BLOCKCHAIN_INIT_FAILED )
                                {
                                    strong->node_logger_->info( "Scheduling blockchain retry after failure" );
                                    strong->ScheduleMigrationRetry();
                                }
                                return;
                            }
                            strong->StateTransition( NodeState::INITIALIZING_DATABASE );
                        }
                    } );
                break;
            }
            case NodeState::INITIALIZING_DATABASE:
            {
                if ( !InitDatabase() )
                {
                    node_logger_->error( "GlobalDB initialization error" );
                    return;
                }
                account_->ConfigureDatabaseDependencies( tx_globaldb_ );
                tx_globaldb_->AddListenTopic( processing_channel_topic_ );
                StateTransition( NodeState::INITIALIZING_BLOCKCHAIN );
                break;
            }
            case NodeState::INITIALIZING_BLOCKCHAIN:
            {
                if ( !blockchain_ )
                {
                    auto weak_self = weak_from_this();
                    blockchain_    = Blockchain::New(
                        tx_globaldb_,
                        account_,
                        pubsub_,
                        [weak_self]( outcome::result<void> result )
                        {
                            if ( auto strong = weak_self.lock() )
                            {
                                if ( result.has_error() )
                                {
                                    strong->node_logger_->error( "Error starting blockchain: {}",
                                                                 result.error().message() );
                                    strong->node_logger_->info( "Scheduling blockchain retry after failure" );
                                    strong->ScheduleBlockchainRetry();
                                    return;
                                }
                                auto current_state = strong->state_.load();
                                if ( current_state != NodeState::INITIALIZING_BLOCKCHAIN )
                                {
                                    strong->node_logger_->debug(
                                        "Skipping transaction initialization, unexpected state: {}",
                                        NodeStateToString( current_state ) );
                                    return;
                                }
                                strong->node_logger_->debug(
                                    "Blockchain started successfully, starting transaction manager" );
                                if ( strong->is_full_node_ )
                                {
                                    strong->node_logger_->debug(
                                        "Full node: Setting blockchain to grab other account creation blocks" );
                                    strong->blockchain_->SetFullNodeMode();
                                }

                                // Move transaction initialization off the AccountMessenger worker thread.
                                boost::asio::post(
                                    *strong->io_,
                                    [weak_self]
                                    {
                                        if ( auto strong = weak_self.lock() )
                                        {
                                            auto current_state = strong->state_.load();
                                            if ( current_state != NodeState::INITIALIZING_BLOCKCHAIN )
                                            {
                                                strong->node_logger_->debug(
                                                    "Skipping transaction initialization, unexpected state: {}",
                                                    NodeStateToString( current_state ) );
                                                return;
                                            }
                                            strong->StateTransition( NodeState::INITIALIZING_TRANSACTIONS );
                                        }
                                    } );
                            }
                        } );
                }
                blockchain_->Start();
                InitBootstrapReconnect();
                StartBootstrapHealthCheck();
                break;
            }

            case NodeState::INITIALIZING_TRANSACTIONS:
            {
                transaction_manager_ = TransactionManager::New( tx_globaldb_,
                                                                io_,
                                                                account_,
                                                                std::make_shared<crypto::HasherImpl>(),
                                                                blockchain_,
                                                                is_full_node_ );

                transaction_manager_->RegisterStateChangeCallback(
                    [weak_self = weak_from_this()]( TransactionManager::State old_state,
                                                    TransactionManager::State new_state )
                    {
                        if ( auto strong = weak_self.lock() )
                        {
                            strong->TransactionStateChanged( old_state, new_state );
                        }
                    } );
                transaction_manager_->Start();
                // TS-01: Wire configurable timestamp tolerance from DevConfig_st
                // to TransactionManager's CheckTransactionTimestamp via SetTimeFrameToleranceMs.
                // Default: 300000ms (±5 minutes), overridable via DevConfig_st aggregate init.
                transaction_manager_->SetTimeFrameToleranceMs( kDefaultTimestampToleranceMs );

                // Phase 6 (D-01..D-10): Wire slot-hash populator bridging
                // PublicChainInputValidator -> ConsensusManager::CreateVote, so
                // each signed vote commits to its RPC endpoint slot hashes.
                // Single-chain resolution: use the first configured chain id.
                // Multi-chain (resolve chain_id from the vote's proposal subject)
                // is a future enhancement.
                if ( blockchain_ )
                {
                    blockchain_->SetSlotHashPopulator(
                        [this]( sgns::ConsensusVote &vote )
                        {
                            if ( !transaction_manager_ )
                            {
                                return;
                            }
                            auto &validator = transaction_manager_->GetPublicChainInputValidator();
                            const auto chain_id = validator.GetFirstConfiguredChainId();
                            if ( !chain_id.has_value() )
                            {
                                // No RPC endpoints configured: leave slots empty (abstain, D-05).
                                node_logger_->debug( "SlotHashPopulator: no configured chain; abstaining" );
                                return;
                            }
                            const auto slot0 = validator.GetSlotHash( 0, chain_id.value() );
                            const auto slot1 = validator.GetSlotHash( 1, chain_id.value() );
                            const auto slot2 = validator.GetSlotHash( 2, chain_id.value() );
                            if ( !slot0.empty() )
                            {
                                vote.set_slot_0_hash( slot0.data(), slot0.size() );
                            }
                            if ( !slot1.empty() )
                            {
                                vote.set_slot_1_hash( slot1.data(), slot1.size() );
                            }
                            if ( !slot2.empty() )
                            {
                                vote.set_slot_2_hash( slot2.data(), slot2.size() );
                            }
                            node_logger_->debug( "SlotHashPopulator: populated chain_id={} slot0={} slot1={} slot2={}",
                                                 chain_id.value(),
                                                 !slot0.empty(),
                                                 !slot1.empty(),
                                                 !slot2.empty() );
                        } );
                }

                // Initialize shared EthWatchService for EVM event detection
                eth_watch_service_ = std::make_shared<eth::EthWatchService>();

                // Initialize bridge relayer — wires evmrelay burn events → MintFunds
                bridge_relayer_ = BridgeRelayer::Create( std::weak_ptr<TransactionManager>( transaction_manager_ ),
                                                         eth_watch_service_ );

                // D-04: Launch async bridge initialization as NON-BLOCKING post.
                // ChainRpcEndpointProvider::Initialize() runs on the io_context
                // independently; observers (BridgeRelayer, catch-up scan) are
                // notified synchronously within Initialize. The node state
                // machine proceeds through INITIALIZING_PROCESSING → READY without waiting.
                boost::asio::post( *io_,
                                   [weak_self = weak_from_this()]
                                   {
                                       if ( auto strong = weak_self.lock() )
                                       {
                                           strong->InitializeAndStartBridge();
                                       }
                                   } );

                break;
            }
            case NodeState::INITIALIZING_PROCESSING:
            {
                ResetProcessingMembers();

                if ( !InitProcessingModules() )
                {
                    node_logger_->error( "Processing modules initialization error" );
                    return;
                }

                auto payout_address = account_->GetAddress();

                if ( auto address = account_->LoadFromSecureStorage( "payout_address" ); address.has_value() )
                {
                    payout_address = std::move( address.value() );
                    node_logger_->debug( "Using address {:.8} for payout", payout_address );
                }

                processing_service_ = std::make_shared<processing::ProcessingServiceImpl>(
                    pubsub_,
                    MAX_NODES_COUNT,
                    std::make_shared<processing::SubTaskEnqueuerImpl>( task_queue_ ),
                    task_result_storage_,
                    processing_core_,
                    [weak_self = weak_from_this()]( const std::string &var, const SGProcessing::TaskResult &taskresult )
                    {
                        if ( auto strong = weak_self.lock() )
                        {
                            strong->ProcessingDone( var, taskresult );
                        }
                    },
                    [weak_self = weak_from_this()]( const std::string &var )
                    {
                        if ( auto strong = weak_self.lock() )
                        {
                            strong->ProcessingError( var );
                        }
                    },
                    payout_address );

                processing_service_->SetChannelListRequestTimeout( boost::posix_time::milliseconds( 3000 ) );
                if ( isprocessor_ )
                {
                    StartProcessing();
                }
                StateTransition( NodeState::READY );
                break;
            }

            case NodeState::READY:
            {
                node_logger_->info( "GeniusNode READY" );
                break;
            }
            case NodeState::CREATING:
            default:
                break;
        }
    }

    void GeniusNode::InitOpenSSL()
    {
        SSL_library_init();
        SSL_load_error_strings();
        OpenSSL_add_all_algorithms();
    }

    bool GeniusNode::InitLoggers( const std::string &base_path )
    {
        logging_system_ = std::make_shared<soralog::LoggingSystem>( std::make_shared<soralog::ConfiguratorFromYAML>(
            // Original LibP2P logging config
            std::make_shared<libp2p::log::Configurator>(),
            // Additional logging config for application
            GetLoggingSystem( base_path ) ) );
        auto result     = logging_system_->configure();
        if ( result.has_error )
        {
            std::cerr << "Logger init error: " << result.message;
            return false;
        }

        libp2p::log::setLoggingSystem( logging_system_ );
        libp2p::log::setLevelOfGroup( "SuperGeniusDemo", soralog::Level::ERROR_ );

        std::string logdir = "";
#ifndef SGNS_DEBUGLOGS
        logdir = base_path + "/sgnslog2.log";
#endif
#ifdef SGNS_DEBUGLOGS
        // Debug mode
        node_logger_              = ConfigureLogger( "SuperGeniusNode", logdir, spdlog::level::debug );
        auto loggerGeniusNode     = ConfigureLogger( "GeniusNode", logdir, spdlog::level::debug );
        auto loggerGlobalDB       = ConfigureLogger( "GlobalDB", logdir, spdlog::level::err );
        auto loggerDAGSyncer      = ConfigureLogger( "GraphsyncDAGSyncer", logdir, spdlog::level::err );
        auto loggerGraphsync      = ConfigureLogger( "graphsync", logdir, spdlog::level::err );
        auto loggerBroadcaster    = ConfigureLogger( "PubSubBroadcasterExt", logdir, spdlog::level::err );
        auto loggerDataStore      = ConfigureLogger( "CrdtDatastore", logdir, spdlog::level::err );
        auto loggerCRDTHeads      = ConfigureLogger( "CrdtHeads", logdir, spdlog::level::err );
        auto loggerTransactions   = ConfigureLogger( "TransactionManager", logdir, spdlog::level::debug );
        auto loggerMigration      = ConfigureLogger( "MigrationManager", logdir, spdlog::level::err );
        auto loggerMigrationStep  = ConfigureLogger( "MigrationStep", logdir, spdlog::level::err );
        auto loggerQueue          = ConfigureLogger( "TaskQueueImpl", logdir, spdlog::level::trace );
        auto loggerRocksDB        = ConfigureLogger( "rocksdb", logdir, spdlog::level::err );
        auto logkad               = ConfigureLogger( "Kademlia", logdir, spdlog::level::err );
        auto logNoise             = ConfigureLogger( "Noise", logdir, spdlog::level::err );
        auto logProcessingEngine  = ConfigureLogger( "ProcessingEngine", logdir, spdlog::level::err );
        auto loggerSubQueue       = ConfigureLogger( "ProcessingSubTaskQueueAccessorImpl", logdir, spdlog::level::err );
        auto loggerProcServ       = ConfigureLogger( "ProcessingService", logdir, spdlog::level::err );
        auto loggerProcqm         = ConfigureLogger( "ProcessingSubTaskQueueManager", logdir, spdlog::level::err );
        auto loggerUPNP           = ConfigureLogger( "UPNP", logdir, spdlog::level::err );
        auto loggerProcessingNode = ConfigureLogger( "ProcessingNode", logdir, spdlog::level::err );
        auto loggerGossipPubsub   = ConfigureLogger( "GossipPubSub", logdir, spdlog::level::err );
        auto loggerAccountMessenger = ConfigureLogger( "AccountMessenger", logdir, spdlog::level::err );
        auto loggerGeniusAccount    = ConfigureLogger( "GeniusAccount", logdir, spdlog::level::err );
        auto loggerKeyPair          = ConfigureLogger( "KeyPairFileStorage", logdir, spdlog::level::err );
        auto loggerBlockchain       = ConfigureLogger( "Blockchain", logdir, spdlog::level::err );
        auto loggerValidator        = ConfigureLogger( "ValidatorRegistry", logdir, spdlog::level::debug );
        auto loggerProcMgr          = ConfigureLogger( "SGProcessingManager", logdir, spdlog::level::err );
        auto loggerProcessor        = ConfigureLogger( "SGProcessor", logdir, spdlog::level::err );
        auto loggerCrdtCallback     = ConfigureLogger( "CRDTCallbackManager", logdir, spdlog::level::err );
        auto loggerCoinPrices       = ConfigureLogger( "CoinPrices", logdir, spdlog::level::err );
        auto loggerUTXOManager      = ConfigureLogger( "UTXOManager", logdir, spdlog::level::err );
        auto loggerConsensusManager = ConfigureLogger( "ConsensusManager", logdir, spdlog::level::debug );
        auto loggerCRDTSet          = ConfigureLogger( "CRDTSet", logdir, spdlog::level::err );
        auto loggerInputValidator   = ConfigureLogger( "InputValidator", logdir, spdlog::level::trace );
        // AsyncIOManager loggers
        auto asioFileCommon  = ConfigureLogger( "FILECommon", logdir, spdlog::level::err );
        auto asioFileManager = ConfigureLogger( "FileManager", logdir, spdlog::level::err );
        auto asioHttpCommon  = ConfigureLogger( "HTTPCommon", logdir, spdlog::level::err );
        auto asioIpfsCommon  = ConfigureLogger( "IPFSCommon", logdir, spdlog::level::err );
        auto asioIpfsLoader  = ConfigureLogger( "IPFSLoader", logdir, spdlog::level::err );
        auto asioFileLoader  = ConfigureLogger( "MNNLoader", logdir, spdlog::level::err );
        auto asioWSCommon    = ConfigureLogger( "WSCommon", logdir, spdlog::level::err );
        // libp2p loggers
        libp2p::log::setLevelOfGroup( "*", soralog::Level::DEBUG );
        libp2p::log::setLevelOfGroup( "Gossip", soralog::Level::DEBUG );
        libp2p::log::setLevelOfGroup( "crypto", soralog::Level::DEBUG );
        libp2p::log::setLevelOfGroup( "identify", soralog::Level::DEBUG );
        libp2p::log::setLevelOfGroup( "kademlia", soralog::Level::DEBUG );
        libp2p::log::setLevelOfGroup( "libp2p", soralog::Level::DEBUG );
        libp2p::log::setLevelOfGroup( "mplex", soralog::Level::DEBUG );
        libp2p::log::setLevelOfGroup( "muxer", soralog::Level::DEBUG );
        libp2p::log::setLevelOfGroup( "plaintext", soralog::Level::DEBUG );
        libp2p::log::setLevelOfGroup( "protocols", soralog::Level::DEBUG );
        libp2p::log::setLevelOfGroup( "secio", soralog::Level::DEBUG );
        libp2p::log::setLevelOfGroup( "security", soralog::Level::DEBUG );
        libp2p::log::setLevelOfGroup( "yamux", soralog::Level::DEBUG );
#else
        // Release mode
        node_logger_              = ConfigureLogger( "SuperGeniusNode", logdir, spdlog::level::err );
        auto loggerGeniusNode     = ConfigureLogger( "GeniusNode", logdir, spdlog::level::err );
        auto loggerGlobalDB       = ConfigureLogger( "GlobalDB", logdir, spdlog::level::err );
        auto loggerDAGSyncer      = ConfigureLogger( "GraphsyncDAGSyncer", logdir, spdlog::level::err );
        auto loggerGraphsync      = ConfigureLogger( "graphsync", logdir, spdlog::level::err );
        auto loggerBroadcaster    = ConfigureLogger( "PubSubBroadcasterExt", logdir, spdlog::level::err );
        auto loggerDataStore      = ConfigureLogger( "CrdtDatastore", logdir, spdlog::level::err );
        auto loggerCRDTHeads      = ConfigureLogger( "CrdtHeads", logdir, spdlog::level::err );
        auto loggerTransactions   = ConfigureLogger( "TransactionManager", logdir, spdlog::level::err );
        auto loggerMigration      = ConfigureLogger( "MigrationManager", logdir, spdlog::level::err );
        auto loggerMigrationStep  = ConfigureLogger( "MigrationStep", logdir, spdlog::level::err );
        auto loggerQueue          = ConfigureLogger( "TaskQueueImpl", logdir, spdlog::level::err );
        auto loggerRocksDB        = ConfigureLogger( "rocksdb", logdir, spdlog::level::err );
        auto logkad               = ConfigureLogger( "Kademlia", logdir, spdlog::level::err );
        auto logNoise             = ConfigureLogger( "Noise", logdir, spdlog::level::err );
        auto logProcessingEngine  = ConfigureLogger( "ProcessingEngine", logdir, spdlog::level::err );
        auto loggerSubQueue       = ConfigureLogger( "ProcessingSubTaskQueueAccessorImpl", logdir, spdlog::level::err );
        auto loggerProcServ       = ConfigureLogger( "ProcessingService", logdir, spdlog::level::err );
        auto loggerProcqm         = ConfigureLogger( "ProcessingSubTaskQueueManager", logdir, spdlog::level::err );
        auto loggerUPNP           = ConfigureLogger( "UPNP", logdir, spdlog::level::err );
        auto loggerProcessingNode = ConfigureLogger( "ProcessingNode", logdir, spdlog::level::err );
        auto loggerGossipPubsub   = ConfigureLogger( "GossipPubSub", logdir, spdlog::level::err );
        auto loggerAccountMessenger = ConfigureLogger( "AccountMessenger", logdir, spdlog::level::err );
        auto loggerGeniusAccount    = ConfigureLogger( "GeniusAccount", logdir, spdlog::level::err );
        auto loggerKeyPair          = ConfigureLogger( "KeyPairFileStorage", logdir, spdlog::level::err );
        auto loggerBlockchain       = ConfigureLogger( "Blockchain", logdir, spdlog::level::err );
        auto loggerValidator        = ConfigureLogger( "ValidatorRegistry", logdir, spdlog::level::err );
        auto loggerProcMgr          = ConfigureLogger( "SGProcessingManager", logdir, spdlog::level::err );
        auto loggerProcessor        = ConfigureLogger( "SGProcessor", logdir, spdlog::level::err );
        auto loggerCrdtCallback     = ConfigureLogger( "CRDTCallbackManager", logdir, spdlog::level::err );
        auto loggerCoinPrices       = ConfigureLogger( "CoinPrices", logdir, spdlog::level::err );
        auto loggerUTXOManager      = ConfigureLogger( "UTXOManager", logdir, spdlog::level::err );
        auto loggerConsensusManager = ConfigureLogger( "ConsensusManager", logdir, spdlog::level::err );
        auto loggerCRDTSet          = ConfigureLogger( "CRDTSet", logdir, spdlog::level::err );
        auto loggerInputValidator   = ConfigureLogger( "InputValidator", logdir, spdlog::level::err );

        //AsyncIOManager Loggers
        auto asioFileCommon  = ConfigureLogger( "FILECommon", logdir, spdlog::level::err );
        auto asioFileManager = ConfigureLogger( "FileManager", logdir, spdlog::level::err );
        auto asioHttpCommon  = ConfigureLogger( "HTTPCommon", logdir, spdlog::level::err );
        auto asioIpfsCommon  = ConfigureLogger( "IPFSCommon", logdir, spdlog::level::err );
        auto asioIpfsLoader  = ConfigureLogger( "IPFSLoader", logdir, spdlog::level::err );
        auto asioFileLoader  = ConfigureLogger( "MNNLoader", logdir, spdlog::level::err );
        auto asioWSCommon    = ConfigureLogger( "WSCommon", logdir, spdlog::level::err );
#endif

        LoadLogConfig();

        // Logger initialization complete
        node_logger_->info( "Logger initialized successfully" );

        return true;
    }

    bool GeniusNode::InitNetwork( uint16_t base_port, bool is_full_node )
    {
        bool                ret         = true;
        std::string         config_path = write_base_path_ + "/network_config.json";
        rapidjson::Document config_json;
        std::string         pubsub_bind_address = "0.0.0.0";
        std::string         authorized_full_node;
        bool                upnp_enabled = true;
        int                 high_water   = is_full_node ? 400 : 300;
        int                 low_water    = is_full_node ? 200 : 150;
        std::string         port_str;
        uint16_t            config_port = 0;

        bootstrap_peers_.clear();
        bootstrap_fullnodes_.clear();

        // Try to read config file
        std::ifstream config_file( config_path );
        if ( config_file.good() )
        {
            std::stringstream buffer;
            buffer << config_file.rdbuf();
            config_json.Parse( buffer.str().c_str() );
            if ( !config_json.HasParseError() && config_json.IsObject() )
            {
                if ( config_json.HasMember( "pubsub_port" ) && config_json["pubsub_port"].IsString() )
                {
                    port_str = config_json["pubsub_port"].GetString();
                    if ( !port_str.empty() )
                    {
                        try
                        {
                            config_port = static_cast<uint16_t>( std::stoi( port_str ) );
                        }
                        catch ( ... )
                        {
                            node_logger_->warn( "Invalid pubsub_port in config, using default" );
                        }
                    }
                }
                if ( config_json.HasMember( "pubsub_bind_address" ) && config_json["pubsub_bind_address"].IsString() )
                {
                    pubsub_bind_address = config_json["pubsub_bind_address"].GetString();
                }
                if ( config_json.HasMember( "bootstrap_addresses" ) && config_json["bootstrap_addresses"].IsArray() )
                {
                    for ( auto &v : config_json["bootstrap_addresses"].GetArray() )
                    {
                        if ( v.IsString() )
                        {
                            bootstrap_peers_.push_back( v.GetString() );
                        }
                    }
                }
                if ( config_json.HasMember( "bootstrap_fullnodes" ) && config_json["bootstrap_fullnodes"].IsArray() )
                {
                    for ( auto &v : config_json["bootstrap_fullnodes"].GetArray() )
                    {
                        if ( v.IsString() )
                        {
                            bootstrap_fullnodes_.push_back( v.GetString() );
                        }
                    }
                }

                if ( config_json.HasMember( "upnp_enabled" ) && config_json["upnp_enabled"].IsBool() )
                {
                    upnp_enabled = config_json["upnp_enabled"].GetBool();
                }
                if ( config_json.HasMember( "high_water" ) && config_json["high_water"].IsInt() )
                {
                    high_water = config_json["high_water"].GetInt();
                }
                if ( config_json.HasMember( "low_water" ) && config_json["low_water"].IsInt() )
                {
                    low_water = config_json["low_water"].GetInt();
                }
                if ( config_json.HasMember( "authorized_full_node" ) && config_json["authorized_full_node"].IsString() )
                {
                    authorized_full_node = config_json["authorized_full_node"].GetString();
                }

                // ── Parse reconnect config ──
                if ( config_json.HasMember( "bootstrap_reconnect_base_delay_sec" ) &&
                     config_json["bootstrap_reconnect_base_delay_sec"].IsInt() )
                {
                    reconnect_config_.base_delay = std::chrono::seconds(
                        config_json["bootstrap_reconnect_base_delay_sec"].GetInt() );
                }
                if ( config_json.HasMember( "bootstrap_reconnect_max_delay_sec" ) &&
                     config_json["bootstrap_reconnect_max_delay_sec"].IsInt() )
                {
                    reconnect_config_.max_delay = std::chrono::seconds(
                        config_json["bootstrap_reconnect_max_delay_sec"].GetInt() );
                }
                if ( config_json.HasMember( "bootstrap_health_check_interval_sec" ) &&
                     config_json["bootstrap_health_check_interval_sec"].IsInt() )
                {
                    reconnect_config_.health_check_interval = std::chrono::seconds(
                        config_json["bootstrap_health_check_interval_sec"].GetInt() );
                }
                if ( config_json.HasMember( "bootstrap_health_check_disconnected_interval_sec" ) &&
                     config_json["bootstrap_health_check_disconnected_interval_sec"].IsInt() )
                {
                    reconnect_config_.health_check_disconnected_interval = std::chrono::seconds(
                        config_json["bootstrap_health_check_disconnected_interval_sec"].GetInt() );
                }
                if ( config_json.HasMember( "bootstrap_background_multiplier" ) &&
                     config_json["bootstrap_background_multiplier"].IsDouble() )
                {
                    reconnect_config_.background_multiplier = config_json["bootstrap_background_multiplier"]
                                                                  .GetDouble();
                }
            }
        }

        // ── Parse bootstrap fullnode multiaddrs into PeerInfo cache for reconnection ──
        bootstrap_fullnode_infos_.clear();
        bootstrap_fullnode_ids_.clear();
        for ( const auto &addr : bootstrap_fullnodes_ )
        {
            auto peer_info = ParsePeerInfoFromString( addr );
            if ( peer_info )
            {
                bootstrap_fullnode_infos_.push_back( peer_info.value() );
                bootstrap_fullnode_ids_.insert( peer_info->id );
            }
            else
            {
                node_logger_->warn( "Failed to parse bootstrap fullnode multiaddr: {}", addr );
            }
        }
        if ( !bootstrap_fullnode_infos_.empty() )
        {
            node_logger_->info( "Parsed {} bootstrap fullnode(s) for reconnection tracking",
                                bootstrap_fullnode_infos_.size() );
        }

        // ── Parse bootstrap peer multiaddrs into PeerInfo cache for reconnection ──
        bootstrap_peer_infos_.clear();
        bootstrap_peer_ids_.clear();
        for ( const auto &addr : bootstrap_peers_ )
        {
            auto peer_info = ParsePeerInfoFromString( addr );
            if ( peer_info )
            {
                bootstrap_peer_infos_.push_back( peer_info.value() );
                bootstrap_peer_ids_.insert( peer_info->id );
            }
            else
            {
                node_logger_->warn( "Failed to parse bootstrap peer multiaddr: {}", addr );
            }
        }
        if ( !bootstrap_peer_infos_.empty() )
        {
            node_logger_->info( "Parsed {} bootstrap peer(s) for reconnection tracking", bootstrap_peer_infos_.size() );
        }

        // Port selection logic
        if ( config_port != 0 )
        {
            pubsubport_ = config_port;
        }
        else
        {
            pubsubport_ = GenerateRandomPort( base_port, account_->GetAddress() );
        }

        do
        {
            // Never block node construction on UPnP/IGD discovery.
            // RefreshUPNP() runs on its own thread and will try immediately.
            if ( upnp_enabled )
            {
                //ret = InitUPNP();
                (void) InitUPNP(); // Ignore UPNP init result for now
            }

            // Make a base58 out of our address
            std::string                tempaddress = account_->GetAddress();
            std::vector<unsigned char> inputBytes( tempaddress.begin(), tempaddress.end() );
            std::vector<unsigned char> hash( SHA256_DIGEST_LENGTH );
            SHA256( inputBytes.data(), inputBytes.size(), hash.data() );

            auto key          = libp2p::multi::ContentIdentifierCodec::encodeCIDV0( hash.data(), hash.size() );
            auto acc_cid      = libp2p::multi::ContentIdentifierCodec::decode( key );
            auto maybe_base58 = libp2p::multi::ContentIdentifierCodec::toString( acc_cid.value() );
            if ( !maybe_base58 )
            {
                ret = false;
                node_logger_->error( "We couldn't convert the account {} to base58", account_->GetAddress() );
                break;
            }
            base58key_ = maybe_base58.value();

            gnus_network_full_path_ = std::string( GNUS_NETWORK_PATH ) + version::GetNetAndVersionAppendix() +
                                      base58key_;
            auto pubsubKeyPath = gnus_network_full_path_ + "/pubs_processor";

            //Set a pubsub config, use no signing because we can verify with proof and dag structure
            libp2p::protocol::gossip::Config config;
            config.echo_forward_mode       = false;
            config.sign_messages           = false;
            config.seen_cache_limit        = 10;
            config.heartbeat_interval_msec = std::chrono::milliseconds{ 500 };
            config.rw_timeout_msec         = std::chrono::seconds{ 30 };

            pubsub_ = std::make_shared<ipfs_pubsub::GossipPubSub>(
                crdt::KeyPairFileStorage( write_base_path_ + pubsubKeyPath ).GetKeyPair().value(),
                config );

            auto pubs = pubsub_->Start( pubsubport_, bootstrap_peers_, pubsub_bind_address, {} );
            pubs.wait();
            node_logger_->info( "PubSub started at address: {}", pubsub_->GetInterfaceAddress() );

            if ( upnp_enabled )
            {
                RefreshUPNP( pubsubport_ );
            }

            pubsub_->GetHost()->getConnectionManagerConfig().high_water = high_water;
            pubsub_->GetHost()->getConnectionManagerConfig().low_water  = low_water;

            graphsyncnetwork_ = std::make_shared<ipfs_lite::ipfs::graphsync::Network>( pubsub_->GetHost(), scheduler_ );

            // Initialize DHT early so peer discovery works during database migration
            if ( autodht_ )
            {
                DHTInit();
            }
        } while ( 0 );
        return ret;
    }

    bool GeniusNode::InitUPNP()
    {
        auto upnp = std::make_shared<upnp::UPNP>();
        if ( !upnp->GetIGD() )
        {
            return true;
        }

        bool ret = false;
        do
        {
            std::string wanip = upnp->GetWanIP();
            std::string lanip = upnp->GetLocalIP();
            node_logger_->info( "Wan IP: {}", wanip );
            node_logger_->info( "Lan IP: {}", lanip );

            std::string owner;

            constexpr uint16_t MAX_ATTEMPTS = 10;
            for ( uint16_t i = 0; i < MAX_ATTEMPTS; ++i )
            {
                uint16_t candidate_port = pubsubport_ + i;
                if ( upnp->CheckIfPortInUse( candidate_port, "TCP", owner ) )
                {
                    if ( owner == lanip )
                    {
                        node_logger_->info( "Port {} is already mapped by this device. Try using it.", candidate_port );
                        if ( upnp->OpenPort( candidate_port, candidate_port, "TCP", 3600 ) )
                        {
                            ret         = true;
                            pubsubport_ = candidate_port;
                            break;
                        }

                        node_logger_->error(
                            "Port {} is already mapped by this device. We tried using it, but could not. Will try other ports.",
                            candidate_port );
                        continue;
                    }
                    node_logger_->error( "Port {} already in use by {}", candidate_port, owner );
                    continue;
                }

                if ( upnp->OpenPort( candidate_port, candidate_port, "TCP", 3600 ) )
                {
                    node_logger_->info( "Successfully opened port {}", candidate_port );
                    ret         = true;
                    pubsubport_ = candidate_port;
                    break;
                }
                node_logger_->warn( "Failed to open port {}", candidate_port );
            }
            if ( !ret )
            {
                node_logger_->error( "Unable to open a usable UPnP port after {} attempts", MAX_ATTEMPTS );
                break;
            }

        } while ( 0 );

        return ret;
    }

    bool GeniusNode::InitDatabase()
    {
        bool ret = false;
        do
        {
            auto global_db_ret = crdt::GlobalDB::New( io_,
                                                      write_base_path_ + gnus_network_full_path_,
                                                      pubsub_,
                                                      crdt::CrdtOptions::DefaultOptions(),
                                                      graphsyncnetwork_,
                                                      scheduler_,
                                                      generator_,
                                                      nullptr,
                                                      crdt_backup_config_ );
            if ( global_db_ret.has_error() )
            {
                node_logger_->error( "Error creating GlobalDB: {}", global_db_ret.error().message() );
                break;
            }
            tx_globaldb_ = std::move( global_db_ret.value() );

            tx_globaldb_->Start();

            ret = true;
        } while ( 0 );
        return ret;
    }

    bool GeniusNode::InitProcessingModules()
    {
        bool ret = true;

        task_queue_      = processing::TaskQueueImpl::New( tx_globaldb_, processing_channel_topic_ );
        processing_core_ = processing::ProcessingCoreImpl::New( task_queue_, 1, dev_config_.TokenID );

        task_result_storage_ = std::make_shared<processing::SubTaskResultStorageImpl>( tx_globaldb_,
                                                                                       processing_channel_topic_ );
        return ret;
    }

    void GeniusNode::MigrateDatabase( std::function<void( outcome::result<void> )> callback )
    {
        auto mgr = sgns::MigrationManager::New( io_,               // ioContext
                                                pubsub_,           // pubSub
                                                graphsyncnetwork_, // graphsync
                                                scheduler_,        // scheduler
                                                generator_,        // generator
                                                write_base_path_,  // writeBasePath
                                                base58key_,        // base58key
                                                account_,
                                                is_full_node_ );

        // We store it to query migration progress later.
        {
            std::lock_guard<std::mutex> lock( migration_mutex_ );
            migration_manager_ = mgr;
        }

        std::thread migration_thread(
            [manager = std::move( mgr ), cb = std::move( callback )]
            {
                auto migrationResult = manager->Migrate();
                if ( cb )
                {
                    cb( migrationResult );
                }
            } );
        migration_thread.detach();
    }

    void GeniusNode::ScheduleMigrationRetry()
    {
        std::thread(
            [weak_self = weak_from_this()]
            {
                std::this_thread::sleep_for( std::chrono::seconds( 5 ) );
                if ( auto strong = weak_self.lock() )
                {
                    strong->StateTransition( NodeState::MIGRATING_DATABASE );
                }
            } )
            .detach();
    }

    void GeniusNode::ScheduleBlockchainRetry()
    {
        std::thread(
            [weak_self = weak_from_this()]
            {
                std::this_thread::sleep_for( std::chrono::seconds( 5 ) );
                if ( auto strong = weak_self.lock() )
                {
                    auto current_state = strong->state_.load();
                    if ( current_state != NodeState::INITIALIZING_BLOCKCHAIN )
                    {
                        strong->node_logger_->debug( "Skipping blockchain retry, unexpected state: {}",
                                                     NodeStateToString( current_state ) );
                        return;
                    }
                    strong->StateTransition( NodeState::INITIALIZING_BLOCKCHAIN );
                }
            } )
            .detach();
    }

    base::Logger GeniusNode::ConfigureLogger( const std::string        &tag,
                                              const std::string        &logdir,
                                              spdlog::level::level_enum level )
    {
        auto logger = base::createLogger( tag, logdir );
        logger->set_level( level );
        if ( level != spdlog::level::off )
        {
            logger->flush_on( level );
        }
        return logger;
    }

    void GeniusNode::ShutdownForDestruction()
    {
        bool expected = false;
        if ( !shutdown_started_.compare_exchange_strong( expected, true ) )
        {
            return;
        }

        node_logger_->info( "GeniusNode shutdown start" );

        // Cancel bootstrap health check timer
        if ( health_check_handle_ )
        {
            health_check_handle_->cancel();
            health_check_handle_.reset();
        }

        // Unsubscribe from bootstrap disconnect events
        if ( bootstrap_disconnect_subscription_ )
        {
            bootstrap_disconnect_subscription_->unsubscribe();
            bootstrap_disconnect_subscription_.reset();
        }

        if ( tx_globaldb_ )
        {
            tx_globaldb_->ShutdownNow();
        }

        node_logger_->info( "GeniusNode shutdown phase CRDT/GlobalDB complete" );
    }

    GeniusNode::~GeniusNode()
    {
        node_logger_->debug( "~GeniusNode CALLED" );

        ShutdownForDestruction();

        if ( pubsub_ )
        {
            pubsub_->Stop();
            pubsub_.reset();
        }
        io_work_guard_.reset();
        if ( io_ )
        {
            io_->stop(); // Stop our io_context
        }
        for ( auto &t : io_threads_ )
        {
            if ( t.joinable() )
            {
                t.join();
            }
        }
        io_threads_.clear();
        stop_upnp = true;
        if ( upnp_thread.joinable() )
        {
            upnp_thread.join();
        }
        std::this_thread::sleep_for( std::chrono::milliseconds( 50 ) );
        node_logger_->debug( "~GeniusNode FINISHED" );
    }

    void GeniusNode::RefreshUPNP( uint16_t pubsubport )
    {
        if ( upnp_thread.joinable() )
        {
            stop_upnp = true;   // Signal the existing thread to stop
            upnp_thread.join(); // Wait for it to finish
        }

        stop_upnp = false; // Reset the stop flag for the new thread

        upnp_thread = std::thread(
            [this, pubsubport]()
            {
                auto next_refresh_time = std::chrono::steady_clock::now() + std::chrono::minutes( 60 );
                auto upnp_shared       = std::make_shared<upnp::UPNP>();

                while ( !stop_upnp )
                {
                    if ( std::chrono::steady_clock::now() >= next_refresh_time )
                    {
                        std::weak_ptr<upnp::UPNP> upnp_weak = upnp_shared;

                        if ( auto upnp = upnp_weak.lock() )
                        {
                            if ( upnp->GetIGD() )
                            {
                                auto openedPort = upnp->OpenPort( pubsubport, pubsubport, "TCP", 3600 );
                                if ( !openedPort )
                                {
                                    GeniusNodeLogger()->error( "Failed to open port" );
                                }
                                else
                                {
                                    GeniusNodeLogger()->info( "Open Ports Success pubsub: {} ", pubsubport );
                                }
                            }
                            else
                            {
                                GeniusNodeLogger()->info( "No IGD" );
                            }
                        }
                        else
                        {
                            GeniusNodeLogger()->info( "UPNP weak_ptr expired" );
                            stop_upnp = true; // Signal thread to stop gracefully
                        }

                        next_refresh_time = std::chrono::steady_clock::now() + std::chrono::minutes( 60 );
                    }

                    std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) );
                }
            } );
    }

    void GeniusNode::AddPeer( const std::string &peer )
    {
        pubsub_->AddPeers( { peer } );
    }

    void GeniusNode::DHTInit()
    {
        // Encode the string to UTF-8 bytes
        std::string                temp = processing_grid_chanel_topic_ + sgns::version::GetNetAndVersionAppendix();
        std::vector<unsigned char> inputBytes( temp.begin(), temp.end() );

        // Compute the SHA-256 hash of the input bytes
        std::vector<unsigned char> hash( SHA256_DIGEST_LENGTH );
        SHA256( inputBytes.data(), inputBytes.size(), hash.data() );

        // Provide CID
        auto key = libp2p::multi::ContentIdentifierCodec::encodeCIDV0( hash.data(), hash.size() );
        pubsub_->GetDHT()->Start();
        pubsub_->ProvideCID( key );

        auto cidtest = libp2p::multi::ContentIdentifierCodec::decode( key );

        auto cidstring = libp2p::multi::ContentIdentifierCodec::toString( cidtest.value() );
        node_logger_->info( "CID Test:: {}", cidstring.value() );

        // Also Find providers
        pubsub_->StartFindingPeers( key );
    }

    std::string generate_uuid_with_ipfs_id( const std::string &ipfs_id )
    {
        // Hash the IPFS ID
        std::hash<std::string> hasher;
        uint64_t               id_hash = hasher( ipfs_id );

        // Get a high-resolution timestamp
        auto now       = std::chrono::high_resolution_clock::now();
        auto timestamp = std::chrono::duration_cast<std::chrono::nanoseconds>( now.time_since_epoch() ).count();

        // Combine the IPFS ID hash and timestamp to create a seed
        uint64_t seed = id_hash ^ static_cast<uint64_t>( timestamp );

        // Seed the PRNG
        std::mt19937                                       gen( seed );
        boost::uuids::basic_random_generator<std::mt19937> uuid_gen( gen );

        // Generate UUID
        boost::uuids::uuid uuid = uuid_gen();
        return boost::uuids::to_string( uuid );
    }

    std::vector<std::string> GeniusNode::GetAvailableAccounts()
    {
        return GeniusAccount::GetAvailableAccounts( write_base_path_ );
    }

    outcome::result<void> GeniusNode::AddAccountWithKey( const char *private_key )
    {
        auto new_account = GeniusAccount::New( this->GetTokenID(), private_key, write_base_path_, is_full_node_ );
        if ( new_account == nullptr )
        {
            return outcome::failure( std::errc::invalid_argument );
        }
        return outcome::success();
    }

    outcome::result<void> GeniusNode::AddAccountWithMnemonic( const std::string &mnemonic )
    {
        auto new_account = GeniusAccount::NewFromMnemonic( this->GetTokenID(),
                                                           mnemonic,
                                                           write_base_path_,
                                                           is_full_node_ );
        if ( new_account == nullptr )
        {
            return outcome::failure( std::errc::invalid_argument );
        }
        return outcome::success();
    }

    outcome::result<void> GeniusNode::SelectAccount( std::string_view public_address )
    {
        public_address = GeniusAccount::NormalizeAddress( public_address );

        if ( public_address == GetAddress() )
        {
            node_logger_->warn( "Address already active" );
            return std::errc::address_in_use;
        }
        auto addresses = GeniusAccount::GetAvailableAccounts( write_base_path_ );

        if ( std::find( addresses.cbegin(), addresses.cend(), public_address ) == addresses.cend() )
        {
            node_logger_->error( "Could not find requested address" );
            return std::errc::address_not_available;
        }

        auto account = GeniusAccount::NewFromPublicKey( GetTokenID(), public_address, is_full_node_ );

        if ( account == nullptr )
        {
            node_logger_->error( "Account not created" );
            return std::errc::address_not_available;
        }

        ResetProcessingMembers();

        auto tx_manager_result = GetTransactionManager();
        if ( tx_manager_result.has_value() )
        {
            auto manager = std::move( tx_manager_result.value() );
            manager->Stop();
        }
        transaction_manager_.reset();

        bridge_relayer_.reset();

        eth_watch_service_.reset();

        if ( blockchain_ )
        {
            BOOST_OUTCOME_TRY( this->blockchain_->Stop() );
        }
        blockchain_.reset();

        account_->DeconfigureDatabaseDependencies();

        if ( account_ )
        {
            account_.swap( account );
        }
        else
        {
            account_ = account;
        }
        account.reset();

        if ( this->tx_globaldb_ )
        {
            // Database is already initialized (keyed by node ID, not account).
            // Keep it alive, configure it for the new account, and restart the
            // account-dependent layers. We must replicate what MIGRATING_DATABASE
            // and INITIALIZING_DATABASE do for a new account, without recreating
            // the database itself.
            this->account_->InitMessenger( this->pubsub_ );
            this->account_->ConfigureDatabaseDependencies( this->tx_globaldb_ );
            this->tx_globaldb_->AddListenTopic( processing_channel_topic_ );
            StateTransition( NodeState::INITIALIZING_BLOCKCHAIN );
        }

        return outcome::success();
    }

    void GeniusNode::ResetProcessingMembers()
    {
        processing_service_.reset();
        task_result_storage_.reset();
        processing_core_.reset();
        task_queue_.reset();
    }

    outcome::result<void> GeniusNode::TransferAccount( std::string_view public_address )
    {
        const std::string destination_address( GeniusAccount::NormalizeAddress( public_address ) );

        if ( destination_address == GetAddress() )
        {
            node_logger_->warn( "Address already active" );
            return std::errc::address_in_use;
        }

        auto addresses = GeniusAccount::GetAvailableAccounts( write_base_path_ );

        if ( std::find( addresses.cbegin(), addresses.cend(), destination_address ) == addresses.cend() )
        {
            node_logger_->error( "Tried to transfer to account that was not added to GeniusNode" );
            return std::errc::address_not_available;
        }

        const auto token_id = GetTokenID();
        auto       balance  = account_->GetUTXOManager().GetBalance( token_id );
        if ( balance > 0 )
        {
            BOOST_OUTCOME_TRY( auto transfer_result,
                               TransferFunds( balance, destination_address, token_id, TIMEOUT_TRANSFER ) );
            const auto &[tx_id, duration_ms] = transfer_result;
            node_logger_->debug( "Transferred account balance {} to {:.8} in transaction {} after {} ms",
                                 balance,
                                 destination_address,
                                 tx_id,
                                 duration_ms );
        }

        return SelectAccount( destination_address );
    }

    outcome::result<void> GeniusNode::DeleteAccount( std::string_view public_address )
    {
        if ( public_address == GetAddress() )
        {
            node_logger_->error( "Can't delete active account" );
            return std::errc::address_not_available;
        }

        return GeniusAccount::DeleteAccount( public_address, write_base_path_ );
    }

    outcome::result<void> GeniusNode::MergeAccount( std::string_view public_address )
    {
        const auto previous_address = GetAddress();
        BOOST_OUTCOME_TRY( TransferAccount( public_address ) );
        return DeleteAccount( previous_address );
    }

    outcome::result<void> GeniusNode::SetPayoutAddress( std::string_view payout_address )
    {
        if ( !GeniusAccount::IsValidPublicKey( payout_address ) )
        {
            return outcome::failure( std::errc::bad_address );
        }

        BOOST_OUTCOME_TRY( account_->SaveInSecureStorage( "payout_address", std::string( payout_address ) ) );

        this->StateTransition( NodeState::INITIALIZING_PROCESSING );

        return outcome::success();
    }

    outcome::result<std::string> GeniusNode::ProcessImage( const std::string &jsondata )
    {
        if ( GetTransactionManagerState() != TransactionManager::State::READY )
        {
            return outcome::failure( Error::TRANSACTIONS_NOT_READY );
        }
        BOOST_OUTCOME_TRY( auto procmgr, sgns::sgprocessing::ProcessingManager::Create( jsondata ) );

        auto funds = GetProcessCost( procmgr );
        if ( funds <= 0 )
        {
            return outcome::failure( Error::PROCESS_COST_ERROR );
        }

        if ( account_->GetUTXOManager().GetBalance() < funds )
        {
            return outcome::failure( Error::INSUFFICIENT_FUNDS );
        }

        SGProcessing::Task task;
        auto               uuidstring = generate_uuid_with_ipfs_id( pubsub_->GetHost()->getId().toBase58() );

        //Make a small json to insert without extra indentation and spacing.
        json smalljson;
        sgns::to_json( smalljson, procmgr->GetProcessingData() );
        task.set_ipfs_block_id( uuidstring );
        task.set_json_data( smalljson.dump( -1 ) );
        task.set_random_seed( 0 );
        task.set_results_channel( ( boost::format( "RESULT_CHANNEL_ID_%1%" ) % ( 1 ) ).str() );
        //Get Processing Data
        auto procdata = procmgr->GetProcessingData();

        //Split into subtasks
        processing::ProcessTaskSplitter  taskSplitter;
        std::list<SGProcessing::SubTask> subTasks;
        //Make Copies, trying to use references for passes/input nodes may cause problems.
        auto passes = procdata.get_passes();
        for ( const auto &pass : passes )
        {
            auto input_nodes = pass.get_model().value().get_input_nodes();
            for ( auto &model : input_nodes )
            {
                json modeljson;
                sgns::to_json( modeljson, model );
                auto   index = procmgr->GetInputIndex( model.get_source().value() );
                size_t nChunks =
                    procdata.get_inputs()[index.value()].get_dimensions().value().get_chunk_count().value();
                rapidjson::StringBuffer                    buffer;
                rapidjson::Writer<rapidjson::StringBuffer> writer( buffer );

                taskSplitter.SplitTask( task,
                                        subTasks,
                                        modeljson.dump( -1 ),
                                        nChunks,
                                        false,
                                        pubsub_->GetHost()->getId().toBase58() );
            }
        }
        if ( subTasks.size() <= 0 )
        {
            return outcome::failure( Error::INVALID_JSON );
        }
        auto cut = sgns::TokenAmount::ParseMinions( dev_config_.Cut );
        if ( !cut )
        {
            return outcome::failure( cut.error() );
        }

        BOOST_OUTCOME_TRY( auto manager, GetTransactionManager() );
        BOOST_OUTCOME_TRY( auto result_pair,
                           manager->HoldEscrow( funds, std::string( dev_config_.Addr ), cut.value(), uuidstring ) );

        //TODO - Make it async to post the job data in case the transaction gets confirmed.
        auto [tx_id, escrow_data_pair] = result_pair;

        auto [escrow_path, escrow_data] = escrow_data_pair;

        task.set_escrow_path( escrow_path );

        BOOST_OUTCOME_TRY( auto crdt_transaction,
                           CreateEscrowInfoCRDTTransaction( escrow_path, std::move( escrow_data ) ) );

        auto enqueue_task_return = task_queue_->EnqueueTask( task, subTasks, crdt_transaction );
        if ( enqueue_task_return.has_failure() )
        {
            return outcome::failure( Error::DATABASE_WRITE_ERROR );
        }

        return tx_id;
    }

    uint64_t GeniusNode::GetProcessCost( std::shared_ptr<sgns::sgprocessing::ProcessingManager> &procmgr )
    {
        auto blockLen = procmgr->ParseBlockSize();
        if ( !blockLen )
        {
            node_logger_->error( "ParseBlockSize failed" );
            return 0;
        }
        node_logger_->trace( "Parsed totalBytes: {}", blockLen.value() );

        auto maybeGnusPrice = GetGNUSPrice();
        if ( !maybeGnusPrice )
        {
            node_logger_->error( "GetGNUSPrice failed" );
            return 0;
        }
        double gnusPrice = maybeGnusPrice.value();
        node_logger_->trace( "Retrieved GNUS price (USD/genius): {}", gnusPrice );

        auto rawMinionsRes = TokenAmount::CalculateCostMinions( blockLen.value(), gnusPrice );
        if ( !rawMinionsRes )
        {
            node_logger_->error( "TokenAmount::CalculateCostMinions failed" );
            return 0;
        }
        uint64_t rawMinions = rawMinionsRes.value();
        node_logger_->trace( "Raw cost in minions: {}", rawMinions );

        return rawMinions;
    }

    outcome::result<double> GeniusNode::GetGNUSPrice()
    {
        auto price = GetCoinprice( { "genius-ai" } );
        if ( !price )
        {
            return outcome::failure( Error::NO_PRICE );
        }
        return price.value()["genius-ai"];
    }

    std::string GeniusNode::GetVersion()
    {
        return sgns::version::SuperGeniusVersionFullString();
    }

    outcome::result<std::string> GeniusNode::MintTokens( uint64_t           amount,
                                                         const std::string &transaction_hash,
                                                         const std::string &chainid,
                                                         TokenID            tokenid,
                                                         std::string        destination )
    {
        if ( GetTransactionManagerState() != TransactionManager::State::READY )
        {
            node_logger_->error( "{}: Transaction manager not ready", __func__ );
            return outcome::failure( Error::TRANSACTIONS_NOT_READY );
        }
        if ( destination.empty() )
        {
            destination = account_->GetAddress();
        }

        BOOST_OUTCOME_TRY( auto manager, GetTransactionManager() );
        BOOST_OUTCOME_TRY( auto tx_id, manager->MintFunds( amount, transaction_hash, chainid, tokenid, destination ) );

        node_logger_->debug( "{}: Mint transaction {} sent ", __func__, tx_id );
        return tx_id;
    }

    outcome::result<std::pair<std::string, uint64_t>> GeniusNode::MintTokens( uint64_t           amount,
                                                                              const std::string &transaction_hash,
                                                                              const std::string &chainid,
                                                                              TokenID            tokenid,
                                                                              std::string        destination,
                                                                              std::chrono::milliseconds timeout )
    {
        BOOST_OUTCOME_TRY( auto tx_id,
                           MintTokens( amount, transaction_hash, chainid, tokenid, std::move( destination ) ) );

        BOOST_OUTCOME_TRY( auto finalized_result, WaitForFinalized( tx_id, timeout ) );

        auto [tx_status, duration] = finalized_result;

        if ( tx_status != TransactionManager::TransactionStatus::CONFIRMED )
        {
            node_logger_->error( "{}: transaction {} failed after {} ms", __func__, tx_id, duration );
            return outcome::failure( Error::TRANSACTION_FAILED );
        }

        node_logger_->debug( "{}: transaction {} sent in {} ms", __func__, tx_id, duration );
        return std::make_pair( tx_id, duration );
    }

    [[nodiscard]] std::pair<float, std::string> GeniusNode::GetInitializationStatus() const
    {
        auto node_state = state_.load();

        // Note: these weights are arbitrary and may be changed if some stage is taking too long
        switch ( node_state )
        {
            case NodeState::CREATING:
                return { 0.0f, "Creating node and initializing services" };

            case NodeState::MIGRATING_DATABASE:
            {
                std::lock_guard<std::mutex> lock( migration_mutex_ );
                if ( migration_manager_ )
                {
                    auto total   = migration_manager_->GetTotalSteps();
                    auto current = migration_manager_->GetCurrentStepIndex();
                    if ( total > 0 && current > 0 )
                    {
                        // Subdivide the 0.05 -- 0.30 range across migration steps
                        float pct = 0.05f + 0.25f * ( static_cast<float>( current ) / static_cast<float>( total ) );
                        return { pct, migration_manager_->GetCurrentStepDescription() };
                    }
                    return { 0.05f, "Preparing migration steps" };
                }
                return { 0.30f, "Migrating database" };
            }

            case NodeState::INITIALIZING_DATABASE:
                return { 0.40f, "Initializing CRDT database" };

            case NodeState::INITIALIZING_BLOCKCHAIN:
                return { 0.525f, "Initializing blockchain service" };

            case NodeState::INITIALIZING_TRANSACTIONS:
            {
                // 0.60 -- 0.90 range with sub-progress from TransactionManager state
                switch ( GetTransactionManagerState() )
                {
                    case TransactionManager::State::CREATING:
                        return { 0.60f, "Creating transaction manager" };
                    case TransactionManager::State::INITIALIZING:
                        return { 0.70f, "Initializing transaction manager" };
                    case TransactionManager::State::SYNCING:
                        return { 0.80f, "Syncing transactions" };
                    case TransactionManager::State::READY:
                        return { 0.90f, "Finalizing transaction manager" };
                }
                return { 0.60f, "Initializing transactions" };
            }

            case NodeState::INITIALIZING_PROCESSING:
                return { 0.945f, "Initializing processing modules" };

            case NodeState::READY:
                return { 1.0f, "Ready" };
        }

        return { 0.0f, "Unknown state" };
    }

    outcome::result<std::pair<std::string, uint64_t>> GeniusNode::TransferFunds( uint64_t                  amount,
                                                                                 const std::string        &destination,
                                                                                 TokenID                   token_id,
                                                                                 std::chrono::milliseconds timeout )
    {
        BOOST_OUTCOME_TRY( auto &&tx_id, TransferFunds( amount, destination, token_id ) );

        BOOST_OUTCOME_TRY( auto finalized_result, WaitForFinalized( tx_id, timeout ) );

        auto [tx_status, duration] = finalized_result;

        if ( tx_status != TransactionManager::TransactionStatus::CONFIRMED )
        {
            node_logger_->error( "{}: transaction {} failed after {} ms", __func__, tx_id, duration );
            return outcome::failure( Error::TRANSACTION_FAILED );
        }

        node_logger_->debug( "{}: transaction {} sent in {} ms", __func__, tx_id, duration );
        return std::make_pair( tx_id, duration );
    }

    outcome::result<std::string> GeniusNode::TransferFunds( uint64_t           amount,
                                                            const std::string &destination,
                                                            TokenID            token_id )
    {
        if ( GetTransactionManagerState() != TransactionManager::State::READY )
        {
            node_logger_->error( "{}: Transaction Manager is not ready", __func__ );
            return outcome::failure( Error::TRANSACTIONS_NOT_READY );
        }

        auto available_balance = account_->GetUTXOManager().GetBalance( token_id );
        if ( available_balance < amount )
        {
            node_logger_->error( "{}: insufficient local funds: requested={}, available={}",
                                 __func__,
                                 amount,
                                 available_balance );
            return outcome::failure( Error::INSUFFICIENT_FUNDS );
        }

        BOOST_OUTCOME_TRY( auto manager, GetTransactionManager() );
        BOOST_OUTCOME_TRY( auto tx_id, manager->TransferFunds( amount, destination, token_id ) );

        node_logger_->debug( "{}: transaction {} sent", __func__, tx_id );
        return tx_id;
    }

    outcome::result<std::string> GeniusNode::PayDev( uint64_t amount, TokenID token_id )
    {
        return TransferFunds( amount, dev_config_.Addr, token_id );
    }

    outcome::result<std::pair<std::string, uint64_t>> GeniusNode::PayDev( uint64_t                  amount,
                                                                          TokenID                   token_id,
                                                                          std::chrono::milliseconds timeout )
    {
        return TransferFunds( amount, dev_config_.Addr, token_id, timeout );
    }

    outcome::result<std::pair<TransactionManager::TransactionStatus, uint64_t>> GeniusNode::WaitForFinalized(
        const std::string        &tx_id,
        std::chrono::milliseconds timeout )
    {
        if ( GetTransactionManagerState() != TransactionManager::State::READY )
        {
            node_logger_->error( "{}: Transaction Manager is not ready", __func__ );

            return outcome::failure( Error::TRANSACTIONS_NOT_READY );
        }
        auto start_time = std::chrono::steady_clock::now();

        do
        {
            auto finalized_result = IsFinalized( tx_id );
            if ( finalized_result.has_value() )
            {
                auto end_time = std::chrono::steady_clock::now();
                auto duration = std::chrono::duration_cast<std::chrono::milliseconds>( end_time - start_time ).count();
                node_logger_->debug( "{}: Transaction finalized with status {} and duration of {} ms",
                                     __func__,
                                     static_cast<int>( finalized_result.value() ),
                                     duration );

                return std::make_pair( finalized_result.value(), duration );
            }

            std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) );
        } while ( std::chrono::steady_clock::now() - start_time < timeout );

        node_logger_->error( "{}: Transaction not finalized within timeout of {} ms", __func__, timeout.count() );

        return outcome::failure( Error::TRANSACTION_NOT_FINALIZED );
    }

    std::optional<TransactionManager::TransactionStatus> GeniusNode::IsFinalized( const std::string &tx_id )
    {
        auto manager_result = GetTransactionManager();

        if ( manager_result.has_failure() )
        {
            node_logger_->error( "{}: Failed to get Transaction Manager: {}",
                                 __func__,
                                 manager_result.error().message() );
            return std::nullopt;
        }
        auto manager   = manager_result.value();
        auto tx_status = manager->GetOutgoingStatusByTxId( tx_id );
        if ( tx_status == TransactionManager::TransactionStatus::CONFIRMED ||
             tx_status == TransactionManager::TransactionStatus::FAILED ||
             tx_status == TransactionManager::TransactionStatus::INVALID )
        {
            return tx_status;
        }
        return std::nullopt;
    }

    outcome::result<std::pair<std::string, uint64_t>> GeniusNode::PayEscrow(
        const std::string                       &escrow_path,
        const SGProcessing::TaskResult          &taskresult,
        std::shared_ptr<crdt::AtomicTransaction> crdt_transaction,
        std::chrono::milliseconds                timeout )
    {
        if ( GetTransactionManagerState() != TransactionManager::State::READY )
        {
            return outcome::failure( Error::TRANSACTIONS_NOT_READY );
        }
        auto start_time = std::chrono::steady_clock::now();

        BOOST_OUTCOME_TRY( auto manager, GetTransactionManager() );
        BOOST_OUTCOME_TRY( auto tx_id, manager->PayEscrow( escrow_path, taskresult, std::move( crdt_transaction ) ) );

        auto payescrow_result = manager->WaitForTransactionOutgoing( tx_id, timeout );

        auto end_time = std::chrono::steady_clock::now();
        auto duration = std::chrono::duration_cast<std::chrono::milliseconds>( end_time - start_time ).count();

        if ( payescrow_result != TransactionManager::TransactionStatus::CONFIRMED )
        {
            node_logger_->error( "Pay escrow transaction {} failed after {} ms", tx_id, duration );
            return outcome::failure( boost::system::errc::make_error_code( boost::system::errc::timed_out ) );
        }

        node_logger_->debug( "Pay escrow transaction {} completed in {} ms", tx_id, duration );
        return std::make_pair( tx_id, duration );
    }

    uint64_t GeniusNode::GetBalance()
    {
        return account_->GetUTXOManager().GetBalance();
    }

    uint64_t GeniusNode::GetBalance( const TokenID token_id )
    {
        return account_->GetUTXOManager().GetBalance( token_id );
    }

    uint64_t GeniusNode::GetBalance( const std::string &address )
    {
        return account_->GetUTXOManager().GetBalance( address );
    }

    uint64_t GeniusNode::GetBalance( const TokenID token_id, const std::string &address )
    {
        return account_->GetUTXOManager().GetBalance( token_id, address );
    }

    void GeniusNode::ProcessingDone( const std::string &task_id, const SGProcessing::TaskResult &taskresult )
    {
        static constexpr std::string_view FUNC = __func__;
        boost::asio::post( *processing_callback_pool_,
                           [weak_self( weak_from_this() ), task_id, taskresult]()
                           {
                               if ( auto strong = weak_self.lock() )
                               {
                                   strong->node_logger_->info( "[{}]{}: SUCCESS PROCESSING TASK {}",
                                                               strong->account_->GetAddress().substr( 0, 8 ),
                                                               FUNC,
                                                               task_id );

                                   if ( strong->task_queue_->IsTaskCompleted( task_id ) )
                                   {
                                       strong->node_logger_->info( "[{}]{}: Task Already completed!",
                                                                   strong->account_->GetAddress().substr( 0, 8 ),
                                                                   FUNC );
                                       return;
                                   }
                                   if ( strong->GetTransactionManagerState() != TransactionManager::State::READY )
                                   {
                                       strong->node_logger_->info( "[{}]{}: Transactions are not ready",
                                                                   strong->account_->GetAddress().substr( 0, 8 ),
                                                                   FUNC );
                                       return;
                                   }
                                   strong->node_logger_->info( "[{}]{}: Transactions READY",
                                                               strong->account_->GetAddress().substr( 0, 8 ),
                                                               FUNC );

                                   auto maybe_task = strong->task_queue_->GetTask( task_id );
                                   if ( maybe_task.has_failure() )
                                   {
                                       strong->node_logger_->info( "[{}]{}: Task id {} not found in DB",
                                                                   strong->account_->GetAddress().substr( 0, 8 ),
                                                                   FUNC,
                                                                   task_id );
                                       return;
                                   }

                                   auto escrow_path          = maybe_task.value().escrow_path();
                                   auto complete_task_result = strong->task_queue_->CompleteTask( task_id, taskresult );
                                   if ( complete_task_result.has_failure() )
                                   {
                                       strong->node_logger_->error( "[{}]{}: Unable to complete task: {} ",
                                                                    strong->account_->GetAddress().substr( 0, 8 ),
                                                                    FUNC,
                                                                    task_id );
                                       return;
                                   }

                                   strong->node_logger_->info( "[{}]{}: Creating the payout transactions",
                                                               strong->account_->GetAddress().substr( 0, 8 ),
                                                               FUNC );

                                   auto pay_result = strong->PayEscrow( escrow_path,
                                                                        taskresult,
                                                                        std::move( complete_task_result.value() ) );
                                   if ( pay_result.has_failure() )
                                   {
                                       strong->node_logger_->error( "[{}]{}: Escrow not paid for task: {} ",
                                                                    strong->account_->GetAddress().substr( 0, 8 ),
                                                                    FUNC,
                                                                    task_id );
                                       return;
                                   }

                                   strong->node_logger_->info( "[{}]{}: Paid for task: {}",
                                                               strong->account_->GetAddress().substr( 0, 8 ),
                                                               FUNC,
                                                               task_id );
                               }
                           } );
    }

    void GeniusNode::ProcessingError( const std::string &task_id )
    {
        boost::asio::post( *processing_callback_pool_,
                           [weak_self( weak_from_this() ), task_id]()
                           {
                               if ( auto strong = weak_self.lock() )
                               {
                                   strong->node_logger_->error( "[ {} ] ERROR PROCESSING SUBTASK ",
                                                                strong->account_->GetAddress().substr( 0, 8 ),
                                                                task_id );
                               }
                           } );
    }

    void GeniusNode::PrintDataStore() const
    {
        if ( tx_globaldb_ )
        {
            tx_globaldb_->PrintDataStore();
        }
        else
        {
            node_logger_->error( "{}: GlobalDB is not initialized", __func__ );
        }
    }

    void GeniusNode::StopProcessing()
    {
        if ( processing_service_ )
        {
            processing_service_->StopProcessing();
        }
        else
        {
            node_logger_->error( "{}: Processing service is not initialized", __func__ );
        }
    }

    void GeniusNode::StartProcessing()
    {
        if ( processing_service_ )
        {
            processing_service_->StartProcessing( processing_grid_chanel_topic_ );
        }
        else
        {
            node_logger_->error( "{}: Processing service is not initialized", __func__ );
        }
    }

    outcome::result<std::map<std::string, double>> GeniusNode::GetCoinprice( const std::vector<std::string> &tokenIds )
    {
        auto                          currentTime = std::chrono::system_clock::now();
        std::map<std::string, double> result;
        std::vector<std::string>      tokensToFetch;
        // Determine which tokens need to be fetched
        for ( const auto &tokenId : tokenIds )
        {
            auto it = m_tokenPriceCache.find( tokenId );

            if ( it != m_tokenPriceCache.end() && ( currentTime - it->second.lastUpdate ) < m_cacheValidityDuration )
            {
                // Use cached price if it's still valid
                result[tokenId] = it->second.price;
            }
            else
            {
                // Add to the list of tokens that need fresh data
                tokensToFetch.push_back( tokenId );
            }
        }

        // If we have tokens to fetch and we're not rate limited
        if ( !tokensToFetch.empty() && ( currentTime - m_lastApiCall ) >= MIN_API_CALL_INTERVAL )
        {
            sgns::CoinGeckoPriceRetriever retriever;
            auto                          newPricesResult = retriever.getCurrentPrices( tokensToFetch );

            if ( newPricesResult )
            {
                auto &newPrices = newPricesResult.value();
                m_lastApiCall   = currentTime;

                // Update the cache and result with new prices
                for ( const auto &[token, price] : newPrices )
                {
                    m_tokenPriceCache[token] = { price, currentTime };
                    result[token]            = price;
                }
            }
            else
            {
                // Handle the error case
                // If we have some cached data, continue with what we have
                if ( result.empty() )
                {
                    // Only return error if we have no data at all
                    return newPricesResult.error();
                }
                // Otherwise, continue with partial data and log the error
                // log("Failed to fetch prices for some tokens: " + newPricesResult.error().message());
            }
        }

        return result;
    }

    outcome::result<std::map<std::string, std::map<int64_t, double>>> GeniusNode::GetCoinPriceByDate(
        const std::vector<std::string> &tokenIds,
        const std::vector<int64_t>     &timestamps )
    {
        sgns::CoinGeckoPriceRetriever retriever;
        return retriever.getHistoricalPrices( tokenIds, timestamps );
    }

    outcome::result<std::map<std::string, std::map<int64_t, double>>> GeniusNode::GetCoinPricesByDateRange(
        const std::vector<std::string> &tokenIds,
        int64_t                         from,
        int64_t                         to )
    {
        sgns::CoinGeckoPriceRetriever retriever;
        return retriever.getHistoricalPriceRange( tokenIds, from, to );
    }

    outcome::result<std::string> GeniusNode::FormatTokens( uint64_t amount, TokenID tokenId )
    {
        if ( tokenId.IsGNUS() )
        {
            return TokenAmount::FormatMinions( amount );
        }
        if ( tokenId.Equals( dev_config_.TokenID ) )
        {
            auto child = TokenAmount::ConvertToChildToken( amount, dev_config_.TokenValueInGNUS );
            if ( !child )
            {
                return outcome::failure( child.error() );
            }
            return child.value();
        }
        return outcome::failure( make_error_code( GeniusNode::Error::TOKEN_ID_MISMATCH ) );
    }

    outcome::result<uint64_t> GeniusNode::ParseTokens( const std::string &str, TokenID tokenId )
    {
        if ( tokenId.IsGNUS() )
        {
            return TokenAmount::ParseMinions( str );
        }
        if ( tokenId.Equals( dev_config_.TokenID ) )
        {
            return TokenAmount::ConvertFromChildToken( str, dev_config_.TokenValueInGNUS );
        }
        return outcome::failure( make_error_code( GeniusNode::Error::TOKEN_ID_MISMATCH ) );
    }

    std::vector<std::vector<uint8_t>> GeniusNode::GetInTransactions() const
    {
        auto manager_result = GetTransactionManager();
        if ( !manager_result.has_value() )
        {
            return {};
        }
        return manager_result.value()->GetInTransactions();
    }

    std::vector<std::vector<uint8_t>> GeniusNode::GetOutTransactions() const
    {
        auto manager_result = GetTransactionManager();
        if ( !manager_result.has_value() )
        {
            return {};
        }
        return manager_result.value()->GetOutTransactions();
    }

    const std::vector<std::vector<uint8_t>> GeniusNode::GetTransactions(
        std::optional<TransactionManager::TransactionStatus> tx_status ) const
    {
        auto manager_result = GetTransactionManager();
        if ( !manager_result.has_value() )
        {
            return {};
        }
        return manager_result.value()->GetTransactions( tx_status );
    }

    std::string GeniusNode::GetAddress() const
    {
        std::string address = "UNVAILABLE";
        if ( account_ )
        {
            address = account_->GetAddress();
        }
        return address;
    }

    // Wait for a transaction to be processed with a timeout
    TransactionManager::TransactionStatus GeniusNode::WaitForTransactionOutgoing( const std::string        &txId,
                                                                                  std::chrono::milliseconds timeout )
    {
        auto manager_result = GetTransactionManager();
        if ( !manager_result.has_value() )
        {
            return TransactionManager::TransactionStatus::INVALID;
        }
        return manager_result.value()->WaitForTransactionOutgoing( txId, timeout );
    }

    // Wait for a transaction to be processed with a timeout
    TransactionManager::TransactionStatus GeniusNode::WaitForTransactionIncoming( const std::string        &txId,
                                                                                  std::chrono::milliseconds timeout )
    {
        auto manager_result = GetTransactionManager();
        if ( !manager_result.has_value() )
        {
            return TransactionManager::TransactionStatus::INVALID;
        }
        return manager_result.value()->WaitForTransactionIncoming( txId, timeout );
    }

    TransactionManager::TransactionStatus GeniusNode::WaitForEscrowRelease( const std::string        &originalEscrowId,
                                                                            std::chrono::milliseconds timeout )
    {
        auto manager_result = GetTransactionManager();
        if ( !manager_result.has_value() )
        {
            return TransactionManager::TransactionStatus::INVALID;
        }
        return manager_result.value()->WaitForEscrowRelease( originalEscrowId, timeout );
    }

    outcome::result<std::shared_ptr<TransactionManager>> GeniusNode::GetTransactionManager() const
    {
        if ( !transaction_manager_ )
        {
            return outcome::failure( Error::TRANSACTIONS_NOT_READY );
        }
        return transaction_manager_;
    }

    outcome::result<std::shared_ptr<crdt::AtomicTransaction>> GeniusNode::CreateEscrowInfoCRDTTransaction(
        std::string        path,
        sgns::base::Buffer value )
    {
        auto crdt_transaction = tx_globaldb_->BeginTransaction();

        sgns::crdt::HierarchicalKey key( path );

        BOOST_OUTCOME_TRY( crdt_transaction->Put( std::move( key ), std::move( value ) ) );

        return crdt_transaction;
    }

    TransactionManager::State GeniusNode::GetTransactionManagerState() const
    {
        auto manager_result = GetTransactionManager();
        if ( !manager_result.has_value() )
        {
            return TransactionManager::State::CREATING;
        }
        return manager_result.value()->GetState();
    }

    void GeniusNode::SendTransactionAndProof( std::shared_ptr<GeniusTransaction> tx, std::vector<uint8_t> proof )
    {
        auto manager_result = GetTransactionManager();
        if ( manager_result.has_value() )
        {
            manager_result.value()->EnqueueTransaction( std::make_pair( tx, proof ) );
        }
        else
        {
            node_logger_->error( "{}: Transactions not ready", __func__ );
        }
    }

    void GeniusNode::ConfigureTransactionFilterTimeoutsMs( uint64_t timeframe_limit_ms, uint64_t mutability_window_ms )
    {
        auto manager_result = GetTransactionManager();
        if ( !manager_result.has_value() )
        {
            node_logger_->error( "{}: Transactions not ready", __func__ );
            return;
        }
        auto manager = manager_result.value();
        manager->SetTimeFrameToleranceMs( timeframe_limit_ms );
        manager->SetMutabilityWindowMs( mutability_window_ms );
    }

    void GeniusNode::RotateLogFiles( const std::string &base_path )
    {
        std::filesystem::path basePath( base_path );

        // Define log file paths
        std::filesystem::path sgnslog_path      = basePath / "sgnslog.log";
        std::filesystem::path sgnslog2_path     = basePath / "sgnslog2.log";
        std::filesystem::path sgnslog_old_path  = basePath / "sgnslog.old.log";
        std::filesystem::path sgnslog2_old_path = basePath / "sgnslog2.old.log";

        try
        {
            // Handle sgnslog.log rotation
            if ( std::filesystem::exists( sgnslog_path ) )
            {
                // Delete old backup if it exists
                if ( std::filesystem::exists( sgnslog_old_path ) )
                {
                    std::filesystem::remove( sgnslog_old_path );
                    std::cout << "Deleted old backup: " << sgnslog_old_path << std::endl;
                }

                // Rename current log to backup
                std::filesystem::rename( sgnslog_path, sgnslog_old_path );
                std::cout << "Rotated log: " << sgnslog_path << " -> " << sgnslog_old_path << std::endl;
            }

            // Handle sgnslog2.log rotation
            if ( std::filesystem::exists( sgnslog2_path ) )
            {
                // Delete old backup if it exists
                if ( std::filesystem::exists( sgnslog2_old_path ) )
                {
                    std::filesystem::remove( sgnslog2_old_path );
                    std::cout << "Deleted old backup: " << sgnslog2_old_path << std::endl;
                }

                // Rename current log to backup
                std::filesystem::rename( sgnslog2_path, sgnslog2_old_path );
                std::cout << "Rotated log: " << sgnslog2_path << " -> " << sgnslog2_old_path << std::endl;
            }
        }
        catch ( const std::filesystem::filesystem_error &e )
        {
            std::cerr << "Log rotation error: " << e.what() << std::endl;
            // Continue execution - don't let log rotation failure stop the application
        }
    }

    void GeniusNode::ConfigureRpcEndpoint( const std::string &chain_id, std::vector<WeightedRpcEndpoint> endpoints )
    {
        if ( !transaction_manager_ )
        {
            node_logger_->warn( "ConfigureRpcEndpoint called before transaction manager is ready" );
            return;
        }
        transaction_manager_->GetPublicChainInputValidator().SetRpcEndpoints( chain_id, std::move( endpoints ) );
        node_logger_->info( "Configured {} RPC endpoint(s) for chain {}", endpoints.size(), chain_id );
    }

    std::filesystem::path GeniusNode::ResolveBridgeChainsConfigPath() const
    {
        std::filesystem::path bridge_chains_path;

        // Primary: use DevConfig_st BaseWritePath (writable on all platforms including Android)
        if ( !dev_config_.BaseWritePath.empty() )
        {
            bridge_chains_path = std::filesystem::path( dev_config_.BaseWritePath ) / "bridge_chains_config.json";
        }

        // Fallback: binary directory (finds CMake-installed or copied default)
        if ( bridge_chains_path.empty() || !std::filesystem::exists( bridge_chains_path ) )
        {
            try
            {
                auto bin_dir = boost::dll::program_location().parent_path();
                auto candidate = std::filesystem::path( bin_dir.string() ) / "bridge_chains_config.json";
                if ( std::filesystem::exists( candidate ) )
                {
                    bridge_chains_path = std::move( candidate );
                }
            }
            catch ( const std::exception &e )
            {
                node_logger_->warn( "ResolveBridgeChainsConfigPath: cannot determine binary location ({}), "
                                    "falling back to CWD",
                                    e.what() );
            }
        }

        // Final fallback: current working directory
        if ( bridge_chains_path.empty() || !std::filesystem::exists( bridge_chains_path ) )
        {
            bridge_chains_path = std::filesystem::current_path() / "bridge_chains_config.json";
        }

        return bridge_chains_path;
    }

    void GeniusNode::OnRpcEndpointsReady( std::vector<ChainContractPair> chains )
    {
        node_logger_->info( "GeniusNode: received {} chain(s) from provider — stored for catch-up scan",
                            chains.size() );
        catchup_chains_ = std::move( chains );

        // P2: If the catchup scan hasn't run yet (READY may have fired before
        // chains were populated), trigger it now that chains are available.
        if ( !catchup_scan_done_ && !catchup_chains_.empty() )
        {
            catchup_scan_done_ = true;
            node_logger_->info( "GeniusNode: chains arrived — triggering deferred catchup scan" );
            boost::asio::post( *io_,
                               [weak_self = weak_from_this()]
                               {
                                   if ( auto strong = weak_self.lock() )
                                   {
                                       strong->PerformStartupCatchupScan();
                                   }
                               } );
        }
    }

    void GeniusNode::InitializeAndStartBridge()
    {
        node_logger_->info( "InitializeAndStartBridge: thin orchestrator (D-01, D-03)" );

        // 1. Resolve config path (stays in GeniusNode per D-01)
        auto config_path = ResolveBridgeChainsConfigPath();
        node_logger_->info( "InitializeAndStartBridge: loading bridge chain config from {}",
                            config_path.string() );

        // 2. Construct provider
        rpc_endpoint_provider_ = std::make_unique<ChainRpcEndpointProvider>();

        // 3. Subscribe observers BEFORE post (D-03 ordering)
        rpc_endpoint_provider_->AddObserver( *this );
        if ( bridge_relayer_ )
        {
            rpc_endpoint_provider_->AddObserver( *bridge_relayer_ );
        }

        // 4. Post Initialize() to io_context — non-blocking
        boost::asio::post( *io_,
                           [weak_self = weak_from_this(),
                            config_path = std::move( config_path )]()
                           {
                               if ( auto strong = weak_self.lock() )
                               {
                                   auto &validator = strong->transaction_manager_->GetPublicChainInputValidator();
                                   strong->rpc_endpoint_provider_->Initialize( config_path, validator );
                               }
                           } );
    }

    void GeniusNode::PerformStartupCatchupScan()
    {
        node_logger_->info( "CatchUpScan: starting historical burn scan (D-20)" );

        if ( !transaction_manager_ )
        {
            node_logger_->warn( "CatchUpScan: transaction_manager_ not available — skipping" );
            return;
        }

        // D-11/D-12: catch-up scan queries BOTH v1 (BridgeSourceBurned) and v2
        // (BridgeOutInitiated) topic0 hashes.  EventFilter topics are
        // single-value-per-position (no OR-array support in serialization), so
        // two separate eth_getLogs calls are made per chain and the results are
        // merged with tx_hash deduplication.
        const std::string event_sig     = "BridgeSourceBurned(address,uint256,uint256,uint256,uint256,bytes)";
        auto              topic0_hash   = eth::abi::event_signature_hash( event_sig );
        std::string       topic0_hex    = rlp::base::parse::hex_bytes( topic0_hash.data(), topic0_hash.size() );

        // Bridge V2: bytes32 sgnsDestination + bool destinationYOdd (parity bit).
        const std::string event_sig_v2  = "BridgeOutInitiated(address,uint256,uint256,uint256,uint256,bytes32,bool)";
        auto              topic0_hash_v2 = eth::abi::event_signature_hash( event_sig_v2 );
        std::string       topic0_hex_v2  = rlp::base::parse::hex_bytes( topic0_hash_v2.data(), topic0_hash_v2.size() );

        // D-20: Cap scan depth — default 10,000 blocks
        const uint64_t scan_depth = kBridgeCatchupScanDepth;

        size_t total_backfilled = 0;
        size_t total_skipped    = 0;
        size_t chains_scanned   = 0;

        // Iterate chains discovered via OnRpcEndpointsReady observer callback (D-02, D-03)
        // Uses catchup_chains_ populated by the observer — no hardcoded maps.
        auto &validator = transaction_manager_->GetPublicChainInputValidator();

        for ( const auto &chain_entry : catchup_chains_ )
        {
            auto rpc_url = validator.GetFirstRpcUrl( std::to_string( chain_entry.chain_id ) );
            if ( !rpc_url.has_value() )
            {
                node_logger_->debug( "CatchUpScan: no RPC endpoint for chain {} (id={}) — skipping",
                                     chain_entry.chain_name,
                                     chain_entry.chain_id );
                continue;
            }

            const std::string &contract_addr_str = chain_entry.contract_address;

            // RPC transport with 10-second timeout per T-05-13
            eth::rpc::RpcHttpTransportOptions opts;
            opts.timeout = std::chrono::seconds( 10 );
            eth::rpc::RpcHttpTransport transport( *rpc_url, opts );

            ++chains_scanned;

            // Parse contract address (eth::Address = rlp::Address = array<uint8_t, 20>)
            rlp::Address contract_addr{};
            if ( !rlp::base::parse::hex_array( contract_addr_str, contract_addr ) )
            {
                node_logger_->warn( "CatchUpScan: invalid bridge address {} for chain {}",
                                    contract_addr_str,
                                    chain_entry.chain_name );
                continue;
            }

            // Parse topic0 hex to Hash256 for EventFilter
            rlp::Hash256 topic0_hash256{};
            if ( !rlp::base::parse::hex_array( topic0_hex, topic0_hash256 ) )
            {
                node_logger_->warn( "CatchUpScan: invalid topic0 {} for chain {}", topic0_hex, chain_entry.chain_name );
                continue;
            }

            // Build EventFilter for eth_getLogs (v1 topic0)
            eth::EventFilter filter;
            filter.addresses.push_back( contract_addr );
            filter.topics.push_back( topic0_hash256 );

            // D-20: Query current block number to compute scan start
            // Scan from (current_block - scan_depth) to latest
            constexpr uint64_t kBlockNumberRequestId = 99;
            auto              block_number_req = eth::rpc::make_json_rpc_request( "eth_blockNumber",
                                                                                   boost::json::array{},
                                                                                   kBlockNumberRequestId );
            auto              block_number_resp = transport.call( block_number_req );
            uint64_t          current_block     = 0;

            if ( block_number_resp.has_value() )
            {
                auto parsed_block = eth::rpc::parse_block_number_response( *block_number_resp );
                if ( parsed_block.has_value() )
                {
                    current_block = *parsed_block;
                }
            }

            if ( current_block == 0 )
            {
                node_logger_->warn( "CatchUpScan: failed to query block number for chain {} — skipping",
                                    chain_entry.chain_name );
                continue;
            }

            // from_block: cap at scan_depth blocks from latest (D-20)
            // to_block: 0 = "latest"
            const uint64_t from_block = current_block > scan_depth ? current_block - scan_depth : 0;

            node_logger_->debug( "CatchUpScan: scanning chain {} from block {} to latest (current={}, depth={})",
                                 chain_entry.chain_name,
                                 from_block,
                                 current_block,
                                 scan_depth );

            // D-12: Shared tx_hash deduplication set across v1 and v2 query
            // results. A burn appearing in both queries (e.g. contract-version
            // overlap) is processed only once.
            std::set<std::string> seen_tx_hashes;

            // Helper: process one batch of logs (v1 or v2) through the dedup +
            // UTXO-check + MintFunds pipeline. `is_v2` selects the destination
            // construction path (D-13).
            auto process_logs = [&]( const std::vector<eth::rpc::RpcLog> &rpc_logs, bool is_v2 ) {
                // Insert burn UTXO as READY with UTXO_BRIDGE type (D-20)
                // MintFunds will later transition it to RESERVED → CONSUMED
                auto &utxo_mgr = account_->GetUTXOManager();

                for ( const auto &rpc_log : rpc_logs )
                {
                    std::string tx_hash_hex = rlp::base::parse::hex_bytes( rpc_log.tx_hash.data(),
                                                                           rpc_log.tx_hash.size() );

                    // Deduplicate across v1 and v2 query results (D-12).
                    if ( !seen_tx_hashes.insert( tx_hash_hex ).second )
                    {
                        ++total_skipped;
                        node_logger_->debug( "CatchUpScan: burn tx {} already seen this scan — skipping",
                                             tx_hash_hex );
                        continue;
                    }

                    // Parse tx_hash_hex to Hash256 for UTXO query
                    base::Hash256 burn_tx_hash;
                    if ( !rlp::base::parse::hex_array( tx_hash_hex, burn_tx_hash ) )
                    {
                        ++total_skipped;
                        continue;
                    }

                    // D-20: Check UTXO set (not in-memory TransactionManager state)
                    // Check if burn outpoint (tx_hash, output_idx=0) is already consumed or reserved
                    if ( utxo_mgr.IsOutPointConsumed( burn_tx_hash, 0 ) )
                    {
                        ++total_skipped;
                        node_logger_->debug( "CatchUpScan: burn tx {} already CONSUMED — skipping", tx_hash_hex );
                        continue;
                    }
                    if ( utxo_mgr.IsOutPointReserved( burn_tx_hash, 0 ) )
                    {
                        ++total_skipped;
                        node_logger_->debug( "CatchUpScan: burn tx {} already RESERVED — skipping", tx_hash_hex );
                        continue;
                    }

                    // D-13: For v2 logs, decompress the 32-byte X-only key from
                    // the event data before calling MintFunds. V1 logs keep the
                    // existing bare-bones empty destination (unchanged path).
                    std::string destination;
                    if ( is_v2 )
                    {
                        // BridgeOutInitiated(address,uint256,uint256,uint256,uint256,bytes32,bool):
                        // sender is indexed (topics[1]); the remaining 6 params
                        // are ABI-encoded in log.data. Non-indexed layout:
                        //   [0] id (uint256) [1] amount (uint256)
                        //   [2] srcChainID (uint256) [3] destChainID (uint256)
                        //   [4] sgnsDestination (bytes32 — the X-only key)
                        //   [5] destinationYOdd (bool — Y parity: false=0x02, true=0x03)
                        static const std::string kEventSigV2 =
                            "BridgeOutInitiated(address,uint256,uint256,uint256,uint256,bytes32,bool)";
                        const auto all_params = eth::cli::event_registry().params_for( kEventSigV2 );

                        std::vector<eth::abi::AbiParam> non_indexed_params;
                        non_indexed_params.reserve( all_params.size() );
                        for ( const auto &p : all_params )
                        {
                            if ( !p.indexed )
                            {
                                non_indexed_params.push_back( p );
                            }
                        }

                        auto decoded = eth::abi::decode_log_data( rpc_log.log.data, non_indexed_params );
                        // Expect 6 non-indexed params (id, amount, srcChainID,
                        // destChainID, sgnsDestination, destinationYOdd).
                        static constexpr size_t kExpectedV2DataParams = 6;
                        static constexpr size_t kSgnsDestinationIndex  = 4;
                        static constexpr size_t kDestinationYOddIndex  = 5;
                        if ( !decoded.has_value() || decoded.value().size() < kExpectedV2DataParams )
                        {
                            ++total_skipped;
                            node_logger_->warn( "CatchUpScan v2: failed to decode log data for tx {} — skipping",
                                                tx_hash_hex );
                            continue;
                        }

                        // sgnsDestination is bytes32 → codec::Hash256 variant.
                        if ( !std::holds_alternative<eth::codec::Hash256>(
                                decoded.value()[kSgnsDestinationIndex] ) )
                        {
                            ++total_skipped;
                            node_logger_->warn( "CatchUpScan v2: sgnsDestination not bytes32 for tx {} — skipping",
                                                tx_hash_hex );
                            continue;
                        }
                        const auto &x_bytes = std::get<eth::codec::Hash256>(
                            decoded.value()[kSgnsDestinationIndex] );

                        // destinationYOdd is bool.
                        if ( !std::holds_alternative<bool>( decoded.value()[kDestinationYOddIndex] ) )
                        {
                            ++total_skipped;
                            node_logger_->warn( "CatchUpScan v2: destinationYOdd not bool for tx {} — skipping",
                                                tx_hash_hex );
                            continue;
                        }
                        const bool destination_y_odd = std::get<bool>( decoded.value()[kDestinationYOddIndex] );

                        auto dest_opt = eth::DecompressXOnlyPubkey( x_bytes, destination_y_odd );
                        if ( !dest_opt.has_value() )
                        {
                            ++total_skipped;
                            node_logger_->warn( "CatchUpScan v2: X-only decompression failed for tx {} — skipping",
                                                tx_hash_hex );
                            continue;
                        }
                        destination = std::move( *dest_opt );
                    }

                    try
                    {
                        auto result = transaction_manager_->MintFunds(
                            0,           // amount — derived from burn event data on full decode
                            tx_hash_hex, // source-chain tx hash
                            std::to_string( chain_entry.chain_id ),
                            dev_config_.TokenID,
                            destination );

                        if ( result.has_value() )
                        {
                            ++total_backfilled;
                            node_logger_->info( "CatchUpScan: backfilled historical burn {} on chain {}",
                                                tx_hash_hex,
                                                chain_entry.chain_name );
                        }
                        else
                        {
                            node_logger_->debug( "CatchUpScan: MintFunds returned no value for tx {} — "
                                                 "likely already processed",
                                                 tx_hash_hex );
                            ++total_skipped;
                        }
                    }
                    catch ( const std::exception &e )
                    {
                        node_logger_->debug( "CatchUpScan: MintFunds threw for tx {}: {} — skipping",
                                             tx_hash_hex,
                                             e.what() );
                        ++total_skipped;
                    }
                }
            };

            // ── v1 query (BridgeSourceBurned) ───────────────────────────────
            auto v1_request  = eth::rpc::make_get_logs_request( filter, from_block, 0, 1 );
            auto v1_response = transport.call( v1_request );

            if ( !v1_response.has_value() )
            {
                // T-05-13: RPC timeout or failure — log and continue (best-effort).
                // A failed v1 query does not block the v2 query or other chains.
                node_logger_->warn( "CatchUpScan: v1 RPC call failed for chain {} (timeout/refused)",
                                    chain_entry.chain_name );
            }
            else
            {
                auto v1_logs = eth::rpc::parse_get_logs_response( *v1_response );
                if ( !v1_logs.has_value() )
                {
                    node_logger_->warn( "CatchUpScan: failed to parse v1 getLogs response for chain {}",
                                        chain_entry.chain_name );
                }
                else
                {
                    process_logs( v1_logs.value(), /*is_v2=*/false );
                }
            }

            // ── v2 query (BridgeOutInitiated) ───────────────────────────────
            // Same contract address + block range, v2 topic0 hash (D-12).
            rlp::Hash256 topic0_hash256_v2{};
            if ( !rlp::base::parse::hex_array( topic0_hex_v2, topic0_hash256_v2 ) )
            {
                node_logger_->warn( "CatchUpScan: invalid v2 topic0 {} for chain {}",
                                    topic0_hex_v2,
                                    chain_entry.chain_name );
            }
            else
            {
                eth::EventFilter filter_v2;
                filter_v2.addresses.push_back( contract_addr );
                filter_v2.topics.push_back( topic0_hash256_v2 );

                // Unique request id so v2 does not collide with v1 (id=1) or
                // blockNumber (id=99).
                constexpr uint64_t kV2LogsRequestId = 2;
                auto v2_request  = eth::rpc::make_get_logs_request( filter_v2, from_block, 0, kV2LogsRequestId );
                auto v2_response = transport.call( v2_request );

                if ( !v2_response.has_value() )
                {
                    node_logger_->warn( "CatchUpScan: v2 RPC call failed for chain {} (timeout/refused)",
                                        chain_entry.chain_name );
                }
                else
                {
                    auto v2_logs = eth::rpc::parse_get_logs_response( *v2_response );
                    if ( !v2_logs.has_value() )
                    {
                        node_logger_->warn( "CatchUpScan: failed to parse v2 getLogs response for chain {}",
                                            chain_entry.chain_name );
                    }
                    else
                    {
                        process_logs( v2_logs.value(), /*is_v2=*/true );
                    }
                }
            }
        }

        node_logger_->info( "CatchUpScan: scanned {} chains — {} historical burns backfilled, "
                            "{} skipped (already processed)",
                            chains_scanned,
                            total_backfilled,
                            total_skipped );
    }

    TransactionManager::TransactionStatus GeniusNode::GetTransactionStatus( const std::string &txId ) const
    {
        auto manager_result = GetTransactionManager();
        if ( !manager_result.has_value() )
        {
            node_logger_->error( "{}: Transactions not ready", __func__ );
            return TransactionManager::TransactionStatus::INVALID;
        }
        auto manager = manager_result.value();
        auto retval  = manager->GetOutgoingStatusByTxId( txId );
        if ( retval == TransactionManager::TransactionStatus::INVALID )
        {
            retval = manager->GetIncomingStatusByTxId( txId );
        }
        return retval;
    }

    void GeniusNode::TransactionStateChanged( TransactionManager::State old_state, TransactionManager::State new_state )
    {
        node_logger_->info( "Transaction Manager state changed from {} to {}", old_state, new_state );

        switch ( new_state )
        {
            case TransactionManager::State::READY:
                // D-20: Trigger startup catch-up scan once after CRDT sync completes.
                // Non-blocking — posted to io_ so the node proceeds normally.
                // P2: Require chains to be populated before marking the scan done.
                // Prevents permanent skip when READY fires before OnRpcEndpointsReady
                // populates catchup_chains_.
                if ( !catchup_scan_done_ && !catchup_chains_.empty() )
                {
                    catchup_scan_done_ = true;
                    boost::asio::post( *io_,
                                       [weak_self = weak_from_this()]
                                       {
                                           if ( auto strong = weak_self.lock() )
                                           {
                                               strong->PerformStartupCatchupScan();
                                           }
                                       } );
                }

                if ( processing_service_ == nullptr )
                {
                    StateTransition( NodeState::INITIALIZING_PROCESSING );
                }
                else if ( isprocessor_ )
                {
                    StartProcessing();
                }
                break;
            case TransactionManager::State::INITIALIZING:
            case TransactionManager::State::SYNCING:
                if ( isprocessor_ )
                {
                    StopProcessing();
                }
                break;
            case TransactionManager::State::CREATING:
            default:
                break;
        }
    }

    void GeniusNode::SetAuthorizedFullNodeAddress( const std::string &pub_address )
    {
        Blockchain::SetAuthorizedFullNodeAddress( pub_address );
        if ( blockchain_ )
        {
            blockchain_->Start();
        }
    }

    const std::string &GeniusNode::GetAuthorizedFullNodeAddress() const
    {
        return Blockchain::GetAuthorizedFullNodeAddress();
    }

    // ── Bootstrap Fullnode Reconnection ──

    boost::optional<libp2p::peer::PeerInfo> GeniusNode::ParsePeerInfoFromString( const std::string &multiaddr_str )
    {
        if ( multiaddr_str.empty() )
        {
            return boost::none;
        }

        auto ma_res = libp2p::multi::Multiaddress::create( multiaddr_str );
        if ( !ma_res )
        {
            return boost::none;
        }

        auto ma = std::move( ma_res.value() );

        auto peer_id_str = ma.getPeerId();
        if ( !peer_id_str )
        {
            return boost::none;
        }

        auto peer_id_res = libp2p::peer::PeerId::fromBase58( *peer_id_str );
        if ( !peer_id_res )
        {
            return boost::none;
        }

        std::vector<libp2p::multi::Multiaddress> multiaddresses;
        multiaddresses.push_back( std::move( ma ) );

        return libp2p::peer::PeerInfo{ peer_id_res.value(), std::move( multiaddresses ) };
    }

    void GeniusNode::InitBootstrapReconnect()
    {
        if ( bootstrap_fullnode_ids_.empty() && bootstrap_peer_ids_.empty() )
        {
            node_logger_->debug( "No bootstrap peers configured, skipping reconnect subscription" );
            return;
        }

        auto host = pubsub_->GetHost();
        bootstrap_disconnect_subscription_.emplace(
            host->getBus().getChannel<libp2p::event::network::OnPeerDisconnectedChannel>().subscribe(
                [weak_self = weak_from_this()]( const libp2p::peer::PeerId &peer_id )
                {
                    if ( auto strong = weak_self.lock() )
                    {
                        if ( strong->shutdown_started_.load() )
                        {
                            return;
                        }
                        bool is_bootstrap = strong->bootstrap_fullnode_ids_.count( peer_id ) ||
                                            strong->bootstrap_peer_ids_.count( peer_id );
                        if ( is_bootstrap )
                        {
                            const char *kind = strong->bootstrap_fullnode_ids_.count( peer_id ) ? "fullnode" : "peer";
                            strong->node_logger_->info( "Bootstrap {} {} disconnected, scheduling reconnect",
                                                        kind,
                                                        peer_id.toBase58() );
                            unsigned attempt = 0;
                            {
                                std::lock_guard<std::mutex> lock( strong->reconnect_mutex_ );
                                auto                        it = strong->reconnect_attempts_.find( peer_id );
                                if ( it != strong->reconnect_attempts_.end() )
                                {
                                    attempt = it->second;
                                }
                            }
                            strong->ScheduleBootstrapReconnect( peer_id, attempt );
                        }
                    }
                } ) );

        node_logger_->info( "Subscribed to disconnect events for {} bootstrap fullnode(s) + {} peer(s)",
                            bootstrap_fullnode_ids_.size(),
                            bootstrap_peer_ids_.size() );
    }

    void GeniusNode::StartBootstrapHealthCheck()
    {
        if ( bootstrap_fullnode_infos_.empty() && bootstrap_peer_infos_.empty() )
        {
            node_logger_->debug( "No bootstrap peers to health-check" );
            return;
        }
        ScheduleNextHealthCheck();
        node_logger_->info( "Bootstrap health check started (interval: {}s, tracking {} fullnodes + {} peers)",
                            reconnect_config_.health_check_interval.count(),
                            bootstrap_fullnode_infos_.size(),
                            bootstrap_peer_infos_.size() );
    }

    void GeniusNode::ScheduleNextHealthCheck()
    {
        if ( shutdown_started_.load() )
        {
            return;
        }

        auto interval = reconnect_config_.health_check_interval;
        {
            std::lock_guard<std::mutex> lock( reconnect_mutex_ );
            if ( !reconnect_attempts_.empty() )
            {
                interval = reconnect_config_.health_check_disconnected_interval;
            }
        }

        auto weak_self = weak_from_this();
        health_check_handle_.emplace( scheduler_->scheduleWithHandle(
            [weak_self]()
            {
                if ( auto strong = weak_self.lock() )
                {
                    strong->PerformHealthCheck();
                }
            },
            interval ) );
    }

    void GeniusNode::PerformHealthCheck()
    {
        if ( shutdown_started_.load() )
        {
            return;
        }

        auto host = pubsub_->GetHost();

        // Check both fullnodes and peers
        for ( const auto &infos : { &bootstrap_fullnode_infos_, &bootstrap_peer_infos_ } )
        {
            for ( const auto &peer_info : *infos )
            {
                auto connectedness = host->connectedness( peer_info );
                if ( connectedness == libp2p::Host::Connectedness::NOT_CONNECTED ||
                     connectedness == libp2p::Host::Connectedness::CAN_NOT_CONNECT )
                {
                    node_logger_->debug( "Health check: bootstrap peer {} is {}",
                                         peer_info.id.toBase58(),
                                         connectedness == libp2p::Host::Connectedness::NOT_CONNECTED
                                             ? "NOT_CONNECTED"
                                             : "CAN_NOT_CONNECT" );

                    unsigned attempt = 0;
                    {
                        std::lock_guard<std::mutex> lock( reconnect_mutex_ );
                        auto                        it = reconnect_attempts_.find( peer_info.id );
                        if ( it != reconnect_attempts_.end() )
                        {
                            attempt = it->second;
                        }
                    }
                    ScheduleBootstrapReconnect( peer_info.id, attempt );
                }
            }
        }

        ScheduleNextHealthCheck();
    }

    void GeniusNode::ScheduleBootstrapReconnect( const libp2p::peer::PeerId &peer_id, unsigned attempt )
    {
        if ( shutdown_started_.load() )
        {
            return;
        }

        // Calculate exponential backoff: base_delay * 2^attempt, capped at max_delay
        auto delay_sec = reconnect_config_.base_delay.count() * ( 1ull << std::min( attempt, 10u ) );
        if ( delay_sec > static_cast<uint64_t>( reconnect_config_.max_delay.count() ) )
        {
            delay_sec = reconnect_config_.max_delay.count();
        }
        auto delay = std::chrono::seconds( delay_sec );

        // Update attempt counter
        {
            std::lock_guard<std::mutex> lock( reconnect_mutex_ );
            reconnect_attempts_[peer_id] = attempt + 1;
        }

        node_logger_->info( "Scheduling reconnect to bootstrap fullnode {} in {}s (attempt {})",
                            peer_id.toBase58(),
                            delay.count(),
                            attempt + 1 );

        auto weak_self = weak_from_this();
        scheduler_->schedule(
            [weak_self, peer_id]()
            {
                if ( auto strong = weak_self.lock() )
                {
                    strong->DoReconnectToBootstrapPeer( peer_id );
                }
            },
            delay );
    }

    void GeniusNode::DoReconnectToBootstrapPeer( const libp2p::peer::PeerId &peer_id )
    {
        if ( shutdown_started_.load() )
        {
            return;
        }

        // Find the PeerInfo for this peer_id (search fullnodes then peers)
        const libp2p::peer::PeerInfo *peer_info_ptr = nullptr;
        for ( const auto &infos : { &bootstrap_fullnode_infos_, &bootstrap_peer_infos_ } )
        {
            for ( const auto &info : *infos )
            {
                if ( info.id == peer_id )
                {
                    peer_info_ptr = &info;
                    break;
                }
            }
            if ( peer_info_ptr )
            {
                break;
            }
        }

        if ( !peer_info_ptr )
        {
            node_logger_->error( "Cannot reconnect: PeerInfo not found for {}", peer_id.toBase58() );
            return;
        }

        auto connectedness = pubsub_->GetHost()->connectedness( *peer_info_ptr );
        if ( connectedness == libp2p::Host::Connectedness::CONNECTED )
        {
            node_logger_->info( "Bootstrap fullnode {} already connected, resetting attempt counter",
                                peer_id.toBase58() );
            std::lock_guard<std::mutex> lock( reconnect_mutex_ );
            reconnect_attempts_.erase( peer_id );
            return;
        }

        node_logger_->info( "Attempting reconnect to bootstrap fullnode {}...", peer_id.toBase58() );

        auto weak_self = weak_from_this();
        pubsub_->GetHost()->connect(
            *peer_info_ptr,
            [weak_self, peer_id]( auto result )
            {
                if ( auto strong = weak_self.lock() )
                {
                    if ( result.has_value() )
                    {
                        strong->node_logger_->info( "Successfully reconnected to bootstrap fullnode {}",
                                                    peer_id.toBase58() );
                        std::lock_guard<std::mutex> lock( strong->reconnect_mutex_ );
                        strong->reconnect_attempts_.erase( peer_id );
                    }
                    else
                    {
                        strong->node_logger_->warn( "Reconnect to bootstrap fullnode {} failed: {}",
                                                    peer_id.toBase58(),
                                                    result.error().message() );
                        unsigned attempt = 0;
                        {
                            std::lock_guard<std::mutex> lock( strong->reconnect_mutex_ );
                            auto                        it = strong->reconnect_attempts_.find( peer_id );
                            if ( it != strong->reconnect_attempts_.end() )
                            {
                                attempt = it->second;
                            }
                        }
                        strong->ScheduleBootstrapReconnect( peer_id, attempt );
                    }
                }
            },
            std::chrono::seconds( 15 ) );
    }
}

Updated on 2026-06-28 at 18:54:57 -0700