Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ see the [quick start](#quick-start).
- [Unit Tests](#unit-tests)
- [Compliance Tests](#compliance-tests)
- [Fuzzing Tests](#fuzzing-tests)
- [Generating Mock Files Using Mockery](./mocks/README.md)
- [Connection Methods](#connection-methods)
- [Plain TCP Connection](#plain-tcp-connection)
- [Secured TCP Connection (TLS)](#secured-tcp-connection-tls)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/hex"
"fmt"
"github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/global/types"
"go.opentelemetry.io/otel/trace"
"strings"
"time"

Expand All @@ -31,7 +32,6 @@ import (

btpb "cloud.google.com/go/bigtable/apiv2/bigtablepb"
"github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/metadata"
otelgo "github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/otel"
"github.com/datastax/go-cassandra-native-protocol/message"
"go.uber.org/zap"
)
Expand All @@ -56,17 +56,17 @@ func NewBigtableClient(clients *types.BigtableClientManager, logger *zap.Logger,
func (btc *BigtableAdapter) Execute(ctx context.Context, query types.IExecutableQuery) (message.Message, error) {
switch q := query.(type) {
case *types.BoundDeleteQuery:
return btc.DeleteRow(ctx, q)
return btc.deleteRow(ctx, q)
case *types.BigtableWriteMutation:
return btc.mutateRow(ctx, q)
case *types.ExecutableSelectQuery:
return btc.ExecutePreparedStatement(ctx, q)
return btc.executePreparedStatement(ctx, q)
case *types.CreateTableStatementMap:
return btc.schemaManager.CreateTable(ctx, q)
case *types.AlterTableStatementMap:
return btc.schemaManager.AlterTable(ctx, q)
case *types.TruncateTableStatementMap:
err := btc.DropAllRows(ctx, q)
err := btc.dropAllRows(ctx, q)
return emptyRowsResult(), err
case *types.DropTableQuery:
return btc.schemaManager.DropTable(ctx, q)
Expand All @@ -88,11 +88,9 @@ func (btc *BigtableAdapter) Execute(ctx context.Context, query types.IExecutable
// Returns:
// - error: Error if the mutation fails.
func (btc *BigtableAdapter) mutateRow(ctx context.Context, input *types.BigtableWriteMutation) (message.Message, error) {
otelgo.AddAnnotation(ctx, applyingBigtableMutation)
span := trace.SpanFromContext(ctx)
mut := bigtable.NewMutation()

btc.Logger.Info("mutating row", zap.String("key", hex.EncodeToString([]byte(input.RowKey()))))

client, err := btc.clients.GetClient(input.Keyspace())
if err != nil {
return nil, err
Expand All @@ -119,7 +117,7 @@ func (btc *BigtableAdapter) mutateRow(ctx context.Context, input *types.Bigtable
}

err := tbl.Apply(ctx, string(input.RowKey()), conditionalMutation, bigtable.GetCondMutationResult(&matched))
otelgo.AddAnnotation(ctx, bigtableMutationApplied)
span.AddEvent(bigtableMutationApplied)
if err != nil {
return nil, err
}
Expand All @@ -129,7 +127,7 @@ func (btc *BigtableAdapter) mutateRow(ctx context.Context, input *types.Bigtable

// If no conditions, apply the mutation directly
err = tbl.Apply(ctx, string(input.RowKey()), mut)
otelgo.AddAnnotation(ctx, bigtableMutationApplied)
span.AddEvent(bigtableMutationApplied)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -185,13 +183,13 @@ func (btc *BigtableAdapter) buildMutation(ctx context.Context, table *bigtable.T
return nil
}

func (btc *BigtableAdapter) DropAllRows(ctx context.Context, data *types.TruncateTableStatementMap) error {
func (btc *BigtableAdapter) dropAllRows(ctx context.Context, data *types.TruncateTableStatementMap) error {
_, err := btc.schemaManager.Schemas().GetTableSchema(data.Keyspace(), data.Table())
if err != nil {
return err
}

// performance optimization because DropAllRows can be slow
// performance optimization because dropAllRows can be slow
hasRows, err := btc.hasAnyRows(ctx, data.Keyspace(), data.Table())
if err != nil {
return err
Expand Down Expand Up @@ -263,20 +261,21 @@ func (btc *BigtableAdapter) InsertRow(ctx context.Context, input *types.Bigtable
return btc.mutateRow(ctx, input)
}

// UpdateRow - Updates a row in the specified bigtable table.
// updateRow - Updates a row in the specified bigtable table.
//
// Parameters:
// - ctx: Context for the operation, used for cancellation and deadlines.
// - data: PreparedUpdateQuery object containing the table, row key, columns, values, and DeleteColumnFamilies.
//
// Returns:
// - error: Error if the update fails.
func (btc *BigtableAdapter) UpdateRow(ctx context.Context, input *types.BigtableWriteMutation) (message.Message, error) {
func (btc *BigtableAdapter) updateRow(ctx context.Context, input *types.BigtableWriteMutation) (message.Message, error) {
return btc.mutateRow(ctx, input)
}

func (btc *BigtableAdapter) DeleteRow(ctx context.Context, deleteQueryData *types.BoundDeleteQuery) (message.Message, error) {
otelgo.AddAnnotation(ctx, applyingDeleteMutation)
func (btc *BigtableAdapter) deleteRow(ctx context.Context, deleteQueryData *types.BoundDeleteQuery) (message.Message, error) {
span := trace.SpanFromContext(ctx)
span.AddEvent(applyingDeleteMutation)
client, err := btc.clients.GetClient(deleteQueryData.Keyspace())
if err != nil {
return nil, err
Expand Down Expand Up @@ -308,7 +307,7 @@ func (btc *BigtableAdapter) DeleteRow(ctx context.Context, deleteQueryData *type
return nil, err
}
}
otelgo.AddAnnotation(ctx, deleteMutationApplied)
span.AddEvent(deleteMutationApplied)
return &message.VoidResult{}, nil
}

Expand Down Expand Up @@ -345,6 +344,7 @@ func (btc *BigtableAdapter) buildDeleteMutation(ctx context.Context, table *bigt
// - BulkOperationResponse: Response indicating the result of the bulk operation.
// - error: Error if the bulk mutation fails.
func (btc *BigtableAdapter) ApplyBulkMutation(ctx context.Context, keyspace types.Keyspace, tableName types.TableName, mutationData []types.IBigtableMutation) (BulkOperationResponse, error) {
span := trace.SpanFromContext(ctx)
client, err := btc.clients.GetClient(keyspace)
if err != nil {
return BulkOperationResponse{
Expand Down Expand Up @@ -385,9 +385,10 @@ func (btc *BigtableAdapter) ApplyBulkMutation(ctx context.Context, keyspace type
}, err
}
default:
err := fmt.Errorf("unhandled bulk mutation type %T", md)
return BulkOperationResponse{
FailedRows: fmt.Sprintf("All Rows are failed because: unsupported bulk operation: %T", v),
}, fmt.Errorf("unhandled bulk mutation type %T", md)
}, err
}
}
// create mutations from mutation data
Expand All @@ -398,7 +399,7 @@ func (btc *BigtableAdapter) ApplyBulkMutation(ctx context.Context, keyspace type
mutations = append(mutations, mutation)
rowKeys = append(rowKeys, string(key))
}
otelgo.AddAnnotation(ctx, applyingBulkMutation)
span.AddEvent(applyingBulkMutation)

errs, err := table.ApplyBulk(ctx, rowKeys, mutations)
if err != nil {
Expand All @@ -423,7 +424,7 @@ func (btc *BigtableAdapter) ApplyBulkMutation(ctx context.Context, keyspace type
FailedRows: "",
}
}
otelgo.AddAnnotation(ctx, bulkMutationApplied)
span.AddEvent(bulkMutationApplied)
return res, nil
}

Expand Down Expand Up @@ -510,8 +511,8 @@ func (btc *BigtableAdapter) PrepareStatement(ctx context.Context, query types.IP
return nil, nil
}

selectQuery, ok := query.(*types.PreparedSelectQuery)
if !ok {
selectQuery, isType := query.(*types.PreparedSelectQuery)
if !isType {
// only select queries can be prepared in Bigtable at this time
return nil, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/global/constants"
"github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/global/types"
"github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/metadata"
"github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/responsehandler"
"github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/third_party/datastax/proxycore"
"github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/utilities"
Expand All @@ -32,7 +33,7 @@ import (
"time"
)

// ExecutePreparedStatement - Executes a prepared statement on Bigtable and returns the result.
// executePreparedStatement - Executes a prepared statement on Bigtable and returns the result.
// Parameters:
// - ctx: Context for the operation, used for cancellation and deadlines.
// - query: rh.QueryMetadata containing the query and parameters.
Expand All @@ -42,7 +43,7 @@ import (
// - *message.RowsResult: The result of the select statement.
// - time.Duration: The total elapsed time for the operation.
// - error: Error if the statement preparation or execution fails.
func (btc *BigtableAdapter) ExecutePreparedStatement(ctx context.Context, query *types.ExecutableSelectQuery) (*message.RowsResult, error) {
func (btc *BigtableAdapter) executePreparedStatement(ctx context.Context, query *types.ExecutableSelectQuery) (*message.RowsResult, error) {
if query.CachedBTPrepare == nil {
return nil, fmt.Errorf("cannot execute select query because prepared bigtable query is nil")
}
Expand All @@ -59,14 +60,19 @@ func (btc *BigtableAdapter) ExecutePreparedStatement(ctx context.Context, query
return nil, fmt.Errorf("failed to bind parameters: %w", err)
}

table, err := btc.schemaManager.Schemas().GetTableSchema(query.Keyspace(), query.Table())
if err != nil {
return nil, err
}

var processingErr error
var rows []types.GoRow
executeErr := boundStmt.Execute(ctx, func(resultRow bigtable.ResultRow) bool {
r, convertErr := btc.convertResultRow(resultRow, query) // Call the implemented helper
r, convertErr := btc.convertResultRow(resultRow, query, table)
if convertErr != nil {
btc.Logger.Error("Failed to convert result row", zap.Error(convertErr), zap.String("btql", query.TranslatedQuery))
processingErr = convertErr // Capture the error
return false // Stop execution
processingErr = convertErr
return false // Stop execution
}
rows = append(rows, r)
return true // Continue processing
Expand All @@ -82,12 +88,7 @@ func (btc *BigtableAdapter) ExecutePreparedStatement(ctx context.Context, query
return responsehandler.BuildRowsResultResponse(query, rows, query.ProtocolVersion)
}

func (btc *BigtableAdapter) convertResultRow(resultRow bigtable.ResultRow, query *types.ExecutableSelectQuery) (types.GoRow, error) {
table, err := btc.schemaManager.Schemas().GetTableSchema(query.Keyspace(), query.Table())
if err != nil {
return nil, err
}

func (btc *BigtableAdapter) convertResultRow(resultRow bigtable.ResultRow, query *types.ExecutableSelectQuery, table *metadata.TableSchema) (types.GoRow, error) {
result := make(types.GoRow)
for i, colMeta := range resultRow.Metadata.Columns {
var val any
Expand Down Expand Up @@ -140,10 +141,6 @@ func (btc *BigtableAdapter) convertResultRow(resultRow bigtable.ResultRow, query
return nil, fmt.Errorf("result already set for column `%s`", key)
}

if key == "list_text" {
btc.Logger.Log(zap.InfoLevel, "list_text", zap.Any("value", val))
}

goValue, err := rowValueToGoValue(val, expectedType)
if err != nil {
return nil, fmt.Errorf("failed to convert result for '%s': %w", key, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func TestDeleteRow(t *testing.T) {
require.NoError(t, err)

deleteQuery := types.NewBoundDeleteQuery(keyspace, tableName, "", rowKey, false, nil)
_, err = btc.DeleteRow(t.Context(), deleteQuery)
_, err = btc.deleteRow(t.Context(), deleteQuery)
require.NoError(t, err)

// Verify deletion
Expand Down Expand Up @@ -236,7 +236,7 @@ func TestMutateRowDeleteColumnFamily(t *testing.T) {
// Delete cf2
updateData := types.NewBigtableWriteMutation(keyspace, tableName, "", types.IfSpec{}, types.QueryTypeUpdate, key)
updateData.AddMutations(types.NewDeleteCellsOp("tags"))
_, err = btc.UpdateRow(t.Context(), updateData)
_, err = btc.updateRow(t.Context(), updateData)
require.NoError(t, err)

// Verify deletion by reading the row
Expand All @@ -259,7 +259,7 @@ func TestMutateRowDeleteQualifiers(t *testing.T) {
// Delete col1
updateData := types.NewBigtableWriteMutation(keyspace, tableName, "", types.IfSpec{}, types.QueryTypeUpdate, key)
updateData.AddMutations(types.NewDeleteColumnOp(types.BigtableColumn{Family: "cf1", Column: "col1"}))
_, err = btc.UpdateRow(t.Context(), updateData)
_, err = btc.updateRow(t.Context(), updateData)
require.NoError(t, err)

// Verify deletion by reading the row
Expand Down Expand Up @@ -289,7 +289,7 @@ func TestMutateRowIfExists(t *testing.T) {
// Update the row when it exists
updateData := types.NewBigtableWriteMutation(keyspace, tableName, "", types.IfSpec{IfExists: true}, types.QueryTypeUpdate, key1)
updateData.AddMutations(types.NewWriteCellOp("cf1", "col1", []byte("v2")))
res, err := btc.UpdateRow(t.Context(), updateData)
res, err := btc.updateRow(t.Context(), updateData)
require.NoError(t, err)
assert.True(t, wasApplied(res))

Expand All @@ -305,7 +305,7 @@ func TestMutateRowIfExists(t *testing.T) {
// Attempt to update a non-existent row
updateDataNonExistent := types.NewBigtableWriteMutation(keyspace, tableName, "", types.IfSpec{IfExists: true}, types.QueryTypeUpdate, key2)
updateDataNonExistent.AddMutations(types.NewWriteCellOp("cf1", "col1", []byte("v2")))
res, err = btc.UpdateRow(t.Context(), updateDataNonExistent)
res, err = btc.updateRow(t.Context(), updateDataNonExistent)
require.NoError(t, err)
assert.False(t, wasApplied(res))

Expand Down Expand Up @@ -350,7 +350,7 @@ func TestMutateRowInvalidKeyspace(t *testing.T) {

updateData := types.NewBigtableWriteMutation("invalid-keyspace", "any-table", "", types.IfSpec{}, types.QueryTypeUpdate, "row1")
updateData.AddMutations(types.NewWriteCellOp("cf1", "col1", []byte("value")))
_, err := localBtc.UpdateRow(t.Context(), updateData)
_, err := localBtc.updateRow(t.Context(), updateData)
require.Error(t, err)
assert.Contains(t, err.Error(), "bigtable client not found for keyspace 'invalid-keyspace'")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ otel:
metrics:
# Collector service endpoint
endpoint: METRICS_SERVICE_ENDPOINT
# Set to true to export metrics directly to Google Cloud Monitoring.
# If true, 'endpoint' will be ignored, and metrics will be sent to the project specified in traces.projectId.
gcpMetricsEnabled: False

traces:
# Collector service endpoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,27 @@ listeners:
grpcChannels: 4

otel:
# Set enabled to true or false for OTEL metrics/traces/logs.
enabled: False
# Name of the collector service to be setup as a sidecar
serviceName: YOUR_OTEL_COLLECTOR_SERVICE_NAME

healthcheck:
# Enable/Disable Health Check for OTEL, Default 'False'.
enabled: False
# Health check endpoint for the OTEL collector service
endpoint: localhost:13133
metrics:
# Collector service endpoint
endpoint: localhost:4317
# Set to true to export metrics directly to Google Cloud Monitoring.
# If true, 'endpoint' will be ignored, and metrics will be sent to the project specified in traces.projectId.
gcpMetricsEnabled: False

traces:
# Collector service endpoint
endpoint: localhost:4317
# Project ID to use for exporting traces to Google Cloud Trace.
projectId: YOUR_GCP_PROJECT_ID
#Sampling ratio should be between 0 and 1. Here 0.05 means 5/100 Sampling ratio.
samplingRatio: 1
Loading