diff --git a/linesman/backends/postgres.py b/linesman/backends/postgres.py new file mode 100644 index 0000000..32aedb5 --- /dev/null +++ b/linesman/backends/postgres.py @@ -0,0 +1,172 @@ +import cPickle +import logging +import psycopg2 +import time +import StringIO + +from linesman.backends.base import Backend + +try: + # Python 2.7+ + from collections import OrderedDict +except ImportError: + # Python 2.4+ + from ordereddict import OrderedDict + +log = logging.getLogger(__name__) + + +class PostgresBackend(Backend): + """ + Stores sessions in a PostgreSQL database. + """ + + def __init__(self, dbname='linesman', user='user', password='password'): + """ + Opens up a connection to a PostgreSQL database. + There is no "connection close" method at + linesman.backends.base.Backend... + """ + + self.conn = psycopg2.connect( + dbname=dbname, user=user, password=password) + + def setup(self): + """ + Creates table for Linesman, if it doesn't already exist. + """ + + query = """ + CREATE TABLE sessions ( + uuid uuid PRIMARY KEY, + timestamp FLOAT, + session BYTEA + ); + """ + cur = self.conn.cursor() + try: + cur.execute(query) + self.conn.commit() + except psycopg2.ProgrammingError: + log.debug("Table already exists.") + self.conn.rollback() + finally: + cur.close() + + def add(self, session): + """ + Insert a new session into the database. + """ + uuid = session.uuid + if session.timestamp: + timestamp = time.mktime(session.timestamp.timetuple()) + else: + timestamp = None + + pickled_session = psycopg2.Binary(cPickle.dumps(session, -1)) + + query = "INSERT INTO sessions VALUES ('{}', {}, {});".format( + uuid, timestamp, pickled_session + ) + cur = self.conn.cursor() + try: + cur.execute(query) + self.conn.commit() + except: + self.conn.rollback() + finally: + cur.close() + + def delete(self, session_uuid): + """ + Remove the session. + """ + query = "DELETE FROM sessions WHERE uuid = '{}';".format(session_uuid) + + cur = self.conn.cursor() + try: + cur.execute(query) + self.conn.commit() + except: + self.conn.rollback() + finally: + cur.close() + + def delete_many(self, session_uuids): + """ + Remove the sessions. + """ + query = "DELETE FROM sessions WHERE uuid IN ('{}');".format( + "', '".join(session_uuids)) + cur = self.conn.cursor() + deleted_rows = 0 + try: + cur.execute(query) + self.conn.commit() + deleted_rows = cur.rowcount + except: + self.conn.rollback() + finally: + cur.close() + return deleted_rows + + def delete_all(self): + """ + Truncate the database. + """ + query = "DELETE FROM sessions;" + + cur = self.conn.cursor() + try: + cur.execute(query) + self.conn.commit() + except: + self.conn.rollback() + finally: + cur.close() + + def get(self, session_uuid): + """ + Retrieves the session from the database. + """ + query = "SELECT session FROM sessions WHERE uuid = '{}';".format( + session_uuid) + + cur = self.conn.cursor() + result = None + try: + cur.execute(query) + self.conn.commit() + session = cur.fetchone() + result = cPickle.load(StringIO.StringIO(session[0])) + except: + self.conn.rollback() + finally: + cur.close() + + return result if result else None + + def get_all(self): + """ + Generates a dictionary of the data based on the contents of the DB. + """ + query = "SELECT uuid, session FROM sessions ORDER BY timestamp;" + + cur = self.conn.cursor() + result = [] + try: + cur.execute(query) + self.conn.commit() + result = cur.fetchall() + except: + self.conn.rollback() + finally: + cur.close() + + unpickled_result = [] + + for (uuid, session) in result: + unpickled_session = cPickle.load(StringIO.StringIO(session)) + unpickled_result.append((uuid, unpickled_session)) + + return OrderedDict(unpickled_result) diff --git a/linesman/tests/backends/test_backend_postgres.py b/linesman/tests/backends/test_backend_postgres.py new file mode 100644 index 0000000..77e5fed --- /dev/null +++ b/linesman/tests/backends/test_backend_postgres.py @@ -0,0 +1,152 @@ +import psycopg2 +import cPickle +import StringIO +from linesman.backends.postgres import PostgresBackend +from linesman.tests import (create_mock_session, SPECIFIC_DATE_EPOCH) +from linesman.tests.backends import TestBackend + + +class TestBackendPostgres(TestBackend): + + def setUp(self): + # This must be configurable + self.db = 'linesman_test' + self.user = 'linesman_test' + self.password = 'linesman_test' + self.backend = PostgresBackend(dbname=self.db, + user=self.user, + password=self.password) + self.backend.setup() + + def tearDown(self): + conn = psycopg2.connect(dbname=self.db, user=self.user, + password=self.password) + query = 'DROP TABLE sessions;' + cur = conn.cursor() + cur.execute(query) + conn.commit() + cur.close() + conn.close() + + def test_setup(self): + """ Test that setup() creates a new table with the correct columns. """ + c = self.backend.conn.cursor() + c.execute("SELECT * FROM sessions") + self.backend.conn.commit() + description = c.description + c.close() + self.assertTrue('uuid', description[0].name) + self.assertTrue('timestamp', description[1].name) + self.assertTrue('session', description[2].name) + + def test_duplicate_setup(self): + """ Test that running setup() twice (duplicate tables) won't fail. """ + self.backend.setup() + + def test_add_session(self): + """ Test that adding a session inserts it into the database. """ + mock_session = create_mock_session() + self.backend.add(mock_session) + + query = "SELECT uuid, timestamp, session FROM sessions " \ + "WHERE uuid = '{}';".format(mock_session.uuid) + + c = self.backend.conn.cursor() + c.execute(query) + self.backend.conn.commit() + actual_uuid, actual_timestamp, result = c.fetchone() + c.close() + actual_session = cPickle.load(StringIO.StringIO(result)) + + # Assure the meta columns are equal + self.assertEquals(mock_session.uuid, actual_uuid) + self.assertEquals(SPECIFIC_DATE_EPOCH, actual_timestamp) + + # Also insure that the session we put in is intact! + self.assertSessionsEqual(mock_session, actual_session) + + def test_delete(self): + """ Test that removing an added session removes it from the DB. """ + mock_session = create_mock_session() + self.backend.add(mock_session) + self.backend.delete(mock_session.uuid) + + query = "SELECT * FROM sessions WHERE uuid = '{}';".format( + mock_session.uuid) + + # Verify that no rows are matched + c = self.backend.conn.cursor() + c.execute(query) + self.backend.conn.commit() + self.assertEquals(c.fetchone(), None) + c.close() + + def test_delete_many(self): + """ Test that delete_many removes the correct sessions. """ + sessions = [] + for i in range(10): + mock_session = create_mock_session() + self.backend.add(mock_session) + sessions.append(mock_session.uuid) + + delete_count = self.backend.delete_many(sessions[0:5]) + self.assertEquals(delete_count, 5) + + c = self.backend.conn.cursor() + c.execute("SELECT COUNT(*) FROM sessions;") + self.backend.conn.commit() + self.assertEquals(c.fetchone(), (5,)) + c.close() + + def test_delete_all(self): + """ Test that deleting all session removes them all from the DB """ + # Add a few new session profiles + for i in range(1, 5): + self.backend.add(create_mock_session()) + + # TODO Check to make sure rows were added? + + # Then delete them all. + self.backend.delete_all() + + query = "SELECT * FROM sessions;" + + # Verify that no rows are matched + c = self.backend.conn.cursor() + c.execute(query) + self.backend.conn.commit() + self.assertEquals(c.fetchone(), None) + c.close() + + def test_get(self): + """ Test that a session can be received using get(). """ + mock_session = create_mock_session() + self.backend.add(mock_session) + + actual_session = self.backend.get(mock_session.uuid) + self.assertSessionsEqual(mock_session, actual_session) + + def test_get_no_results(self): + """ Test that when no sessions are available, get returns None """ + actual_session = self.backend.get("not a real uuid") + self.assertEqual(None, actual_session) + + def test_get_all(self): + """ Test that all sessions are retrieved when using get_all() """ + expected_sessions = {} + for i in range(1, 5): + mock_session = create_mock_session() + self.backend.add(mock_session) + expected_sessions[mock_session.uuid] = mock_session + + actual_sessions = self.backend.get_all() + for (actual_uuid, actual_session) in actual_sessions.items(): + expected_session = expected_sessions.get(actual_uuid) + self.assertTrue(expected_session is not None, + "UUID `%s' not found in results." % actual_uuid) + self.assertSessionsEqual(actual_session, expected_session) + + def test_get_all_no_results(self): + """ Test that an empty dict is returned when no sessions exist. """ + actual_sessions = self.backend.get_all() + self.assertFalse(len(actual_sessions)) diff --git a/setup.py b/setup.py index 1b19344..de3d1de 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ import sys -install_requires = ["mako", "networkx==1.7", "pillow", "pygraphviz", 'Paste', 'WebOb'] +install_requires = ["mako", "networkx==1.7", "pillow", "pygraphviz", 'Paste', 'WebOb', 'psycopg2'] # ordereddict is required for versions < 2.7; its included in collections in # versions 2.7+ and 3.0+