improve test coverage for parsing api bulk results

This commit is contained in:
Alan Hamlett 2017-11-22 18:54:47 -08:00
parent 9ffb677f9d
commit 8269196c88
2 changed files with 88 additions and 5 deletions

View file

@ -632,3 +632,87 @@ class OfflineQueueTestCase(utils.TestCase):
self.assertEquals(u(language), saved_heartbeat['language']) self.assertEquals(u(language), saved_heartbeat['language'])
self.assertEquals(u(project), saved_heartbeat['project']) self.assertEquals(u(project), saved_heartbeat['project'])
self.assertEquals(u(branch), saved_heartbeat['branch']) self.assertEquals(u(branch), saved_heartbeat['branch'])
@log_capture()
def test_heartbeat_saved_when_bulk_result_json_decode_error(self, logs):
logging.disable(logging.NOTSET)
with utils.NamedTemporaryFile() as fh:
with utils.mock.patch('wakatime.offlinequeue.Queue._get_db_file') as mock_db_file:
mock_db_file.return_value = fh.name
response = CustomResponse()
response.response_text = '[[{id":1}]]'
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
now = u(int(time.time()))
entity = 'tests/samples/codefiles/twolinefile.txt'
config = 'tests/samples/configs/good_config.cfg'
args = ['--file', entity, '--config', config, '--time', now]
execute(args)
queue = Queue(None, None)
saved_heartbeat = queue.pop()
self.assertEquals(os.path.realpath(entity), saved_heartbeat['entity'])
self.assertNothingPrinted()
expected = 'JSONDecodeError'
actual = self.getLogOutput(logs)
self.assertIn(expected, actual)
@log_capture()
def test_heartbeat_saved_from_result_type_error(self, logs):
logging.disable(logging.NOTSET)
with utils.NamedTemporaryFile() as fh:
with utils.mock.patch('wakatime.offlinequeue.Queue._get_db_file') as mock_db_file:
mock_db_file.return_value = fh.name
response = CustomResponse()
response.response_text = '[0]'
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
now = u(int(time.time()))
entity = 'tests/samples/codefiles/twolinefile.txt'
config = 'tests/samples/configs/good_config.cfg'
args = ['--file', entity, '--config', config, '--time', now]
execute(args)
queue = Queue(None, None)
saved_heartbeat = queue.pop()
self.assertEquals(os.path.realpath(entity), saved_heartbeat['entity'])
self.assertNothingPrinted()
expected = 'TypeError'
actual = self.getLogOutput(logs)
self.assertIn(expected, actual)
@log_capture()
def test_heartbeat_saved_from_result_index_error(self, logs):
logging.disable(logging.NOTSET)
with utils.NamedTemporaryFile() as fh:
with utils.mock.patch('wakatime.offlinequeue.Queue._get_db_file') as mock_db_file:
mock_db_file.return_value = fh.name
response = CustomResponse()
response.response_text = '[[{"id":1}]]'
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
now = u(int(time.time()))
entity = 'tests/samples/codefiles/twolinefile.txt'
config = 'tests/samples/configs/good_config.cfg'
args = ['--file', entity, '--config', config, '--time', now]
execute(args)
queue = Queue(None, None)
saved_heartbeat = queue.pop()
self.assertEquals(os.path.realpath(entity), saved_heartbeat['entity'])
self.assertNothingPrinted()
expected = 'IndexError'
actual = self.getLogOutput(logs)
self.assertIn(expected, actual)

View file

@ -30,7 +30,7 @@ log = logging.getLogger('WakaTime')
try: try:
from .packages import requests from .packages import requests
except ImportError: except ImportError: # pragma: nocover
log.traceback(logging.ERROR) log.traceback(logging.ERROR)
print(traceback.format_exc()) print(traceback.format_exc())
log.error('Please upgrade Python to the latest version.') log.error('Please upgrade Python to the latest version.')
@ -167,12 +167,12 @@ def _process_server_results(heartbeats, code, content, results, args, configs):
try: try:
c = results[i][1] c = results[i][1]
except: except:
log.traceback(logging.WARNING)
c = 0 c = 0
try: try:
text = json.dumps(results[i][0]) text = json.dumps(results[i][0])
except: except:
if log.isEnabledFor(logging.DEBUG): log.traceback(logging.WARNING)
log.traceback(logging.WARNING)
text = '' text = ''
if not _success(c): if not _success(c):
_handle_unsent_heartbeats([heartbeats[i]], c, text, args, configs) _handle_unsent_heartbeats([heartbeats[i]], c, text, args, configs)
@ -212,8 +212,7 @@ def _get_results(response):
try: try:
results = response.json() results = response.json()
except: except:
if log.isEnabledFor(logging.DEBUG): log.traceback(logging.WARNING)
log.traceback(logging.WARNING)
return results return results