parse response from bulk endpoint, only saving unsent heartbeats to offline cache

This commit is contained in:
Alan Hamlett 2017-11-09 09:03:27 -08:00
parent 2b2c48b2d7
commit cbd67fb73c
4 changed files with 216 additions and 23 deletions

View file

@ -147,7 +147,7 @@ class LoggingTestCase(utils.TestCase):
self.assertEquals(output[1], expected)
self.assertEquals(output[2], u('WakaTime DEBUG Sending heartbeats to api at https://api.wakatime.com/api/v1/users/current/heartbeats.bulk'))
self.assertIn('Python', output[3])
self.assertIn('response_code', output[4])
self.assertIn('response_code', output[5])
@log_capture()
def test_exception_traceback_logged_in_debug_mode(self, logs):

View file

@ -8,13 +8,15 @@ from wakatime.packages import requests
import logging
import os
import sqlite3
import shutil
import time
import uuid
from testfixtures import log_capture
from wakatime.compat import u
from wakatime.constants import SUCCESS
from wakatime.packages.requests.models import Response
from . import utils
from .utils import json
from .utils import ANY, json
class OfflineQueueTestCase(utils.TestCase):
@ -159,6 +161,163 @@ class OfflineQueueTestCase(utils.TestCase):
self.assertEquals(data.get('project'), project2)
self.assertEquals(u(int(data.get('time'))), now2)
@log_capture()
def test_heartbeats_sent_not_saved_from_bulk_response(self, logs):
logging.disable(logging.NOTSET)
with utils.NamedTemporaryFile() as fh:
with utils.mock.patch('wakatime.offlinequeue.Queue._get_db_file') as mock_db_file:
mock_db_file.return_value = fh.name
entities = [
'emptyfile.txt',
'twolinefile.txt',
'python.py',
'go.go',
]
with utils.TemporaryDirectory() as tempdir:
for entity in entities:
shutil.copy(os.path.join('tests/samples/codefiles', entity), os.path.join(tempdir, entity))
now = u(int(time.time()))
key = str(uuid.uuid4())
args = ['--file', os.path.join(tempdir, entities[0]), '--key', key, '--config', 'tests/samples/configs/good_config.cfg', '--time', now, '--extra-heartbeats']
class CustomResponse(Response):
@property
def status_code(self):
return 202
@status_code.setter
def status_code(self, value):
pass
@property
def text(self):
return '[[{"id":1},201], [{"error":"error 2"},500], [{"id":3},201], [{"error":4},500]]'
response = CustomResponse()
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.mock.patch('wakatime.main.sys.stdin') as mock_stdin:
heartbeats = json.dumps([{
'timestamp': now,
'entity': os.path.join(tempdir, entity),
'entity_type': 'file',
'is_write': False,
} for entity in entities[1:]])
mock_stdin.readline.return_value = heartbeats
with utils.mock.patch('wakatime.offlinequeue.Queue.pop') as mock_pop:
mock_pop.return_value = None
retval = execute(args)
self.assertEquals(retval, SUCCESS)
self.assertNothingPrinted()
heartbeat = {
'entity': os.path.realpath(os.path.join(tempdir, entities[0])),
'language': ANY,
'lines': ANY,
'project': ANY,
'time': ANY,
'type': 'file',
'is_write': ANY,
'user_agent': ANY,
'dependencies': ANY,
}
extra_heartbeats = [{
'entity': os.path.realpath(os.path.join(tempdir, entity)),
'language': ANY,
'lines': ANY,
'project': ANY,
'branch': ANY,
'time': ANY,
'is_write': ANY,
'type': 'file',
'dependencies': ANY,
'user_agent': ANY,
} for entity in entities[1:]]
self.assertHeartbeatSent(heartbeat, extra_heartbeats=extra_heartbeats)
self.assertSessionCacheSaved()
queue = Queue(None, None)
self.assertEquals(queue._get_db_file(), fh.name)
saved_heartbeats = queue.pop_many()
self.assertNothingPrinted()
self.assertNothingLogged(logs)
# make sure only heartbeats with error code responses were saved
self.assertEquals(len(saved_heartbeats), 2)
self.assertEquals(saved_heartbeats[0].entity, os.path.realpath(os.path.join(tempdir, entities[1])))
self.assertEquals(saved_heartbeats[1].entity, os.path.realpath(os.path.join(tempdir, entities[3])))
@log_capture()
def test_offline_heartbeats_sent_after_partial_success_from_bulk_response(self, logs):
logging.disable(logging.NOTSET)
with utils.NamedTemporaryFile() as fh:
with utils.mock.patch('wakatime.offlinequeue.Queue._get_db_file') as mock_db_file:
mock_db_file.return_value = fh.name
entities = [
'emptyfile.txt',
'twolinefile.txt',
'python.py',
'go.go',
]
with utils.TemporaryDirectory() as tempdir:
for entity in entities:
shutil.copy(os.path.join('tests/samples/codefiles', entity), os.path.join(tempdir, entity))
now = u(int(time.time()))
key = str(uuid.uuid4())
args = ['--file', os.path.join(tempdir, entities[0]), '--key', key, '--config', 'tests/samples/configs/good_config.cfg', '--time', now, '--extra-heartbeats']
class CustomResponse(Response):
@property
def status_code(self):
return 202
@status_code.setter
def status_code(self, value):
pass
@property
def text(self):
return '[[{"id":1},201], [{"error":"error 2"},500], [{"id":3},201], [{"error":4},500]]'
response = CustomResponse()
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.mock.patch('wakatime.main.sys.stdin') as mock_stdin:
heartbeats = json.dumps([{
'timestamp': now,
'entity': os.path.join(tempdir, entity),
'entity_type': 'file',
'is_write': False,
} for entity in entities[1:]])
mock_stdin.readline.return_value = heartbeats
retval = execute(args)
self.assertEquals(retval, SUCCESS)
self.assertNothingPrinted()
queue = Queue(None, None)
self.assertEquals(queue._get_db_file(), fh.name)
saved_heartbeats = queue.pop_many()
self.assertNothingPrinted()
self.assertNothingLogged(logs)
# make sure all offline heartbeats were sent, so queue should only have 1 heartbeat left from the second 500 response
self.assertEquals(len(saved_heartbeats), 1)
def test_auth_error_when_sending_offline_heartbeats(self):
with utils.NamedTemporaryFile() as fh:
with utils.mock.patch('wakatime.offlinequeue.Queue._get_db_file') as mock_db_file:

