refactor main into multiple files

This commit is contained in:
Alan Hamlett 2017-02-20 12:04:07 -08:00
parent e50ce86c6e
commit c5531312da
7 changed files with 1389 additions and 1237 deletions

480
tests/test_arguments.py Normal file
View File

@ -0,0 +1,480 @@
# -*- coding: utf-8 -*-
from wakatime.main import execute
from wakatime.packages import requests
import logging
import os
import time
import shutil
import sys
import uuid
from testfixtures import log_capture
from wakatime.compat import u
from wakatime.constants import (
API_ERROR,
AUTH_ERROR,
SUCCESS,
MALFORMED_HEARTBEAT_ERROR,
)
from wakatime.packages.requests.models import Response
from . import utils
try:
from .packages import simplejson as json
except (ImportError, SyntaxError):
import json
try:
from mock import ANY, call
except ImportError:
from unittest.mock import ANY, call
class MainTestCase(utils.TestCase):
patch_these = [
'wakatime.packages.requests.adapters.HTTPAdapter.send',
'wakatime.offlinequeue.Queue.push',
['wakatime.offlinequeue.Queue.pop', None],
['wakatime.offlinequeue.Queue.connect', None],
'wakatime.session_cache.SessionCache.save',
'wakatime.session_cache.SessionCache.delete',
['wakatime.session_cache.SessionCache.get', requests.session],
['wakatime.session_cache.SessionCache.connect', None],
]
def test_help_contents(self):
args = ['--help']
with self.assertRaises(SystemExit) as e:
execute(args)
self.assertEquals(int(str(e.exception)), 0)
expected_stdout = open('tests/samples/output/test_help_contents').read()
self.assertEquals(sys.stdout.getvalue(), expected_stdout)
self.assertEquals(sys.stderr.getvalue(), '')
self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called()
def test_argument_parsing(self):
response = Response()
response.status_code = 201
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/twolinefile.txt'
shutil.copy(entity, os.path.join(tempdir, 'twolinefile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'twolinefile.txt'))
key = str(uuid.uuid4())
config = 'tests/samples/configs/good_config.cfg'
args = ['--file', entity, '--key', key, '--config', config]
retval = execute(args)
self.assertEquals(retval, SUCCESS)
self.assertEquals(sys.stdout.getvalue(), '')
self.assertEquals(sys.stderr.getvalue(), '')
self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.save'].assert_called_once_with(ANY)
self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with()
def test_lineno_and_cursorpos(self):
response = Response()
response.status_code = 0
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
entity = 'tests/samples/codefiles/twolinefile.txt'
config = 'tests/samples/configs/good_config.cfg'
now = u(int(time.time()))
args = ['--entity', entity, '--config', config, '--time', now, '--lineno', '3', '--cursorpos', '4', '--verbose']
retval = execute(args)
self.assertEquals(sys.stdout.getvalue(), '')
self.assertEquals(sys.stderr.getvalue(), '')
self.assertEquals(retval, API_ERROR)
self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called()
heartbeat = {
'language': 'Text only',
'lines': 2,
'entity': os.path.realpath(entity),
'project': os.path.basename(os.path.abspath('.')),
'cursorpos': '4',
'lineno': '3',
'branch': 'master',
'time': float(now),
'type': 'file',
}
stats = {
u('cursorpos'): '4',
u('dependencies'): [],
u('language'): u('Text only'),
u('lineno'): '3',
u('lines'): 2,
}
self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(ANY, ANY, None)
for key, val in self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][0].items():
self.assertEquals(heartbeat[key], val)
self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1]))
self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called()
def test_invalid_timeout_passed_via_command_line(self):
response = Response()
response.status_code = 201
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/twolinefile.txt'
shutil.copy(entity, os.path.join(tempdir, 'twolinefile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'twolinefile.txt'))
config = 'tests/samples/configs/good_config.cfg'
key = str(uuid.uuid4())
args = ['--file', entity, '--key', key, '--config', config, '--timeout', 'abc']
with self.assertRaises(SystemExit) as e:
execute(args)
self.assertEquals(int(str(e.exception)), 2)
self.assertEquals(sys.stdout.getvalue(), '')
expected_stderr = open('tests/samples/output/main_test_timeout_passed_via_command_line').read()
self.assertEquals(sys.stderr.getvalue(), expected_stderr)
self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.get'].assert_not_called()
@log_capture()
def test_missing_entity_file(self, logs):
logging.disable(logging.NOTSET)
response = Response()
response.status_code = 201
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
entity = 'tests/samples/codefiles/missingfile.txt'
config = 'tests/samples/configs/good_config.cfg'
args = ['--file', entity, '--config', config, '--verbose']
retval = execute(args)
self.assertEquals(retval, SUCCESS)
self.assertEquals(sys.stdout.getvalue(), '')
self.assertEquals(sys.stderr.getvalue(), '')
log_output = u("\n").join([u(' ').join(x) for x in logs.actual()])
expected = 'WakaTime DEBUG File does not exist; ignoring this heartbeat.'
self.assertEquals(log_output, expected)
self.patched['wakatime.session_cache.SessionCache.get'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with()
@log_capture()
def test_missing_entity_argument(self, logs):
logging.disable(logging.NOTSET)
response = Response()
response.status_code = 201
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
config = 'tests/samples/configs/good_config.cfg'
args = ['--config', config]
with self.assertRaises(SystemExit) as e:
execute(args)
self.assertEquals(int(str(e.exception)), 2)
self.assertEquals(sys.stdout.getvalue(), '')
expected = 'error: argument --entity is required'
self.assertIn(expected, sys.stderr.getvalue())
log_output = u("\n").join([u(' ').join(x) for x in logs.actual()])
expected = ''
self.assertEquals(log_output, expected)
self.patched['wakatime.session_cache.SessionCache.get'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called()
@log_capture()
def test_missing_api_key(self, logs):
logging.disable(logging.NOTSET)
response = Response()
response.status_code = 201
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
config = 'tests/samples/configs/missing_api_key.cfg'
args = ['--config', config]
with self.assertRaises(SystemExit) as e:
execute(args)
self.assertEquals(int(str(e.exception)), AUTH_ERROR)
self.assertEquals(sys.stdout.getvalue(), '')
expected = 'error: Missing api key. Find your api key from wakatime.com/settings.'
self.assertIn(expected, sys.stderr.getvalue())
log_output = u("\n").join([u(' ').join(x) for x in logs.actual()])
expected = ''
self.assertEquals(log_output, expected)
self.patched['wakatime.session_cache.SessionCache.get'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called()
def test_proxy_argument(self):
response = Response()
response.status_code = 201
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/emptyfile.txt'
shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt'))
proxy = 'localhost:1337'
config = 'tests/samples/configs/good_config.cfg'
args = ['--file', entity, '--config', config, '--proxy', proxy]
retval = execute(args)
self.assertEquals(retval, SUCCESS)
self.assertEquals(sys.stdout.getvalue(), '')
self.assertEquals(sys.stderr.getvalue(), '')
self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.save'].assert_called_once_with(ANY)
self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with()
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].assert_called_once_with(ANY, cert=None, proxies={'https': proxy}, stream=False, timeout=60, verify=True)
def test_write_argument(self):
response = Response()
response.status_code = 0
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/emptyfile.txt'
shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt'))
now = u(int(time.time()))
key = str(uuid.uuid4())
args = ['--file', entity, '--key', key, '--write', '--verbose',
'--config', 'tests/samples/configs/good_config.cfg', '--time', now]
retval = execute(args)
self.assertEquals(retval, API_ERROR)
self.assertEquals(sys.stdout.getvalue(), '')
self.assertEquals(sys.stderr.getvalue(), '')
self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called()
heartbeat = {
'language': 'Text only',
'lines': 0,
'entity': entity,
'project': os.path.basename(os.path.abspath('.')),
'time': float(now),
'type': 'file',
'is_write': True,
}
stats = {
u('cursorpos'): None,
u('dependencies'): [],
u('language'): u('Text only'),
u('lineno'): None,
u('lines'): 0,
}
self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(ANY, ANY, None)
for key, val in self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][0].items():
self.assertEquals(heartbeat[key], val)
self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1]))
self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called()
def test_entity_type_domain(self):
response = Response()
response.status_code = 0
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
entity = 'google.com'
config = 'tests/samples/configs/good_config.cfg'
now = u(int(time.time()))
args = ['--entity', entity, '--entity-type', 'domain', '--config', config, '--time', now]
retval = execute(args)
self.assertEquals(retval, API_ERROR)
self.assertEquals(sys.stdout.getvalue(), '')
self.assertEquals(sys.stderr.getvalue(), '')
self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called()
heartbeat = {
'entity': u(entity),
'time': float(now),
'type': 'domain',
}
stats = {
u('cursorpos'): None,
u('dependencies'): [],
u('language'): None,
u('lineno'): None,
u('lines'): None,
}
self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(heartbeat, ANY, None)
self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1]))
self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called()
def test_entity_type_app(self):
response = Response()
response.status_code = 0
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
entity = 'Firefox'
config = 'tests/samples/configs/good_config.cfg'
now = u(int(time.time()))
args = ['--entity', entity, '--entity-type', 'app', '--config', config, '--time', now]
retval = execute(args)
self.assertEquals(retval, API_ERROR)
self.assertEquals(sys.stdout.getvalue(), '')
self.assertEquals(sys.stderr.getvalue(), '')
self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called()
heartbeat = {
'entity': u(entity),
'time': float(now),
'type': 'app',
}
stats = {
u('cursorpos'): None,
u('dependencies'): [],
u('language'): None,
u('lineno'): None,
u('lines'): None,
}
self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(heartbeat, ANY, None)
self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1]))
self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called()
def test_extra_heartbeats_argument(self):
response = Response()
response.status_code = 201
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/twolinefile.txt'
shutil.copy(entity, os.path.join(tempdir, 'twolinefile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'twolinefile.txt'))
project1 = os.path.basename(os.path.abspath('.'))
project2 = 'xyz'
entity1 = os.path.abspath('tests/samples/codefiles/emptyfile.txt')
entity2 = os.path.abspath('tests/samples/codefiles/twolinefile.txt')
config = 'tests/samples/configs/good_config.cfg'
args = ['--file', entity1, '--config', config, '--extra-heartbeats']
with utils.mock.patch('wakatime.main.sys.stdin') as mock_stdin:
now = int(time.time())
heartbeats = json.dumps([{
'timestamp': now,
'entity': entity2,
'entity_type': 'file',
'project': project2,
'is_write': True,
}])
mock_stdin.readline.return_value = heartbeats
retval = execute(args)
self.assertEquals(retval, SUCCESS)
self.assertEquals(sys.stdout.getvalue(), '')
self.assertEquals(sys.stderr.getvalue(), '')
self.patched['wakatime.session_cache.SessionCache.get'].assert_has_calls([call(), call()])
self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.save'].assert_has_calls([call(ANY), call(ANY)])
self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with()
calls = self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].call_args_list
body = calls[0][0][0].body
data = json.loads(body)
self.assertEquals(data.get('entity'), entity1)
self.assertEquals(data.get('project'), project1)
body = calls[1][0][0].body
data = json.loads(body)
self.assertEquals(data.get('entity'), entity2)
self.assertEquals(data.get('project'), project2)
@log_capture()
def test_extra_heartbeats_with_malformed_json(self, logs):
logging.disable(logging.NOTSET)
response = Response()
response.status_code = 201
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/twolinefile.txt'
shutil.copy(entity, os.path.join(tempdir, 'twolinefile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'twolinefile.txt'))
entity = os.path.abspath('tests/samples/codefiles/emptyfile.txt')
config = 'tests/samples/configs/good_config.cfg'
args = ['--file', entity, '--config', config, '--extra-heartbeats']
with utils.mock.patch('wakatime.main.sys.stdin') as mock_stdin:
heartbeats = '[{foobar}]'
mock_stdin.readline.return_value = heartbeats
retval = execute(args)
self.assertEquals(retval, MALFORMED_HEARTBEAT_ERROR)
self.assertEquals(sys.stdout.getvalue(), '')
self.assertEquals(sys.stderr.getvalue(), '')
log_output = u("\n").join([u(' ').join(x) for x in logs.actual()])
self.assertEquals(log_output, '')
self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.save'].assert_called_once_with(ANY)
self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called()

540
tests/test_configs.py Normal file
View File

@ -0,0 +1,540 @@
# -*- coding: utf-8 -*-
from wakatime.main import execute
from wakatime.packages import requests
import logging
import os
import time
import re
import shutil
import sys
import uuid
from testfixtures import log_capture
from wakatime.compat import u, is_py3
from wakatime.constants import (
API_ERROR,
CONFIG_FILE_PARSE_ERROR,
SUCCESS,
)
from wakatime.packages.requests.models import Response
from . import utils
try:
from .packages import simplejson as json
except (ImportError, SyntaxError):
import json
try:
from mock import ANY
except ImportError:
from unittest.mock import ANY
class MainTestCase(utils.TestCase):
patch_these = [
'wakatime.packages.requests.adapters.HTTPAdapter.send',
'wakatime.offlinequeue.Queue.push',
['wakatime.offlinequeue.Queue.pop', None],
['wakatime.offlinequeue.Queue.connect', None],
'wakatime.session_cache.SessionCache.save',
'wakatime.session_cache.SessionCache.delete',
['wakatime.session_cache.SessionCache.get', requests.session],
['wakatime.session_cache.SessionCache.connect', None],
]
def test_config_file_not_passed_in_command_line_args(self):
response = Response()
response.status_code = 201
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/emptyfile.txt'
shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt'))
with utils.mock.patch('wakatime.configs.os.environ.get') as mock_env:
mock_env.return_value = None
with utils.mock.patch('wakatime.configs.open') as mock_open:
mock_open.side_effect = IOError('')
config = os.path.join(os.path.expanduser('~'), '.wakatime.cfg')
args = ['--file', entity]
with self.assertRaises(SystemExit) as e:
execute(args)
self.assertEquals(int(str(e.exception)), CONFIG_FILE_PARSE_ERROR)
expected_stdout = u('')
expected_stderr = u("Error: Could not read from config file {0}\n").format(u(config))
self.assertEquals(sys.stdout.getvalue(), expected_stdout)
self.assertEquals(sys.stderr.getvalue(), expected_stderr)
self.patched['wakatime.session_cache.SessionCache.get'].assert_not_called()
def test_config_file_from_env(self):
response = Response()
response.status_code = 201
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/emptyfile.txt'
shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt'))
config = 'tests/samples/configs/has_everything.cfg'
shutil.copy(config, os.path.join(tempdir, '.wakatime.cfg'))
config = os.path.realpath(os.path.join(tempdir, '.wakatime.cfg'))
with utils.mock.patch('wakatime.configs.os.environ.get') as mock_env:
mock_env.return_value = tempdir
args = ['--file', entity]
retval = execute(args)
self.assertEquals(retval, SUCCESS)
expected_stdout = open('tests/samples/output/main_test_good_config_file').read()
traceback_file = os.path.realpath('wakatime/arguments.py')
lineno = int(re.search(r' line (\d+),', sys.stdout.getvalue()).group(1))
self.assertEquals(sys.stdout.getvalue(), expected_stdout.format(file=traceback_file, lineno=lineno))
self.assertEquals(sys.stderr.getvalue(), '')
self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.save'].assert_called_once_with(ANY)
self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with()
def test_missing_config_file(self):
config = 'foo'
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/emptyfile.txt'
shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt'))
args = ['--file', entity, '--config', config]
with self.assertRaises(SystemExit) as e:
execute(args)
self.assertEquals(int(str(e.exception)), CONFIG_FILE_PARSE_ERROR)
expected_stdout = u('')
expected_stderr = u("Error: Could not read from config file foo\n")
self.assertEquals(sys.stdout.getvalue(), expected_stdout)
self.assertEquals(sys.stderr.getvalue(), expected_stderr)
self.patched['wakatime.session_cache.SessionCache.get'].assert_not_called()
def test_good_config_file(self):
response = Response()
response.status_code = 201
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/emptyfile.txt'
shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt'))
config = 'tests/samples/configs/has_everything.cfg'
args = ['--file', entity, '--config', config]
retval = execute(args)
self.assertEquals(retval, SUCCESS)
expected_stdout = open('tests/samples/output/main_test_good_config_file').read()
traceback_file = os.path.realpath('wakatime/arguments.py')
lineno = int(re.search(r' line (\d+),', sys.stdout.getvalue()).group(1))
self.assertEquals(sys.stdout.getvalue(), expected_stdout.format(file=traceback_file, lineno=lineno))
self.assertEquals(sys.stderr.getvalue(), '')
self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.save'].assert_called_once_with(ANY)
self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with()
def test_api_key_setting_without_underscore_accepted(self):
"""Api key in wakatime.cfg should also work without an underscore:
apikey = XXX
"""
response = Response()
response.status_code = 201
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/emptyfile.txt'
shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt'))
config = 'tests/samples/configs/sample_alternate_apikey.cfg'
args = ['--file', entity, '--config', config]
retval = execute(args)
self.assertEquals(retval, SUCCESS)
self.assertEquals(sys.stdout.getvalue(), '')
self.assertEquals(sys.stderr.getvalue(), '')
self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.save'].assert_called_once_with(ANY)
self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with()
def test_bad_config_file(self):
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/emptyfile.txt'
shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt'))
config = 'tests/samples/configs/bad_config.cfg'
args = ['--file', entity, '--config', config]
retval = execute(args)
self.assertEquals(retval, CONFIG_FILE_PARSE_ERROR)
self.assertIn('ParsingError', sys.stdout.getvalue())
self.assertEquals(sys.stderr.getvalue(), '')
self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.get'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called()
def test_non_hidden_filename(self):
response = Response()
response.status_code = 0
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/twolinefile.txt'
shutil.copy(entity, os.path.join(tempdir, 'twolinefile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'twolinefile.txt'))
now = u(int(time.time()))
config = 'tests/samples/configs/good_config.cfg'
key = str(uuid.uuid4())
args = ['--file', entity, '--key', key, '--config', config, '--time', now]
retval = execute(args)
self.assertEquals(retval, API_ERROR)
self.assertEquals(sys.stdout.getvalue(), '')
self.assertEquals(sys.stderr.getvalue(), '')
self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called()
heartbeat = {
'language': 'Text only',
'lines': 2,
'entity': os.path.realpath(entity),
'project': os.path.basename(os.path.abspath('.')),
'time': float(now),
'type': 'file',
}
stats = {
u('cursorpos'): None,
u('dependencies'): [],
u('language'): u('Text only'),
u('lineno'): None,
u('lines'): 2,
}
self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(ANY, ANY, None)
for key, val in self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][0].items():
self.assertEquals(heartbeat[key], val)
self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1]))
self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called()
def test_hide_all_filenames(self):
response = Response()
response.status_code = 0
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/twolinefile.txt'
shutil.copy(entity, os.path.join(tempdir, 'twolinefile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'twolinefile.txt'))
now = u(int(time.time()))
config = 'tests/samples/configs/paranoid.cfg'
key = str(uuid.uuid4())
args = ['--file', entity, '--key', key, '--config', config, '--time', now]
retval = execute(args)
self.assertEquals(retval, API_ERROR)
self.assertEquals(sys.stdout.getvalue(), '')
self.assertEquals(sys.stderr.getvalue(), '')
self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called()
heartbeat = {
'language': 'Text only',
'lines': 2,
'entity': 'HIDDEN.txt',
'project': os.path.basename(os.path.abspath('.')),
'time': float(now),
'type': 'file',
}
stats = {
u('cursorpos'): None,
u('dependencies'): [],
u('language'): u('Text only'),
u('lineno'): None,
u('lines'): 2,
}
self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(ANY, ANY, None)
for key, val in self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][0].items():
self.assertEquals(heartbeat[key], val)
self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1]))
self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called()
def test_hide_all_filenames_from_cli_arg(self):
response = Response()
response.status_code = 0
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/twolinefile.txt'
shutil.copy(entity, os.path.join(tempdir, 'twolinefile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'twolinefile.txt'))
now = u(int(time.time()))
config = 'tests/samples/configs/good_config.cfg'
key = str(uuid.uuid4())
args = ['--file', entity, '--key', key, '--config', config, '--time', now, '--hidefilenames']
retval = execute(args)
self.assertEquals(retval, API_ERROR)
self.assertEquals(sys.stdout.getvalue(), '')
self.assertEquals(sys.stderr.getvalue(), '')
self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called()
heartbeat = {
'language': 'Text only',
'lines': 2,
'entity': 'HIDDEN.txt',
'project': os.path.basename(os.path.abspath('.')),
'time': float(now),
'type': 'file',
}
stats = {
u('cursorpos'): None,
u('dependencies'): [],
u('language'): u('Text only'),
u('lineno'): None,
u('lines'): 2,
}
self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(ANY, ANY, None)
for key, val in self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][0].items():
self.assertEquals(heartbeat[key], val)
self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1]))
self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called()
def test_hide_matching_filenames(self):
response = Response()
response.status_code = 0
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/twolinefile.txt'
shutil.copy(entity, os.path.join(tempdir, 'twolinefile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'twolinefile.txt'))
now = u(int(time.time()))
config = 'tests/samples/configs/hide_file_names.cfg'
key = str(uuid.uuid4())
args = ['--file', entity, '--key', key, '--config', config, '--time', now]
retval = execute(args)
self.assertEquals(retval, API_ERROR)
self.assertEquals(sys.stdout.getvalue(), '')
self.assertEquals(sys.stderr.getvalue(), '')
self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called()
heartbeat = {
'language': 'Text only',
'lines': 2,
'entity': 'HIDDEN.txt',
'project': os.path.basename(os.path.abspath('.')),
'time': float(now),
'type': 'file',
}
stats = {
u('cursorpos'): None,
u('dependencies'): [],
u('language'): u('Text only'),
u('lineno'): None,
u('lines'): 2,
}
self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(ANY, ANY, None)
for key, val in self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][0].items():
self.assertEquals(heartbeat[key], val)
self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1]))
self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called()
def test_does_not_hide_unmatching_filenames(self):
response = Response()
response.status_code = 0
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/emptyfile.txt'
shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt'))
now = u(int(time.time()))
config = 'tests/samples/configs/hide_file_names.cfg'
key = str(uuid.uuid4())
args = ['--file', entity, '--key', key, '--config', config, '--time', now]
retval = execute(args)
self.assertEquals(retval, API_ERROR)
self.assertEquals(sys.stdout.getvalue(), '')
self.assertEquals(sys.stderr.getvalue(), '')
self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called()
heartbeat = {
'language': 'Text only',
'lines': 0,
'entity': entity,
'project': os.path.basename(os.path.abspath('.')),
'time': float(now),
'type': 'file',
}
stats = {
u('cursorpos'): None,
u('dependencies'): [],
u('language'): u('Text only'),
u('lineno'): None,
u('lines'): 0,
}
self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(ANY, ANY, None)
for key, val in self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][0].items():
self.assertEquals(heartbeat[key], val)
self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1]))
self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called()
@log_capture()
def test_does_not_hide_filenames_from_invalid_regex(self, logs):
logging.disable(logging.NOTSET)
response = Response()
response.status_code = 0
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/emptyfile.txt'
shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt'))
now = u(int(time.time()))
config = 'tests/samples/configs/invalid_hide_file_names.cfg'
key = str(uuid.uuid4())
args = ['--file', entity, '--key', key, '--config', config, '--time', now]
retval = execute(args)
self.assertEquals(retval, API_ERROR)
self.assertEquals(sys.stdout.getvalue(), '')
self.assertEquals(sys.stderr.getvalue(), '')
log_output = u("\n").join([u(' ').join(x) for x in logs.actual()])
expected = u('WakaTime WARNING Regex error (unbalanced parenthesis) for include pattern: invalid(regex')
if self.isPy35OrNewer:
expected = 'WakaTime WARNING Regex error (missing ), unterminated subpattern at position 7) for include pattern: invalid(regex'
self.assertEquals(expected, log_output)
self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called()
heartbeat = {
'language': 'Text only',
'lines': 0,
'entity': entity,
'project': os.path.basename(os.path.abspath('.')),
'time': float(now),
'type': 'file',
}
stats = {
u('cursorpos'): None,
u('dependencies'): [],
u('language'): u('Text only'),
u('lineno'): None,
u('lines'): 0,
}
self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(ANY, ANY, None)
for key, val in self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][0].items():
self.assertEquals(heartbeat[key], val)
self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1]))
self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called()
@log_capture()
def test_exclude_file(self, logs):
logging.disable(logging.NOTSET)
response = Response()
response.status_code = 0
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/emptyfile.txt'
shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt'))
config = 'tests/samples/configs/good_config.cfg'
args = ['--file', entity, '--config', config, '--exclude', 'empty', '--verbose']
retval = execute(args)
self.assertEquals(retval, SUCCESS)
self.assertEquals(sys.stdout.getvalue(), '')
self.assertEquals(sys.stderr.getvalue(), '')
log_output = u("\n").join([u(' ').join(x) for x in logs.actual()])
expected = 'WakaTime DEBUG Skipping because matches exclude pattern: empty'
self.assertEquals(log_output, expected)
self.patched['wakatime.session_cache.SessionCache.get'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with()
def test_hostname_set_from_config_file(self):
response = Response()
response.status_code = 201
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/emptyfile.txt'
shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt'))
hostname = 'fromcfgfile'
config = 'tests/samples/configs/has_everything.cfg'
args = ['--file', entity, '--config', config, '--timeout', '15']
retval = execute(args)
self.assertEquals(retval, SUCCESS)
self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.save'].assert_called_once_with(ANY)
self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with()
headers = self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].call_args[0][0].headers
self.assertEquals(headers.get('X-Machine-Name'), hostname.encode('utf-8') if is_py3 else hostname)

