Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 81 additions & 14 deletions build/lib/hibp.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from enum import Enum
import re
import time

try:
import requests
Expand All @@ -16,8 +17,27 @@
monkey.patch_all(thread=False, select=False)

# global variables
BASE_URL = "https://haveibeenpwned.com/api/v2/"
BASE_URL = "https://haveibeenpwned.com/api/{api_version}/"
HEADERS = {"User-Agent": "hibp-python",}
DEFAULT_API_VERSION = "v2"

#Min delay between two api calls
API_CALL_DELAY = 1.601

# decorator for api min-delay calls
def api_min_delay(base_func):
'''Forces the called function to wait until the api's call delay
is ellapsed before returning, to avoid exceeding rate.'''
def func(*args, **kwargs):
start = time.time()
res = base_func(*args, **kwargs)
stop = time.time()
ellapsed_time = stop-start
remaining_time = max(API_CALL_DELAY - ellapsed_time, 0)
time.sleep(remaining_time)
return res
return func


# enumerate the types of services that are callable
class Services(Enum):
Expand All @@ -26,6 +46,7 @@ class Services(Enum):
Breach = "breach"
AllBreaches = "allbreaches"
DataClasses = "dataclasses"
PasteAccount = "pasteaccount"

# generic HIBP class
class HIBP(object):
Expand All @@ -45,89 +66,123 @@ def __init__(self):
self.response = None

@classmethod
def get_account_breaches(cls,account):
def get_account_breaches(cls,account, api_version=DEFAULT_API_VERSION):
'''
Setup request to retrieve all breaches on a particular account

Args:
- account -> account you want to query. can be email or username to
anything
- api_version -> the server's requested api version: e.g v1 or v2
Returns:
- HIBP object with updated url, service, and param attributes
'''
req = cls()
req.url = BASE_URL + "breachedaccount/{}".format(account)
req.url = BASE_URL.format(api_version=api_version) + \
"breachedaccount/{}".format(account)
req.service = Services.AccountBreach
req.param = account
return req

@classmethod
def get_domain_breaches(cls,domain):
def get_domain_breaches(cls,domain, api_version=DEFAULT_API_VERSION):
'''
Setup request to retrieve all breaches on a particular domain

Args:
- domain -> domain you want to query. must be valid domain,
according to RFC 1035
- api_version -> the server's requested api version: e.g v1 or v2
Returns:
- HIBP object with updated url, service, and param attributes
'''
req = cls()
domain_regex = re.compile(r"[a-zA-Z\d-]{,63}(\.[a-zA-Z\d-]{,63})+")
if not re.match(domain_regex, domain):
raise ValueError("{} is an invalid domain.".format(domain))
req.url = BASE_URL + "breaches?domain={}".format(domain)
req.url = BASE_URL.format(api_version=api_version) + \
"breaches?domain={}".format(domain)
req.service = Services.DomainBreach
req.param = domain
return req

@classmethod
def get_breach(cls,name):
def get_breach(cls,name, api_version=DEFAULT_API_VERSION):
'''
Setup request to retrieve a specific breach.

Args:
- name -> name of breach you want to query. To get a list of
all breach names, run HIBP.get_all_breaches()
- api_version -> the server's requested api version: e.g v1 or v2
Returns:
- HIBP object with updated url, service, and param attributes
'''
req = cls()
req.url = BASE_URL + "breach/{}".format(name)
req.url = BASE_URL.format(api_version=api_version) + \
"breach/{}".format(name)
req.service = Services.Breach
req.param = name
return req

@classmethod
def get_all_breaches(cls):
def get_paste_account(cls, account, api_version=DEFAULT_API_VERSION):
'''
Setup request to retrieve all pasted on HIBP for a givent website.

Args:
- account -> account you want to query. can be email or username to
anything
- api_version -> the server's requested api version: e.g v1 or v2

Returns:
- HIBP object with updated url, service, and param attributes
'''
req = cls()
req.url = BASE_URL.format(api_version=api_version) + \
"pasteaccount/{}".format(account)
req.service = Services.PasteAccount
req.param = account
return req

@classmethod
def get_all_breaches(cls, api_version=DEFAULT_API_VERSION):
'''
Setup request to retrieve all breaches recorded on HIBP.com so far.

Args:
- api_version -> the server's requested api version: e.g v1 or v2

Returns:
- HIBP object with updated url, service, and param attributes
'''
req = cls()
req.url = BASE_URL + "breaches"
req.url = BASE_URL.format(api_version=api_version) + "breaches"
req.service = Services.AllBreaches
return req

@classmethod
def get_dataclasses(cls):
def get_dataclasses(cls, api_version=DEFAULT_API_VERSION):
'''
Setup request to retrieve all dataclasses on HIBP.

Args:
- api_version -> the server's requested api version: e.g v1 or v2

Returns:
- HIBP object with updated url, service, and param attributes
'''
req = cls()
req.url = BASE_URL + "dataclasses"
req.url = BASE_URL.format(api_version=api_version) + "dataclasses"
req.service = Services.DataClasses
return req

def execute(self):
'''
Execute a GET request on HIBP REST API service based on request
object setup with one of the query services above.
If many queries are to be executed in batch, use @execute_min_delay
instead.

Returns:
- If query parameter is pwned:
Expand All @@ -151,12 +206,24 @@ def execute(self):
elif response.text == "[]" and self.service == Services.DomainBreach:
self.response = "object has not been pwned."
return self
elif response.status_code == 404 and self.service == Services.PasteAccount:
self.response = "object has not been pwned."
return self
elif response.status_code == 404 and self.service == Services.Breach:
raise ValueError("invalid breach name {}.".format(self.param))
elif response.status_code == 429 and self.service == Services.AccountBreach:
raise ValueError("Rate limit error {}.".format(self.param))
else:
self.response = response.json()
return self

def execute_min_delay(self):
'''Calls execute and make sure the minimal delay between two api calls
is ellapsed before returning.'''
delayed_func = api_min_delay(self.execute)
return delayed_func()


class AsyncHIBP(object):
'''
Generic AsyncHIBP object. Use this object to do concurrent HIBP requests
Expand All @@ -178,6 +245,7 @@ def __init__(self):
self.url = None
self.response = None

@api_min_delay
def send(self,hibp_obj):
'''
Spawns gevent/pool threads that will run the execute method on each
Expand Down Expand Up @@ -208,6 +276,5 @@ def imap(self,hibp_objs):
Attributes:
- hibp_objs - list of HIBP objects
'''
for hibp_obj in self.pool.imap_unordered(HIBP.execute, hibp_objs):
yield hibp_obj.response
self.pool.join()
for hibp_obj in hibp_objs:
yield HIBP.execute_min_delay(hibp_obj)
6 changes: 4 additions & 2 deletions build/lib/hibp/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,18 @@
names = ['adobe','ashleymadison', 'naughtyamerica', 'myspace']
accounts = ["ssgrn", "pegasos1","barackobama"]
domains = ['twitter.com', 'facebook.com','github.com','adobe.com']
paste_accounts = ['test@example.com']

# setup HIBP objects for request executions
reqs = [HIBP.get_breach(x) for x in names] \
+ [HIBP.get_account_breaches(x) for x in accounts] \
+ [HIBP.get_domain_breaches(x) for x in domains]
+ [HIBP.get_domain_breaches(x) for x in domains] \
+ [HIBP.get_paste_account(x) for x in paste_accounts]

### SERIAL
start_time = time.time()
for req in reqs:
req.execute()
req.execute_min_delay()
elapsed_time = time.time() - start_time
logging.info("serial impl took %.2f seconds" % elapsed_time)

Expand Down
Loading