-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathdataset_api_example.py
More file actions
346 lines (272 loc) · 11.5 KB
/
dataset_api_example.py
File metadata and controls
346 lines (272 loc) · 11.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
"""Basalt Dataset API Example (New Exception-Based API).
This script demonstrates how to use the new exception-based Basalt Dataset API.
It showcases both synchronous and asynchronous operations with proper error handling.
Before running this script:
1. Set your API key: export BASALT_API_KEY="your-api-key"
2. Set a test dataset slug: export BASALT_TEST_DATASET_SLUG="your-dataset-slug"
3. Install dependencies: pip install basalt-sdk
Reference: See MIGRATION_GUIDE.md for details on the new exception-based API.
"""
from __future__ import annotations
import asyncio
import logging
import os
import sys
from pathlib import Path
from basalt.datasets.client import DatasetsClient
from basalt.datasets.models import Dataset
from basalt.types.exceptions import (
BasaltAPIError,
NotFoundError,
UnauthorizedError,
)
# Add repo root to path to import basalt
project_root = Path(__file__).resolve().parents[2]
sys.path.insert(0, str(project_root))
os.environ["BASALT_BUILD"] = "development"
def initialize_client() -> DatasetsClient:
"""Initialize the Basalt Datasets client with API key.
Returns:
DatasetsClient: Configured datasets client instance.
Raises:
ValueError: If BASALT_API_KEY environment variable is not set.
"""
api_key = os.getenv("BASALT_API_KEY")
if not api_key:
raise ValueError("BASALT_API_KEY environment variable not set")
# Setup logging
logging.basicConfig(level=logging.INFO)
return DatasetsClient(
api_key=api_key,
)
def example_1_list_datasets(client: DatasetsClient) -> None:
"""Example 1: List all datasets synchronously."""
logging.info("Listing datasets synchronously")
try:
dataset_list = client.list_sync()
if dataset_list:
for _i, _dataset in enumerate(dataset_list[:3], 1):
logging.info(f"{_i}. {_dataset.slug} - {_dataset.name}")
logging.info(f" Columns: {_dataset.columns}\n")
if len(dataset_list) > 3:
logging.info(f" ... and {len(dataset_list) - 3} more datasets\n")
except UnauthorizedError:
logging.error("Unauthorized access")
except BasaltAPIError:
logging.error("Basalt API error occurred")
def example_2_get_dataset(client: DatasetsClient) -> None:
"""Example 2: Get a specific dataset synchronously."""
logging.info("Getting a specific dataset synchronously")
dataset_slug = os.getenv("BASALT_TEST_DATASET_SLUG")
if not dataset_slug:
logging.error("BASALT_TEST_DATASET_SLUG environment variable not set")
return
try:
dataset = client.get_sync(dataset_slug)
logging.info(f"Dataset: {dataset.name}")
logging.info(f"Slug: {dataset.slug}")
logging.info(f"Columns: {dataset.columns}")
logging.info(f"Rows: {len(dataset.rows)}\n")
except NotFoundError:
logging.error("Dataset not found")
except UnauthorizedError:
logging.error("Unauthorized access")
except BasaltAPIError:
logging.error("Basalt API error occurred")
def example_3_list_dataset_rows(client: DatasetsClient) -> None:
"""Example 3: List rows in a dataset."""
logging.info("Listing dataset rows")
dataset_slug = os.getenv("BASALT_TEST_DATASET_SLUG")
if not dataset_slug:
logging.error("BASALT_TEST_DATASET_SLUG environment variable not set")
return
try:
dataset = client.get_sync(dataset_slug)
if dataset.rows:
for _i, _row in enumerate(dataset.rows[:3], 1):
logging.info(f"{_i}. Row: {_row.name or 'Unnamed'}")
logging.info(f" Values: {_row.values}")
if _row.ideal_output:
logging.info(f" Ideal Output: {_row.ideal_output}")
logging.info("")
if len(dataset.rows) > 3:
logging.info(f" ... and {len(dataset.rows) - 3} more rows\n")
else:
logging.info("Dataset has no rows\n")
except NotFoundError:
logging.error("Dataset not found", exc_info=True)
except UnauthorizedError:
logging.error("Unauthorized access", exc_info=True)
except BasaltAPIError:
logging.error("Basalt API error occurred", exc_info=True)
def example_4_add_dataset_row(client: DatasetsClient) -> None:
"""Example 4: Add a row to a dataset."""
logging.info("Adding a row to a dataset")
dataset_slug = os.getenv("BASALT_TEST_DATASET_SLUG")
if not dataset_slug:
logging.error("BASALT_TEST_DATASET_SLUG environment variable not set")
return
try:
# First get the dataset to understand its schema
dataset = client.get_sync(dataset_slug)
# Create values for each column
values = {}
for column in dataset.columns:
values[column] = f"test_value_{column}"
# Add row with optional metadata
row = client.add_row_sync(
slug=dataset_slug,
values=values,
name="Example Row",
ideal_output="expected_output",
metadata={"source": "example_script"},
)
logging.info("Row added successfully")
logging.info(f"Row values: {row.values}\n")
except NotFoundError:
logging.error("Dataset not found", exc_info=True)
except UnauthorizedError:
logging.error("Unauthorized access", exc_info=True)
except BasaltAPIError:
logging.error("Basalt API error occurred", exc_info=True)
def example_5_get_dataset_metadata(client: DatasetsClient) -> None:
"""Example 5: Get dataset metadata."""
logging.info("Getting dataset metadata")
dataset_slug = os.getenv("BASALT_TEST_DATASET_SLUG")
if not dataset_slug:
logging.error("BASALT_TEST_DATASET_SLUG environment variable not set")
return
try:
dataset = client.get_sync(dataset_slug)
logging.info(f"Dataset Name: {dataset.name}")
logging.info(f"Dataset Slug: {dataset.slug}")
logging.info(f"Columns: {', '.join(col.name for col in dataset.columns)}")
logging.info(f"Total Rows: {len(dataset.rows)}\n")
except NotFoundError:
logging.error("Dataset not found", exc_info=True)
except BasaltAPIError:
logging.error("Basalt API error occurred", exc_info=True)
async def example_6_async_list_datasets(client: DatasetsClient) -> None:
"""Example 6: List all datasets asynchronously."""
logging.info("Listing datasets asynchronously")
try:
dataset_list = await client.list()
if dataset_list:
for _i, _dataset in enumerate(dataset_list[:3], 1):
logging.info(f"{_i}. {_dataset.slug} - {_dataset.name}")
if len(dataset_list) > 3:
logging.info(f" ... and {len(dataset_list) - 3} more datasets\n")
else:
logging.info("No datasets found\n")
except UnauthorizedError:
logging.error("Unauthorized access")
except BasaltAPIError:
logging.error("Basalt API error occurred", exc_info=True)
async def example_7_async_get_dataset(client: DatasetsClient) -> None:
"""Example 7: Get a dataset asynchronously."""
logging.info("Getting a dataset asynchronously")
dataset_slug = os.getenv("BASALT_TEST_DATASET_SLUG")
if not dataset_slug:
logging.error("BASALT_TEST_DATASET_SLUG environment variable not set")
return
try:
dataset = await client.get(dataset_slug)
logging.info(f"Retrieved dataset: {dataset.name}")
logging.info(f"Rows: {len(dataset.rows)}\n")
except NotFoundError:
logging.error("Dataset not found", exc_info=True)
except BasaltAPIError:
logging.error("Basalt API error occurred", exc_info=True)
async def example_8_async_add_row(client: DatasetsClient) -> None:
"""Example 8: Add a row to a dataset asynchronously."""
logging.info("Adding a row asynchronously")
dataset_slug = os.getenv("BASALT_TEST_DATASET_SLUG")
if not dataset_slug:
logging.error("BASALT_TEST_DATASET_SLUG environment variable not set")
return
try:
# First get the dataset
dataset = await client.get(dataset_slug)
# Create values for each column
values = {}
for column in dataset.columns:
values[column] = f"async_test_{column}"
# Add row asynchronously
row = await client.add_row(
slug=dataset_slug,
values=values,
name="Async Example Row",
metadata={"source": "async_example"},
)
logging.info("Row added asynchronously\n")
except NotFoundError:
logging.error("Dataset not found", exc_info=True)
except BasaltAPIError:
logging.error("Basalt API error occurred", exc_info=True)
async def example_9_concurrent_operations(client: DatasetsClient) -> None:
"""Example 9: Execute multiple async operations concurrently.
Demonstrates concurrent execution of multiple API calls with proper
error handling and type-safe result processing.
"""
logging.info("Executing concurrent operations")
dataset_slug = os.getenv("BASALT_TEST_DATASET_SLUG")
if not dataset_slug:
logging.error("BASALT_TEST_DATASET_SLUG environment variable not set")
return
try:
# Create multiple concurrent tasks
tasks = [
client.list(),
client.get(dataset_slug),
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Type-safe extraction of results
list_result = results[0]
get_result = results[1]
# Process list result
if isinstance(list_result, Exception):
logging.error(f"List operation failed: {type(list_result).__name__}: {list_result}")
elif isinstance(list_result, list):
logging.info(f"Retrieved {len(list_result)} datasets")
else:
logging.warning(f"Unexpected list result type: {type(list_result)}")
# Process get result
if isinstance(get_result, Exception):
logging.error(f"Get operation failed: {type(get_result).__name__}: {get_result}")
elif isinstance(get_result, Dataset):
logging.info(f"Retrieved dataset: {get_result.name} ({len(get_result.rows)} rows)")
else:
logging.warning(f"Unexpected get result type: {type(get_result)}")
except BasaltAPIError as e:
logging.error(f"Basalt API error during concurrent operations: {e}", exc_info=True)
except Exception as e:
logging.error(
f"Unexpected error during concurrent operations: {type(e).__name__}: {e}", exc_info=True
)
async def run_async_examples(client: DatasetsClient) -> None:
"""Run all async examples."""
await example_6_async_list_datasets(client)
await example_7_async_get_dataset(client)
await example_8_async_add_row(client)
await example_9_concurrent_operations(client)
def main() -> None:
"""Run all examples."""
try:
# Initialize client
client = initialize_client()
# Run synchronous examples
example_1_list_datasets(client)
example_2_get_dataset(client)
example_3_list_dataset_rows(client)
example_4_add_dataset_row(client)
example_5_get_dataset_metadata(client)
# Run asynchronous examples
asyncio.run(run_async_examples(client))
except ValueError:
logging.error("API key not set in environment variables")
sys.exit(1)
except Exception:
logging.error("An unexpected error occurred", exc_info=True)
sys.exit(1)
if __name__ == "__main__":
main()