The TRUF.NETWORK SDK provides a comprehensive toolkit for developers to interact with decentralized data streams. It enables seamless creation, management, and consumption of economic and financial data streams.
- Stream creation and management
- Primitive and composed stream support
- Flexible data retrieval
- Advanced permission management
- Secure, blockchain-backed data streams
The SDK is structured around several key interfaces:
- Client: Primary entry point for network interactions
- Stream: Core stream operations and access control
- Primitive Stream: Raw data stream management
- Composed Stream: Aggregated data stream handling and taxonomy management
- Transaction Actions: Query transaction history, fees, and distributions
- Attestation Actions: Request and parse cryptographically signed attestations for on-chain verification
- Primitive Streams: Direct data sources with raw data points
- Composed Streams: Aggregated streams combining multiple data sources
- Secure, immutable data recording
- Flexible querying and indexing
- Granular access control
The SDK supports transparent caching through an optional useCache parameter on data retrieval methods (GetRecord, GetIndex, GetFirstRecord, GetIndexChange). When enabled, queries can leverage node-side caching for improved performance, with detailed cache metadata returned in all responses.
package main
import (
"context"
"github.com/trufnetwork/sdk-go/core/tnclient"
"github.com/trufnetwork/sdk-go/core/types"
"github.com/trufnetwork/sdk-go/core/util"
)
func main() {
ctx := context.Background()
// Initialize client with mainnet endpoint
tnClient, err := tnclient.NewClient(
ctx,
"https://gateway.mainnet.truf.network",
tnclient.WithSigner(mySigner),
)
if err != nil {
// Handle client initialization error
}
// Deploy a primitive stream
streamId := util.GenerateStreamId("my-economic-stream")
deployTx, err := tnClient.DeployStream(
ctx,
streamId,
types.StreamTypePrimitive,
)
// Handle deployment and further stream operations
}- Install the SDK
- Configure your network endpoint
- Initialize a client
- Create and manage streams
The Client Interface is the primary entry point for interacting with the TRUF.NETWORK ecosystem. It provides a comprehensive set of methods for managing streams, handling transactions, and interfacing with the underlying blockchain infrastructure.
- Stream lifecycle management
- Transaction handling
- Network interaction
- Address and identity management
Create a client connection to the TRUF.NETWORK:
tnClient, err := tnclient.NewClient(
ctx,
"https://gateway.mainnet.truf.network",
tnclient.WithSigner(mySigner),
// Optional configuration options
)The SDK provides flexible configuration through functional options:
client, err := tnclient.NewClient(
ctx,
"https://gateway.mainnet.truf.network",
tnclient.WithSigner(signer), // Required: Authentication signer
tnclient.WithLogger(logger), // Optional: Custom logger
)WithTransport - Use custom transport implementation:
// For specialized environments (e.g., Chainlink Runtime Environment)
customTransport, err := NewCustomTransport(...)
if err != nil {
return err
}
client, err := tnclient.NewClient(ctx, endpoint,
tnclient.WithTransport(customTransport),
tnclient.WithSigner(signer),
)Use cases for custom transports:
- Chainlink Runtime Environment (CRE) workflows
- Testing with mock transports
- Custom HTTP client requirements
- Alternative communication protocols
The SDK provides specialized options for Chainlink Runtime Environment workflows.
Configures the client to use CRE's HTTP client instead of standard net/http.
func WithCRETransport(runtime cre.NodeRuntime, endpoint string) OptionParameters:
runtime(cre.NodeRuntime): The NodeRuntime fromcre.RunInNodeMode()endpoint(string): TRUF.NETWORK gateway URL
Build Requirement: Must use //go:build wasip1 tag
Example:
//go:build wasip1
package main
import (
"github.com/smartcontractkit/cre-sdk-go/cre"
"github.com/trufnetwork/sdk-go/core/tnclient"
)
func onTrigger(config *Config, runtime cre.Runtime) (*Result, error) {
return cre.RunInNodeMode(config, runtime,
func(config *Config, nodeRuntime cre.NodeRuntime) (*Result, error) {
client, err := tnclient.NewClient(ctx, config.Endpoint,
tnclient.WithCRETransport(nodeRuntime, config.Endpoint),
)
if err != nil {
return nil, err
}
// All read operations work
streams, err := client.ListStreams(ctx, types.ListStreamsInput{})
actions, err := client.LoadActions()
records, err := actions.GetRecord(ctx, getRecordInput)
return &Result{Records: records}, nil
},
cre.ConsensusAggregationFromTags[*Result](),
).Await()
}When to use:
- CRE workflows requiring read-only access
- Listing streams
- Reading records
- Querying data
Convenience function combining signer and CRE transport configuration for write operations.
func WithCRETransportAndSigner(runtime cre.NodeRuntime, endpoint string, signer auth.Signer) OptionParameters:
runtime(cre.NodeRuntime): The NodeRuntime fromcre.RunInNodeMode()endpoint(string): TRUF.NETWORK gateway URLsigner(auth.Signer): Cryptographic signer for transactions
Build Requirement: Must use //go:build wasip1 tag
Example:
//go:build wasip1
package main
import (
"github.com/smartcontractkit/cre-sdk-go/cre"
"github.com/trufnetwork/kwil-db/core/crypto/auth"
"github.com/trufnetwork/sdk-go/core/tnclient"
)
func onTrigger(config *Config, runtime cre.Runtime) (*Result, error) {
return cre.RunInNodeMode(config, runtime,
func(config *Config, nodeRuntime cre.NodeRuntime) (*Result, error) {
// Create signer
signer := &auth.EthPersonalSigner{Key: privateKey}
// Create client with both transport and signer
client, err := tnclient.NewClient(ctx, config.Endpoint,
tnclient.WithCRETransportAndSigner(nodeRuntime, config.Endpoint, signer),
)
if err != nil {
return nil, err
}
// Now you can perform write operations
actions, err := client.LoadActions()
// Insert records
txHash, err := actions.InsertRecords(ctx, types.InsertRecordsInput{
DataProvider: config.DataProvider,
StreamId: config.StreamId,
Records: [][]interface{}{
{"value1", "value2"},
},
})
// Wait for transaction confirmation
result, err := client.WaitTx(ctx, txHash, 2*time.Second)
return &Result{TxHash: txHash}, nil
},
cre.ConsensusAggregationFromTags[*Result](),
).Await()
}When to use:
- CRE workflows requiring write access
- Inserting records
- Deploying streams
- Any operation requiring transaction signing
Equivalent to:
client, err := tnclient.NewClient(ctx, endpoint,
tnclient.WithSigner(signer),
tnclient.WithCRETransport(nodeRuntime, endpoint),
)All CRE-specific code must include the build tag:
//go:build wasip1
package mainCompilation:
# Build for CRE (WASM)
GOOS=wasip1 GOARCH=wasm go build -o workflow.wasm
# Regular build (excludes CRE code)
go build- Build tag required: All files using CRE transport must have
//go:build wasip1 - No net/http: Standard HTTP client not available in WASM
- Context handling: Use
context.WithTimeoutfor all operations - Error handling: Implement robust error handling for network operations
📖 Complete Guide: CRE Integration Guide
🎯 Working Example: examples/truf-cre-demo/
🔗 CRE Documentation: docs.chain.link/cre
GetKwilClient() - Access underlying GatewayClient (HTTP transport only):
// For advanced use cases requiring low-level control
if gwClient := client.GetKwilClient(); gwClient != nil {
// Direct GatewayClient access for advanced scenarios
result, err := gwClient.Call(ctx, "", "custom_action", args)
}
// Returns nil for non-HTTP transportsImportant:
GetKwilClient()is provided for advanced use cases that require direct low-level access. For most scenarios, prefer using the high-level Client methods which are transport-agnostic.
The SDK uses a pluggable transport layer that allows different communication implementations:
- HTTPTransport (default): Standard
net/httpcommunication with the TRUF.NETWORK - Custom transports: For specialized runtime environments (e.g., Chainlink CRE)
- Mock transports: For testing without network dependencies
This abstraction enables the SDK to work in various runtime environments while maintaining a consistent, high-level API. All Client methods work transparently with any transport implementation.
Critical: All TN operations are asynchronous by default. They return success when transactions enter the mempool, NOT when they're executed on-chain.
📚 Complete Example: See
examples/transaction-lifecycle-example/main.gofor a comprehensive demonstration of safe transaction patterns with detailed explanations.
Common Race Condition:
// ❌ DANGEROUS - Race condition
deployTx, _ := tnClient.DeployStream(ctx, streamId, types.StreamTypePrimitive)
insertTx, _ := primitiveActions.InsertRecord(ctx, input) // Might fail!Two Solutions for Safe Operations:
Waits for a transaction to be mined and confirmed. Always check the Result.Code to detect failures:
import (
kwiltypes "github.com/trufnetwork/kwil-db/core/types"
// ... other imports
)
// Deploy stream
deployTx, err := tnClient.DeployStream(ctx, streamId, types.StreamTypePrimitive)
if err != nil {
return err
}
// Wait for deployment to complete
txResponse, err := tnClient.WaitForTx(ctx, deployTx, time.Second*5)
if err != nil {
return err
} else if txResponse.Result.Code != uint32(kwiltypes.CodeOk) {
return fmt.Errorf("deployment failed: %d", txResponse.Result.Code)
}
// Now safe to proceed
insertTx, err := primitiveActions.InsertRecord(ctx, input)For operations that support TxOpt parameters, use WithSyncBroadcast(true):
import (
client "github.com/trufnetwork/kwil-db/core/client/types"
// ... other imports
)
// Synchronous record insertion (waits for mining)
insertTx, err := primitiveActions.InsertRecord(ctx, input,
client.WithSyncBroadcast(true))
// Synchronous taxonomy update (waits for mining)
taxonomyTx, err := composedActions.InsertTaxonomy(ctx, taxonomy,
client.WithSyncBroadcast(true))Note: DeployStream and DestroyStream don't support TxOpt, so use WaitForTx with them.
Deploy a new stream (primitive or composed):
streamId := util.GenerateStreamId("my-economic-stream")
txHash, err := tnClient.DeployStream(
ctx,
streamId,
types.StreamTypePrimitive
)Remove an existing stream:
txHash, err := tnClient.DestroyStream(ctx, streamId)Load an existing primitive stream:
primitiveStream, err := tnClient.LoadPrimitiveStream(
tnClient.OwnStreamLocator(streamId)
)Load an existing composed stream:
composedStream, err := tnClient.LoadComposedStream(
tnClient.OwnStreamLocator(streamId)
)Generate a stream locator using the current client's address:
streamLocator := tnClient.OwnStreamLocator(streamId)Retrieve the client's Ethereum address:
clientAddress := tnClient.Address()
addressString := clientAddress.String()func createAndManageStream(ctx context.Context, tnClient *tnclient.Client) error {
// Generate unique stream ID
streamId := util.GenerateStreamId("market-data-stream")
// Deploy stream
deployTx, err := tnClient.DeployStream(
ctx,
streamId,
types.StreamTypePrimitive,
)
if err != nil {
return fmt.Errorf("stream deployment failed: %v", err)
}
// Wait for deployment confirmation
txRes, err := tnClient.WaitForTx(ctx, deployTx, time.Second * 5)
if err != nil {
return fmt.Errorf("deployment confirmation failed: %v", err)
} else if txRes.Result.Code != uint32(kwiltypes.CodeOk) {
return fmt.Errorf("deployment failed: %s", txRes.Result.Log)
}
// Load the stream
primitiveStream, err := tnClient.LoadPrimitiveStream(
tnClient.OwnStreamLocator(streamId)
)
if err != nil {
return fmt.Errorf("stream loading failed: %v", err)
}
// Perform stream operations...
return nil
}- Always handle errors
- Use appropriate context timeouts
- Log important transactions
- Implement retry mechanisms
- Ensure proper error handling and logging
The Stream Interface is the core abstraction for data streams in the TRUF.NETWORK ecosystem. It provides a comprehensive set of methods for managing stream lifecycle, visibility, and access control.
- Immutable Data: Streams store data points that cannot be altered once recorded
- Visibility Control: Fine-grained access management
- Flexible Querying: Multiple methods for data retrieval
- Permissions Management: Granular control over stream access
- Unified Data Types: All stream data operations return
StreamResultorActionResultfor consistency
The SDK uses a unified approach for all stream data operations:
- StreamResult: Core data structure with
EventTimeandValuefields - ActionResult: Contains an array of
StreamResultplusCacheMetadata - All data retrieval methods (
GetRecord,GetIndex,GetIndexChange) returnActionResult - This unified approach eliminates the need for separate
StreamIndexandStreamIndexChangetypes, and provides cache metadata by default
All stream data operations return cache metadata that provides insights into query performance and cache behavior:
type CacheMetadata struct {
// Cache hit/miss statistics
CacheHit bool `json:"cache_hit"` // Whether the query hit the cache
CacheDisabled bool `json:"cache_disabled"` // Whether caching is disabled
// Cache height information
CacheHeight *int64 `json:"cache_height"` // Block height when data was cached
// Query context (populated by SDK)
StreamId string `json:"stream_id"` // Stream identifier
DataProvider string `json:"data_provider"` // Data provider address
From *int64 `json:"from"` // Query start time
To *int64 `json:"to"` // Query end time
FrozenAt *int64 `json:"frozen_at"` // Time-travel timestamp
RowsServed int `json:"rows_served"` // Number of rows returned
}Use cache metadata to optimize query performance:
result, err := primitiveActions.GetRecord(ctx, types.GetRecordInput{
DataProvider: provider,
StreamId: streamId,
From: &from,
To: &to,
UseCache: &[]bool{true}[0],
})
if err != nil {
return err
}
// Analyze cache performance
if result.Metadata.CacheHit {
fmt.Printf("Cache hit! Served %d rows from cache\n", result.Metadata.RowsServed)
if result.Metadata.CacheHeight != nil {
fmt.Printf("Cache height: %d\n", *result.Metadata.CacheHeight)
}
} else {
fmt.Printf("Cache miss - data retrieved from database\n")
}For batch operations, use AggregateCacheMetadata to analyze overall cache performance:
// Collect metadata from multiple queries
var metadataList []types.CacheMetadata
// ... perform multiple queries and collect metadata ...
// Aggregate statistics
aggregated := types.AggregateCacheMetadata(metadataList)
fmt.Printf("Cache hit rate: %.2f%% (%d hits / %d queries)\n",
aggregated.CacheHitRate * 100,
aggregated.CacheHits,
aggregated.TotalQueries)
fmt.Printf("Total rows served: %d\n", aggregated.TotalRowsServed)GetRecord(ctx context.Context, input types.GetRecordInput) (types.ActionResult, error)Retrieves the raw time-series data for the specified stream, including cache metadata. Internally the SDK calls the on-chain action get_record, which automatically delegates to either get_record_primitive or get_record_composed depending on the type of the stream.
Returns types.ActionResult:
Results: Array ofStreamResultcontaining the actual dataMetadata: Cache performance and hit/miss statistics
Usage Example:
// Basic usage without caching
result, err := primitiveActions.GetRecord(ctx, types.GetRecordInput{
DataProvider: provider,
StreamId: streamId,
From: &from,
To: &to,
})
if err != nil {
return err
}
// Access the results
for _, record := range result.Results {
fmt.Printf("Time: %d, Value: %s\n", record.EventTime, record.Value.String())
}
// Access cache metadata
fmt.Printf("Cache Hit: %v\n", result.Metadata.CacheHit)
fmt.Printf("Rows Served: %d\n", result.Metadata.RowsServed)Cache-Optimized Usage:
// Enable caching for improved performance
useCache := true
result, err := primitiveActions.GetRecord(ctx, types.GetRecordInput{
DataProvider: provider,
StreamId: streamId,
From: &from,
To: &to,
UseCache: &useCache,
})
if err != nil {
return err
}
// Performance analysis
if result.Metadata.CacheHit {
fmt.Printf("✓ Cache hit! Query served in optimized time\n")
if result.Metadata.CacheHeight != nil {
fmt.Printf("Cache height: %d\n", *result.Metadata.CacheHeight)
}
} else {
fmt.Printf("○ Cache miss - data retrieved from source\n")
}
Behaviour
- If both
FromandToarenil, the latest data-point (LOCF-filled for composed streams) is returned. - Gap-filling logic is applied to primitive streams so that the value immediately preceding
Fromis included—this guarantees that visualisations can safely draw a continuous line. - For composed streams, the value is calculated recursively by aggregating the weighted values of all child primitives at each point in time. All permission checks (
read,compose) are enforced inside the SQL action.
Input fields (types.GetRecordInput):
DataProvider(string) Owner address of the stream.StreamId(string) ID of the stream (stxxxxxxxxxxxxxxxxxxxxxxxxxxxx).From,To(*int) Unix timestamp range (inclusive). Passnilto make the bound open-ended.FrozenAt(*int) Time-travel flag. Only events created on or before this block-timestamp are considered.BaseDate(*int) Base date for index calculations. If not provided, defaults to the stream'sdefault_base_timemetadata.Prefix(*string) Optional prefix filter for stream operations.UseCache(*bool) Enable/disable caching for this query. Whennil, defaults tofalse. Whentrue, enables server-side caching for improved performance on repeated queries.
Returned slice: each StreamResult contains
EventTime(int) Unix timestamp of the point.Value(apd.Decimal) Raw numeric value.
GetIndex(ctx context.Context, input types.GetIndexInput) ([]types.StreamResult, error)Returns a rebased index of the stream where the value at BaseDate (defaults to metadata key default_base_time) is normalised to 100.
Mathematically:
index(t) = 100 × value(t) / value(baseDate)
The same recursive aggregation, gap-filling and permission rules described in GetRecord apply here; the only difference is the final normalisation step.
Important details
- If
BaseDateisnilthe function will fall back to the first available record for the stream. - Division-by-zero protection is enforced in the SQL action—an error is thrown when the base value is 0.
- For single-point queries (
From==To==nil) only the latest indexed value is returned.
The returned types.ActionResult has the same structure as GetRecord but semantically represents an index instead of raw values, with each record's Value field containing the indexed data. Access the results via the Results field.
GetIndexChange(ctx context.Context, input types.GetIndexChangeInput) (types.ActionResult, error)Computes the percentage change of the index over a fixed time interval. Internally the SDK obtains the indexed series via get_index and then, for every returned row whose timestamp is t, finds the closest index value at or before t − timeInterval.
Formula:
Δindex(t) = ( index(t) − index(t − Δ ) ) / index(t − Δ ) × 100
where Δ = timeInterval (in seconds).
Only rows for which a matching previous value exists and is non-zero are emitted. This is performed server-side by the SQL action get_index_change, ensuring minimal bandwidth usage.
Typical use-cases:
- Day-over-day change: pass
86400seconds. - Year-on-year change: pass
31 536 000seconds.
Input fields (types.GetIndexChangeInput):
All fields from GetIndexInput plus:
TimeInterval(int) Interval in seconds used for the delta computation (mandatory).UseCache(*bool) Enable/disable caching for this query. Whennil, defaults tofalse. Whentrue, enables server-side caching for improved performance on repeated queries.
Return value: Returns types.ActionResult where each Value in the Results array represents percentage change, e.g. 2.5 means +2.5 %.
GetFirstRecord(ctx context.Context, input types.GetFirstRecordInput) (types.ActionResult, error)Retrieves the first record from a stream, optionally after a specified timestamp.
Parameters:
ctx: The context for the operationinput: GetFirstRecordInput containing query parameters
Input fields (types.GetFirstRecordInput):
DataProvider(string): Owner address of the streamStreamId(string): ID of the streamAfter(*int): Optional timestamp to search after. If provided, returns the first record after this timeFrozenAt(*int): Time-travel flag. Only events created on or before this block-timestamp are consideredUseCache(*bool): Enable/disable caching for this query. Whennil, defaults tofalse
Returns types.ActionResult:
Results: Array containing a singleStreamResultwith the first recordMetadata: Cache performance and hit/miss statistics
Usage Example:
// Get the very first record in a stream
result, err := primitiveActions.GetFirstRecord(ctx, types.GetFirstRecordInput{
DataProvider: provider,
StreamId: streamId,
UseCache: &[]bool{true}[0],
})
if err != nil {
return err
}
if len(result.Results) > 0 {
firstRecord := result.Results[0]
fmt.Printf("First record: Time=%d, Value=%s\n",
firstRecord.EventTime, firstRecord.Value.String())
}
// Get the first record after a specific timestamp
after := int(time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC).Unix())
result, err = primitiveActions.GetFirstRecord(ctx, types.GetFirstRecordInput{
DataProvider: provider,
StreamId: streamId,
After: &after,
UseCache: &[]bool{true}[0],
})SetReadVisibility(ctx context.Context, visibility util.VisibilityEnum) (transactions.TxHash, error)Sets the read visibility of the stream.
Parameters:
ctx: The context for the operation.visibility: The visibility setting (Public,Private).
Returns:
transactions.TxHash: The transaction hash for the operation.error: An error if the operation fails.
SetComposeVisibility(ctx context.Context, visibility util.VisibilityEnum) (transactions.TxHash, error)Sets the compose visibility of the stream.
Parameters:
ctx: The context for the operation.visibility: The visibility setting (Public,Private).
Returns:
transactions.TxHash: The transaction hash for the operation.error: An error if the operation fails.
AllowReadWallet(ctx context.Context, wallet util.EthereumAddress) (transactions.TxHash, error)Allows a wallet to read the stream.
Parameters:
ctx: The context for the operation.wallet: The Ethereum address of the wallet.
Returns:
transactions.TxHash: The transaction hash for the operation.error: An error if the operation fails.
DisableReadWallet(ctx context.Context, wallet util.EthereumAddress) (transactions.TxHash, error)Disables a wallet from reading the stream.
Parameters:
ctx: The context for the operation.wallet: The Ethereum address of the wallet.
Returns:
transactions.TxHash: The transaction hash for the operation.error: An error if the operation fails.
AllowComposeStream(ctx context.Context, locator StreamLocator) (transactions.TxHash, error)Allows a stream to use this stream as a child.
Parameters:
ctx: The context for the operation.locator: The locator of the composed stream.
Returns:
transactions.TxHash: The transaction hash for the operation.error: An error if the operation fails.
DisableComposeStream(ctx context.Context, locator StreamLocator) (transactions.TxHash, error)Disables a stream from using this stream as a child.
Parameters:
ctx: The context for the operation.- `locator": The locator of the composed stream.
Returns:
transactions.TxHash: The transaction hash for the operation.error: An error if the operation fails.
CallProcedure(ctx context.Context, procedure string, args []any) (*kwiltypes.QueryResult, error)Invokes a read-only stored procedure on the underlying database and returns a QueryResult that you can inspect or decode into typed structs using contractsapi.DecodeCallResult[T].
Parameters:
ctx: Operation context.procedure: The name of the stored procedure to execute.args: A positional slice ([]any) containing the arguments expected by the procedure. Usenilfor optional parameters you wish to skip.
Returns:
*kwiltypes.QueryResult: The raw query result.error: An error if the call fails.
// Load the generic Action API
actions, _ := tnClient.LoadActions()
// Prepare arguments
from := int(time.Now().AddDate(0, 0, -7).Unix())
to := int(time.Now().Unix())
args := []any{from, to, nil, nil, 31_536_000} // 1-year interval
// Call the procedure
result, err := actions.CallProcedure(ctx, "get_divergence_index_change", args)
if err != nil {
return err
}
fmt.Println("Columns:", result.ColumnNames)
for _, row := range result.Values {
fmt.Println(row)
}The SDK provides intelligent caching to optimize query performance:
When to Use Caching:
- Repeated queries with identical parameters
- Dashboard or monitoring applications
- Data visualization with frequent refreshes
- Batch processing where data consistency is acceptable
Cache Behavior:
- Cache is pre-configured for specific streams by node operators
- No automatic invalidation when new data arrives - cache refreshes periodically based on operator configuration
- When
FrozenAtorBaseDateparameters are specified, cache is bypassed - Cache date is returned allowing users to determine acceptable data freshness
- Users can contact node operators for additional cached streams or host their own node
Performance Tips:
// 1. Use caching for repeated queries
useCache := true
result, err := stream.GetRecord(ctx, types.GetRecordInput{
DataProvider: provider,
StreamId: streamId,
UseCache: &useCache,
})
// 2. Monitor cache hit rates (batch example)
if aggregated.CacheHitRate < 0.5 {
log.Printf("Low cache hit rate: %.2f%%", aggregated.CacheHitRate*100)
}
// 3. Analyze cache height for data consistency
if result.Metadata.CacheHeight != nil {
fmt.Printf("Data cached at block height: %d\n", *result.Metadata.CacheHeight)
}For multiple stream queries, leverage batch operations and cache aggregation:
// Batch cache analysis
var allMetadata []types.CacheMetadata
// Perform multiple queries
for _, streamId := range streamIds {
result, err := stream.GetRecord(ctx, types.GetRecordInput{
DataProvider: provider,
StreamId: streamId,
UseCache: &[]bool{true}[0],
})
if err != nil {
continue
}
allMetadata = append(allMetadata, result.Metadata)
}
// Analyze overall performance
aggregated := types.AggregateCacheMetadata(allMetadata)
fmt.Printf("Overall cache performance: %.2f%% hit rate\n",
aggregated.CacheHitRate*100)Time Range Queries:
- Use specific time ranges instead of open-ended queries when possible
- Note:
FrozenAtparameter bypasses cache - use for consistent historical data when cache freshness is not suitable - Consider pagination for large datasets
Index Operations:
- Note:
BaseDateparameter bypasses cache - use when precise index calculations are required - For frequently accessed base dates, consider working with node operators to ensure proper caching
- Monitor cache metadata to understand data freshness for your use case
- Always handle errors
- Use context with appropriate timeouts
- Validate wallet addresses
- Log permission changes
- Implement retry mechanisms
- Use caching strategically for improved performance
- Monitor cache hit rates and data freshness
- Aggregate cache metadata for batch operations
- Visibility changes are blockchain transactions
- Cache metadata is always returned, even when caching is disabled
- Cache refresh intervals are configured by node operators
- Cache is bypassed when
FrozenAtorBaseDateparameters are used
Primitive streams are the foundational data sources in the TRUF.NETWORK ecosystem. They represent raw, unprocessed data points that can be used directly or as components in more complex composed streams.
- Direct data input mechanism
- Immutable record storage
InsertRecords(ctx context.Context, inputs []types.InsertRecordInput) (transactions.TxHash, error)Allows insertion of one or multiple records into a primitive stream.
type InsertRecordInput struct {
DataProvider string // Address of the data provider
StreamId string // Unique stream identifier
EventTime int // Unix timestamp of the record
Value float64 // Numeric value of the record
}// Insert a single record
records := []types.InsertRecordInput{
{
DataProvider: myAddress,
StreamId: "my-economic-stream",
EventTime: int(time.Now().Unix()),
Value: 105.75, // Economic indicator value
},
}
txHash, err := primitiveStream.InsertRecords(ctx, records)-
Consistent Timestamps
- Use UTC timestamps
- Handle potential time zone complexities
-
Data Validation
- Validate input values before insertion
-
Error Handling
- Implement retry mechanisms
- Log insertion failures
- Batch record insertions when possible
The Composed Stream interface provides advanced capabilities for creating and managing aggregated data streams in the TRUF.NETWORK ecosystem.
A taxonomy defines how multiple primitive or composed streams are combined to create a new, more complex stream. Key components include:
- Parent Stream: The new composed stream being created
- Child Streams: Source streams used for aggregation
- Weights: Relative importance of each child stream
taxonomy := types.Taxonomy{
ParentStream: composedStreamLocator,
TaxonomyItems: []types.TaxonomyItem{
{
ChildStream: primitiveStream1Locator,
Weight: 0.6, // 60% contribution
},
{
ChildStream: primitiveStream2Locator,
Weight: 0.4, // 40% contribution
},
},
StartDate: &startTimestamp,
}DescribeTaxonomies(ctx context.Context, params types.DescribeTaxonomiesParams) ([]types.TaxonomyItem, error)Retrieves the current taxonomy configuration for a composed stream. This is the key method for discovering how composed streams aggregate their child streams.
Parameters:
ctx: Operation contextparams: Taxonomy description parametersStream: Stream locator (identifies the composed stream)LatestVersion: Flag to return only the most recent taxonomy version
Returns:
- List of
TaxonomyItemobjects containing:ChildStream: Locator of each child streamWeight: Weight/contribution of each child stream (0.0 to 1.0)
- Error if retrieval fails
Example Usage:
// Get the latest taxonomy for a composed stream
params := types.DescribeTaxonomiesParams{
Stream: tnClient.OwnStreamLocator(composedStreamId),
LatestVersion: true,
}
taxonomyItems, err := composedActions.DescribeTaxonomies(ctx, params)
if err != nil {
log.Printf("Failed to describe taxonomies: %v", err)
return
}
fmt.Printf("Taxonomy for stream %s:\n", composedStreamId.String())
for _, item := range taxonomyItems {
fmt.Printf(" Child: %s (Weight: %.2f)\n",
item.ChildStream.StreamId.String(), item.Weight)
}SetTaxonomy(ctx context.Context, taxonomies []types.TaxonomyItem) (kwiltypes.Hash, error)Configures or updates the taxonomy for a composed stream.
Parameters:
ctx: Operation contexttaxonomies: Taxonomy configuration
Returns:
- Transaction hash
- Error if setting taxonomy fails
- Carefully design taxonomy weights
Always check for errors when working with composed streams:
- Validate taxonomy before setting
- Handle potential child stream access issues
- Manage weight distribution carefully
// Create a composed stream aggregating market sentiment and economic indicators
composedStreamId := util.GenerateStreamId("market-composite-index")
err := tnClient.DeployStream(ctx, composedStreamId, types.StreamTypeComposed)
composedActions, err := tnClient.LoadComposedActions()
taxonomyTx, err := composedActions.InsertTaxonomy(ctx, types.Taxonomy{
ParentStream: tnClient.OwnStreamLocator(composedStreamId),
TaxonomyItems: []types.TaxonomyItem{
{
ChildStream: sentimentStreamLocator,
Weight: 0.6,
},
{
ChildStream: economicIndicatorLocator,
Weight: 0.4,
},
},
})The Transaction Actions Interface provides methods for querying transaction history, fees, and distributions from the TRUF.NETWORK ledger. This interface is essential for auditing, analytics, and tracking fee distributions across the network.
- Query detailed transaction information by hash
- List transactions by wallet with flexible filtering
- Track fee distributions to validators and proposers
- Pagination support for large result sets
- Filter by transaction type (paid, received, or both)
func (c *Client) LoadTransactionActions() (*contractsapi.TransactionActions, error)Initializes the transaction actions interface for querying transaction data.
Returns:
*TransactionActions: Interface for transaction querieserror: Error if initialization fails
Example:
txActions, err := client.LoadTransactionActions()
if err != nil {
log.Fatalf("Failed to load transaction actions: %v", err)
}func (a *TransactionActions) GetTransactionEvent(
ctx context.Context,
input types.GetTransactionEventInput,
) (*types.TransactionEvent, error)Retrieves detailed information about a specific transaction by its hash.
Parameters:
ctx: Context for the operationinput: Input containing:TxID: Transaction hash (with or without0xprefix)
Returns:
*TransactionEvent: Complete transaction details including:TxID: Transaction hash (0x-prefixed)BlockHeight: Block number where transaction was includedMethod: Method name (e.g., "deployStream", "insertRecords")Caller: Ethereum address of the caller (lowercase, 0x-prefixed)FeeAmount: Total fee amount as string (handles large numbers)FeeRecipient: Primary fee recipient address (nullable)Metadata: Optional metadata JSON (nullable)FeeDistributions: Array of fee distributions showing who received what amount
error: Error if query fails or transaction not found
Example:
txEvent, err := txActions.GetTransactionEvent(ctx, types.GetTransactionEventInput{
TxID: "0xabcdef123456...",
})
if err != nil {
log.Fatalf("Failed to get transaction: %v", err)
}
fmt.Printf("Method: %s\n", txEvent.Method)
fmt.Printf("Caller: %s\n", txEvent.Caller)
fmt.Printf("Fee: %s wei\n", txEvent.FeeAmount)
fmt.Printf("Block: %d\n", txEvent.BlockHeight)
// Check fee distributions
for _, dist := range txEvent.FeeDistributions {
fmt.Printf(" → %s: %s wei\n", dist.Recipient, dist.Amount)
}func (a *TransactionActions) ListTransactionFees(
ctx context.Context,
input types.ListTransactionFeesInput,
) ([]types.TransactionFeeEntry, error)Lists transactions filtered by wallet address and mode, with pagination support.
Parameters:
ctx: Context for the operationinput: Input containing:Wallet: Ethereum address to query (required)Mode: Filter mode - one of:types.TransactionFeeModePaid: Transactions where wallet paid feestypes.TransactionFeeModeReceived: Transactions where wallet received fee distributionstypes.TransactionFeeModeBoth: All transactions involving the wallet
Limit: Maximum results to return (optional, default: 20, max: 1000)Offset: Pagination offset (optional, default: 0)
Returns:
[]TransactionFeeEntry: Array of transaction entries, each containing:TxID: Transaction hashBlockHeight: Block numberMethod: Method nameCaller: Caller addressTotalFee: Total fee amountFeeRecipient: Primary recipient (nullable)Metadata: Optional metadata (nullable)DistributionSequence: Distribution index (for multiple distributions)DistributionRecipient: Recipient address for this distribution (nullable)DistributionAmount: Amount for this distribution (nullable)
error: Error if query fails
Note: This method returns one row per fee distribution. If a transaction has multiple distributions, it will appear multiple times with different DistributionSequence values.
Example - List Fees Paid:
wallet := client.Address().Address()
limit := 10
entries, err := txActions.ListTransactionFees(ctx, types.ListTransactionFeesInput{
Wallet: wallet,
Mode: types.TransactionFeeModePaid,
Limit: &limit,
})
if err != nil {
log.Fatalf("Failed to list fees: %v", err)
}
for _, entry := range entries {
fmt.Printf("%s: %s wei (block %d)\n",
entry.Method, entry.TotalFee, entry.BlockHeight)
}Example - Pagination:
limit := 20
offset := 0
// Get first page
page1, err := txActions.ListTransactionFees(ctx, types.ListTransactionFeesInput{
Wallet: wallet,
Mode: types.TransactionFeeModeBoth,
Limit: &limit,
Offset: &offset,
})
// Get second page
offset = 20
page2, err := txActions.ListTransactionFees(ctx, types.ListTransactionFeesInput{
Wallet: wallet,
Mode: types.TransactionFeeModeBoth,
Limit: &limit,
Offset: &offset,
})Example - Fees Received:
// Track fee distributions received by a validator
entries, err := txActions.ListTransactionFees(ctx, types.ListTransactionFeesInput{
Wallet: validatorAddress,
Mode: types.TransactionFeeModeReceived,
Limit: &limit,
})
totalReceived := big.NewInt(0)
for _, entry := range entries {
if entry.DistributionAmount != nil {
amount, _ := new(big.Int).SetString(*entry.DistributionAmount, 10)
totalReceived.Add(totalReceived, amount)
}
}
fmt.Printf("Total fees received: %s wei\n", totalReceived.String())type TransactionEvent struct {
TxID string
BlockHeight int64
Method string
Caller string
FeeAmount string
FeeRecipient *string
Metadata *string
FeeDistributions []FeeDistribution
}type FeeDistribution struct {
Recipient string `json:"recipient"`
Amount string `json:"amount"`
}type TransactionFeeEntry struct {
TxID string
BlockHeight int64
Method string
Caller string
TotalFee string
FeeRecipient *string
Metadata *string
DistributionSequence int
DistributionRecipient *string
DistributionAmount *string
}type TransactionFeeMode string
const (
TransactionFeeModePaid TransactionFeeMode = "paid"
TransactionFeeModeReceived TransactionFeeMode = "received"
TransactionFeeModeBoth TransactionFeeMode = "both"
)// Calculate total fees paid by wallet in last 30 days
entries, err := txActions.ListTransactionFees(ctx, types.ListTransactionFeesInput{
Wallet: myWallet,
Mode: types.TransactionFeeModePaid,
})
totalSpent := big.NewInt(0)
for _, entry := range entries {
amount, _ := new(big.Int).SetString(entry.TotalFee, 10)
totalSpent.Add(totalSpent, amount)
}
fmt.Printf("Total spent: %s wei\n", totalSpent.String())// Analyze transaction types and their costs
methodCounts := make(map[string]int)
methodCosts := make(map[string]*big.Int)
entries, err := txActions.ListTransactionFees(ctx, types.ListTransactionFeesInput{
Wallet: myWallet,
Mode: types.TransactionFeeModePaid,
})
for _, entry := range entries {
methodCounts[entry.Method]++
if _, ok := methodCosts[entry.Method]; !ok {
methodCosts[entry.Method] = big.NewInt(0)
}
amount, _ := new(big.Int).SetString(entry.TotalFee, 10)
methodCosts[entry.Method].Add(methodCosts[entry.Method], amount)
}
for method, count := range methodCounts {
avgCost := new(big.Int).Div(methodCosts[method], big.NewInt(int64(count)))
fmt.Printf("%s: %d calls, avg cost %s wei\n", method, count, avgCost.String())
}// Monitor where your fees are going
txEvent, err := txActions.GetTransactionEvent(ctx, types.GetTransactionEventInput{
TxID: deployTxHash,
})
fmt.Printf("Transaction: %s\n", txEvent.TxID)
fmt.Printf("Total Fee: %s wei\n", txEvent.FeeAmount)
fmt.Println("\nFee Distributions:")
for i, dist := range txEvent.FeeDistributions {
fmt.Printf(" %d. %s: %s wei\n", i+1, dist.Recipient, dist.Amount)
}-
Error Handling: Always check for errors, especially for transaction not found
txEvent, err := txActions.GetTransactionEvent(ctx, input) if err != nil { if strings.Contains(err.Error(), "not found") { // Handle missing transaction } return err }
-
Pagination: Use reasonable page sizes to avoid overwhelming the API
limit := 100 // Good balance between API calls and memory
-
Large Numbers: Use
big.Intfor fee calculations to avoid overflowamount, ok := new(big.Int).SetString(entry.TotalFee, 10) if !ok { return fmt.Errorf("invalid fee amount: %s", entry.TotalFee) }
-
Context Timeout: Set reasonable timeouts for large queries
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel()
Common errors and how to handle them:
entries, err := txActions.ListTransactionFees(ctx, input)
if err != nil {
switch {
case strings.Contains(err.Error(), "invalid wallet"):
// Handle invalid wallet address
case strings.Contains(err.Error(), "invalid mode"):
// Handle invalid mode value
case strings.Contains(err.Error(), "limit"):
// Handle limit out of range
default:
// Handle other errors
}
}The Attestation Actions Interface enables users to request cryptographically signed attestations of query results from TRUF.NETWORK validators. These signed attestations can be verified on-chain (e.g., in EVM smart contracts) to trustlessly prove that specific data existed at a particular block height.
- Cryptographic Verification: Signed by network validators using secp256k1
- Tamper-Proof: Immutable attestations linked to specific block heights
- EVM-Compatible: Can be verified in Solidity smart contracts
- Payload Parsing: Decode attestation data including timestamps and values
- Signature Recovery: Extract validator addresses from signatures
- DeFi Protocols: Verify off-chain data on-chain (oracle alternative)
- Prediction Markets: Settle bets with cryptographically verified results
- Insurance: Trigger payouts based on attested data
- Auditing: Prove data provenance and integrity
- Cross-Chain Bridges: Verify state across networks
Creates an attestation action handler for requesting and retrieving signed attestations.
Signature:
func (c *TNClient) LoadAttestationActions() (types.IAttestationAction, error)Returns:
types.IAttestationAction: Attestation action handlererror: Error if initialization fails
Example:
attestationActions, err := tnClient.LoadAttestationActions()
if err != nil {
log.Fatalf("Failed to load attestation actions: %v", err)
}The following methods are part of the types.IAttestationAction interface returned by LoadAttestationActions(). Call these methods on the attestation action handler.
Requests a signed attestation for a specific query. The validator will execute the query at the current block height and sign the results.
Signature:
func RequestAttestation(ctx context.Context, input types.RequestAttestationInput) (*types.RequestAttestationResult, error)Parameters:
types.RequestAttestationInput:
DataProvider(string): Data provider address (0x-prefixed, 42 chars)StreamID(string): Stream identifier (32 characters)ActionName(string): Action to attest (e.g., "get_record")Args([]any): Action arguments (will be canonically encoded)EncryptSig(bool): Must befalse(encryption not supported in MVP)MaxFee(string): Maximum fee willing to pay in wei (NUMERIC(78,0) as string)
Returns:
types.RequestAttestationResult:
RequestTxID(string): Transaction ID for this attestation request
Example:
// Request attestation for AI Index data from last 7 days
now := time.Now()
weekAgo := now.AddDate(0, 0, -7)
result, err := attestationActions.RequestAttestation(ctx, types.RequestAttestationInput{
DataProvider: "0x4710a8d8f0d845da110086812a32de6d90d7ff5c",
StreamID: "stai0000000000000000000000000000",
ActionName: "get_record",
Args: []any{
"0x4710a8d8f0d845da110086812a32de6d90d7ff5c",
"stai0000000000000000000000000000",
int64(weekAgo.Unix()),
int64(now.Unix()),
nil, // frozen_at (optional)
false, // use_cache (forced to false for attestations)
},
EncryptSig: false,
MaxFee: "100000000000000000000", // 100 TRUF
})
if err != nil {
log.Fatalf("Failed to request attestation: %v", err)
}
fmt.Printf("Request TX ID: %s\n", result.RequestTxID)Notes:
- Attestation requests require sufficient TRUF balance for fees
- The validator signs asynchronously (typically 1-2 blocks)
- Use
GetSignedAttestation()to retrieve the signed payload
Retrieves a complete signed attestation payload for a previous attestation request.
Signature:
func GetSignedAttestation(ctx context.Context, input types.GetSignedAttestationInput) (*types.SignedAttestationResult, error)Parameters:
types.GetSignedAttestationInput:
RequestTxID(string): Transaction ID fromRequestAttestation()
Returns:
types.SignedAttestationResult:
Payload([]byte): Canonical payload + 65-byte secp256k1 signature
Payload Format:
The payload consists of:
-
Canonical Fields (variable length):
- Version (1 byte)
- Algorithm (1 byte, 0 = secp256k1)
- Block Height (8 bytes, big-endian uint64)
- Data Provider (length-prefixed, big-endian uint32 + bytes)
- Stream ID (length-prefixed, big-endian uint32 + UTF-8)
- Action ID (2 bytes, big-endian uint16)
- Arguments (length-prefixed, big-endian uint32 + canonical encoding)
- Result (length-prefixed, big-endian uint32 + ABI-encoded data)
-
Signature (last 65 bytes):
- R component (32 bytes)
- S component (32 bytes)
- V recovery ID (1 byte, typically 27 or 28)
Example:
// Poll for signed attestation (max 30 seconds)
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
var signedResult *types.SignedAttestationResult
for {
select {
case <-ctx.Done():
log.Println("Timeout waiting for signature")
goto afterPoll
case <-ticker.C:
signed, err := attestationActions.GetSignedAttestation(ctx, types.GetSignedAttestationInput{
RequestTxID: result.RequestTxID,
})
if err == nil && signed != nil && len(signed.Payload) > 0 {
signedResult = signed
goto afterPoll
}
}
}
afterPoll:
if signedResult != nil {
fmt.Printf("Payload size: %d bytes\n", len(signedResult.Payload))
}Returns metadata for attestations, optionally filtered by requester address.
Signature:
func ListAttestations(ctx context.Context, input types.ListAttestationsInput) ([]types.AttestationMetadata, error)Parameters:
types.ListAttestationsInput:
Requester([]byte, optional): Filter by requester address (20 bytes)Limit(*int, optional): Max results (default/max 5000)Offset(*int, optional): Pagination offset (default 0)OrderBy(*string, optional): Sort order (see below)
Valid OrderBy values:
"created_height ASC"/"created_height DESC""signed_height ASC"/"signed_height DESC"
Returns:
Array of types.AttestationMetadata:
RequestTxID(string): Transaction ID of the attestation requestAttestationHash([]byte): Hash of the attestationRequester([]byte): Address that requested the attestation (20 bytes)CreatedHeight(int64): Block height when requestedSignedHeight(*int64): Block height when signed (nil if not yet signed)EncryptSig(bool): Whether signature is encrypted
Example:
// List recent attestations for current wallet
myAddress := tnClient.Address()
addressBytes, _ := hex.DecodeString(myAddress.Address()[2:])
limit := 10
attestations, err := attestationActions.ListAttestations(ctx, types.ListAttestationsInput{
Requester: addressBytes,
Limit: &limit,
OrderBy: strPtr("created_height desc"),
})
if err != nil {
log.Fatalf("Failed to list attestations: %v", err)
}
fmt.Printf("Found %d attestations\n", len(attestations))
for i, att := range attestations {
status := "unsigned"
if att.SignedHeight != nil {
status = fmt.Sprintf("signed at height %d", *att.SignedHeight)
}
fmt.Printf("%d. TX: %s, Status: %s\n", i+1, att.RequestTxID, status)
}Parses a canonical attestation payload (without signature) into structured data.
Package: github.com/trufnetwork/sdk-go/core/contractsapi
Signature:
func ParseAttestationPayload(payload []byte) (*types.ParsedAttestationPayload, error)Parameters:
payload([]byte): Canonical payload without the 65-byte signature
Returns:
types.ParsedAttestationPayload:
Version(uint8): Payload format versionAlgorithm(uint8): Signature algorithm (0 = secp256k1)BlockHeight(uint64): Block height when attestedDataProvider(string): Data provider address (0x-prefixed hex)StreamID(string): Stream identifierActionID(uint16): Action identifierArguments([]any): Decoded action argumentsResult([]types.DecodedRow): Decoded query results
types.DecodedRow:
Values([]any): Array of decoded column values- For attestation results:
Values[0]is timestamp (string),Values[1]is value (string)
- For attestation results:
Example:
import (
"crypto/sha256"
"github.com/trufnetwork/kwil-db/core/crypto"
"github.com/trufnetwork/sdk-go/core/contractsapi"
)
// Split payload into canonical part and signature
signedPayload := signedResult.Payload
canonicalPayload := signedPayload[:len(signedPayload)-65]
signature := signedPayload[len(signedPayload)-65:]
// Parse the canonical payload
parsed, err := contractsapi.ParseAttestationPayload(canonicalPayload)
if err != nil {
log.Fatalf("Failed to parse payload: %v", err)
}
// Access parsed fields
fmt.Printf("Version: %d\n", parsed.Version)
fmt.Printf("Block Height: %d\n", parsed.BlockHeight)
fmt.Printf("Data Provider: %s\n", parsed.DataProvider)
fmt.Printf("Stream ID: %s\n", parsed.StreamID)
// Access query results
fmt.Printf("Found %d rows:\n", len(parsed.Result))
for i, row := range parsed.Result {
timestamp := row.Values[0] // Unix timestamp as string
value := row.Values[1] // 18-decimal value as string
fmt.Printf("Row %d: Timestamp=%v, Value=%v\n", i+1, timestamp, value)
}To verify the attestation signature and recover the validator's address:
import (
"crypto/sha256"
"github.com/trufnetwork/kwil-db/core/crypto"
)
// Extract canonical payload and signature
canonicalPayload := signedPayload[:len(signedPayload)-65]
signature := signedPayload[len(signedPayload)-65:]
// Hash the canonical payload with SHA256
hash := sha256.Sum256(canonicalPayload)
// Adjust signature format for recovery
// Attestation signatures use Ethereum format (V=27/28)
// kwil-db expects raw format (V=0-3)
adjustedSig := make([]byte, 65)
copy(adjustedSig, signature)
if signature[64] >= 27 {
adjustedSig[64] = signature[64] - 27
}
// Recover validator public key
pubKey, err := crypto.RecoverSecp256k1KeyFromSigHash(hash[:], adjustedSig)
if err != nil {
log.Fatalf("Failed to recover public key: %v", err)
}
// Derive Ethereum address
validatorAddr := crypto.EthereumAddressFromPubKey(pubKey)
fmt.Printf("Validator Address: 0x%x\n", validatorAddr)Important Notes:
- Attestation signatures use Ethereum format with V=27/28
- kwil-db's
RecoverSecp256k1KeyFromSigHashexpects V=0-3 (raw format) - You must subtract 27 from V before calling the recovery function
- The recovered address identifies which validator signed the attestation
Decoded attestation payload structure.
type ParsedAttestationPayload struct {
Version uint8 `json:"version"`
Algorithm uint8 `json:"algorithm"` // 0 = secp256k1
BlockHeight uint64 `json:"blockHeight"`
DataProvider string `json:"dataProvider"` // 0x-prefixed hex
StreamID string `json:"streamId"`
ActionID uint16 `json:"actionId"`
Arguments []any `json:"arguments"`
Result []DecodedRow `json:"result"`
}Represents a decoded row from attestation query results.
type DecodedRow struct {
Values []any `json:"values"`
}For attestation results:
Values[0]: Unix timestamp as string (e.g., "1704067200")Values[1]: 18-decimal fixed-point value as string (e.g., "77.051806494788211665")
Attestation results use ABI encoding (Ethereum format):
abi.encode(uint256[] timestamps, int256[] values)Details:
timestamps: Array of Unix timestamps as uint256values: Array of 18-decimal fixed-point integers as int256- Negative values are properly handled (two's complement)
Example decoded result:
// Raw ABI bytes → Decoded rows
[
{Values: ["1704067200", "77.051806494788211665"]},
{Values: ["1704153600", "80.0"]},
{Values: ["1704240000", "75.5"]},
]See examples/attestation_example/main.go for a complete working example demonstrating:
- Request Attestation: Submit attestation request for AI Index data
- Poll for Signature: Wait for validator to sign (1-2 blocks)
- Retrieve Payload: Get the complete signed attestation
- Verify Signature: Recover validator address from signature
- Parse Payload: Decode attestation fields and query results
- Display Results: Show all attested datapoints with full precision
Key Code Snippets:
// 1. Request attestation
result, err := attestationActions.RequestAttestation(ctx, types.RequestAttestationInput{
DataProvider: "0x4710a8d8f0d845da110086812a32de6d90d7ff5c",
StreamID: "stai0000000000000000000000000000",
ActionName: "get_record",
Args: args,
EncryptSig: false,
MaxFee: "100000000000000000000",
})
// 2. Wait for signing (poll with timeout)
signed, err := attestationActions.GetSignedAttestation(ctx, types.GetSignedAttestationInput{
RequestTxID: result.RequestTxID,
})
// 3. Split payload
canonicalPayload := signed.Payload[:len(signed.Payload)-65]
signature := signed.Payload[len(signed.Payload)-65:]
// 4. Verify signature
hash := sha256.Sum256(canonicalPayload)
adjustedSig := make([]byte, 65)
copy(adjustedSig, signature)
if signature[64] >= 27 {
adjustedSig[64] = signature[64] - 27
}
pubKey, _ := crypto.RecoverSecp256k1KeyFromSigHash(hash[:], adjustedSig)
validatorAddr := crypto.EthereumAddressFromPubKey(pubKey)
// 5. Parse payload
parsed, _ := contractsapi.ParseAttestationPayload(canonicalPayload)
// 6. Display results
for i, row := range parsed.Result {
fmt.Printf("Row %d: Timestamp=%v, Value=%v\n",
i+1, row.Values[0], row.Values[1])
}To verify attestations in Solidity smart contracts:
// SPDX-License-Identifier: MIT
pragma solidity ^0.8.0;
contract AttestationVerifier {
address public validatorAddress;
function verifyAttestation(
bytes memory canonicalPayload,
bytes memory signature
) public view returns (bool) {
require(signature.length == 65, "Invalid signature length");
// Hash the canonical payload
bytes32 digest = sha256(canonicalPayload);
// Extract r, s, v from signature using assembly
bytes32 r;
bytes32 s;
uint8 v;
assembly {
r := mload(add(signature, 32))
s := mload(add(signature, 64))
v := byte(0, mload(add(signature, 96)))
}
// Recover signer address
address signer = ecrecover(digest, v, r, s);
// Verify it matches the known validator
return signer == validatorAddress;
}
function parseValue(bytes memory payload) public pure returns (uint256) {
// Parse and extract specific fields from canonical payload
// Implementation depends on your use case
}
}Usage Pattern:
- User requests attestation off-chain
- Validator signs the query results
- User submits signed payload to smart contract
- Contract verifies signature using
ecrecover - Contract parses payload to extract attested data
- Contract executes logic based on verified data
-
Always Verify Signatures
- Never trust attestation payloads without verifying the validator signature
- Check that the recovered address matches a known validator
-
Handle Async Signing
- Poll with timeout (typically 30 seconds is sufficient)
- Check for errors during polling (attestation may fail)
-
Fee Management
- Ensure sufficient TRUF balance before requesting attestations
- Set reasonable
MaxFeevalues to avoid overpaying
-
Parse Results Carefully
- Timestamps are Unix seconds as strings
- Values are 18-decimal fixed-point as strings
- Convert to appropriate types for your use case
-
Store Request IDs
- Keep track of
RequestTxIDfor later retrieval - Use
ListAttestations()to view attestation history
- Keep track of
-
Test Locally First
- Use local node for development
- Test with mainnet only when ready
Common errors and how to handle them:
// Requesting attestation
result, err := attestationActions.RequestAttestation(ctx, input)
if err != nil {
switch {
case strings.Contains(err.Error(), "Insufficient balance"):
// User needs more TRUF tokens
log.Println("Please fund your wallet with TRUF tokens")
case strings.Contains(err.Error(), "invalid"):
// Input validation failed
log.Println("Check input parameters")
default:
log.Printf("Attestation request failed: %v", err)
}
}
// Retrieving signed attestation
signed, err := attestationActions.GetSignedAttestation(ctx, input)
if err != nil || len(signed.Payload) < 66 {
// Attestation not ready or invalid
log.Println("Attestation not yet signed, try again later")
}
// Parsing payload
parsed, err := contractsapi.ParseAttestationPayload(canonicalPayload)
if err != nil {
switch {
case strings.Contains(err.Error(), "too short"):
// Payload truncated or invalid
log.Println("Invalid payload format")
case strings.Contains(err.Error(), "version"):
// Unsupported payload version
log.Println("Unsupported attestation version")
default:
log.Printf("Parse error: %v", err)
}
}
// Signature verification
pubKey, err := crypto.RecoverSecp256k1KeyFromSigHash(hash[:], adjustedSig)
if err != nil {
// Invalid signature or tampering detected
log.Println("Signature verification failed - payload may be tampered")
}- Attestation Latency: Typically 1-2 blocks (2-4 seconds) for signing
- Payload Size: Varies with result data (typically 1KB-100KB)
- Fee Costs: Depends on query complexity and data size
- Polling Frequency: Recommended 2-second intervals to balance latency and API load
- Signature Verification: Always verify signatures before trusting attestation data
- Replay Protection: Check block height to prevent replay attacks
- Validator Trust: Only accept attestations from known validator addresses
- Payload Integrity: Hash payload before verification; detect tampering
- Fee Limits: Set appropriate
MaxFeeto prevent unexpected charges