-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathasync_usage.py
More file actions
163 lines (130 loc) · 4.69 KB
/
async_usage.py
File metadata and controls
163 lines (130 loc) · 4.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
"""Async usage example for the ZipTax SDK.
Note: This example demonstrates concurrent operations using asyncio with the
synchronous client. Full async HTTP support (using aiohttp) can be added as
a future enhancement for better performance.
"""
import asyncio
from concurrent.futures import ThreadPoolExecutor
from ziptax import ZipTaxClient
async def get_tax_by_address_async(client, address):
"""Async wrapper for GetSalesTaxByAddress.
Args:
client: ZipTaxClient instance
address: Address to query
Returns:
V60Response object
"""
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as executor:
response = await loop.run_in_executor(
executor,
client.request.GetSalesTaxByAddress,
address,
)
return response
async def get_tax_by_location_async(client, lat, lng):
"""Async wrapper for GetSalesTaxByGeoLocation.
Args:
client: ZipTaxClient instance
lat: Latitude
lng: Longitude
Returns:
V60Response object
"""
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as executor:
response = await loop.run_in_executor(
executor,
client.request.GetSalesTaxByGeoLocation,
lat,
lng,
)
return response
async def get_account_metrics_async(client):
"""Async wrapper for GetAccountMetrics.
Args:
client: ZipTaxClient instance
Returns:
V60AccountMetrics object
"""
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as executor:
response = await loop.run_in_executor(
executor,
client.request.GetAccountMetrics,
)
return response
async def main():
"""Main async function demonstrating concurrent API calls."""
# Initialize the client
client = ZipTaxClient.api_key("your-api-key-here")
try:
# Example 1: Run multiple address lookups concurrently
print("=" * 60)
print("Example 1: Concurrent Address Lookups")
print("=" * 60)
addresses = [
"200 Spectrum Center Drive, Irvine, CA 92618",
"1 Apple Park Way, Cupertino, CA 95014",
"1600 Amphitheatre Parkway, Mountain View, CA 94043",
]
# Create tasks for all addresses
tasks = [get_tax_by_address_async(client, addr) for addr in addresses]
# Run all tasks concurrently
responses = await asyncio.gather(*tasks)
# Process results
for address, response in zip(addresses, responses):
print(f"\nAddress: {response.address_detail.normalized_address}")
if response.tax_summaries:
for summary in response.tax_summaries:
print(f" {summary.summary_name}: {summary.rate * 100:.2f}%")
# Example 2: Mix different API calls concurrently
print("\n" + "=" * 60)
print("Example 2: Mixed Concurrent API Calls")
print("=" * 60)
# Create tasks for different API endpoints
address_task = get_tax_by_address_async(
client,
"123 Main St, Los Angeles, CA 90001"
)
location_task = get_tax_by_location_async(
client,
"34.0522",
"-118.2437"
)
metrics_task = get_account_metrics_async(client)
# Run all tasks concurrently
address_response, location_response, metrics = await asyncio.gather(
address_task,
location_task,
metrics_task,
)
print(f"\nAddress lookup: {address_response.address_detail.normalized_address}")
print(f"Location lookup: {location_response.address_detail.normalized_address}")
print(f"Account metrics: {metrics.request_count:,} requests")
# Example 3: Process results as they complete
print("\n" + "=" * 60)
print("Example 3: Process Results as They Complete")
print("=" * 60)
locations = [
("33.6489", "-117.8386"),
("34.0522", "-118.2437"),
("37.7749", "-122.4194"),
]
tasks = [
get_tax_by_location_async(client, lat, lng)
for lat, lng in locations
]
# Process results as they complete
for coro in asyncio.as_completed(tasks):
response = await coro
print(f"Completed: {response.address_detail.normalized_address}")
if response.tax_summaries:
rate = response.tax_summaries[0].rate
print(f" Tax rate: {rate * 100:.2f}%")
finally:
# Always close the client
client.close()
if __name__ == "__main__":
# Run the async main function
asyncio.run(main())