View File

@ -7,7 +7,6 @@ from wakatime.packages import requests
import logging import logging
import os import os
import time import time
import re
import shutil import shutil
import sys import sys
import uuid import uuid
@ -16,9 +15,7 @@ from wakatime.compat import u, is_py3
from wakatime.constants import ( from wakatime.constants import (
API_ERROR, API_ERROR,
AUTH_ERROR, AUTH_ERROR,
CONFIG_FILE_PARSE_ERROR,
SUCCESS, SUCCESS,
MALFORMED_HEARTBEAT_ERROR,
) )
from wakatime.packages.requests.exceptions import RequestException from wakatime.packages.requests.exceptions import RequestException
from wakatime.packages.requests.models import Response from wakatime.packages.requests.models import Response
@ -29,9 +26,9 @@ try:
except (ImportError, SyntaxError): except (ImportError, SyntaxError):
import json import json
try: try:
from mock import ANY, call from mock import ANY
except ImportError: except ImportError:
from unittest.mock import ANY, call from unittest.mock import ANY
from wakatime.packages import tzlocal from wakatime.packages import tzlocal
@ -47,586 +44,6 @@ class MainTestCase(utils.TestCase):
['wakatime.session_cache.SessionCache.connect', None], ['wakatime.session_cache.SessionCache.connect', None],
] ]
def test_help_contents(self):
args = ['--help']
with self.assertRaises(SystemExit) as e:
execute(args)
self.assertEquals(int(str(e.exception)), 0)
expected_stdout = open('tests/samples/output/test_help_contents').read()
self.assertEquals(sys.stdout.getvalue(), expected_stdout)
self.assertEquals(sys.stderr.getvalue(), '')
self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called()
def test_argument_parsing(self):
response = Response()
response.status_code = 201
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/twolinefile.txt'
shutil.copy(entity, os.path.join(tempdir, 'twolinefile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'twolinefile.txt'))
key = str(uuid.uuid4())
config = 'tests/samples/configs/good_config.cfg'
args = ['--file', entity, '--key', key, '--config', config]
retval = execute(args)
self.assertEquals(retval, SUCCESS)
self.assertEquals(sys.stdout.getvalue(), '')
self.assertEquals(sys.stderr.getvalue(), '')
self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.save'].assert_called_once_with(ANY)
self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with()
def test_config_file_not_passed_in_command_line_args(self):
response = Response()
response.status_code = 201
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/emptyfile.txt'
shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt'))
with utils.mock.patch('wakatime.main.os.environ.get') as mock_env:
mock_env.return_value = None
with utils.mock.patch('wakatime.main.open') as mock_open:
mock_open.side_effect = IOError('')
config = os.path.join(os.path.expanduser('~'), '.wakatime.cfg')
args = ['--file', entity]
with self.assertRaises(SystemExit) as e:
execute(args)
self.assertEquals(int(str(e.exception)), CONFIG_FILE_PARSE_ERROR)
expected_stdout = u('')
expected_stderr = u("Error: Could not read from config file {0}\n").format(u(config))
self.assertEquals(sys.stdout.getvalue(), expected_stdout)
self.assertEquals(sys.stderr.getvalue(), expected_stderr)
self.patched['wakatime.session_cache.SessionCache.get'].assert_not_called()
def test_config_file_from_env(self):
response = Response()
response.status_code = 201
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/emptyfile.txt'
shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt'))
config = 'tests/samples/configs/has_everything.cfg'
shutil.copy(config, os.path.join(tempdir, '.wakatime.cfg'))
config = os.path.realpath(os.path.join(tempdir, '.wakatime.cfg'))
with utils.mock.patch('wakatime.main.os.environ.get') as mock_env:
mock_env.return_value = tempdir
args = ['--file', entity]
retval = execute(args)
self.assertEquals(retval, SUCCESS)
expected_stdout = open('tests/samples/output/main_test_good_config_file').read()
traceback_file = os.path.realpath('wakatime/main.py')
lineno = int(re.search(r' line (\d+),', sys.stdout.getvalue()).group(1))
self.assertEquals(sys.stdout.getvalue(), expected_stdout.format(file=traceback_file, lineno=lineno))
self.assertEquals(sys.stderr.getvalue(), '')
self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.save'].assert_called_once_with(ANY)
self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with()
def test_missing_config_file(self):
config = 'foo'
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/emptyfile.txt'
shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt'))
args = ['--file', entity, '--config', config]
with self.assertRaises(SystemExit) as e:
execute(args)
self.assertEquals(int(str(e.exception)), CONFIG_FILE_PARSE_ERROR)
expected_stdout = u('')
expected_stderr = u("Error: Could not read from config file foo\n")
self.assertEquals(sys.stdout.getvalue(), expected_stdout)
self.assertEquals(sys.stderr.getvalue(), expected_stderr)
self.patched['wakatime.session_cache.SessionCache.get'].assert_not_called()
def test_good_config_file(self):
response = Response()
response.status_code = 201
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/emptyfile.txt'
shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt'))
config = 'tests/samples/configs/has_everything.cfg'
args = ['--file', entity, '--config', config]
retval = execute(args)
self.assertEquals(retval, SUCCESS)
expected_stdout = open('tests/samples/output/main_test_good_config_file').read()
traceback_file = os.path.realpath('wakatime/main.py')
lineno = int(re.search(r' line (\d+),', sys.stdout.getvalue()).group(1))
self.assertEquals(sys.stdout.getvalue(), expected_stdout.format(file=traceback_file, lineno=lineno))
self.assertEquals(sys.stderr.getvalue(), '')
self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.save'].assert_called_once_with(ANY)
self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with()
def test_api_key_setting_without_underscore_accepted(self):
"""Api key in wakatime.cfg should also work without an underscore:
apikey = XXX
"""
response = Response()
response.status_code = 201
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/emptyfile.txt'
shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt'))
config = 'tests/samples/configs/sample_alternate_apikey.cfg'
args = ['--file', entity, '--config', config]
retval = execute(args)
self.assertEquals(retval, SUCCESS)
self.assertEquals(sys.stdout.getvalue(), '')
self.assertEquals(sys.stderr.getvalue(), '')
self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.save'].assert_called_once_with(ANY)
self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with()
def test_bad_config_file(self):
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/emptyfile.txt'
shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt'))
config = 'tests/samples/configs/bad_config.cfg'
args = ['--file', entity, '--config', config]
retval = execute(args)
self.assertEquals(retval, CONFIG_FILE_PARSE_ERROR)
self.assertIn('ParsingError', sys.stdout.getvalue())
self.assertEquals(sys.stderr.getvalue(), '')
self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.get'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called()
def test_lineno_and_cursorpos(self):
response = Response()
response.status_code = 0
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
entity = 'tests/samples/codefiles/twolinefile.txt'
config = 'tests/samples/configs/good_config.cfg'
now = u(int(time.time()))
args = ['--entity', entity, '--config', config, '--time', now, '--lineno', '3', '--cursorpos', '4', '--verbose']
retval = execute(args)
self.assertEquals(sys.stdout.getvalue(), '')
self.assertEquals(sys.stderr.getvalue(), '')
self.assertEquals(retval, API_ERROR)
self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called()
heartbeat = {
'language': 'Text only',
'lines': 2,
'entity': os.path.realpath(entity),
'project': os.path.basename(os.path.abspath('.')),
'cursorpos': '4',
'lineno': '3',
'branch': 'master',
'time': float(now),
'type': 'file',
}
stats = {
u('cursorpos'): '4',
u('dependencies'): [],
u('language'): u('Text only'),
u('lineno'): '3',
u('lines'): 2,
}
self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(ANY, ANY, None)
for key, val in self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][0].items():
self.assertEquals(heartbeat[key], val)
self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1]))
self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called()
def test_non_hidden_filename(self):
response = Response()
response.status_code = 0
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/twolinefile.txt'
shutil.copy(entity, os.path.join(tempdir, 'twolinefile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'twolinefile.txt'))
now = u(int(time.time()))
config = 'tests/samples/configs/good_config.cfg'
key = str(uuid.uuid4())
args = ['--file', entity, '--key', key, '--config', config, '--time', now]
retval = execute(args)
self.assertEquals(retval, API_ERROR)
self.assertEquals(sys.stdout.getvalue(), '')
self.assertEquals(sys.stderr.getvalue(), '')
self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called()
heartbeat = {
'language': 'Text only',
'lines': 2,
'entity': os.path.realpath(entity),
'project': os.path.basename(os.path.abspath('.')),
'time': float(now),
'type': 'file',
}
stats = {
u('cursorpos'): None,
u('dependencies'): [],
u('language'): u('Text only'),
u('lineno'): None,
u('lines'): 2,
}
self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(ANY, ANY, None)
for key, val in self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][0].items():
self.assertEquals(heartbeat[key], val)
self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1]))
self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called()
def test_hide_all_filenames(self):
response = Response()
response.status_code = 0
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/twolinefile.txt'
shutil.copy(entity, os.path.join(tempdir, 'twolinefile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'twolinefile.txt'))
now = u(int(time.time()))
config = 'tests/samples/configs/paranoid.cfg'
key = str(uuid.uuid4())
args = ['--file', entity, '--key', key, '--config', config, '--time', now]
retval = execute(args)
self.assertEquals(retval, API_ERROR)
self.assertEquals(sys.stdout.getvalue(), '')
self.assertEquals(sys.stderr.getvalue(), '')
self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called()
heartbeat = {
'language': 'Text only',
'lines': 2,
'entity': 'HIDDEN.txt',
'project': os.path.basename(os.path.abspath('.')),
'time': float(now),
'type': 'file',
}
stats = {
u('cursorpos'): None,
u('dependencies'): [],
u('language'): u('Text only'),
u('lineno'): None,
u('lines'): 2,
}
self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(ANY, ANY, None)
for key, val in self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][0].items():
self.assertEquals(heartbeat[key], val)
self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1]))
self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called()
def test_hide_all_filenames_from_cli_arg(self):
response = Response()
response.status_code = 0
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/twolinefile.txt'
shutil.copy(entity, os.path.join(tempdir, 'twolinefile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'twolinefile.txt'))
now = u(int(time.time()))
config = 'tests/samples/configs/good_config.cfg'
key = str(uuid.uuid4())
args = ['--file', entity, '--key', key, '--config', config, '--time', now, '--hidefilenames']
retval = execute(args)
self.assertEquals(retval, API_ERROR)
self.assertEquals(sys.stdout.getvalue(), '')
self.assertEquals(sys.stderr.getvalue(), '')
self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called()
heartbeat = {
'language': 'Text only',
'lines': 2,
'entity': 'HIDDEN.txt',
'project': os.path.basename(os.path.abspath('.')),
'time': float(now),
'type': 'file',
}
stats = {
u('cursorpos'): None,
u('dependencies'): [],
u('language'): u('Text only'),
u('lineno'): None,
u('lines'): 2,
}
self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(ANY, ANY, None)
for key, val in self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][0].items():
self.assertEquals(heartbeat[key], val)
self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1]))
self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called()
def test_hide_matching_filenames(self):
response = Response()
response.status_code = 0
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/twolinefile.txt'
shutil.copy(entity, os.path.join(tempdir, 'twolinefile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'twolinefile.txt'))
now = u(int(time.time()))
config = 'tests/samples/configs/hide_file_names.cfg'
key = str(uuid.uuid4())
args = ['--file', entity, '--key', key, '--config', config, '--time', now]
retval = execute(args)
self.assertEquals(retval, API_ERROR)
self.assertEquals(sys.stdout.getvalue(), '')
self.assertEquals(sys.stderr.getvalue(), '')
self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called()
heartbeat = {
'language': 'Text only',
'lines': 2,
'entity': 'HIDDEN.txt',
'project': os.path.basename(os.path.abspath('.')),
'time': float(now),
'type': 'file',
}
stats = {
u('cursorpos'): None,
u('dependencies'): [],
u('language'): u('Text only'),
u('lineno'): None,
u('lines'): 2,
}
self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(ANY, ANY, None)
for key, val in self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][0].items():
self.assertEquals(heartbeat[key], val)
self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1]))
self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called()
def test_does_not_hide_unmatching_filenames(self):
response = Response()
response.status_code = 0
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/emptyfile.txt'
shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt'))
now = u(int(time.time()))
config = 'tests/samples/configs/hide_file_names.cfg'
key = str(uuid.uuid4())
args = ['--file', entity, '--key', key, '--config', config, '--time', now]
retval = execute(args)
self.assertEquals(retval, API_ERROR)
self.assertEquals(sys.stdout.getvalue(), '')
self.assertEquals(sys.stderr.getvalue(), '')
self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called()
heartbeat = {
'language': 'Text only',
'lines': 0,
'entity': entity,
'project': os.path.basename(os.path.abspath('.')),
'time': float(now),
'type': 'file',
}
stats = {
u('cursorpos'): None,
u('dependencies'): [],
u('language'): u('Text only'),
u('lineno'): None,
u('lines'): 0,
}
self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(ANY, ANY, None)
for key, val in self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][0].items():
self.assertEquals(heartbeat[key], val)
self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1]))
self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called()
@log_capture()
def test_does_not_hide_filenames_from_invalid_regex(self, logs):
logging.disable(logging.NOTSET)
response = Response()
response.status_code = 0
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/emptyfile.txt'
shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt'))
now = u(int(time.time()))
config = 'tests/samples/configs/invalid_hide_file_names.cfg'
key = str(uuid.uuid4())
args = ['--file', entity, '--key', key, '--config', config, '--time', now]
retval = execute(args)
self.assertEquals(retval, API_ERROR)
self.assertEquals(sys.stdout.getvalue(), '')
self.assertEquals(sys.stderr.getvalue(), '')
log_output = u("\n").join([u(' ').join(x) for x in logs.actual()])
expected = u('WakaTime WARNING Regex error (unbalanced parenthesis) for include pattern: invalid(regex')
if self.isPy35OrNewer:
expected = 'WakaTime WARNING Regex error (missing ), unterminated subpattern at position 7) for include pattern: invalid(regex'
self.assertEquals(expected, log_output)
self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called()
heartbeat = {
'language': 'Text only',
'lines': 0,
'entity': entity,
'project': os.path.basename(os.path.abspath('.')),
'time': float(now),
'type': 'file',
}
stats = {
u('cursorpos'): None,
u('dependencies'): [],
u('language'): u('Text only'),
u('lineno'): None,
u('lines'): 0,
}
self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(ANY, ANY, None)
for key, val in self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][0].items():
self.assertEquals(heartbeat[key], val)
self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1]))
self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called()
def test_invalid_timeout_passed_via_command_line(self):
response = Response()
response.status_code = 201
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/twolinefile.txt'
shutil.copy(entity, os.path.join(tempdir, 'twolinefile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'twolinefile.txt'))
config = 'tests/samples/configs/good_config.cfg'
key = str(uuid.uuid4())
args = ['--file', entity, '--key', key, '--config', config, '--timeout', 'abc']
with self.assertRaises(SystemExit) as e:
execute(args)
self.assertEquals(int(str(e.exception)), 2)
self.assertEquals(sys.stdout.getvalue(), '')
expected_stderr = open('tests/samples/output/main_test_timeout_passed_via_command_line').read()
self.assertEquals(sys.stderr.getvalue(), expected_stderr)
self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.get'].assert_not_called()
@log_capture()
def test_exclude_file(self, logs):
logging.disable(logging.NOTSET)
response = Response()
response.status_code = 0
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/emptyfile.txt'
shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt'))
config = 'tests/samples/configs/good_config.cfg'
args = ['--file', entity, '--config', config, '--exclude', 'empty', '--verbose']
retval = execute(args)
self.assertEquals(retval, SUCCESS)
self.assertEquals(sys.stdout.getvalue(), '')
self.assertEquals(sys.stderr.getvalue(), '')
log_output = u("\n").join([u(' ').join(x) for x in logs.actual()])
expected = 'WakaTime DEBUG Skipping because matches exclude pattern: empty'
self.assertEquals(log_output, expected)
self.patched['wakatime.session_cache.SessionCache.get'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with()
def test_500_response(self): def test_500_response(self):
response = Response() response = Response()
response.status_code = 500 response.status_code = 500
@ -880,94 +297,6 @@ class MainTestCase(utils.TestCase):
self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called() self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called() self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called()
@log_capture()
def test_missing_entity_file(self, logs):
logging.disable(logging.NOTSET)
response = Response()
response.status_code = 201
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
entity = 'tests/samples/codefiles/missingfile.txt'
config = 'tests/samples/configs/good_config.cfg'
args = ['--file', entity, '--config', config, '--verbose']
retval = execute(args)
self.assertEquals(retval, SUCCESS)
self.assertEquals(sys.stdout.getvalue(), '')
self.assertEquals(sys.stderr.getvalue(), '')
log_output = u("\n").join([u(' ').join(x) for x in logs.actual()])
expected = 'WakaTime DEBUG File does not exist; ignoring this heartbeat.'
self.assertEquals(log_output, expected)
self.patched['wakatime.session_cache.SessionCache.get'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with()
@log_capture()
def test_missing_entity_argument(self, logs):
logging.disable(logging.NOTSET)
response = Response()
response.status_code = 201
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
config = 'tests/samples/configs/good_config.cfg'
args = ['--config', config]
with self.assertRaises(SystemExit) as e:
execute(args)
self.assertEquals(int(str(e.exception)), 2)
self.assertEquals(sys.stdout.getvalue(), '')
expected = 'error: argument --entity is required'
self.assertIn(expected, sys.stderr.getvalue())
log_output = u("\n").join([u(' ').join(x) for x in logs.actual()])
expected = ''
self.assertEquals(log_output, expected)
self.patched['wakatime.session_cache.SessionCache.get'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called()
@log_capture()
def test_missing_api_key(self, logs):
logging.disable(logging.NOTSET)
response = Response()
response.status_code = 201
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
config = 'tests/samples/configs/missing_api_key.cfg'
args = ['--config', config]
with self.assertRaises(SystemExit) as e:
execute(args)
self.assertEquals(int(str(e.exception)), AUTH_ERROR)
self.assertEquals(sys.stdout.getvalue(), '')
expected = 'error: Missing api key. Find your api key from wakatime.com/settings.'
self.assertIn(expected, sys.stderr.getvalue())
log_output = u("\n").join([u(' ').join(x) for x in logs.actual()])
expected = ''
self.assertEquals(log_output, expected)
self.patched['wakatime.session_cache.SessionCache.get'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called()
@log_capture() @log_capture()
def test_invalid_api_key(self, logs): def test_invalid_api_key(self, logs):
logging.disable(logging.NOTSET) logging.disable(logging.NOTSET)
@ -998,154 +327,6 @@ class MainTestCase(utils.TestCase):
self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called() self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called() self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called()
def test_proxy_argument(self):
response = Response()
response.status_code = 201
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/emptyfile.txt'
shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt'))
proxy = 'localhost:1337'
config = 'tests/samples/configs/good_config.cfg'
args = ['--file', entity, '--config', config, '--proxy', proxy]
retval = execute(args)
self.assertEquals(retval, SUCCESS)
self.assertEquals(sys.stdout.getvalue(), '')
self.assertEquals(sys.stderr.getvalue(), '')
self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.save'].assert_called_once_with(ANY)
self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with()
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].assert_called_once_with(ANY, cert=None, proxies={'https': proxy}, stream=False, timeout=60, verify=True)
def test_write_argument(self):
response = Response()
response.status_code = 0
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/emptyfile.txt'
shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt'))
now = u(int(time.time()))
key = str(uuid.uuid4())
args = ['--file', entity, '--key', key, '--write', '--verbose',
'--config', 'tests/samples/configs/good_config.cfg', '--time', now]
retval = execute(args)
self.assertEquals(retval, API_ERROR)
self.assertEquals(sys.stdout.getvalue(), '')
self.assertEquals(sys.stderr.getvalue(), '')
self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called()
heartbeat = {
'language': 'Text only',
'lines': 0,
'entity': entity,
'project': os.path.basename(os.path.abspath('.')),
'time': float(now),
'type': 'file',
'is_write': True,
}
stats = {
u('cursorpos'): None,
u('dependencies'): [],
u('language'): u('Text only'),
u('lineno'): None,
u('lines'): 0,
}
self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(ANY, ANY, None)
for key, val in self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][0].items():
self.assertEquals(heartbeat[key], val)
self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1]))
self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called()
def test_entity_type_domain(self):
response = Response()
response.status_code = 0
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
entity = 'google.com'
config = 'tests/samples/configs/good_config.cfg'
now = u(int(time.time()))
args = ['--entity', entity, '--entity-type', 'domain', '--config', config, '--time', now]
retval = execute(args)
self.assertEquals(retval, API_ERROR)
self.assertEquals(sys.stdout.getvalue(), '')
self.assertEquals(sys.stderr.getvalue(), '')
self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called()
heartbeat = {
'entity': u(entity),
'time': float(now),
'type': 'domain',
}
stats = {
u('cursorpos'): None,
u('dependencies'): [],
u('language'): None,
u('lineno'): None,
u('lines'): None,
}
self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(heartbeat, ANY, None)
self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1]))
self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called()
def test_entity_type_app(self):
response = Response()
response.status_code = 0
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
entity = 'Firefox'
config = 'tests/samples/configs/good_config.cfg'
now = u(int(time.time()))
args = ['--entity', entity, '--entity-type', 'app', '--config', config, '--time', now]
retval = execute(args)
self.assertEquals(retval, API_ERROR)
self.assertEquals(sys.stdout.getvalue(), '')
self.assertEquals(sys.stderr.getvalue(), '')
self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called()
heartbeat = {
'entity': u(entity),
'time': float(now),
'type': 'app',
}
stats = {
u('cursorpos'): None,
u('dependencies'): [],
u('language'): None,
u('lineno'): None,
u('lines'): None,
}
self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(heartbeat, ANY, None)
self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1]))
self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called()
def test_nonascii_hostname(self): def test_nonascii_hostname(self):
response = Response() response = Response()
response.status_code = 201 response.status_code = 201
@ -1178,32 +359,6 @@ class MainTestCase(utils.TestCase):
headers = self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].call_args[0][0].headers headers = self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].call_args[0][0].headers
self.assertEquals(headers.get('X-Machine-Name'), hostname.encode('utf-8') if is_py3 else hostname) self.assertEquals(headers.get('X-Machine-Name'), hostname.encode('utf-8') if is_py3 else hostname)
def test_hostname_set_from_config_file(self):
response = Response()
response.status_code = 201
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/emptyfile.txt'
shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt'))
hostname = 'fromcfgfile'
config = 'tests/samples/configs/has_everything.cfg'
args = ['--file', entity, '--config', config, '--timeout', '15']
retval = execute(args)
self.assertEquals(retval, SUCCESS)
self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.save'].assert_called_once_with(ANY)
self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with()
headers = self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].call_args[0][0].headers
self.assertEquals(headers.get('X-Machine-Name'), hostname.encode('utf-8') if is_py3 else hostname)
def test_nonascii_timezone(self): def test_nonascii_timezone(self):
response = Response() response = Response()
response.status_code = 201 response.status_code = 201
@ -1328,96 +483,6 @@ class MainTestCase(utils.TestCase):
headers = self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].call_args[0][0].headers headers = self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].call_args[0][0].headers
self.assertEquals(headers.get('TimeZone'), u(timezone.zone).encode('utf-8') if is_py3 else timezone.zone) self.assertEquals(headers.get('TimeZone'), u(timezone.zone).encode('utf-8') if is_py3 else timezone.zone)
def test_extra_heartbeats_argument(self):
response = Response()
response.status_code = 201
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/twolinefile.txt'
shutil.copy(entity, os.path.join(tempdir, 'twolinefile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'twolinefile.txt'))
project1 = os.path.basename(os.path.abspath('.'))
project2 = 'xyz'
entity1 = os.path.abspath('tests/samples/codefiles/emptyfile.txt')
entity2 = os.path.abspath('tests/samples/codefiles/twolinefile.txt')
config = 'tests/samples/configs/good_config.cfg'
args = ['--file', entity1, '--config', config, '--extra-heartbeats']
with utils.mock.patch('wakatime.main.sys.stdin') as mock_stdin:
now = int(time.time())
heartbeats = json.dumps([{
'timestamp': now,
'entity': entity2,
'entity_type': 'file',
'project': project2,
'is_write': True,
}])
mock_stdin.readline.return_value = heartbeats
retval = execute(args)
self.assertEquals(retval, SUCCESS)
self.assertEquals(sys.stdout.getvalue(), '')
self.assertEquals(sys.stderr.getvalue(), '')
self.patched['wakatime.session_cache.SessionCache.get'].assert_has_calls([call(), call()])
self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.save'].assert_has_calls([call(ANY), call(ANY)])
self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with()
calls = self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].call_args_list
body = calls[0][0][0].body
data = json.loads(body)
self.assertEquals(data.get('entity'), entity1)
self.assertEquals(data.get('project'), project1)
body = calls[1][0][0].body
data = json.loads(body)
self.assertEquals(data.get('entity'), entity2)
self.assertEquals(data.get('project'), project2)
@log_capture()
def test_extra_heartbeats_with_malformed_json(self, logs):
logging.disable(logging.NOTSET)
response = Response()
response.status_code = 201
self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response
with utils.TemporaryDirectory() as tempdir:
entity = 'tests/samples/codefiles/twolinefile.txt'
shutil.copy(entity, os.path.join(tempdir, 'twolinefile.txt'))
entity = os.path.realpath(os.path.join(tempdir, 'twolinefile.txt'))
entity = os.path.abspath('tests/samples/codefiles/emptyfile.txt')
config = 'tests/samples/configs/good_config.cfg'
args = ['--file', entity, '--config', config, '--extra-heartbeats']
with utils.mock.patch('wakatime.main.sys.stdin') as mock_stdin:
heartbeats = '[{foobar}]'
mock_stdin.readline.return_value = heartbeats
retval = execute(args)
self.assertEquals(retval, MALFORMED_HEARTBEAT_ERROR)
self.assertEquals(sys.stdout.getvalue(), '')
self.assertEquals(sys.stderr.getvalue(), '')
log_output = u("\n").join([u(' ').join(x) for x in logs.actual()])
self.assertEquals(log_output, '')
self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with()
self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called()
self.patched['wakatime.session_cache.SessionCache.save'].assert_called_once_with(ANY)
self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called()
self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called()
@log_capture() @log_capture()
def test_nonascii_filename(self, logs): def test_nonascii_filename(self, logs):
logging.disable(logging.NOTSET) logging.disable(logging.NOTSET)

230
wakatime/arguments.py Normal file
View File

@ -0,0 +1,230 @@
# -*- coding: utf-8 -*-
"""
wakatime.arguments
~~~~~~~~~~~~~~~~~~
Command-line arguments.
:copyright: (c) 2016 Alan Hamlett.
:license: BSD, see LICENSE for more details.
"""
from __future__ import print_function
import os
import re
import time
import traceback
from .__about__ import __version__
from .configs import parseConfigFile
from .constants import AUTH_ERROR
from .packages import argparse
class FileAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
try:
if os.path.isfile(values):
values = os.path.realpath(values)
except: # pragma: nocover
pass
setattr(namespace, self.dest, values)
def parseArguments():
"""Parse command line arguments and configs from ~/.wakatime.cfg.
Command line arguments take precedence over config file settings.
Returns instances of ArgumentParser and SafeConfigParser.
"""
# define supported command line arguments
parser = argparse.ArgumentParser(
description='Common interface for the WakaTime api.')
parser.add_argument('--entity', dest='entity', metavar='FILE',
action=FileAction,
help='absolute path to file for the heartbeat; can also be a '+
'url, domain, or app when --entity-type is not file')
parser.add_argument('--file', dest='file', action=FileAction,
help=argparse.SUPPRESS)
parser.add_argument('--key', dest='key',
help='your wakatime api key; uses api_key from '+
'~/.wakatime.cfg by default')
parser.add_argument('--write', dest='is_write',
action='store_true',
help='when set, tells api this heartbeat was triggered from '+
'writing to a file')
parser.add_argument('--plugin', dest='plugin',
help='optional text editor plugin name and version '+
'for User-Agent header')
parser.add_argument('--time', dest='timestamp', metavar='time',
type=float,
help='optional floating-point unix epoch timestamp; '+
'uses current time by default')
parser.add_argument('--lineno', dest='lineno',
help='optional line number; current line being edited')
parser.add_argument('--cursorpos', dest='cursorpos',
help='optional cursor position in the current file')
parser.add_argument('--entity-type', dest='entity_type',
help='entity type for this heartbeat. can be one of "file", '+
'"domain", or "app"; defaults to file.')
parser.add_argument('--proxy', dest='proxy',
help='optional proxy configuration. Supports HTTPS '+
'and SOCKS proxies. For example: '+
'https://user:pass@host:port or '+
'socks5://user:pass@host:port or ' +
'domain\\user:pass')
parser.add_argument('--project', dest='project',
help='optional project name')
parser.add_argument('--alternate-project', dest='alternate_project',
help='optional alternate project name; auto-discovered project '+
'takes priority')
parser.add_argument('--alternate-language', dest='alternate_language',
help='optional alternate language name; auto-detected language'+
'takes priority')
parser.add_argument('--hostname', dest='hostname', help='hostname of '+
'current machine.')
parser.add_argument('--disableoffline', dest='offline',
action='store_false',
help='disables offline time logging instead of queuing logged time')
parser.add_argument('--hidefilenames', dest='hidefilenames',
action='store_true',
help='obfuscate file names; will not send file names to api')
parser.add_argument('--exclude', dest='exclude', action='append',
help='filename patterns to exclude from logging; POSIX regex '+
'syntax; can be used more than once')
parser.add_argument('--include', dest='include', action='append',
help='filename patterns to log; when used in combination with '+
'--exclude, files matching include will still be logged; '+
'POSIX regex syntax; can be used more than once')
parser.add_argument('--ignore', dest='ignore', action='append',
help=argparse.SUPPRESS)
parser.add_argument('--extra-heartbeats', dest='extra_heartbeats',
action='store_true',
help='reads extra heartbeats from STDIN as a JSON array until EOF')
parser.add_argument('--logfile', dest='logfile',
help='defaults to ~/.wakatime.log')
parser.add_argument('--apiurl', dest='api_url',
help='heartbeats api url; for debugging with a local server')
parser.add_argument('--timeout', dest='timeout', type=int,
help='number of seconds to wait when sending heartbeats to api; '+
'defaults to 60 seconds')
parser.add_argument('--config', dest='config',
help='defaults to ~/.wakatime.cfg')
parser.add_argument('--verbose', dest='verbose', action='store_true',
help='turns on debug messages in log file')
parser.add_argument('--version', action='version', version=__version__)
# parse command line arguments
args = parser.parse_args()
# use current unix epoch timestamp by default
if not args.timestamp:
args.timestamp = time.time()
# parse ~/.wakatime.cfg file
configs = parseConfigFile(args.config)
if configs is None:
return args, configs
# update args from configs
if not args.hostname:
if configs.has_option('settings', 'hostname'):
args.hostname = configs.get('settings', 'hostname')
if not args.key:
default_key = None
if configs.has_option('settings', 'api_key'):
default_key = configs.get('settings', 'api_key')
elif configs.has_option('settings', 'apikey'):
default_key = configs.get('settings', 'apikey')
if default_key:
args.key = default_key
else:
try:
parser.error('Missing api key. Find your api key from wakatime.com/settings.')
except SystemExit:
raise SystemExit(AUTH_ERROR)
is_valid = not not re.match(r'^[a-f0-9]{8}-[a-f0-9]{4}-4[a-f0-9]{3}-[89ab][a-f0-9]{3}-[a-f0-9]{12}$', args.key, re.I)
if not is_valid:
try:
parser.error('Invalid api key. Find your api key from wakatime.com/settings.')
except SystemExit:
raise SystemExit(AUTH_ERROR)
if not args.entity:
if args.file:
args.entity = args.file
else:
parser.error('argument --entity is required')
if not args.exclude:
args.exclude = []
if configs.has_option('settings', 'ignore'):
try:
for pattern in configs.get('settings', 'ignore').split("\n"):
if pattern.strip() != '':
args.exclude.append(pattern)
except TypeError: # pragma: nocover
pass
if configs.has_option('settings', 'exclude'):
try:
for pattern in configs.get('settings', 'exclude').split("\n"):
if pattern.strip() != '':
args.exclude.append(pattern)
except TypeError: # pragma: nocover
pass
if not args.include:
args.include = []
if configs.has_option('settings', 'include'):
try:
for pattern in configs.get('settings', 'include').split("\n"):
if pattern.strip() != '':
args.include.append(pattern)
except TypeError: # pragma: nocover
pass
if args.hidefilenames:
args.hidefilenames = ['.*']
else:
args.hidefilenames = []
if configs.has_option('settings', 'hidefilenames'):
option = configs.get('settings', 'hidefilenames')
if option.strip().lower() == 'true':
args.hidefilenames = ['.*']
elif option.strip().lower() != 'false':
for pattern in option.split("\n"):
if pattern.strip() != '':
args.hidefilenames.append(pattern)
if args.offline and configs.has_option('settings', 'offline'):
args.offline = configs.getboolean('settings', 'offline')
if not args.proxy and configs.has_option('settings', 'proxy'):
args.proxy = configs.get('settings', 'proxy')
if args.proxy:
pattern = r'^((https?|socks5)://)?([^:@]+(:([^:@])+)?@)?[^:]+(:\d+)?$'
if '\\' in args.proxy:
pattern = r'^.*\\.+$'
is_valid = not not re.match(pattern, args.proxy, re.I)
if not is_valid:
parser.error('Invalid proxy. Must be in format ' +
'https://user:pass@host:port or ' +
'socks5://user:pass@host:port or ' +
'domain\\user:pass.')
if not args.verbose and configs.has_option('settings', 'verbose'):
args.verbose = configs.getboolean('settings', 'verbose')
if not args.verbose and configs.has_option('settings', 'debug'):
args.verbose = configs.getboolean('settings', 'debug')
if not args.logfile and configs.has_option('settings', 'logfile'):
args.logfile = configs.get('settings', 'logfile')
if not args.logfile and os.environ.get('WAKATIME_HOME'):
home = os.environ.get('WAKATIME_HOME')
args.logfile = os.path.join(os.path.expanduser(home), '.wakatime.log')
if not args.api_url and configs.has_option('settings', 'api_url'):
args.api_url = configs.get('settings', 'api_url')
if not args.timeout and configs.has_option('settings', 'timeout'):
try:
args.timeout = int(configs.get('settings', 'timeout'))
except ValueError:
print(traceback.format_exc())
return args, configs

55
wakatime/configs.py Normal file
View File

@ -0,0 +1,55 @@
# -*- coding: utf-8 -*-
"""
wakatime.configs
~~~~~~~~~~~~~~~~
Config file parser.
:copyright: (c) 2016 Alan Hamlett.
:license: BSD, see LICENSE for more details.
"""
from __future__ import print_function
import os
import sys
import traceback
from .compat import u, open
from .constants import CONFIG_FILE_PARSE_ERROR
try:
import ConfigParser as configparser
except ImportError: # pragma: nocover
import configparser
def parseConfigFile(configFile=None):
"""Returns a configparser.SafeConfigParser instance with configs
read from the config file. Default location of the config file is
at ~/.wakatime.cfg.
"""
# get config file location from ENV
home = os.environ.get('WAKATIME_HOME')
if not configFile and home:
configFile = os.path.join(os.path.expanduser(home), '.wakatime.cfg')
# use default config file location
if not configFile:
configFile = os.path.join(os.path.expanduser('~'), '.wakatime.cfg')
configs = configparser.SafeConfigParser()
try:
with open(configFile, 'r', encoding='utf-8') as fh:
try:
configs.readfp(fh)
except configparser.Error:
print(traceback.format_exc())
return None
except IOError:
sys.stderr.write(u("Error: Could not read from config file {0}\n").format(u(configFile)))
raise SystemExit(CONFIG_FILE_PARSE_ERROR)
return configs

View File

@ -14,23 +14,18 @@ from __future__ import print_function
import base64 import base64
import logging import logging
import os import os
import platform
import re import re
import sys import sys
import time
import traceback import traceback
import socket import socket
try:
import ConfigParser as configparser
except ImportError: # pragma: nocover
import configparser
pwd = os.path.dirname(os.path.abspath(__file__)) pwd = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.dirname(pwd)) sys.path.insert(0, os.path.dirname(pwd))
sys.path.insert(0, os.path.join(pwd, 'packages')) sys.path.insert(0, os.path.join(pwd, 'packages'))
from .__about__ import __version__ from .__about__ import __version__
from .compat import u, open, is_py3 from .arguments import parseArguments
from .compat import u, is_py3
from .constants import ( from .constants import (
API_ERROR, API_ERROR,
AUTH_ERROR, AUTH_ERROR,
@ -41,12 +36,12 @@ from .constants import (
) )
from .logger import setup_logging from .logger import setup_logging
from .offlinequeue import Queue from .offlinequeue import Queue
from .packages import argparse
from .packages import requests from .packages import requests
from .packages.requests.exceptions import RequestException from .packages.requests.exceptions import RequestException
from .project import get_project_info from .project import get_project_info
from .session_cache import SessionCache from .session_cache import SessionCache
from .stats import get_file_stats from .stats import get_file_stats
from .utils import get_user_agent, should_exclude, format_file_path
try: try:
from .packages import simplejson as json # pragma: nocover from .packages import simplejson as json # pragma: nocover
except (ImportError, SyntaxError): # pragma: nocover except (ImportError, SyntaxError): # pragma: nocover
@ -57,287 +52,6 @@ from .packages import tzlocal
log = logging.getLogger('WakaTime') log = logging.getLogger('WakaTime')
class FileAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
try:
if os.path.isfile(values):
values = os.path.realpath(values)
except: # pragma: nocover
pass
setattr(namespace, self.dest, values)
def parseConfigFile(configFile=None):
"""Returns a configparser.SafeConfigParser instance with configs
read from the config file. Default location of the config file is
at ~/.wakatime.cfg.
"""
# get config file location from ENV
home = os.environ.get('WAKATIME_HOME')
if not configFile and home:
configFile = os.path.join(os.path.expanduser(home), '.wakatime.cfg')
# use default config file location
if not configFile:
configFile = os.path.join(os.path.expanduser('~'), '.wakatime.cfg')
configs = configparser.SafeConfigParser()
try:
with open(configFile, 'r', encoding='utf-8') as fh:
try:
configs.readfp(fh)
except configparser.Error:
print(traceback.format_exc())
return None
except IOError:
sys.stderr.write(u("Error: Could not read from config file {0}\n").format(u(configFile)))
raise SystemExit(CONFIG_FILE_PARSE_ERROR)
return configs
def parseArguments():
"""Parse command line arguments and configs from ~/.wakatime.cfg.
Command line arguments take precedence over config file settings.
Returns instances of ArgumentParser and SafeConfigParser.
"""
# define supported command line arguments
parser = argparse.ArgumentParser(
description='Common interface for the WakaTime api.')
parser.add_argument('--entity', dest='entity', metavar='FILE',
action=FileAction,
help='absolute path to file for the heartbeat; can also be a '+
'url, domain, or app when --entity-type is not file')
parser.add_argument('--file', dest='file', action=FileAction,
help=argparse.SUPPRESS)
parser.add_argument('--key', dest='key',
help='your wakatime api key; uses api_key from '+
'~/.wakatime.cfg by default')
parser.add_argument('--write', dest='is_write',
action='store_true',
help='when set, tells api this heartbeat was triggered from '+
'writing to a file')
parser.add_argument('--plugin', dest='plugin',
help='optional text editor plugin name and version '+
'for User-Agent header')
parser.add_argument('--time', dest='timestamp', metavar='time',
type=float,
help='optional floating-point unix epoch timestamp; '+
'uses current time by default')
parser.add_argument('--lineno', dest='lineno',
help='optional line number; current line being edited')
parser.add_argument('--cursorpos', dest='cursorpos',
help='optional cursor position in the current file')
parser.add_argument('--entity-type', dest='entity_type',
help='entity type for this heartbeat. can be one of "file", '+
'"domain", or "app"; defaults to file.')
parser.add_argument('--proxy', dest='proxy',
help='optional proxy configuration. Supports HTTPS '+
'and SOCKS proxies. For example: '+
'https://user:pass@host:port or '+
'socks5://user:pass@host:port or ' +
'domain\\user:pass')
parser.add_argument('--project', dest='project',
help='optional project name')
parser.add_argument('--alternate-project', dest='alternate_project',
help='optional alternate project name; auto-discovered project '+
'takes priority')
parser.add_argument('--alternate-language', dest='alternate_language',
help='optional alternate language name; auto-detected language'+
'takes priority')
parser.add_argument('--hostname', dest='hostname', help='hostname of '+
'current machine.')
parser.add_argument('--disableoffline', dest='offline',
action='store_false',
help='disables offline time logging instead of queuing logged time')
parser.add_argument('--hidefilenames', dest='hidefilenames',
action='store_true',
help='obfuscate file names; will not send file names to api')
parser.add_argument('--exclude', dest='exclude', action='append',
help='filename patterns to exclude from logging; POSIX regex '+
'syntax; can be used more than once')
parser.add_argument('--include', dest='include', action='append',
help='filename patterns to log; when used in combination with '+
'--exclude, files matching include will still be logged; '+
'POSIX regex syntax; can be used more than once')
parser.add_argument('--ignore', dest='ignore', action='append',
help=argparse.SUPPRESS)
parser.add_argument('--extra-heartbeats', dest='extra_heartbeats',
action='store_true',
help='reads extra heartbeats from STDIN as a JSON array until EOF')
parser.add_argument('--logfile', dest='logfile',
help='defaults to ~/.wakatime.log')
parser.add_argument('--apiurl', dest='api_url',
help='heartbeats api url; for debugging with a local server')
parser.add_argument('--timeout', dest='timeout', type=int,
help='number of seconds to wait when sending heartbeats to api; '+
'defaults to 60 seconds')
parser.add_argument('--config', dest='config',
help='defaults to ~/.wakatime.cfg')
parser.add_argument('--verbose', dest='verbose', action='store_true',
help='turns on debug messages in log file')
parser.add_argument('--version', action='version', version=__version__)
# parse command line arguments
args = parser.parse_args()
# use current unix epoch timestamp by default
if not args.timestamp:
args.timestamp = time.time()
# parse ~/.wakatime.cfg file
configs = parseConfigFile(args.config)
if configs is None:
return args, configs
# update args from configs
if not args.hostname:
if configs.has_option('settings', 'hostname'):
args.hostname = configs.get('settings', 'hostname')
if not args.key:
default_key = None
if configs.has_option('settings', 'api_key'):
default_key = configs.get('settings', 'api_key')
elif configs.has_option('settings', 'apikey'):
default_key = configs.get('settings', 'apikey')
if default_key:
args.key = default_key
else:
try:
parser.error('Missing api key. Find your api key from wakatime.com/settings.')
except SystemExit:
raise SystemExit(AUTH_ERROR)
is_valid = not not re.match(r'^[a-f0-9]{8}-[a-f0-9]{4}-4[a-f0-9]{3}-[89ab][a-f0-9]{3}-[a-f0-9]{12}$', args.key, re.I)
if not is_valid:
try:
parser.error('Invalid api key. Find your api key from wakatime.com/settings.')
except SystemExit:
raise SystemExit(AUTH_ERROR)
if not args.entity:
if args.file:
args.entity = args.file
else:
parser.error('argument --entity is required')
if not args.exclude:
args.exclude = []
if configs.has_option('settings', 'ignore'):
try:
for pattern in configs.get('settings', 'ignore').split("\n"):
if pattern.strip() != '':
args.exclude.append(pattern)
except TypeError: # pragma: nocover
pass
if configs.has_option('settings', 'exclude'):
try:
for pattern in configs.get('settings', 'exclude').split("\n"):
if pattern.strip() != '':
args.exclude.append(pattern)
except TypeError: # pragma: nocover
pass
if not args.include:
args.include = []
if configs.has_option('settings', 'include'):
try:
for pattern in configs.get('settings', 'include').split("\n"):
if pattern.strip() != '':
args.include.append(pattern)
except TypeError: # pragma: nocover
pass
if args.hidefilenames:
args.hidefilenames = ['.*']
else:
args.hidefilenames = []
if configs.has_option('settings', 'hidefilenames'):
option = configs.get('settings', 'hidefilenames')
if option.strip().lower() == 'true':
args.hidefilenames = ['.*']
elif option.strip().lower() != 'false':
for pattern in option.split("\n"):
if pattern.strip() != '':
args.hidefilenames.append(pattern)
if args.offline and configs.has_option('settings', 'offline'):
args.offline = configs.getboolean('settings', 'offline')
if not args.proxy and configs.has_option('settings', 'proxy'):
args.proxy = configs.get('settings', 'proxy')
if args.proxy:
pattern = r'^((https?|socks5)://)?([^:@]+(:([^:@])+)?@)?[^:]+(:\d+)?$'
if '\\' in args.proxy:
pattern = r'^.*\\.+$'
is_valid = not not re.match(pattern, args.proxy, re.I)
if not is_valid:
parser.error('Invalid proxy. Must be in format ' +
'https://user:pass@host:port or ' +
'socks5://user:pass@host:port or ' +
'domain\\user:pass.')
if not args.verbose and configs.has_option('settings', 'verbose'):
args.verbose = configs.getboolean('settings', 'verbose')
if not args.verbose and configs.has_option('settings', 'debug'):
args.verbose = configs.getboolean('settings', 'debug')
if not args.logfile and configs.has_option('settings', 'logfile'):
args.logfile = configs.get('settings', 'logfile')
if not args.logfile and os.environ.get('WAKATIME_HOME'):
home = os.environ.get('WAKATIME_HOME')
args.logfile = os.path.join(os.path.expanduser(home), '.wakatime.log')
if not args.api_url and configs.has_option('settings', 'api_url'):
args.api_url = configs.get('settings', 'api_url')
if not args.timeout and configs.has_option('settings', 'timeout'):
try:
args.timeout = int(configs.get('settings', 'timeout'))
except ValueError:
print(traceback.format_exc())
return args, configs
def should_exclude(entity, include, exclude):
if entity is not None and entity.strip() != '':
for pattern in include:
try:
compiled = re.compile(pattern, re.IGNORECASE)
if compiled.search(entity):
return False
except re.error as ex:
log.warning(u('Regex error ({msg}) for include pattern: {pattern}').format(
msg=u(ex),
pattern=u(pattern),
))
for pattern in exclude:
try:
compiled = re.compile(pattern, re.IGNORECASE)
if compiled.search(entity):
return pattern
except re.error as ex:
log.warning(u('Regex error ({msg}) for exclude pattern: {pattern}').format(
msg=u(ex),
pattern=u(pattern),
))
return False
def get_user_agent(plugin):
ver = sys.version_info
python_version = '%d.%d.%d.%s.%d' % (ver[0], ver[1], ver[2], ver[3], ver[4])
user_agent = u('wakatime/{ver} ({platform}) Python{py_ver}').format(
ver=u(__version__),
platform=u(platform.platform()),
py_ver=python_version,
)
if plugin:
user_agent = u('{user_agent} {plugin}').format(
user_agent=user_agent,
plugin=u(plugin),
)
else:
user_agent = u('{user_agent} Unknown/0').format(
user_agent=user_agent,
)
return user_agent
def send_heartbeat(project=None, branch=None, hostname=None, stats={}, key=None, def send_heartbeat(project=None, branch=None, hostname=None, stats={}, key=None,
entity=None, timestamp=None, is_write=None, plugin=None, entity=None, timestamp=None, is_write=None, plugin=None,
offline=None, entity_type='file', hidefilenames=None, offline=None, entity_type='file', hidefilenames=None,
@ -527,17 +241,6 @@ def sync_offline_heartbeats(args, hostname):
return SUCCESS return SUCCESS
def format_file_path(filepath):
"""Formats a path as absolute and with the correct platform separator."""
try:
filepath = os.path.realpath(os.path.abspath(filepath))
filepath = re.sub(r'[/\\]', os.path.sep, filepath)
except: # pragma: nocover
pass
return filepath
def process_heartbeat(args, configs, hostname, heartbeat): def process_heartbeat(args, configs, hostname, heartbeat):
exclude = should_exclude(heartbeat['entity'], args.include, args.exclude) exclude = should_exclude(heartbeat['entity'], args.include, args.exclude)
if exclude is not False: if exclude is not False:

79
wakatime/utils.py Normal file
View File

@ -0,0 +1,79 @@
# -*- coding: utf-8 -*-
"""
wakatime.utils
~~~~~~~~~~~~~~
Utility functions.
:copyright: (c) 2016 Alan Hamlett.
:license: BSD, see LICENSE for more details.
"""
import platform
import logging
import os
import re
import sys
from .__about__ import __version__
from .compat import u
log = logging.getLogger('WakaTime')
def should_exclude(entity, include, exclude):
if entity is not None and entity.strip() != '':
for pattern in include:
try:
compiled = re.compile(pattern, re.IGNORECASE)
if compiled.search(entity):
return False
except re.error as ex:
log.warning(u('Regex error ({msg}) for include pattern: {pattern}').format(
msg=u(ex),
pattern=u(pattern),
))
for pattern in exclude:
try:
compiled = re.compile(pattern, re.IGNORECASE)
if compiled.search(entity):
return pattern
except re.error as ex:
log.warning(u('Regex error ({msg}) for exclude pattern: {pattern}').format(
msg=u(ex),
pattern=u(pattern),
))
return False
def get_user_agent(plugin):
ver = sys.version_info
python_version = '%d.%d.%d.%s.%d' % (ver[0], ver[1], ver[2], ver[3], ver[4])
user_agent = u('wakatime/{ver} ({platform}) Python{py_ver}').format(
ver=u(__version__),
platform=u(platform.platform()),
py_ver=python_version,
)
if plugin:
user_agent = u('{user_agent} {plugin}').format(
user_agent=user_agent,
plugin=u(plugin),
)
else:
user_agent = u('{user_agent} Unknown/0').format(
user_agent=user_agent,
)
return user_agent
def format_file_path(filepath):
"""Formats a path as absolute and with the correct platform separator."""
try:
filepath = os.path.realpath(os.path.abspath(filepath))
filepath = re.sub(r'[/\\]', os.path.sep, filepath)
except: # pragma: nocover
pass
return filepath