From c5531312da54b2be3bc48f1680304570316b88f9 Mon Sep 17 00:00:00 2001 From: Alan Hamlett Date: Mon, 20 Feb 2017 12:04:07 -0800 Subject: [PATCH] refactor main into multiple files --- tests/test_arguments.py | 480 ++++++++++++++++++++ tests/test_configs.py | 540 +++++++++++++++++++++++ tests/test_main.py | 939 +--------------------------------------- wakatime/arguments.py | 230 ++++++++++ wakatime/configs.py | 55 +++ wakatime/main.py | 303 +------------ wakatime/utils.py | 79 ++++ 7 files changed, 1389 insertions(+), 1237 deletions(-) create mode 100644 tests/test_arguments.py create mode 100644 tests/test_configs.py create mode 100644 wakatime/arguments.py create mode 100644 wakatime/configs.py create mode 100644 wakatime/utils.py diff --git a/tests/test_arguments.py b/tests/test_arguments.py new file mode 100644 index 0000000..04ab105 --- /dev/null +++ b/tests/test_arguments.py @@ -0,0 +1,480 @@ +# -*- coding: utf-8 -*- + + +from wakatime.main import execute +from wakatime.packages import requests + +import logging +import os +import time +import shutil +import sys +import uuid +from testfixtures import log_capture +from wakatime.compat import u +from wakatime.constants import ( + API_ERROR, + AUTH_ERROR, + SUCCESS, + MALFORMED_HEARTBEAT_ERROR, +) +from wakatime.packages.requests.models import Response +from . import utils + +try: + from .packages import simplejson as json +except (ImportError, SyntaxError): + import json +try: + from mock import ANY, call +except ImportError: + from unittest.mock import ANY, call + + +class MainTestCase(utils.TestCase): + patch_these = [ + 'wakatime.packages.requests.adapters.HTTPAdapter.send', + 'wakatime.offlinequeue.Queue.push', + ['wakatime.offlinequeue.Queue.pop', None], + ['wakatime.offlinequeue.Queue.connect', None], + 'wakatime.session_cache.SessionCache.save', + 'wakatime.session_cache.SessionCache.delete', + ['wakatime.session_cache.SessionCache.get', requests.session], + ['wakatime.session_cache.SessionCache.connect', None], + ] + + def test_help_contents(self): + args = ['--help'] + with self.assertRaises(SystemExit) as e: + execute(args) + + self.assertEquals(int(str(e.exception)), 0) + expected_stdout = open('tests/samples/output/test_help_contents').read() + self.assertEquals(sys.stdout.getvalue(), expected_stdout) + self.assertEquals(sys.stderr.getvalue(), '') + + self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called() + self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called() + + def test_argument_parsing(self): + response = Response() + response.status_code = 201 + self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response + + with utils.TemporaryDirectory() as tempdir: + entity = 'tests/samples/codefiles/twolinefile.txt' + shutil.copy(entity, os.path.join(tempdir, 'twolinefile.txt')) + entity = os.path.realpath(os.path.join(tempdir, 'twolinefile.txt')) + key = str(uuid.uuid4()) + config = 'tests/samples/configs/good_config.cfg' + + args = ['--file', entity, '--key', key, '--config', config] + + retval = execute(args) + self.assertEquals(retval, SUCCESS) + self.assertEquals(sys.stdout.getvalue(), '') + self.assertEquals(sys.stderr.getvalue(), '') + + self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with() + self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called() + self.patched['wakatime.session_cache.SessionCache.save'].assert_called_once_with(ANY) + + self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called() + self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with() + + def test_lineno_and_cursorpos(self): + response = Response() + response.status_code = 0 + self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response + + entity = 'tests/samples/codefiles/twolinefile.txt' + config = 'tests/samples/configs/good_config.cfg' + now = u(int(time.time())) + + args = ['--entity', entity, '--config', config, '--time', now, '--lineno', '3', '--cursorpos', '4', '--verbose'] + retval = execute(args) + + self.assertEquals(sys.stdout.getvalue(), '') + self.assertEquals(sys.stderr.getvalue(), '') + + self.assertEquals(retval, API_ERROR) + + self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with() + self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with() + self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called() + + heartbeat = { + 'language': 'Text only', + 'lines': 2, + 'entity': os.path.realpath(entity), + 'project': os.path.basename(os.path.abspath('.')), + 'cursorpos': '4', + 'lineno': '3', + 'branch': 'master', + 'time': float(now), + 'type': 'file', + } + stats = { + u('cursorpos'): '4', + u('dependencies'): [], + u('language'): u('Text only'), + u('lineno'): '3', + u('lines'): 2, + } + + self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(ANY, ANY, None) + for key, val in self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][0].items(): + self.assertEquals(heartbeat[key], val) + self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1])) + self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called() + + def test_invalid_timeout_passed_via_command_line(self): + response = Response() + response.status_code = 201 + self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response + + with utils.TemporaryDirectory() as tempdir: + entity = 'tests/samples/codefiles/twolinefile.txt' + shutil.copy(entity, os.path.join(tempdir, 'twolinefile.txt')) + entity = os.path.realpath(os.path.join(tempdir, 'twolinefile.txt')) + config = 'tests/samples/configs/good_config.cfg' + key = str(uuid.uuid4()) + args = ['--file', entity, '--key', key, '--config', config, '--timeout', 'abc'] + + with self.assertRaises(SystemExit) as e: + execute(args) + + self.assertEquals(int(str(e.exception)), 2) + self.assertEquals(sys.stdout.getvalue(), '') + expected_stderr = open('tests/samples/output/main_test_timeout_passed_via_command_line').read() + self.assertEquals(sys.stderr.getvalue(), expected_stderr) + + self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called() + self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called() + self.patched['wakatime.session_cache.SessionCache.get'].assert_not_called() + + @log_capture() + def test_missing_entity_file(self, logs): + logging.disable(logging.NOTSET) + + response = Response() + response.status_code = 201 + self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response + + entity = 'tests/samples/codefiles/missingfile.txt' + + config = 'tests/samples/configs/good_config.cfg' + args = ['--file', entity, '--config', config, '--verbose'] + retval = execute(args) + self.assertEquals(retval, SUCCESS) + self.assertEquals(sys.stdout.getvalue(), '') + self.assertEquals(sys.stderr.getvalue(), '') + + log_output = u("\n").join([u(' ').join(x) for x in logs.actual()]) + expected = 'WakaTime DEBUG File does not exist; ignoring this heartbeat.' + self.assertEquals(log_output, expected) + + self.patched['wakatime.session_cache.SessionCache.get'].assert_not_called() + self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called() + self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called() + + self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called() + self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with() + + @log_capture() + def test_missing_entity_argument(self, logs): + logging.disable(logging.NOTSET) + + response = Response() + response.status_code = 201 + self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response + + config = 'tests/samples/configs/good_config.cfg' + args = ['--config', config] + + with self.assertRaises(SystemExit) as e: + execute(args) + + self.assertEquals(int(str(e.exception)), 2) + self.assertEquals(sys.stdout.getvalue(), '') + expected = 'error: argument --entity is required' + self.assertIn(expected, sys.stderr.getvalue()) + + log_output = u("\n").join([u(' ').join(x) for x in logs.actual()]) + expected = '' + self.assertEquals(log_output, expected) + + self.patched['wakatime.session_cache.SessionCache.get'].assert_not_called() + self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called() + self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called() + + self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called() + self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called() + + @log_capture() + def test_missing_api_key(self, logs): + logging.disable(logging.NOTSET) + + response = Response() + response.status_code = 201 + self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response + + config = 'tests/samples/configs/missing_api_key.cfg' + args = ['--config', config] + + with self.assertRaises(SystemExit) as e: + execute(args) + + self.assertEquals(int(str(e.exception)), AUTH_ERROR) + self.assertEquals(sys.stdout.getvalue(), '') + expected = 'error: Missing api key. Find your api key from wakatime.com/settings.' + self.assertIn(expected, sys.stderr.getvalue()) + + log_output = u("\n").join([u(' ').join(x) for x in logs.actual()]) + expected = '' + self.assertEquals(log_output, expected) + + self.patched['wakatime.session_cache.SessionCache.get'].assert_not_called() + self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called() + self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called() + + self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called() + self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called() + + def test_proxy_argument(self): + response = Response() + response.status_code = 201 + self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response + + with utils.TemporaryDirectory() as tempdir: + entity = 'tests/samples/codefiles/emptyfile.txt' + shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt')) + entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt')) + proxy = 'localhost:1337' + + config = 'tests/samples/configs/good_config.cfg' + args = ['--file', entity, '--config', config, '--proxy', proxy] + retval = execute(args) + self.assertEquals(retval, SUCCESS) + self.assertEquals(sys.stdout.getvalue(), '') + self.assertEquals(sys.stderr.getvalue(), '') + + self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with() + self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called() + self.patched['wakatime.session_cache.SessionCache.save'].assert_called_once_with(ANY) + + self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called() + self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with() + + self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].assert_called_once_with(ANY, cert=None, proxies={'https': proxy}, stream=False, timeout=60, verify=True) + + def test_write_argument(self): + response = Response() + response.status_code = 0 + self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response + + with utils.TemporaryDirectory() as tempdir: + entity = 'tests/samples/codefiles/emptyfile.txt' + shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt')) + entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt')) + now = u(int(time.time())) + key = str(uuid.uuid4()) + + args = ['--file', entity, '--key', key, '--write', '--verbose', + '--config', 'tests/samples/configs/good_config.cfg', '--time', now] + + retval = execute(args) + self.assertEquals(retval, API_ERROR) + self.assertEquals(sys.stdout.getvalue(), '') + self.assertEquals(sys.stderr.getvalue(), '') + + self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with() + self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with() + self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called() + + heartbeat = { + 'language': 'Text only', + 'lines': 0, + 'entity': entity, + 'project': os.path.basename(os.path.abspath('.')), + 'time': float(now), + 'type': 'file', + 'is_write': True, + } + stats = { + u('cursorpos'): None, + u('dependencies'): [], + u('language'): u('Text only'), + u('lineno'): None, + u('lines'): 0, + } + + self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(ANY, ANY, None) + for key, val in self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][0].items(): + self.assertEquals(heartbeat[key], val) + self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1])) + self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called() + + def test_entity_type_domain(self): + response = Response() + response.status_code = 0 + self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response + + entity = 'google.com' + config = 'tests/samples/configs/good_config.cfg' + now = u(int(time.time())) + + args = ['--entity', entity, '--entity-type', 'domain', '--config', config, '--time', now] + retval = execute(args) + + self.assertEquals(retval, API_ERROR) + self.assertEquals(sys.stdout.getvalue(), '') + self.assertEquals(sys.stderr.getvalue(), '') + + self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with() + self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with() + self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called() + + heartbeat = { + 'entity': u(entity), + 'time': float(now), + 'type': 'domain', + } + stats = { + u('cursorpos'): None, + u('dependencies'): [], + u('language'): None, + u('lineno'): None, + u('lines'): None, + } + + self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(heartbeat, ANY, None) + self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1])) + self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called() + + def test_entity_type_app(self): + response = Response() + response.status_code = 0 + self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response + + entity = 'Firefox' + config = 'tests/samples/configs/good_config.cfg' + now = u(int(time.time())) + + args = ['--entity', entity, '--entity-type', 'app', '--config', config, '--time', now] + retval = execute(args) + + self.assertEquals(retval, API_ERROR) + self.assertEquals(sys.stdout.getvalue(), '') + self.assertEquals(sys.stderr.getvalue(), '') + + self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with() + self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with() + self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called() + + heartbeat = { + 'entity': u(entity), + 'time': float(now), + 'type': 'app', + } + stats = { + u('cursorpos'): None, + u('dependencies'): [], + u('language'): None, + u('lineno'): None, + u('lines'): None, + } + + self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(heartbeat, ANY, None) + self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1])) + self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called() + + def test_extra_heartbeats_argument(self): + response = Response() + response.status_code = 201 + self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response + + with utils.TemporaryDirectory() as tempdir: + entity = 'tests/samples/codefiles/twolinefile.txt' + shutil.copy(entity, os.path.join(tempdir, 'twolinefile.txt')) + entity = os.path.realpath(os.path.join(tempdir, 'twolinefile.txt')) + + project1 = os.path.basename(os.path.abspath('.')) + project2 = 'xyz' + entity1 = os.path.abspath('tests/samples/codefiles/emptyfile.txt') + entity2 = os.path.abspath('tests/samples/codefiles/twolinefile.txt') + config = 'tests/samples/configs/good_config.cfg' + args = ['--file', entity1, '--config', config, '--extra-heartbeats'] + + with utils.mock.patch('wakatime.main.sys.stdin') as mock_stdin: + now = int(time.time()) + heartbeats = json.dumps([{ + 'timestamp': now, + 'entity': entity2, + 'entity_type': 'file', + 'project': project2, + 'is_write': True, + }]) + mock_stdin.readline.return_value = heartbeats + + retval = execute(args) + + self.assertEquals(retval, SUCCESS) + self.assertEquals(sys.stdout.getvalue(), '') + self.assertEquals(sys.stderr.getvalue(), '') + + self.patched['wakatime.session_cache.SessionCache.get'].assert_has_calls([call(), call()]) + self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called() + self.patched['wakatime.session_cache.SessionCache.save'].assert_has_calls([call(ANY), call(ANY)]) + + self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called() + self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with() + + calls = self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].call_args_list + + body = calls[0][0][0].body + data = json.loads(body) + self.assertEquals(data.get('entity'), entity1) + self.assertEquals(data.get('project'), project1) + + body = calls[1][0][0].body + data = json.loads(body) + self.assertEquals(data.get('entity'), entity2) + self.assertEquals(data.get('project'), project2) + + @log_capture() + def test_extra_heartbeats_with_malformed_json(self, logs): + logging.disable(logging.NOTSET) + + response = Response() + response.status_code = 201 + self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response + + with utils.TemporaryDirectory() as tempdir: + entity = 'tests/samples/codefiles/twolinefile.txt' + shutil.copy(entity, os.path.join(tempdir, 'twolinefile.txt')) + entity = os.path.realpath(os.path.join(tempdir, 'twolinefile.txt')) + + entity = os.path.abspath('tests/samples/codefiles/emptyfile.txt') + config = 'tests/samples/configs/good_config.cfg' + args = ['--file', entity, '--config', config, '--extra-heartbeats'] + + with utils.mock.patch('wakatime.main.sys.stdin') as mock_stdin: + heartbeats = '[{foobar}]' + mock_stdin.readline.return_value = heartbeats + + retval = execute(args) + + self.assertEquals(retval, MALFORMED_HEARTBEAT_ERROR) + self.assertEquals(sys.stdout.getvalue(), '') + self.assertEquals(sys.stderr.getvalue(), '') + + log_output = u("\n").join([u(' ').join(x) for x in logs.actual()]) + self.assertEquals(log_output, '') + + self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with() + self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called() + self.patched['wakatime.session_cache.SessionCache.save'].assert_called_once_with(ANY) + + self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called() + self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called() diff --git a/tests/test_configs.py b/tests/test_configs.py new file mode 100644 index 0000000..5f0ecbb --- /dev/null +++ b/tests/test_configs.py @@ -0,0 +1,540 @@ +# -*- coding: utf-8 -*- + + +from wakatime.main import execute +from wakatime.packages import requests + +import logging +import os +import time +import re +import shutil +import sys +import uuid +from testfixtures import log_capture +from wakatime.compat import u, is_py3 +from wakatime.constants import ( + API_ERROR, + CONFIG_FILE_PARSE_ERROR, + SUCCESS, +) +from wakatime.packages.requests.models import Response +from . import utils + +try: + from .packages import simplejson as json +except (ImportError, SyntaxError): + import json +try: + from mock import ANY +except ImportError: + from unittest.mock import ANY + + +class MainTestCase(utils.TestCase): + patch_these = [ + 'wakatime.packages.requests.adapters.HTTPAdapter.send', + 'wakatime.offlinequeue.Queue.push', + ['wakatime.offlinequeue.Queue.pop', None], + ['wakatime.offlinequeue.Queue.connect', None], + 'wakatime.session_cache.SessionCache.save', + 'wakatime.session_cache.SessionCache.delete', + ['wakatime.session_cache.SessionCache.get', requests.session], + ['wakatime.session_cache.SessionCache.connect', None], + ] + + def test_config_file_not_passed_in_command_line_args(self): + response = Response() + response.status_code = 201 + self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response + + with utils.TemporaryDirectory() as tempdir: + entity = 'tests/samples/codefiles/emptyfile.txt' + shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt')) + entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt')) + + with utils.mock.patch('wakatime.configs.os.environ.get') as mock_env: + mock_env.return_value = None + + with utils.mock.patch('wakatime.configs.open') as mock_open: + mock_open.side_effect = IOError('') + + config = os.path.join(os.path.expanduser('~'), '.wakatime.cfg') + args = ['--file', entity] + + with self.assertRaises(SystemExit) as e: + execute(args) + + self.assertEquals(int(str(e.exception)), CONFIG_FILE_PARSE_ERROR) + expected_stdout = u('') + expected_stderr = u("Error: Could not read from config file {0}\n").format(u(config)) + self.assertEquals(sys.stdout.getvalue(), expected_stdout) + self.assertEquals(sys.stderr.getvalue(), expected_stderr) + self.patched['wakatime.session_cache.SessionCache.get'].assert_not_called() + + def test_config_file_from_env(self): + response = Response() + response.status_code = 201 + self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response + + with utils.TemporaryDirectory() as tempdir: + entity = 'tests/samples/codefiles/emptyfile.txt' + shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt')) + entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt')) + config = 'tests/samples/configs/has_everything.cfg' + shutil.copy(config, os.path.join(tempdir, '.wakatime.cfg')) + config = os.path.realpath(os.path.join(tempdir, '.wakatime.cfg')) + + with utils.mock.patch('wakatime.configs.os.environ.get') as mock_env: + mock_env.return_value = tempdir + + args = ['--file', entity] + retval = execute(args) + self.assertEquals(retval, SUCCESS) + expected_stdout = open('tests/samples/output/main_test_good_config_file').read() + traceback_file = os.path.realpath('wakatime/arguments.py') + lineno = int(re.search(r' line (\d+),', sys.stdout.getvalue()).group(1)) + self.assertEquals(sys.stdout.getvalue(), expected_stdout.format(file=traceback_file, lineno=lineno)) + self.assertEquals(sys.stderr.getvalue(), '') + + self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with() + self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called() + self.patched['wakatime.session_cache.SessionCache.save'].assert_called_once_with(ANY) + + self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called() + self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with() + + def test_missing_config_file(self): + config = 'foo' + + with utils.TemporaryDirectory() as tempdir: + entity = 'tests/samples/codefiles/emptyfile.txt' + shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt')) + entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt')) + + args = ['--file', entity, '--config', config] + with self.assertRaises(SystemExit) as e: + execute(args) + + self.assertEquals(int(str(e.exception)), CONFIG_FILE_PARSE_ERROR) + expected_stdout = u('') + expected_stderr = u("Error: Could not read from config file foo\n") + self.assertEquals(sys.stdout.getvalue(), expected_stdout) + self.assertEquals(sys.stderr.getvalue(), expected_stderr) + + self.patched['wakatime.session_cache.SessionCache.get'].assert_not_called() + + def test_good_config_file(self): + response = Response() + response.status_code = 201 + self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response + + with utils.TemporaryDirectory() as tempdir: + entity = 'tests/samples/codefiles/emptyfile.txt' + shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt')) + entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt')) + + config = 'tests/samples/configs/has_everything.cfg' + args = ['--file', entity, '--config', config] + retval = execute(args) + self.assertEquals(retval, SUCCESS) + expected_stdout = open('tests/samples/output/main_test_good_config_file').read() + traceback_file = os.path.realpath('wakatime/arguments.py') + lineno = int(re.search(r' line (\d+),', sys.stdout.getvalue()).group(1)) + self.assertEquals(sys.stdout.getvalue(), expected_stdout.format(file=traceback_file, lineno=lineno)) + self.assertEquals(sys.stderr.getvalue(), '') + + self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with() + self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called() + self.patched['wakatime.session_cache.SessionCache.save'].assert_called_once_with(ANY) + + self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called() + self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with() + + def test_api_key_setting_without_underscore_accepted(self): + """Api key in wakatime.cfg should also work without an underscore: + apikey = XXX + """ + + response = Response() + response.status_code = 201 + self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response + + with utils.TemporaryDirectory() as tempdir: + entity = 'tests/samples/codefiles/emptyfile.txt' + shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt')) + entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt')) + + config = 'tests/samples/configs/sample_alternate_apikey.cfg' + args = ['--file', entity, '--config', config] + retval = execute(args) + self.assertEquals(retval, SUCCESS) + self.assertEquals(sys.stdout.getvalue(), '') + self.assertEquals(sys.stderr.getvalue(), '') + + self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with() + self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called() + self.patched['wakatime.session_cache.SessionCache.save'].assert_called_once_with(ANY) + + self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called() + self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with() + + def test_bad_config_file(self): + with utils.TemporaryDirectory() as tempdir: + entity = 'tests/samples/codefiles/emptyfile.txt' + shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt')) + entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt')) + + config = 'tests/samples/configs/bad_config.cfg' + args = ['--file', entity, '--config', config] + retval = execute(args) + self.assertEquals(retval, CONFIG_FILE_PARSE_ERROR) + self.assertIn('ParsingError', sys.stdout.getvalue()) + self.assertEquals(sys.stderr.getvalue(), '') + self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called() + self.patched['wakatime.session_cache.SessionCache.get'].assert_not_called() + self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called() + self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called() + + def test_non_hidden_filename(self): + response = Response() + response.status_code = 0 + self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response + + with utils.TemporaryDirectory() as tempdir: + entity = 'tests/samples/codefiles/twolinefile.txt' + shutil.copy(entity, os.path.join(tempdir, 'twolinefile.txt')) + entity = os.path.realpath(os.path.join(tempdir, 'twolinefile.txt')) + now = u(int(time.time())) + config = 'tests/samples/configs/good_config.cfg' + key = str(uuid.uuid4()) + + args = ['--file', entity, '--key', key, '--config', config, '--time', now] + + retval = execute(args) + self.assertEquals(retval, API_ERROR) + self.assertEquals(sys.stdout.getvalue(), '') + self.assertEquals(sys.stderr.getvalue(), '') + + self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with() + self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with() + self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called() + + heartbeat = { + 'language': 'Text only', + 'lines': 2, + 'entity': os.path.realpath(entity), + 'project': os.path.basename(os.path.abspath('.')), + 'time': float(now), + 'type': 'file', + } + stats = { + u('cursorpos'): None, + u('dependencies'): [], + u('language'): u('Text only'), + u('lineno'): None, + u('lines'): 2, + } + + self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(ANY, ANY, None) + for key, val in self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][0].items(): + self.assertEquals(heartbeat[key], val) + self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1])) + self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called() + + def test_hide_all_filenames(self): + response = Response() + response.status_code = 0 + self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response + + with utils.TemporaryDirectory() as tempdir: + entity = 'tests/samples/codefiles/twolinefile.txt' + shutil.copy(entity, os.path.join(tempdir, 'twolinefile.txt')) + entity = os.path.realpath(os.path.join(tempdir, 'twolinefile.txt')) + now = u(int(time.time())) + config = 'tests/samples/configs/paranoid.cfg' + key = str(uuid.uuid4()) + + args = ['--file', entity, '--key', key, '--config', config, '--time', now] + + retval = execute(args) + self.assertEquals(retval, API_ERROR) + self.assertEquals(sys.stdout.getvalue(), '') + self.assertEquals(sys.stderr.getvalue(), '') + + self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with() + self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with() + self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called() + + heartbeat = { + 'language': 'Text only', + 'lines': 2, + 'entity': 'HIDDEN.txt', + 'project': os.path.basename(os.path.abspath('.')), + 'time': float(now), + 'type': 'file', + } + stats = { + u('cursorpos'): None, + u('dependencies'): [], + u('language'): u('Text only'), + u('lineno'): None, + u('lines'): 2, + } + + self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(ANY, ANY, None) + for key, val in self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][0].items(): + self.assertEquals(heartbeat[key], val) + self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1])) + self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called() + + def test_hide_all_filenames_from_cli_arg(self): + response = Response() + response.status_code = 0 + self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response + + with utils.TemporaryDirectory() as tempdir: + entity = 'tests/samples/codefiles/twolinefile.txt' + shutil.copy(entity, os.path.join(tempdir, 'twolinefile.txt')) + entity = os.path.realpath(os.path.join(tempdir, 'twolinefile.txt')) + now = u(int(time.time())) + config = 'tests/samples/configs/good_config.cfg' + key = str(uuid.uuid4()) + + args = ['--file', entity, '--key', key, '--config', config, '--time', now, '--hidefilenames'] + + retval = execute(args) + self.assertEquals(retval, API_ERROR) + self.assertEquals(sys.stdout.getvalue(), '') + self.assertEquals(sys.stderr.getvalue(), '') + + self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with() + self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with() + self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called() + + heartbeat = { + 'language': 'Text only', + 'lines': 2, + 'entity': 'HIDDEN.txt', + 'project': os.path.basename(os.path.abspath('.')), + 'time': float(now), + 'type': 'file', + } + stats = { + u('cursorpos'): None, + u('dependencies'): [], + u('language'): u('Text only'), + u('lineno'): None, + u('lines'): 2, + } + + self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(ANY, ANY, None) + for key, val in self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][0].items(): + self.assertEquals(heartbeat[key], val) + self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1])) + self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called() + + def test_hide_matching_filenames(self): + response = Response() + response.status_code = 0 + self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response + + with utils.TemporaryDirectory() as tempdir: + entity = 'tests/samples/codefiles/twolinefile.txt' + shutil.copy(entity, os.path.join(tempdir, 'twolinefile.txt')) + entity = os.path.realpath(os.path.join(tempdir, 'twolinefile.txt')) + now = u(int(time.time())) + config = 'tests/samples/configs/hide_file_names.cfg' + key = str(uuid.uuid4()) + + args = ['--file', entity, '--key', key, '--config', config, '--time', now] + + retval = execute(args) + self.assertEquals(retval, API_ERROR) + self.assertEquals(sys.stdout.getvalue(), '') + self.assertEquals(sys.stderr.getvalue(), '') + + self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with() + self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with() + self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called() + + heartbeat = { + 'language': 'Text only', + 'lines': 2, + 'entity': 'HIDDEN.txt', + 'project': os.path.basename(os.path.abspath('.')), + 'time': float(now), + 'type': 'file', + } + stats = { + u('cursorpos'): None, + u('dependencies'): [], + u('language'): u('Text only'), + u('lineno'): None, + u('lines'): 2, + } + + self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(ANY, ANY, None) + for key, val in self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][0].items(): + self.assertEquals(heartbeat[key], val) + self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1])) + self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called() + + def test_does_not_hide_unmatching_filenames(self): + response = Response() + response.status_code = 0 + self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response + + with utils.TemporaryDirectory() as tempdir: + entity = 'tests/samples/codefiles/emptyfile.txt' + shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt')) + entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt')) + now = u(int(time.time())) + config = 'tests/samples/configs/hide_file_names.cfg' + key = str(uuid.uuid4()) + + args = ['--file', entity, '--key', key, '--config', config, '--time', now] + + retval = execute(args) + self.assertEquals(retval, API_ERROR) + self.assertEquals(sys.stdout.getvalue(), '') + self.assertEquals(sys.stderr.getvalue(), '') + + self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with() + self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with() + self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called() + + heartbeat = { + 'language': 'Text only', + 'lines': 0, + 'entity': entity, + 'project': os.path.basename(os.path.abspath('.')), + 'time': float(now), + 'type': 'file', + } + stats = { + u('cursorpos'): None, + u('dependencies'): [], + u('language'): u('Text only'), + u('lineno'): None, + u('lines'): 0, + } + + self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(ANY, ANY, None) + for key, val in self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][0].items(): + self.assertEquals(heartbeat[key], val) + self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1])) + self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called() + + @log_capture() + def test_does_not_hide_filenames_from_invalid_regex(self, logs): + logging.disable(logging.NOTSET) + + response = Response() + response.status_code = 0 + self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response + + with utils.TemporaryDirectory() as tempdir: + entity = 'tests/samples/codefiles/emptyfile.txt' + shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt')) + entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt')) + now = u(int(time.time())) + config = 'tests/samples/configs/invalid_hide_file_names.cfg' + key = str(uuid.uuid4()) + + args = ['--file', entity, '--key', key, '--config', config, '--time', now] + + retval = execute(args) + self.assertEquals(retval, API_ERROR) + self.assertEquals(sys.stdout.getvalue(), '') + self.assertEquals(sys.stderr.getvalue(), '') + + log_output = u("\n").join([u(' ').join(x) for x in logs.actual()]) + expected = u('WakaTime WARNING Regex error (unbalanced parenthesis) for include pattern: invalid(regex') + if self.isPy35OrNewer: + expected = 'WakaTime WARNING Regex error (missing ), unterminated subpattern at position 7) for include pattern: invalid(regex' + self.assertEquals(expected, log_output) + + self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with() + self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with() + self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called() + + heartbeat = { + 'language': 'Text only', + 'lines': 0, + 'entity': entity, + 'project': os.path.basename(os.path.abspath('.')), + 'time': float(now), + 'type': 'file', + } + stats = { + u('cursorpos'): None, + u('dependencies'): [], + u('language'): u('Text only'), + u('lineno'): None, + u('lines'): 0, + } + + self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(ANY, ANY, None) + for key, val in self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][0].items(): + self.assertEquals(heartbeat[key], val) + self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1])) + self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called() + + @log_capture() + def test_exclude_file(self, logs): + logging.disable(logging.NOTSET) + + response = Response() + response.status_code = 0 + self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response + + with utils.TemporaryDirectory() as tempdir: + entity = 'tests/samples/codefiles/emptyfile.txt' + shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt')) + entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt')) + config = 'tests/samples/configs/good_config.cfg' + + args = ['--file', entity, '--config', config, '--exclude', 'empty', '--verbose'] + retval = execute(args) + self.assertEquals(retval, SUCCESS) + + self.assertEquals(sys.stdout.getvalue(), '') + self.assertEquals(sys.stderr.getvalue(), '') + + log_output = u("\n").join([u(' ').join(x) for x in logs.actual()]) + expected = 'WakaTime DEBUG Skipping because matches exclude pattern: empty' + self.assertEquals(log_output, expected) + + self.patched['wakatime.session_cache.SessionCache.get'].assert_not_called() + self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called() + self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called() + + self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called() + self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with() + + def test_hostname_set_from_config_file(self): + response = Response() + response.status_code = 201 + self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response + + with utils.TemporaryDirectory() as tempdir: + entity = 'tests/samples/codefiles/emptyfile.txt' + shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt')) + entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt')) + + hostname = 'fromcfgfile' + config = 'tests/samples/configs/has_everything.cfg' + args = ['--file', entity, '--config', config, '--timeout', '15'] + retval = execute(args) + self.assertEquals(retval, SUCCESS) + + self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with() + self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called() + self.patched['wakatime.session_cache.SessionCache.save'].assert_called_once_with(ANY) + + self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called() + self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with() + + headers = self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].call_args[0][0].headers + self.assertEquals(headers.get('X-Machine-Name'), hostname.encode('utf-8') if is_py3 else hostname) diff --git a/tests/test_main.py b/tests/test_main.py index 4730ec4..9e420cd 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -7,7 +7,6 @@ from wakatime.packages import requests import logging import os import time -import re import shutil import sys import uuid @@ -16,9 +15,7 @@ from wakatime.compat import u, is_py3 from wakatime.constants import ( API_ERROR, AUTH_ERROR, - CONFIG_FILE_PARSE_ERROR, SUCCESS, - MALFORMED_HEARTBEAT_ERROR, ) from wakatime.packages.requests.exceptions import RequestException from wakatime.packages.requests.models import Response @@ -29,9 +26,9 @@ try: except (ImportError, SyntaxError): import json try: - from mock import ANY, call + from mock import ANY except ImportError: - from unittest.mock import ANY, call + from unittest.mock import ANY from wakatime.packages import tzlocal @@ -47,586 +44,6 @@ class MainTestCase(utils.TestCase): ['wakatime.session_cache.SessionCache.connect', None], ] - def test_help_contents(self): - args = ['--help'] - with self.assertRaises(SystemExit) as e: - execute(args) - - self.assertEquals(int(str(e.exception)), 0) - expected_stdout = open('tests/samples/output/test_help_contents').read() - self.assertEquals(sys.stdout.getvalue(), expected_stdout) - self.assertEquals(sys.stderr.getvalue(), '') - - self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called() - self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called() - - def test_argument_parsing(self): - response = Response() - response.status_code = 201 - self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response - - with utils.TemporaryDirectory() as tempdir: - entity = 'tests/samples/codefiles/twolinefile.txt' - shutil.copy(entity, os.path.join(tempdir, 'twolinefile.txt')) - entity = os.path.realpath(os.path.join(tempdir, 'twolinefile.txt')) - key = str(uuid.uuid4()) - config = 'tests/samples/configs/good_config.cfg' - - args = ['--file', entity, '--key', key, '--config', config] - - retval = execute(args) - self.assertEquals(retval, SUCCESS) - self.assertEquals(sys.stdout.getvalue(), '') - self.assertEquals(sys.stderr.getvalue(), '') - - self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with() - self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called() - self.patched['wakatime.session_cache.SessionCache.save'].assert_called_once_with(ANY) - - self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called() - self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with() - - def test_config_file_not_passed_in_command_line_args(self): - response = Response() - response.status_code = 201 - self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response - - with utils.TemporaryDirectory() as tempdir: - entity = 'tests/samples/codefiles/emptyfile.txt' - shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt')) - entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt')) - - with utils.mock.patch('wakatime.main.os.environ.get') as mock_env: - mock_env.return_value = None - - with utils.mock.patch('wakatime.main.open') as mock_open: - mock_open.side_effect = IOError('') - - config = os.path.join(os.path.expanduser('~'), '.wakatime.cfg') - args = ['--file', entity] - - with self.assertRaises(SystemExit) as e: - execute(args) - - self.assertEquals(int(str(e.exception)), CONFIG_FILE_PARSE_ERROR) - expected_stdout = u('') - expected_stderr = u("Error: Could not read from config file {0}\n").format(u(config)) - self.assertEquals(sys.stdout.getvalue(), expected_stdout) - self.assertEquals(sys.stderr.getvalue(), expected_stderr) - self.patched['wakatime.session_cache.SessionCache.get'].assert_not_called() - - def test_config_file_from_env(self): - response = Response() - response.status_code = 201 - self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response - - with utils.TemporaryDirectory() as tempdir: - entity = 'tests/samples/codefiles/emptyfile.txt' - shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt')) - entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt')) - config = 'tests/samples/configs/has_everything.cfg' - shutil.copy(config, os.path.join(tempdir, '.wakatime.cfg')) - config = os.path.realpath(os.path.join(tempdir, '.wakatime.cfg')) - - with utils.mock.patch('wakatime.main.os.environ.get') as mock_env: - mock_env.return_value = tempdir - - args = ['--file', entity] - retval = execute(args) - self.assertEquals(retval, SUCCESS) - expected_stdout = open('tests/samples/output/main_test_good_config_file').read() - traceback_file = os.path.realpath('wakatime/main.py') - lineno = int(re.search(r' line (\d+),', sys.stdout.getvalue()).group(1)) - self.assertEquals(sys.stdout.getvalue(), expected_stdout.format(file=traceback_file, lineno=lineno)) - self.assertEquals(sys.stderr.getvalue(), '') - - self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with() - self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called() - self.patched['wakatime.session_cache.SessionCache.save'].assert_called_once_with(ANY) - - self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called() - self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with() - - def test_missing_config_file(self): - config = 'foo' - - with utils.TemporaryDirectory() as tempdir: - entity = 'tests/samples/codefiles/emptyfile.txt' - shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt')) - entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt')) - - args = ['--file', entity, '--config', config] - with self.assertRaises(SystemExit) as e: - execute(args) - - self.assertEquals(int(str(e.exception)), CONFIG_FILE_PARSE_ERROR) - expected_stdout = u('') - expected_stderr = u("Error: Could not read from config file foo\n") - self.assertEquals(sys.stdout.getvalue(), expected_stdout) - self.assertEquals(sys.stderr.getvalue(), expected_stderr) - - self.patched['wakatime.session_cache.SessionCache.get'].assert_not_called() - - def test_good_config_file(self): - response = Response() - response.status_code = 201 - self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response - - with utils.TemporaryDirectory() as tempdir: - entity = 'tests/samples/codefiles/emptyfile.txt' - shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt')) - entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt')) - - config = 'tests/samples/configs/has_everything.cfg' - args = ['--file', entity, '--config', config] - retval = execute(args) - self.assertEquals(retval, SUCCESS) - expected_stdout = open('tests/samples/output/main_test_good_config_file').read() - traceback_file = os.path.realpath('wakatime/main.py') - lineno = int(re.search(r' line (\d+),', sys.stdout.getvalue()).group(1)) - self.assertEquals(sys.stdout.getvalue(), expected_stdout.format(file=traceback_file, lineno=lineno)) - self.assertEquals(sys.stderr.getvalue(), '') - - self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with() - self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called() - self.patched['wakatime.session_cache.SessionCache.save'].assert_called_once_with(ANY) - - self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called() - self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with() - - def test_api_key_setting_without_underscore_accepted(self): - """Api key in wakatime.cfg should also work without an underscore: - apikey = XXX - """ - - response = Response() - response.status_code = 201 - self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response - - with utils.TemporaryDirectory() as tempdir: - entity = 'tests/samples/codefiles/emptyfile.txt' - shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt')) - entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt')) - - config = 'tests/samples/configs/sample_alternate_apikey.cfg' - args = ['--file', entity, '--config', config] - retval = execute(args) - self.assertEquals(retval, SUCCESS) - self.assertEquals(sys.stdout.getvalue(), '') - self.assertEquals(sys.stderr.getvalue(), '') - - self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with() - self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called() - self.patched['wakatime.session_cache.SessionCache.save'].assert_called_once_with(ANY) - - self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called() - self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with() - - def test_bad_config_file(self): - with utils.TemporaryDirectory() as tempdir: - entity = 'tests/samples/codefiles/emptyfile.txt' - shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt')) - entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt')) - - config = 'tests/samples/configs/bad_config.cfg' - args = ['--file', entity, '--config', config] - retval = execute(args) - self.assertEquals(retval, CONFIG_FILE_PARSE_ERROR) - self.assertIn('ParsingError', sys.stdout.getvalue()) - self.assertEquals(sys.stderr.getvalue(), '') - self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called() - self.patched['wakatime.session_cache.SessionCache.get'].assert_not_called() - self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called() - self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called() - - def test_lineno_and_cursorpos(self): - response = Response() - response.status_code = 0 - self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response - - entity = 'tests/samples/codefiles/twolinefile.txt' - config = 'tests/samples/configs/good_config.cfg' - now = u(int(time.time())) - - args = ['--entity', entity, '--config', config, '--time', now, '--lineno', '3', '--cursorpos', '4', '--verbose'] - retval = execute(args) - - self.assertEquals(sys.stdout.getvalue(), '') - self.assertEquals(sys.stderr.getvalue(), '') - - self.assertEquals(retval, API_ERROR) - - self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with() - self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with() - self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called() - - heartbeat = { - 'language': 'Text only', - 'lines': 2, - 'entity': os.path.realpath(entity), - 'project': os.path.basename(os.path.abspath('.')), - 'cursorpos': '4', - 'lineno': '3', - 'branch': 'master', - 'time': float(now), - 'type': 'file', - } - stats = { - u('cursorpos'): '4', - u('dependencies'): [], - u('language'): u('Text only'), - u('lineno'): '3', - u('lines'): 2, - } - - self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(ANY, ANY, None) - for key, val in self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][0].items(): - self.assertEquals(heartbeat[key], val) - self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1])) - self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called() - - def test_non_hidden_filename(self): - response = Response() - response.status_code = 0 - self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response - - with utils.TemporaryDirectory() as tempdir: - entity = 'tests/samples/codefiles/twolinefile.txt' - shutil.copy(entity, os.path.join(tempdir, 'twolinefile.txt')) - entity = os.path.realpath(os.path.join(tempdir, 'twolinefile.txt')) - now = u(int(time.time())) - config = 'tests/samples/configs/good_config.cfg' - key = str(uuid.uuid4()) - - args = ['--file', entity, '--key', key, '--config', config, '--time', now] - - retval = execute(args) - self.assertEquals(retval, API_ERROR) - self.assertEquals(sys.stdout.getvalue(), '') - self.assertEquals(sys.stderr.getvalue(), '') - - self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with() - self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with() - self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called() - - heartbeat = { - 'language': 'Text only', - 'lines': 2, - 'entity': os.path.realpath(entity), - 'project': os.path.basename(os.path.abspath('.')), - 'time': float(now), - 'type': 'file', - } - stats = { - u('cursorpos'): None, - u('dependencies'): [], - u('language'): u('Text only'), - u('lineno'): None, - u('lines'): 2, - } - - self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(ANY, ANY, None) - for key, val in self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][0].items(): - self.assertEquals(heartbeat[key], val) - self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1])) - self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called() - - def test_hide_all_filenames(self): - response = Response() - response.status_code = 0 - self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response - - with utils.TemporaryDirectory() as tempdir: - entity = 'tests/samples/codefiles/twolinefile.txt' - shutil.copy(entity, os.path.join(tempdir, 'twolinefile.txt')) - entity = os.path.realpath(os.path.join(tempdir, 'twolinefile.txt')) - now = u(int(time.time())) - config = 'tests/samples/configs/paranoid.cfg' - key = str(uuid.uuid4()) - - args = ['--file', entity, '--key', key, '--config', config, '--time', now] - - retval = execute(args) - self.assertEquals(retval, API_ERROR) - self.assertEquals(sys.stdout.getvalue(), '') - self.assertEquals(sys.stderr.getvalue(), '') - - self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with() - self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with() - self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called() - - heartbeat = { - 'language': 'Text only', - 'lines': 2, - 'entity': 'HIDDEN.txt', - 'project': os.path.basename(os.path.abspath('.')), - 'time': float(now), - 'type': 'file', - } - stats = { - u('cursorpos'): None, - u('dependencies'): [], - u('language'): u('Text only'), - u('lineno'): None, - u('lines'): 2, - } - - self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(ANY, ANY, None) - for key, val in self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][0].items(): - self.assertEquals(heartbeat[key], val) - self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1])) - self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called() - - def test_hide_all_filenames_from_cli_arg(self): - response = Response() - response.status_code = 0 - self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response - - with utils.TemporaryDirectory() as tempdir: - entity = 'tests/samples/codefiles/twolinefile.txt' - shutil.copy(entity, os.path.join(tempdir, 'twolinefile.txt')) - entity = os.path.realpath(os.path.join(tempdir, 'twolinefile.txt')) - now = u(int(time.time())) - config = 'tests/samples/configs/good_config.cfg' - key = str(uuid.uuid4()) - - args = ['--file', entity, '--key', key, '--config', config, '--time', now, '--hidefilenames'] - - retval = execute(args) - self.assertEquals(retval, API_ERROR) - self.assertEquals(sys.stdout.getvalue(), '') - self.assertEquals(sys.stderr.getvalue(), '') - - self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with() - self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with() - self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called() - - heartbeat = { - 'language': 'Text only', - 'lines': 2, - 'entity': 'HIDDEN.txt', - 'project': os.path.basename(os.path.abspath('.')), - 'time': float(now), - 'type': 'file', - } - stats = { - u('cursorpos'): None, - u('dependencies'): [], - u('language'): u('Text only'), - u('lineno'): None, - u('lines'): 2, - } - - self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(ANY, ANY, None) - for key, val in self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][0].items(): - self.assertEquals(heartbeat[key], val) - self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1])) - self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called() - - def test_hide_matching_filenames(self): - response = Response() - response.status_code = 0 - self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response - - with utils.TemporaryDirectory() as tempdir: - entity = 'tests/samples/codefiles/twolinefile.txt' - shutil.copy(entity, os.path.join(tempdir, 'twolinefile.txt')) - entity = os.path.realpath(os.path.join(tempdir, 'twolinefile.txt')) - now = u(int(time.time())) - config = 'tests/samples/configs/hide_file_names.cfg' - key = str(uuid.uuid4()) - - args = ['--file', entity, '--key', key, '--config', config, '--time', now] - - retval = execute(args) - self.assertEquals(retval, API_ERROR) - self.assertEquals(sys.stdout.getvalue(), '') - self.assertEquals(sys.stderr.getvalue(), '') - - self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with() - self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with() - self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called() - - heartbeat = { - 'language': 'Text only', - 'lines': 2, - 'entity': 'HIDDEN.txt', - 'project': os.path.basename(os.path.abspath('.')), - 'time': float(now), - 'type': 'file', - } - stats = { - u('cursorpos'): None, - u('dependencies'): [], - u('language'): u('Text only'), - u('lineno'): None, - u('lines'): 2, - } - - self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(ANY, ANY, None) - for key, val in self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][0].items(): - self.assertEquals(heartbeat[key], val) - self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1])) - self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called() - - def test_does_not_hide_unmatching_filenames(self): - response = Response() - response.status_code = 0 - self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response - - with utils.TemporaryDirectory() as tempdir: - entity = 'tests/samples/codefiles/emptyfile.txt' - shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt')) - entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt')) - now = u(int(time.time())) - config = 'tests/samples/configs/hide_file_names.cfg' - key = str(uuid.uuid4()) - - args = ['--file', entity, '--key', key, '--config', config, '--time', now] - - retval = execute(args) - self.assertEquals(retval, API_ERROR) - self.assertEquals(sys.stdout.getvalue(), '') - self.assertEquals(sys.stderr.getvalue(), '') - - self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with() - self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with() - self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called() - - heartbeat = { - 'language': 'Text only', - 'lines': 0, - 'entity': entity, - 'project': os.path.basename(os.path.abspath('.')), - 'time': float(now), - 'type': 'file', - } - stats = { - u('cursorpos'): None, - u('dependencies'): [], - u('language'): u('Text only'), - u('lineno'): None, - u('lines'): 0, - } - - self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(ANY, ANY, None) - for key, val in self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][0].items(): - self.assertEquals(heartbeat[key], val) - self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1])) - self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called() - - @log_capture() - def test_does_not_hide_filenames_from_invalid_regex(self, logs): - logging.disable(logging.NOTSET) - - response = Response() - response.status_code = 0 - self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response - - with utils.TemporaryDirectory() as tempdir: - entity = 'tests/samples/codefiles/emptyfile.txt' - shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt')) - entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt')) - now = u(int(time.time())) - config = 'tests/samples/configs/invalid_hide_file_names.cfg' - key = str(uuid.uuid4()) - - args = ['--file', entity, '--key', key, '--config', config, '--time', now] - - retval = execute(args) - self.assertEquals(retval, API_ERROR) - self.assertEquals(sys.stdout.getvalue(), '') - self.assertEquals(sys.stderr.getvalue(), '') - - log_output = u("\n").join([u(' ').join(x) for x in logs.actual()]) - expected = u('WakaTime WARNING Regex error (unbalanced parenthesis) for include pattern: invalid(regex') - if self.isPy35OrNewer: - expected = 'WakaTime WARNING Regex error (missing ), unterminated subpattern at position 7) for include pattern: invalid(regex' - self.assertEquals(expected, log_output) - - self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with() - self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with() - self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called() - - heartbeat = { - 'language': 'Text only', - 'lines': 0, - 'entity': entity, - 'project': os.path.basename(os.path.abspath('.')), - 'time': float(now), - 'type': 'file', - } - stats = { - u('cursorpos'): None, - u('dependencies'): [], - u('language'): u('Text only'), - u('lineno'): None, - u('lines'): 0, - } - - self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(ANY, ANY, None) - for key, val in self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][0].items(): - self.assertEquals(heartbeat[key], val) - self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1])) - self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called() - - def test_invalid_timeout_passed_via_command_line(self): - response = Response() - response.status_code = 201 - self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response - - with utils.TemporaryDirectory() as tempdir: - entity = 'tests/samples/codefiles/twolinefile.txt' - shutil.copy(entity, os.path.join(tempdir, 'twolinefile.txt')) - entity = os.path.realpath(os.path.join(tempdir, 'twolinefile.txt')) - config = 'tests/samples/configs/good_config.cfg' - key = str(uuid.uuid4()) - args = ['--file', entity, '--key', key, '--config', config, '--timeout', 'abc'] - - with self.assertRaises(SystemExit) as e: - execute(args) - - self.assertEquals(int(str(e.exception)), 2) - self.assertEquals(sys.stdout.getvalue(), '') - expected_stderr = open('tests/samples/output/main_test_timeout_passed_via_command_line').read() - self.assertEquals(sys.stderr.getvalue(), expected_stderr) - - self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called() - self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called() - self.patched['wakatime.session_cache.SessionCache.get'].assert_not_called() - - @log_capture() - def test_exclude_file(self, logs): - logging.disable(logging.NOTSET) - - response = Response() - response.status_code = 0 - self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response - - with utils.TemporaryDirectory() as tempdir: - entity = 'tests/samples/codefiles/emptyfile.txt' - shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt')) - entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt')) - config = 'tests/samples/configs/good_config.cfg' - - args = ['--file', entity, '--config', config, '--exclude', 'empty', '--verbose'] - retval = execute(args) - self.assertEquals(retval, SUCCESS) - - self.assertEquals(sys.stdout.getvalue(), '') - self.assertEquals(sys.stderr.getvalue(), '') - - log_output = u("\n").join([u(' ').join(x) for x in logs.actual()]) - expected = 'WakaTime DEBUG Skipping because matches exclude pattern: empty' - self.assertEquals(log_output, expected) - - self.patched['wakatime.session_cache.SessionCache.get'].assert_not_called() - self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called() - self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called() - - self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called() - self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with() - def test_500_response(self): response = Response() response.status_code = 500 @@ -880,94 +297,6 @@ class MainTestCase(utils.TestCase): self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called() self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called() - @log_capture() - def test_missing_entity_file(self, logs): - logging.disable(logging.NOTSET) - - response = Response() - response.status_code = 201 - self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response - - entity = 'tests/samples/codefiles/missingfile.txt' - - config = 'tests/samples/configs/good_config.cfg' - args = ['--file', entity, '--config', config, '--verbose'] - retval = execute(args) - self.assertEquals(retval, SUCCESS) - self.assertEquals(sys.stdout.getvalue(), '') - self.assertEquals(sys.stderr.getvalue(), '') - - log_output = u("\n").join([u(' ').join(x) for x in logs.actual()]) - expected = 'WakaTime DEBUG File does not exist; ignoring this heartbeat.' - self.assertEquals(log_output, expected) - - self.patched['wakatime.session_cache.SessionCache.get'].assert_not_called() - self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called() - self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called() - - self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called() - self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with() - - @log_capture() - def test_missing_entity_argument(self, logs): - logging.disable(logging.NOTSET) - - response = Response() - response.status_code = 201 - self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response - - config = 'tests/samples/configs/good_config.cfg' - args = ['--config', config] - - with self.assertRaises(SystemExit) as e: - execute(args) - - self.assertEquals(int(str(e.exception)), 2) - self.assertEquals(sys.stdout.getvalue(), '') - expected = 'error: argument --entity is required' - self.assertIn(expected, sys.stderr.getvalue()) - - log_output = u("\n").join([u(' ').join(x) for x in logs.actual()]) - expected = '' - self.assertEquals(log_output, expected) - - self.patched['wakatime.session_cache.SessionCache.get'].assert_not_called() - self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called() - self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called() - - self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called() - self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called() - - @log_capture() - def test_missing_api_key(self, logs): - logging.disable(logging.NOTSET) - - response = Response() - response.status_code = 201 - self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response - - config = 'tests/samples/configs/missing_api_key.cfg' - args = ['--config', config] - - with self.assertRaises(SystemExit) as e: - execute(args) - - self.assertEquals(int(str(e.exception)), AUTH_ERROR) - self.assertEquals(sys.stdout.getvalue(), '') - expected = 'error: Missing api key. Find your api key from wakatime.com/settings.' - self.assertIn(expected, sys.stderr.getvalue()) - - log_output = u("\n").join([u(' ').join(x) for x in logs.actual()]) - expected = '' - self.assertEquals(log_output, expected) - - self.patched['wakatime.session_cache.SessionCache.get'].assert_not_called() - self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called() - self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called() - - self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called() - self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called() - @log_capture() def test_invalid_api_key(self, logs): logging.disable(logging.NOTSET) @@ -998,154 +327,6 @@ class MainTestCase(utils.TestCase): self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called() self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called() - def test_proxy_argument(self): - response = Response() - response.status_code = 201 - self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response - - with utils.TemporaryDirectory() as tempdir: - entity = 'tests/samples/codefiles/emptyfile.txt' - shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt')) - entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt')) - proxy = 'localhost:1337' - - config = 'tests/samples/configs/good_config.cfg' - args = ['--file', entity, '--config', config, '--proxy', proxy] - retval = execute(args) - self.assertEquals(retval, SUCCESS) - self.assertEquals(sys.stdout.getvalue(), '') - self.assertEquals(sys.stderr.getvalue(), '') - - self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with() - self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called() - self.patched['wakatime.session_cache.SessionCache.save'].assert_called_once_with(ANY) - - self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called() - self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with() - - self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].assert_called_once_with(ANY, cert=None, proxies={'https': proxy}, stream=False, timeout=60, verify=True) - - def test_write_argument(self): - response = Response() - response.status_code = 0 - self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response - - with utils.TemporaryDirectory() as tempdir: - entity = 'tests/samples/codefiles/emptyfile.txt' - shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt')) - entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt')) - now = u(int(time.time())) - key = str(uuid.uuid4()) - - args = ['--file', entity, '--key', key, '--write', '--verbose', - '--config', 'tests/samples/configs/good_config.cfg', '--time', now] - - retval = execute(args) - self.assertEquals(retval, API_ERROR) - self.assertEquals(sys.stdout.getvalue(), '') - self.assertEquals(sys.stderr.getvalue(), '') - - self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with() - self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with() - self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called() - - heartbeat = { - 'language': 'Text only', - 'lines': 0, - 'entity': entity, - 'project': os.path.basename(os.path.abspath('.')), - 'time': float(now), - 'type': 'file', - 'is_write': True, - } - stats = { - u('cursorpos'): None, - u('dependencies'): [], - u('language'): u('Text only'), - u('lineno'): None, - u('lines'): 0, - } - - self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(ANY, ANY, None) - for key, val in self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][0].items(): - self.assertEquals(heartbeat[key], val) - self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1])) - self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called() - - def test_entity_type_domain(self): - response = Response() - response.status_code = 0 - self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response - - entity = 'google.com' - config = 'tests/samples/configs/good_config.cfg' - now = u(int(time.time())) - - args = ['--entity', entity, '--entity-type', 'domain', '--config', config, '--time', now] - retval = execute(args) - - self.assertEquals(retval, API_ERROR) - self.assertEquals(sys.stdout.getvalue(), '') - self.assertEquals(sys.stderr.getvalue(), '') - - self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with() - self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with() - self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called() - - heartbeat = { - 'entity': u(entity), - 'time': float(now), - 'type': 'domain', - } - stats = { - u('cursorpos'): None, - u('dependencies'): [], - u('language'): None, - u('lineno'): None, - u('lines'): None, - } - - self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(heartbeat, ANY, None) - self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1])) - self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called() - - def test_entity_type_app(self): - response = Response() - response.status_code = 0 - self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response - - entity = 'Firefox' - config = 'tests/samples/configs/good_config.cfg' - now = u(int(time.time())) - - args = ['--entity', entity, '--entity-type', 'app', '--config', config, '--time', now] - retval = execute(args) - - self.assertEquals(retval, API_ERROR) - self.assertEquals(sys.stdout.getvalue(), '') - self.assertEquals(sys.stderr.getvalue(), '') - - self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with() - self.patched['wakatime.session_cache.SessionCache.delete'].assert_called_once_with() - self.patched['wakatime.session_cache.SessionCache.save'].assert_not_called() - - heartbeat = { - 'entity': u(entity), - 'time': float(now), - 'type': 'app', - } - stats = { - u('cursorpos'): None, - u('dependencies'): [], - u('language'): None, - u('lineno'): None, - u('lines'): None, - } - - self.patched['wakatime.offlinequeue.Queue.push'].assert_called_once_with(heartbeat, ANY, None) - self.assertEquals(stats, json.loads(self.patched['wakatime.offlinequeue.Queue.push'].call_args[0][1])) - self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called() - def test_nonascii_hostname(self): response = Response() response.status_code = 201 @@ -1178,32 +359,6 @@ class MainTestCase(utils.TestCase): headers = self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].call_args[0][0].headers self.assertEquals(headers.get('X-Machine-Name'), hostname.encode('utf-8') if is_py3 else hostname) - def test_hostname_set_from_config_file(self): - response = Response() - response.status_code = 201 - self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response - - with utils.TemporaryDirectory() as tempdir: - entity = 'tests/samples/codefiles/emptyfile.txt' - shutil.copy(entity, os.path.join(tempdir, 'emptyfile.txt')) - entity = os.path.realpath(os.path.join(tempdir, 'emptyfile.txt')) - - hostname = 'fromcfgfile' - config = 'tests/samples/configs/has_everything.cfg' - args = ['--file', entity, '--config', config, '--timeout', '15'] - retval = execute(args) - self.assertEquals(retval, SUCCESS) - - self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with() - self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called() - self.patched['wakatime.session_cache.SessionCache.save'].assert_called_once_with(ANY) - - self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called() - self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with() - - headers = self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].call_args[0][0].headers - self.assertEquals(headers.get('X-Machine-Name'), hostname.encode('utf-8') if is_py3 else hostname) - def test_nonascii_timezone(self): response = Response() response.status_code = 201 @@ -1328,96 +483,6 @@ class MainTestCase(utils.TestCase): headers = self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].call_args[0][0].headers self.assertEquals(headers.get('TimeZone'), u(timezone.zone).encode('utf-8') if is_py3 else timezone.zone) - def test_extra_heartbeats_argument(self): - response = Response() - response.status_code = 201 - self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response - - with utils.TemporaryDirectory() as tempdir: - entity = 'tests/samples/codefiles/twolinefile.txt' - shutil.copy(entity, os.path.join(tempdir, 'twolinefile.txt')) - entity = os.path.realpath(os.path.join(tempdir, 'twolinefile.txt')) - - project1 = os.path.basename(os.path.abspath('.')) - project2 = 'xyz' - entity1 = os.path.abspath('tests/samples/codefiles/emptyfile.txt') - entity2 = os.path.abspath('tests/samples/codefiles/twolinefile.txt') - config = 'tests/samples/configs/good_config.cfg' - args = ['--file', entity1, '--config', config, '--extra-heartbeats'] - - with utils.mock.patch('wakatime.main.sys.stdin') as mock_stdin: - now = int(time.time()) - heartbeats = json.dumps([{ - 'timestamp': now, - 'entity': entity2, - 'entity_type': 'file', - 'project': project2, - 'is_write': True, - }]) - mock_stdin.readline.return_value = heartbeats - - retval = execute(args) - - self.assertEquals(retval, SUCCESS) - self.assertEquals(sys.stdout.getvalue(), '') - self.assertEquals(sys.stderr.getvalue(), '') - - self.patched['wakatime.session_cache.SessionCache.get'].assert_has_calls([call(), call()]) - self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called() - self.patched['wakatime.session_cache.SessionCache.save'].assert_has_calls([call(ANY), call(ANY)]) - - self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called() - self.patched['wakatime.offlinequeue.Queue.pop'].assert_called_once_with() - - calls = self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].call_args_list - - body = calls[0][0][0].body - data = json.loads(body) - self.assertEquals(data.get('entity'), entity1) - self.assertEquals(data.get('project'), project1) - - body = calls[1][0][0].body - data = json.loads(body) - self.assertEquals(data.get('entity'), entity2) - self.assertEquals(data.get('project'), project2) - - @log_capture() - def test_extra_heartbeats_with_malformed_json(self, logs): - logging.disable(logging.NOTSET) - - response = Response() - response.status_code = 201 - self.patched['wakatime.packages.requests.adapters.HTTPAdapter.send'].return_value = response - - with utils.TemporaryDirectory() as tempdir: - entity = 'tests/samples/codefiles/twolinefile.txt' - shutil.copy(entity, os.path.join(tempdir, 'twolinefile.txt')) - entity = os.path.realpath(os.path.join(tempdir, 'twolinefile.txt')) - - entity = os.path.abspath('tests/samples/codefiles/emptyfile.txt') - config = 'tests/samples/configs/good_config.cfg' - args = ['--file', entity, '--config', config, '--extra-heartbeats'] - - with utils.mock.patch('wakatime.main.sys.stdin') as mock_stdin: - heartbeats = '[{foobar}]' - mock_stdin.readline.return_value = heartbeats - - retval = execute(args) - - self.assertEquals(retval, MALFORMED_HEARTBEAT_ERROR) - self.assertEquals(sys.stdout.getvalue(), '') - self.assertEquals(sys.stderr.getvalue(), '') - - log_output = u("\n").join([u(' ').join(x) for x in logs.actual()]) - self.assertEquals(log_output, '') - - self.patched['wakatime.session_cache.SessionCache.get'].assert_called_once_with() - self.patched['wakatime.session_cache.SessionCache.delete'].assert_not_called() - self.patched['wakatime.session_cache.SessionCache.save'].assert_called_once_with(ANY) - - self.patched['wakatime.offlinequeue.Queue.push'].assert_not_called() - self.patched['wakatime.offlinequeue.Queue.pop'].assert_not_called() - @log_capture() def test_nonascii_filename(self, logs): logging.disable(logging.NOTSET) diff --git a/wakatime/arguments.py b/wakatime/arguments.py new file mode 100644 index 0000000..988987b --- /dev/null +++ b/wakatime/arguments.py @@ -0,0 +1,230 @@ +# -*- coding: utf-8 -*- +""" + wakatime.arguments + ~~~~~~~~~~~~~~~~~~ + + Command-line arguments. + + :copyright: (c) 2016 Alan Hamlett. + :license: BSD, see LICENSE for more details. +""" + + +from __future__ import print_function + + +import os +import re +import time +import traceback +from .__about__ import __version__ +from .configs import parseConfigFile +from .constants import AUTH_ERROR +from .packages import argparse + + +class FileAction(argparse.Action): + + def __call__(self, parser, namespace, values, option_string=None): + try: + if os.path.isfile(values): + values = os.path.realpath(values) + except: # pragma: nocover + pass + setattr(namespace, self.dest, values) + + +def parseArguments(): + """Parse command line arguments and configs from ~/.wakatime.cfg. + Command line arguments take precedence over config file settings. + Returns instances of ArgumentParser and SafeConfigParser. + """ + + # define supported command line arguments + parser = argparse.ArgumentParser( + description='Common interface for the WakaTime api.') + parser.add_argument('--entity', dest='entity', metavar='FILE', + action=FileAction, + help='absolute path to file for the heartbeat; can also be a '+ + 'url, domain, or app when --entity-type is not file') + parser.add_argument('--file', dest='file', action=FileAction, + help=argparse.SUPPRESS) + parser.add_argument('--key', dest='key', + help='your wakatime api key; uses api_key from '+ + '~/.wakatime.cfg by default') + parser.add_argument('--write', dest='is_write', + action='store_true', + help='when set, tells api this heartbeat was triggered from '+ + 'writing to a file') + parser.add_argument('--plugin', dest='plugin', + help='optional text editor plugin name and version '+ + 'for User-Agent header') + parser.add_argument('--time', dest='timestamp', metavar='time', + type=float, + help='optional floating-point unix epoch timestamp; '+ + 'uses current time by default') + parser.add_argument('--lineno', dest='lineno', + help='optional line number; current line being edited') + parser.add_argument('--cursorpos', dest='cursorpos', + help='optional cursor position in the current file') + parser.add_argument('--entity-type', dest='entity_type', + help='entity type for this heartbeat. can be one of "file", '+ + '"domain", or "app"; defaults to file.') + parser.add_argument('--proxy', dest='proxy', + help='optional proxy configuration. Supports HTTPS '+ + 'and SOCKS proxies. For example: '+ + 'https://user:pass@host:port or '+ + 'socks5://user:pass@host:port or ' + + 'domain\\user:pass') + parser.add_argument('--project', dest='project', + help='optional project name') + parser.add_argument('--alternate-project', dest='alternate_project', + help='optional alternate project name; auto-discovered project '+ + 'takes priority') + parser.add_argument('--alternate-language', dest='alternate_language', + help='optional alternate language name; auto-detected language'+ + 'takes priority') + parser.add_argument('--hostname', dest='hostname', help='hostname of '+ + 'current machine.') + parser.add_argument('--disableoffline', dest='offline', + action='store_false', + help='disables offline time logging instead of queuing logged time') + parser.add_argument('--hidefilenames', dest='hidefilenames', + action='store_true', + help='obfuscate file names; will not send file names to api') + parser.add_argument('--exclude', dest='exclude', action='append', + help='filename patterns to exclude from logging; POSIX regex '+ + 'syntax; can be used more than once') + parser.add_argument('--include', dest='include', action='append', + help='filename patterns to log; when used in combination with '+ + '--exclude, files matching include will still be logged; '+ + 'POSIX regex syntax; can be used more than once') + parser.add_argument('--ignore', dest='ignore', action='append', + help=argparse.SUPPRESS) + parser.add_argument('--extra-heartbeats', dest='extra_heartbeats', + action='store_true', + help='reads extra heartbeats from STDIN as a JSON array until EOF') + parser.add_argument('--logfile', dest='logfile', + help='defaults to ~/.wakatime.log') + parser.add_argument('--apiurl', dest='api_url', + help='heartbeats api url; for debugging with a local server') + parser.add_argument('--timeout', dest='timeout', type=int, + help='number of seconds to wait when sending heartbeats to api; '+ + 'defaults to 60 seconds') + parser.add_argument('--config', dest='config', + help='defaults to ~/.wakatime.cfg') + parser.add_argument('--verbose', dest='verbose', action='store_true', + help='turns on debug messages in log file') + parser.add_argument('--version', action='version', version=__version__) + + # parse command line arguments + args = parser.parse_args() + + # use current unix epoch timestamp by default + if not args.timestamp: + args.timestamp = time.time() + + # parse ~/.wakatime.cfg file + configs = parseConfigFile(args.config) + if configs is None: + return args, configs + + # update args from configs + if not args.hostname: + if configs.has_option('settings', 'hostname'): + args.hostname = configs.get('settings', 'hostname') + if not args.key: + default_key = None + if configs.has_option('settings', 'api_key'): + default_key = configs.get('settings', 'api_key') + elif configs.has_option('settings', 'apikey'): + default_key = configs.get('settings', 'apikey') + if default_key: + args.key = default_key + else: + try: + parser.error('Missing api key. Find your api key from wakatime.com/settings.') + except SystemExit: + raise SystemExit(AUTH_ERROR) + + is_valid = not not re.match(r'^[a-f0-9]{8}-[a-f0-9]{4}-4[a-f0-9]{3}-[89ab][a-f0-9]{3}-[a-f0-9]{12}$', args.key, re.I) + if not is_valid: + try: + parser.error('Invalid api key. Find your api key from wakatime.com/settings.') + except SystemExit: + raise SystemExit(AUTH_ERROR) + + if not args.entity: + if args.file: + args.entity = args.file + else: + parser.error('argument --entity is required') + if not args.exclude: + args.exclude = [] + if configs.has_option('settings', 'ignore'): + try: + for pattern in configs.get('settings', 'ignore').split("\n"): + if pattern.strip() != '': + args.exclude.append(pattern) + except TypeError: # pragma: nocover + pass + if configs.has_option('settings', 'exclude'): + try: + for pattern in configs.get('settings', 'exclude').split("\n"): + if pattern.strip() != '': + args.exclude.append(pattern) + except TypeError: # pragma: nocover + pass + if not args.include: + args.include = [] + if configs.has_option('settings', 'include'): + try: + for pattern in configs.get('settings', 'include').split("\n"): + if pattern.strip() != '': + args.include.append(pattern) + except TypeError: # pragma: nocover + pass + if args.hidefilenames: + args.hidefilenames = ['.*'] + else: + args.hidefilenames = [] + if configs.has_option('settings', 'hidefilenames'): + option = configs.get('settings', 'hidefilenames') + if option.strip().lower() == 'true': + args.hidefilenames = ['.*'] + elif option.strip().lower() != 'false': + for pattern in option.split("\n"): + if pattern.strip() != '': + args.hidefilenames.append(pattern) + if args.offline and configs.has_option('settings', 'offline'): + args.offline = configs.getboolean('settings', 'offline') + if not args.proxy and configs.has_option('settings', 'proxy'): + args.proxy = configs.get('settings', 'proxy') + if args.proxy: + pattern = r'^((https?|socks5)://)?([^:@]+(:([^:@])+)?@)?[^:]+(:\d+)?$' + if '\\' in args.proxy: + pattern = r'^.*\\.+$' + is_valid = not not re.match(pattern, args.proxy, re.I) + if not is_valid: + parser.error('Invalid proxy. Must be in format ' + + 'https://user:pass@host:port or ' + + 'socks5://user:pass@host:port or ' + + 'domain\\user:pass.') + if not args.verbose and configs.has_option('settings', 'verbose'): + args.verbose = configs.getboolean('settings', 'verbose') + if not args.verbose and configs.has_option('settings', 'debug'): + args.verbose = configs.getboolean('settings', 'debug') + if not args.logfile and configs.has_option('settings', 'logfile'): + args.logfile = configs.get('settings', 'logfile') + if not args.logfile and os.environ.get('WAKATIME_HOME'): + home = os.environ.get('WAKATIME_HOME') + args.logfile = os.path.join(os.path.expanduser(home), '.wakatime.log') + if not args.api_url and configs.has_option('settings', 'api_url'): + args.api_url = configs.get('settings', 'api_url') + if not args.timeout and configs.has_option('settings', 'timeout'): + try: + args.timeout = int(configs.get('settings', 'timeout')) + except ValueError: + print(traceback.format_exc()) + + return args, configs diff --git a/wakatime/configs.py b/wakatime/configs.py new file mode 100644 index 0000000..a7eb3b4 --- /dev/null +++ b/wakatime/configs.py @@ -0,0 +1,55 @@ +# -*- coding: utf-8 -*- +""" + wakatime.configs + ~~~~~~~~~~~~~~~~ + + Config file parser. + + :copyright: (c) 2016 Alan Hamlett. + :license: BSD, see LICENSE for more details. +""" + + +from __future__ import print_function + +import os +import sys +import traceback + +from .compat import u, open +from .constants import CONFIG_FILE_PARSE_ERROR + + +try: + import ConfigParser as configparser +except ImportError: # pragma: nocover + import configparser + + +def parseConfigFile(configFile=None): + """Returns a configparser.SafeConfigParser instance with configs + read from the config file. Default location of the config file is + at ~/.wakatime.cfg. + """ + + # get config file location from ENV + home = os.environ.get('WAKATIME_HOME') + if not configFile and home: + configFile = os.path.join(os.path.expanduser(home), '.wakatime.cfg') + + # use default config file location + if not configFile: + configFile = os.path.join(os.path.expanduser('~'), '.wakatime.cfg') + + configs = configparser.SafeConfigParser() + try: + with open(configFile, 'r', encoding='utf-8') as fh: + try: + configs.readfp(fh) + except configparser.Error: + print(traceback.format_exc()) + return None + except IOError: + sys.stderr.write(u("Error: Could not read from config file {0}\n").format(u(configFile))) + raise SystemExit(CONFIG_FILE_PARSE_ERROR) + return configs diff --git a/wakatime/main.py b/wakatime/main.py index 706e9e9..0f7a035 100644 --- a/wakatime/main.py +++ b/wakatime/main.py @@ -14,23 +14,18 @@ from __future__ import print_function import base64 import logging import os -import platform import re import sys -import time import traceback import socket -try: - import ConfigParser as configparser -except ImportError: # pragma: nocover - import configparser pwd = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, os.path.dirname(pwd)) sys.path.insert(0, os.path.join(pwd, 'packages')) from .__about__ import __version__ -from .compat import u, open, is_py3 +from .arguments import parseArguments +from .compat import u, is_py3 from .constants import ( API_ERROR, AUTH_ERROR, @@ -41,12 +36,12 @@ from .constants import ( ) from .logger import setup_logging from .offlinequeue import Queue -from .packages import argparse from .packages import requests from .packages.requests.exceptions import RequestException from .project import get_project_info from .session_cache import SessionCache from .stats import get_file_stats +from .utils import get_user_agent, should_exclude, format_file_path try: from .packages import simplejson as json # pragma: nocover except (ImportError, SyntaxError): # pragma: nocover @@ -57,287 +52,6 @@ from .packages import tzlocal log = logging.getLogger('WakaTime') -class FileAction(argparse.Action): - - def __call__(self, parser, namespace, values, option_string=None): - try: - if os.path.isfile(values): - values = os.path.realpath(values) - except: # pragma: nocover - pass - setattr(namespace, self.dest, values) - - -def parseConfigFile(configFile=None): - """Returns a configparser.SafeConfigParser instance with configs - read from the config file. Default location of the config file is - at ~/.wakatime.cfg. - """ - - # get config file location from ENV - home = os.environ.get('WAKATIME_HOME') - if not configFile and home: - configFile = os.path.join(os.path.expanduser(home), '.wakatime.cfg') - - # use default config file location - if not configFile: - configFile = os.path.join(os.path.expanduser('~'), '.wakatime.cfg') - - configs = configparser.SafeConfigParser() - try: - with open(configFile, 'r', encoding='utf-8') as fh: - try: - configs.readfp(fh) - except configparser.Error: - print(traceback.format_exc()) - return None - except IOError: - sys.stderr.write(u("Error: Could not read from config file {0}\n").format(u(configFile))) - raise SystemExit(CONFIG_FILE_PARSE_ERROR) - return configs - - -def parseArguments(): - """Parse command line arguments and configs from ~/.wakatime.cfg. - Command line arguments take precedence over config file settings. - Returns instances of ArgumentParser and SafeConfigParser. - """ - - # define supported command line arguments - parser = argparse.ArgumentParser( - description='Common interface for the WakaTime api.') - parser.add_argument('--entity', dest='entity', metavar='FILE', - action=FileAction, - help='absolute path to file for the heartbeat; can also be a '+ - 'url, domain, or app when --entity-type is not file') - parser.add_argument('--file', dest='file', action=FileAction, - help=argparse.SUPPRESS) - parser.add_argument('--key', dest='key', - help='your wakatime api key; uses api_key from '+ - '~/.wakatime.cfg by default') - parser.add_argument('--write', dest='is_write', - action='store_true', - help='when set, tells api this heartbeat was triggered from '+ - 'writing to a file') - parser.add_argument('--plugin', dest='plugin', - help='optional text editor plugin name and version '+ - 'for User-Agent header') - parser.add_argument('--time', dest='timestamp', metavar='time', - type=float, - help='optional floating-point unix epoch timestamp; '+ - 'uses current time by default') - parser.add_argument('--lineno', dest='lineno', - help='optional line number; current line being edited') - parser.add_argument('--cursorpos', dest='cursorpos', - help='optional cursor position in the current file') - parser.add_argument('--entity-type', dest='entity_type', - help='entity type for this heartbeat. can be one of "file", '+ - '"domain", or "app"; defaults to file.') - parser.add_argument('--proxy', dest='proxy', - help='optional proxy configuration. Supports HTTPS '+ - 'and SOCKS proxies. For example: '+ - 'https://user:pass@host:port or '+ - 'socks5://user:pass@host:port or ' + - 'domain\\user:pass') - parser.add_argument('--project', dest='project', - help='optional project name') - parser.add_argument('--alternate-project', dest='alternate_project', - help='optional alternate project name; auto-discovered project '+ - 'takes priority') - parser.add_argument('--alternate-language', dest='alternate_language', - help='optional alternate language name; auto-detected language'+ - 'takes priority') - parser.add_argument('--hostname', dest='hostname', help='hostname of '+ - 'current machine.') - parser.add_argument('--disableoffline', dest='offline', - action='store_false', - help='disables offline time logging instead of queuing logged time') - parser.add_argument('--hidefilenames', dest='hidefilenames', - action='store_true', - help='obfuscate file names; will not send file names to api') - parser.add_argument('--exclude', dest='exclude', action='append', - help='filename patterns to exclude from logging; POSIX regex '+ - 'syntax; can be used more than once') - parser.add_argument('--include', dest='include', action='append', - help='filename patterns to log; when used in combination with '+ - '--exclude, files matching include will still be logged; '+ - 'POSIX regex syntax; can be used more than once') - parser.add_argument('--ignore', dest='ignore', action='append', - help=argparse.SUPPRESS) - parser.add_argument('--extra-heartbeats', dest='extra_heartbeats', - action='store_true', - help='reads extra heartbeats from STDIN as a JSON array until EOF') - parser.add_argument('--logfile', dest='logfile', - help='defaults to ~/.wakatime.log') - parser.add_argument('--apiurl', dest='api_url', - help='heartbeats api url; for debugging with a local server') - parser.add_argument('--timeout', dest='timeout', type=int, - help='number of seconds to wait when sending heartbeats to api; '+ - 'defaults to 60 seconds') - parser.add_argument('--config', dest='config', - help='defaults to ~/.wakatime.cfg') - parser.add_argument('--verbose', dest='verbose', action='store_true', - help='turns on debug messages in log file') - parser.add_argument('--version', action='version', version=__version__) - - # parse command line arguments - args = parser.parse_args() - - # use current unix epoch timestamp by default - if not args.timestamp: - args.timestamp = time.time() - - # parse ~/.wakatime.cfg file - configs = parseConfigFile(args.config) - if configs is None: - return args, configs - - # update args from configs - if not args.hostname: - if configs.has_option('settings', 'hostname'): - args.hostname = configs.get('settings', 'hostname') - if not args.key: - default_key = None - if configs.has_option('settings', 'api_key'): - default_key = configs.get('settings', 'api_key') - elif configs.has_option('settings', 'apikey'): - default_key = configs.get('settings', 'apikey') - if default_key: - args.key = default_key - else: - try: - parser.error('Missing api key. Find your api key from wakatime.com/settings.') - except SystemExit: - raise SystemExit(AUTH_ERROR) - - is_valid = not not re.match(r'^[a-f0-9]{8}-[a-f0-9]{4}-4[a-f0-9]{3}-[89ab][a-f0-9]{3}-[a-f0-9]{12}$', args.key, re.I) - if not is_valid: - try: - parser.error('Invalid api key. Find your api key from wakatime.com/settings.') - except SystemExit: - raise SystemExit(AUTH_ERROR) - - if not args.entity: - if args.file: - args.entity = args.file - else: - parser.error('argument --entity is required') - if not args.exclude: - args.exclude = [] - if configs.has_option('settings', 'ignore'): - try: - for pattern in configs.get('settings', 'ignore').split("\n"): - if pattern.strip() != '': - args.exclude.append(pattern) - except TypeError: # pragma: nocover - pass - if configs.has_option('settings', 'exclude'): - try: - for pattern in configs.get('settings', 'exclude').split("\n"): - if pattern.strip() != '': - args.exclude.append(pattern) - except TypeError: # pragma: nocover - pass - if not args.include: - args.include = [] - if configs.has_option('settings', 'include'): - try: - for pattern in configs.get('settings', 'include').split("\n"): - if pattern.strip() != '': - args.include.append(pattern) - except TypeError: # pragma: nocover - pass - if args.hidefilenames: - args.hidefilenames = ['.*'] - else: - args.hidefilenames = [] - if configs.has_option('settings', 'hidefilenames'): - option = configs.get('settings', 'hidefilenames') - if option.strip().lower() == 'true': - args.hidefilenames = ['.*'] - elif option.strip().lower() != 'false': - for pattern in option.split("\n"): - if pattern.strip() != '': - args.hidefilenames.append(pattern) - if args.offline and configs.has_option('settings', 'offline'): - args.offline = configs.getboolean('settings', 'offline') - if not args.proxy and configs.has_option('settings', 'proxy'): - args.proxy = configs.get('settings', 'proxy') - if args.proxy: - pattern = r'^((https?|socks5)://)?([^:@]+(:([^:@])+)?@)?[^:]+(:\d+)?$' - if '\\' in args.proxy: - pattern = r'^.*\\.+$' - is_valid = not not re.match(pattern, args.proxy, re.I) - if not is_valid: - parser.error('Invalid proxy. Must be in format ' + - 'https://user:pass@host:port or ' + - 'socks5://user:pass@host:port or ' + - 'domain\\user:pass.') - if not args.verbose and configs.has_option('settings', 'verbose'): - args.verbose = configs.getboolean('settings', 'verbose') - if not args.verbose and configs.has_option('settings', 'debug'): - args.verbose = configs.getboolean('settings', 'debug') - if not args.logfile and configs.has_option('settings', 'logfile'): - args.logfile = configs.get('settings', 'logfile') - if not args.logfile and os.environ.get('WAKATIME_HOME'): - home = os.environ.get('WAKATIME_HOME') - args.logfile = os.path.join(os.path.expanduser(home), '.wakatime.log') - if not args.api_url and configs.has_option('settings', 'api_url'): - args.api_url = configs.get('settings', 'api_url') - if not args.timeout and configs.has_option('settings', 'timeout'): - try: - args.timeout = int(configs.get('settings', 'timeout')) - except ValueError: - print(traceback.format_exc()) - - return args, configs - - -def should_exclude(entity, include, exclude): - if entity is not None and entity.strip() != '': - for pattern in include: - try: - compiled = re.compile(pattern, re.IGNORECASE) - if compiled.search(entity): - return False - except re.error as ex: - log.warning(u('Regex error ({msg}) for include pattern: {pattern}').format( - msg=u(ex), - pattern=u(pattern), - )) - for pattern in exclude: - try: - compiled = re.compile(pattern, re.IGNORECASE) - if compiled.search(entity): - return pattern - except re.error as ex: - log.warning(u('Regex error ({msg}) for exclude pattern: {pattern}').format( - msg=u(ex), - pattern=u(pattern), - )) - return False - - -def get_user_agent(plugin): - ver = sys.version_info - python_version = '%d.%d.%d.%s.%d' % (ver[0], ver[1], ver[2], ver[3], ver[4]) - user_agent = u('wakatime/{ver} ({platform}) Python{py_ver}').format( - ver=u(__version__), - platform=u(platform.platform()), - py_ver=python_version, - ) - if plugin: - user_agent = u('{user_agent} {plugin}').format( - user_agent=user_agent, - plugin=u(plugin), - ) - else: - user_agent = u('{user_agent} Unknown/0').format( - user_agent=user_agent, - ) - return user_agent - - def send_heartbeat(project=None, branch=None, hostname=None, stats={}, key=None, entity=None, timestamp=None, is_write=None, plugin=None, offline=None, entity_type='file', hidefilenames=None, @@ -527,17 +241,6 @@ def sync_offline_heartbeats(args, hostname): return SUCCESS -def format_file_path(filepath): - """Formats a path as absolute and with the correct platform separator.""" - - try: - filepath = os.path.realpath(os.path.abspath(filepath)) - filepath = re.sub(r'[/\\]', os.path.sep, filepath) - except: # pragma: nocover - pass - return filepath - - def process_heartbeat(args, configs, hostname, heartbeat): exclude = should_exclude(heartbeat['entity'], args.include, args.exclude) if exclude is not False: diff --git a/wakatime/utils.py b/wakatime/utils.py new file mode 100644 index 0000000..5ae657a --- /dev/null +++ b/wakatime/utils.py @@ -0,0 +1,79 @@ +# -*- coding: utf-8 -*- +""" + wakatime.utils + ~~~~~~~~~~~~~~ + + Utility functions. + + :copyright: (c) 2016 Alan Hamlett. + :license: BSD, see LICENSE for more details. +""" + + +import platform +import logging +import os +import re +import sys + +from .__about__ import __version__ +from .compat import u + + +log = logging.getLogger('WakaTime') + + +def should_exclude(entity, include, exclude): + if entity is not None and entity.strip() != '': + for pattern in include: + try: + compiled = re.compile(pattern, re.IGNORECASE) + if compiled.search(entity): + return False + except re.error as ex: + log.warning(u('Regex error ({msg}) for include pattern: {pattern}').format( + msg=u(ex), + pattern=u(pattern), + )) + for pattern in exclude: + try: + compiled = re.compile(pattern, re.IGNORECASE) + if compiled.search(entity): + return pattern + except re.error as ex: + log.warning(u('Regex error ({msg}) for exclude pattern: {pattern}').format( + msg=u(ex), + pattern=u(pattern), + )) + return False + + +def get_user_agent(plugin): + ver = sys.version_info + python_version = '%d.%d.%d.%s.%d' % (ver[0], ver[1], ver[2], ver[3], ver[4]) + user_agent = u('wakatime/{ver} ({platform}) Python{py_ver}').format( + ver=u(__version__), + platform=u(platform.platform()), + py_ver=python_version, + ) + if plugin: + user_agent = u('{user_agent} {plugin}').format( + user_agent=user_agent, + plugin=u(plugin), + ) + else: + user_agent = u('{user_agent} Unknown/0').format( + user_agent=user_agent, + ) + return user_agent + + +def format_file_path(filepath): + """Formats a path as absolute and with the correct platform separator.""" + + try: + filepath = os.path.realpath(os.path.abspath(filepath)) + filepath = re.sub(r'[/\\]', os.path.sep, filepath) + except: # pragma: nocover + pass + return filepath