Skip to content

Commit 1afe155

Browse files
authored
Feat: Add AsyncClient, search API, and streaming bulk export (#28)
* Refactor: Extract BaseClient from Client to share logic between clients * Feat: Add serialize_queries helper and widen Query to AbstractQuery * Fix: Return SuccessResponse for 204 No Content instead of ErrorResponse * Feat: Add search(), get_domain(), and streaming bulk_export_stream() * Feat: Add AsyncClient with httpx for async API access * Docs: Add async examples and update sync examples * Docs: Update CHANGELOG for v0.2.0 * Fix: Sort imports in async example
1 parent 3601565 commit 1afe155

9 files changed

Lines changed: 437 additions & 98 deletions

File tree

CHANGELOG.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,26 @@ and this project adheres to
88

99
## [Unreleased]
1010

11+
### Added
12+
13+
- AsyncClient with full async/await support using httpx
14+
- Simple `search()` API accepting raw query strings
15+
- `get_domain()` method for domain lookups
16+
- Streaming `bulk_export_stream()` for memory-efficient exports
17+
- `serialize_queries()` helper to reduce query serialization duplication
18+
- Async example in `example/example_async_client.py`
19+
1120
### Changed
1221

22+
- Use `__get` in both sync and async clients for uniform internal API
23+
- Widen query type from `Query` to `AbstractQuery` to accept `RawQuery` directly
1324
- Updated l9format requirement from =1.3.2 to =1.4.0 ([ae676d9])
1425
- Updated l9format requirement from =1.4.0 to =2.0.0 ([df916e5], [#68])
1526

27+
### Fixed
28+
29+
- Return `SuccessResponse` for HTTP 204 No Content instead of `ErrorResponse`
30+
1631
### Added
1732

1833
- Add Python 3.11, 3.12, and 3.14 support ([d111628])

example/example_async_client.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
"""Example usage of the async LeakIX client."""
2+
3+
import asyncio
4+
5+
import decouple
6+
7+
from leakix import AsyncClient, Scope
8+
9+
API_KEY = decouple.config("API_KEY")
10+
11+
12+
async def example_search_services():
13+
"""Search for services using a raw query string."""
14+
async with AsyncClient(api_key=API_KEY) as client:
15+
response = await client.search("+country:FR +port:22", scope=Scope.SERVICE)
16+
assert response.status_code() == 200
17+
for event in response.json():
18+
print(f"{event.ip}:{event.port} - {event.summary}")
19+
20+
21+
async def example_search_leaks():
22+
"""Search for leaks using a raw query string."""
23+
async with AsyncClient(api_key=API_KEY) as client:
24+
response = await client.search("+plugin:GitConfigHttpPlugin", scope=Scope.LEAK)
25+
assert response.status_code() == 200
26+
for event in response.json():
27+
print(f"{event.host} - {event.summary}")
28+
29+
30+
async def main():
31+
await example_search_services()
32+
await example_search_leaks()
33+
34+
35+
if __name__ == "__main__":
36+
asyncio.run(main())

example/example_client.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import decouple
44

5-
from leakix import Client
5+
from leakix import Client, Scope
66
from leakix.field import CountryField, Operator, PluginField, TimeField
77
from leakix.plugin import Plugin
88
from leakix.query import MustNotQuery, MustQuery, RawQuery
@@ -118,6 +118,36 @@ def example_get_subdomains():
118118
print(response.json())
119119

120120

121+
def example_search_simple():
122+
"""Simple search using query string syntax (same as the website)."""
123+
response = CLIENT.search("+plugin:GitConfigHttpPlugin", scope=Scope.LEAK)
124+
for event in response.json():
125+
print(event.ip)
126+
127+
128+
def example_search_service():
129+
"""Search for services with multiple filters."""
130+
response = CLIENT.search("+country:FR +port:22", scope=Scope.SERVICE)
131+
for event in response.json():
132+
print(event.ip, event.port)
133+
134+
135+
def example_get_domain():
136+
"""Get services and leaks for a domain."""
137+
response = CLIENT.get_domain("example.com")
138+
if response.is_success():
139+
print("Services:", response.json()["services"])
140+
print("Leaks:", response.json()["leaks"])
141+
142+
143+
def example_bulk_export_stream():
144+
"""Streaming bulk export - memory efficient for large datasets."""
145+
query = MustQuery(field=PluginField(Plugin.GitConfigHttpPlugin))
146+
for aggregation in CLIENT.bulk_export_stream(queries=[query]):
147+
for event in aggregation.events:
148+
print(event.ip)
149+
150+
121151
if __name__ == "__main__":
122152
example_get_host_filter_plugin()
123153
example_get_service_filter_plugin()
@@ -131,3 +161,7 @@ def example_get_subdomains():
131161
example_bulk_service()
132162
example_bulk_export_last_event()
133163
example_get_subdomains()
164+
example_search_simple()
165+
example_search_service()
166+
example_get_domain()
167+
example_bulk_export_stream()

leakix/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
from importlib.metadata import version
22

3+
from leakix.async_client import AsyncClient as AsyncClient
4+
from leakix.base import HostResult as HostResult
35
from leakix.client import Client as Client
4-
from leakix.client import HostResult as HostResult
56
from leakix.client import Scope as Scope
67
from leakix.domain import L9Subdomain as L9Subdomain
78
from leakix.field import (
@@ -71,6 +72,7 @@
7172

7273
__all__ = [
7374
"__version__",
75+
"AsyncClient",
7476
"Client",
7577
"HostResult",
7678
"L9Subdomain",

leakix/async_client.py

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
"""Async LeakIX API client using httpx."""
2+
3+
import json
4+
from collections.abc import AsyncIterator
5+
from typing import Any, cast
6+
7+
import httpx
8+
from l9format import l9format
9+
10+
from leakix.base import DEFAULT_URL, BaseClient
11+
from leakix.client import Scope
12+
from leakix.query import AbstractQuery, serialize_queries
13+
from leakix.response import (
14+
AbstractResponse,
15+
ErrorResponse,
16+
RateLimitResponse,
17+
SuccessResponse,
18+
)
19+
20+
DEFAULT_TIMEOUT = 30.0
21+
22+
23+
class AsyncClient(BaseClient):
24+
"""Async client for the LeakIX API.
25+
26+
Mirrors the sync Client API but uses httpx for async operations.
27+
All methods return AbstractResponse for consistency with the sync client.
28+
"""
29+
30+
def __init__(
31+
self,
32+
api_key: str | None = None,
33+
base_url: str | None = DEFAULT_URL,
34+
timeout: float = DEFAULT_TIMEOUT,
35+
) -> None:
36+
super().__init__(api_key=api_key, base_url=base_url)
37+
self.timeout = timeout
38+
self._client: httpx.AsyncClient | None = None
39+
40+
async def _get_client(self) -> httpx.AsyncClient:
41+
"""Get or create the HTTP client."""
42+
if self._client is None or self._client.is_closed:
43+
self._client = httpx.AsyncClient(
44+
base_url=self.base_url,
45+
headers=self.headers,
46+
timeout=self.timeout,
47+
)
48+
return self._client
49+
50+
async def close(self) -> None:
51+
"""Close the HTTP client."""
52+
if self._client is not None and not self._client.is_closed:
53+
await self._client.aclose()
54+
self._client = None
55+
56+
async def __aenter__(self) -> "AsyncClient":
57+
return self
58+
59+
async def __aexit__(self, *args: Any) -> None:
60+
await self.close()
61+
62+
async def __get(
63+
self, path: str, params: dict[str, Any] | None = None
64+
) -> AbstractResponse:
65+
"""Make a GET request and return an AbstractResponse."""
66+
client = await self._get_client()
67+
r = await client.get(path, params=params)
68+
if r.status_code == 200:
69+
response_json = r.json() if r.content else []
70+
return SuccessResponse(response=r, response_json=response_json)
71+
elif r.status_code == 429:
72+
return RateLimitResponse(response=r)
73+
elif r.status_code == 204:
74+
return SuccessResponse(response=r, response_json=[])
75+
else:
76+
return ErrorResponse(response=r, response_json=r.json())
77+
78+
async def get(
79+
self,
80+
scope: Scope,
81+
queries: list[AbstractQuery] | None = None,
82+
page: int = 0,
83+
) -> AbstractResponse:
84+
"""Search LeakIX for services or leaks."""
85+
if page < 0:
86+
raise ValueError("Page argument must be a positive integer")
87+
serialized_query = serialize_queries(queries)
88+
return await self.__get(
89+
"/search",
90+
params={"scope": scope.value, "q": serialized_query, "page": page},
91+
)
92+
93+
async def get_service(
94+
self, queries: list[AbstractQuery] | None = None, page: int = 0
95+
) -> AbstractResponse:
96+
"""Shortcut for get with scope=Scope.SERVICE."""
97+
return self._parse_events(
98+
await self.get(Scope.SERVICE, queries=queries, page=page)
99+
)
100+
101+
async def get_leak(
102+
self, queries: list[AbstractQuery] | None = None, page: int = 0
103+
) -> AbstractResponse:
104+
"""Shortcut for get with scope=Scope.LEAK."""
105+
return self._parse_events(
106+
await self.get(Scope.LEAK, queries=queries, page=page)
107+
)
108+
109+
async def search(
110+
self, query: str, scope: Scope = Scope.LEAK, page: int = 0
111+
) -> AbstractResponse:
112+
"""
113+
Simple search using a raw query string (same syntax as the website).
114+
115+
Example:
116+
>>> await client.search("+plugin:GitConfigHttpPlugin", scope=Scope.LEAK)
117+
"""
118+
if page < 0:
119+
raise ValueError("Page argument must be a positive integer")
120+
r = await self.__get(
121+
"/search",
122+
params={"scope": scope.value, "q": query, "page": page},
123+
)
124+
return self._parse_events(r)
125+
126+
async def get_host(self, ipv4: str) -> AbstractResponse:
127+
"""Returns the list of services and associated leaks for a given host."""
128+
return self._parse_host_result(await self.__get(f"/host/{ipv4}"))
129+
130+
async def get_domain(self, domain: str) -> AbstractResponse:
131+
"""Returns the list of services and associated leaks for a given domain."""
132+
return self._parse_host_result(await self.__get(f"/domain/{domain}"))
133+
134+
async def get_plugins(self) -> AbstractResponse:
135+
"""Returns the list of plugins the authenticated user has access to."""
136+
return self._parse_plugins(await self.__get("/api/plugins"))
137+
138+
async def get_subdomains(self, domain: str) -> AbstractResponse:
139+
"""Returns the list of subdomains for a given domain."""
140+
return self._parse_subdomains(await self.__get(f"/api/subdomains/{domain}"))
141+
142+
async def bulk_export(
143+
self, queries: list[AbstractQuery] | None = None
144+
) -> AbstractResponse:
145+
"""Bulk export leaks (Pro API feature)."""
146+
serialized_query = serialize_queries(queries)
147+
client = await self._get_client()
148+
async with client.stream(
149+
"GET", "/bulk/search", params={"q": serialized_query}
150+
) as r:
151+
if r.status_code == 200:
152+
response_json = []
153+
async for line in r.aiter_lines():
154+
if line:
155+
json_event = json.loads(line)
156+
response_json.append(
157+
l9format.L9Aggregation.from_dict(json_event)
158+
)
159+
return SuccessResponse(response=r, response_json=response_json)
160+
elif r.status_code == 429:
161+
return RateLimitResponse(response=r)
162+
elif r.status_code == 204:
163+
return SuccessResponse(response=r, response_json=[])
164+
else:
165+
await r.aread()
166+
return ErrorResponse(response=r, response_json=r.json())
167+
168+
async def bulk_export_stream(
169+
self, queries: list[AbstractQuery] | None = None
170+
) -> AsyncIterator[l9format.L9Aggregation]:
171+
"""
172+
Streaming version of bulk_export. Yields L9Aggregation objects one by one.
173+
More memory efficient for large result sets.
174+
"""
175+
serialized_query = serialize_queries(queries)
176+
client = await self._get_client()
177+
async with client.stream(
178+
"GET", "/bulk/search", params={"q": serialized_query}
179+
) as r:
180+
if r.status_code != 200:
181+
return
182+
async for line in r.aiter_lines():
183+
if line:
184+
json_event = json.loads(line)
185+
yield cast(
186+
l9format.L9Aggregation,
187+
l9format.L9Aggregation.from_dict(json_event),
188+
)

leakix/base.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
"""Shared logic between sync and async LeakIX clients."""
2+
3+
import dataclasses
4+
from importlib.metadata import version
5+
from typing import Any, cast
6+
7+
from l9format import l9format
8+
from l9format.l9format import Model
9+
10+
from leakix.domain import L9Subdomain
11+
from leakix.plugin import APIResult
12+
from leakix.response import AbstractResponse
13+
14+
DEFAULT_URL = "https://leakix.net"
15+
16+
17+
@dataclasses.dataclass
18+
class HostResult(Model):
19+
Services: list[l9format.L9Event] | None = None
20+
Leaks: list[l9format.L9Event] | None = None
21+
22+
23+
class BaseClient:
24+
"""Shared initialization and response transformation logic."""
25+
26+
MAX_RESULTS_PER_PAGE = 20
27+
28+
def __init__(
29+
self,
30+
api_key: str | None = None,
31+
base_url: str | None = DEFAULT_URL,
32+
) -> None:
33+
self.api_key = api_key
34+
self.base_url = base_url if base_url else DEFAULT_URL
35+
self.headers: dict[str, str] = {
36+
"Accept": "application/json",
37+
"User-agent": f"leakix-client-python/{version('leakix')}",
38+
}
39+
if api_key:
40+
self.headers["api-key"] = api_key
41+
42+
@staticmethod
43+
def _parse_events(response: AbstractResponse) -> AbstractResponse:
44+
"""Parse raw JSON dicts into L9Event objects on a success response."""
45+
if response.is_success():
46+
response.response_json = [
47+
l9format.L9Event.from_dict(res) for res in response.response_json
48+
]
49+
return response
50+
51+
@staticmethod
52+
def _parse_host_result(response: AbstractResponse) -> AbstractResponse:
53+
"""Parse a host/domain response into {services, leaks} format."""
54+
if response.is_success():
55+
data: dict[str, Any] = response.json()
56+
formatted = cast(HostResult, HostResult.from_dict(data))
57+
response.response_json = {
58+
"services": formatted.Services,
59+
"leaks": formatted.Leaks,
60+
}
61+
return response
62+
63+
@staticmethod
64+
def _parse_plugins(response: AbstractResponse) -> AbstractResponse:
65+
if response.is_success():
66+
response.response_json = [APIResult.from_dict(d) for d in response.json()]
67+
return response
68+
69+
@staticmethod
70+
def _parse_subdomains(response: AbstractResponse) -> AbstractResponse:
71+
if response.is_success():
72+
response.response_json = [L9Subdomain.from_dict(d) for d in response.json()]
73+
return response

0 commit comments

Comments
 (0)