View file

@ -91,13 +91,16 @@ class TestCase(unittest.TestCase):
if isinstance(heartbeat.get(key), list):
self.assertListsEqual(heartbeat.get(key), body[0].get(key), u('Expected heartbeat to be sent with {0}={1}, instead {0}={2}').format(u(key), u(heartbeat.get(key)), u(body[0].get(key))))
else:
self.assertEquals(heartbeat.get(key), body[0].get(key), u('Expected heartbeat to be sent with {0}={1}, instead {0}={2}').format(u(key), u(heartbeat.get(key)), u(body[0].get(key))))
self.assertEquals(heartbeat.get(key), body[0].get(key), u('Expected heartbeat to be sent with {1} {0}={2}, instead {3} {0}={4}').format(u(key), type(heartbeat.get(key)), u(heartbeat.get(key)), type(body[0].get(key)), u(body[0].get(key))))
if extra_heartbeats:
for i in range(len(extra_heartbeats)):
keys = list(body[i + 1].keys()) + list(extra_heartbeats[i].keys())
for key in keys:
self.assertEquals(extra_heartbeats[i].get(key), body[i + 1].get(key), u('Expected extra heartbeat {3} to be sent with {0}={1}, instead {0}={2}').format(u(key), u(extra_heartbeats[i].get(key)), u(body[i + 1].get(key)), i))
if isinstance(extra_heartbeats[i].get(key), list):
self.assertListsEqual(extra_heartbeats[i].get(key), body[i + 1].get(key), u('Expected extra heartbeat {3} to be sent with {0}={1}, instead {0}={2}').format(u(key), u(extra_heartbeats[i].get(key)), u(body[i + 1].get(key)), i))
else:
self.assertEquals(extra_heartbeats[i].get(key), body[i + 1].get(key), u('Expected extra heartbeat {5} to be sent with {1} {0}={2}, instead {3} {0}={4}').format(u(key), type(extra_heartbeats[i].get(key)), u(extra_heartbeats[i].get(key)), type(body[i + 1].get(key)), u(body[i + 1].get(key)), i))
def assertSessionCacheUntouched(self):
self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called()

View file

@ -138,16 +138,50 @@ def send_heartbeats(heartbeats, args, configs, use_ntlm_proxy=False):
else:
code = response.status_code if response is not None else None
content = response.text if response is not None else None
try:
results = response.json() if response is not None else []
except:
if log.isEnabledFor(logging.DEBUG):
log.traceback(logging.WARNING)
results = []
if code == requests.codes.created or code == requests.codes.accepted:
log.debug({
'response_code': code,
})
for i in range(len(results)):
if len(heartbeats) <= i:
log.debug('Results from server do not match heartbeats sent.')
break
try:
c = results[i][1]
except:
c = 0
try:
text = json.dumps(results[i][0])
except:
if log.isEnabledFor(logging.DEBUG):
log.traceback(logging.WARNING)
text = ''
handle_result([heartbeats[i]], c, text, args, configs)
session_cache.save(session)
return SUCCESS
if should_try_ntlm:
return send_heartbeats(heartbeats, args, configs, use_ntlm_proxy=True)
else:
handle_result(heartbeats, code, content, args, configs)
session_cache.delete()
return AUTH_ERROR if code == 401 else API_ERROR
def handle_result(h, code, content, args, configs):
if code == requests.codes.created or code == requests.codes.accepted:
return
if args.offline:
if code == 400:
log.error({
@ -161,12 +195,9 @@ def send_heartbeats(heartbeats, args, configs, use_ntlm_proxy=False):
'response_content': content,
})
queue = Queue(args, configs)
queue.push_many(heartbeats)
queue.push_many(h)
else:
log.error({
'response_code': code,
'response_content': content,
})
session_cache.delete()
return AUTH_ERROR if code == 401 else API_ERROR