better test coverage for offline queue

This commit is contained in:
Alan Hamlett 2015-09-27 03:32:28 -07:00
parent db02b82d95
commit d6faae47d0
2 changed files with 35 additions and 3 deletions

View File

@ -109,7 +109,7 @@ class OfflineQueueTestCase(utils.TestCase):
self.assertEquals(sys.stderr.getvalue(), '')
self.assertEquals(os.path.realpath(entity), saved_heartbeat['entity'])
def test_handles_connection_exception(self):
def test_get_handles_connection_exception(self):
with tempfile.NamedTemporaryFile() as fh:
with utils.mock.patch('wakatime.offlinequeue.Queue.get_db_file') as mock_db_file:
mock_db_file.return_value = fh.name
@ -138,3 +138,35 @@ class OfflineQueueTestCase(utils.TestCase):
queue = Queue()
saved_heartbeat = queue.pop()
self.assertEquals(os.path.realpath(entity), saved_heartbeat['entity'])
def test_push_handles_connection_exception(self):
with tempfile.NamedTemporaryFile() as fh:
with utils.mock.patch('wakatime.offlinequeue.Queue.get_db_file') as mock_db_file:
mock_db_file.return_value = fh.name
response = Response()
response.status_code = 500
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
now = u(int(time.time()))
entity = 'tests/samples/codefiles/twolinefile.txt'
config = 'tests/samples/configs/sample.cfg'
with utils.mock.patch('wakatime.offlinequeue.Queue.connect') as mock_connect:
mock_connect.side_effect = sqlite3.Error('')
args = ['--file', entity, '--config', config, '--time', now]
execute(args)
response.status_code = 201
execute(args)
queue = Queue()
saved_heartbeat = queue.pop()
self.assertEquals(None, saved_heartbeat)
def test_get_db_file(self):
queue = Queue()
db_file = queue.get_db_file()
expected = os.path.join(os.path.expanduser('~'), '.wakatime.db')
self.assertEquals(db_file, expected)

View File

@ -97,12 +97,12 @@ class Queue(object):
if row[index] is not None:
clauses.append('{0}=?'.format(row_name))
values.append(row[index])
else:
else: # pragma: nocover
clauses.append('{0} IS NULL'.format(row_name))
index += 1
if len(values) > 0:
c.execute('DELETE FROM {0} WHERE {1}'.format(self.table_name, ' AND '.join(clauses)), values)
else:
else: # pragma: nocover
c.execute('DELETE FROM {0} WHERE {1}'.format(self.table_name, ' AND '.join(clauses)))
conn.commit()
if row is not None: