detect Windows UNC prefix for remote network drives

This commit is contained in:
Alan Hamlett 2018-10-03 00:28:16 -07:00
parent eef70499e7
commit 6b678942f0
6 changed files with 136 additions and 9 deletions

7
tests/samples/netuse/v1 Normal file
View file

@ -0,0 +1,7 @@
New connections will be remembered.
Status Local Remote Network
-------------------------------------------------------------------------------
C: \\vboxsrv\Projects VirtualBox Shared Folders
The command completed successfully.

8
tests/samples/netuse/v2 Normal file
View file

@ -0,0 +1,8 @@
New connections will be remembered.
Status Local Remote Network
-------------------------------------------------------------------------------
C: \\vboxsrv\Projects VirtualBox Shared Folders
OK D: \\192.0.0.1\work VirtualBox Shared Folders
The command completed successfully.

View file

@ -6,7 +6,7 @@ from wakatime.heartbeat import Heartbeat
import os import os
import logging import logging
from testfixtures import log_capture from testfixtures import log_capture
from .utils import TestCase from .utils import DynamicIterable, TestCase, mock
class HeartbeatTestCase(TestCase): class HeartbeatTestCase(TestCase):
@ -116,3 +116,31 @@ class HeartbeatTestCase(TestCase):
self.assertNothingPrinted() self.assertNothingPrinted()
self.assertNothingLogged(logs) self.assertNothingLogged(logs)
def test_parsing(self):
class Args(object):
hide_file_names = ['.*']
hide_project_names = []
plugin = None
samples = [
('v1', 'C:\\v1\\file.txt', '\\\\vboxsrv\\Projects\\v1\\file.txt'),
('v2', 'D:\\stuff\\v2\\file.py', '\\\\192.0.0.1\\work\\stuff\\v2\\file.py'),
]
for sample, filepath, expected in samples:
with mock.patch('wakatime.heartbeat.Popen') as mock_popen:
class MockCommunicate(object):
pass
stdout = open('tests/samples/netuse/' + sample).read()
mock_communicate = MockCommunicate()
mock_communicate.communicate = mock.MagicMock(return_value=DynamicIterable((stdout, ''), max_calls=1))
mock_popen.return_value = mock_communicate
heartbeat = Heartbeat({'user_agent': 'test'}, Args(), None, _clone=True)
result = heartbeat._to_unc_path(filepath)
self.assertEquals(expected, result)
self.assertNothingPrinted()

View file

@ -11,6 +11,7 @@
import codecs import codecs
import os
import platform import platform
import subprocess import subprocess
import sys import sys
@ -115,4 +116,7 @@ class Popen(subprocess.Popen):
except AttributeError: except AttributeError:
pass pass
kwargs['startupinfo'] = startupinfo kwargs['startupinfo'] = startupinfo
super(Popen, self).__init__(*args, **kwargs) if 'env' not in kwargs:
kwargs['env'] = os.environ.copy()
kwargs['env']['LANG'] = 'en-US' if is_win else 'en_US.UTF-8'
subprocess.Popen.__init__(self, *args, **kwargs)

View file

@ -10,8 +10,9 @@
import os import os
import logging import logging
import re import re
from subprocess import PIPE
from .compat import u, json from .compat import u, json, is_win, Popen
from .exceptions import SkipHeartbeat from .exceptions import SkipHeartbeat
from .project import get_project_info from .project import get_project_info
from .stats import get_file_stats from .stats import get_file_stats
@ -85,6 +86,7 @@ class Heartbeat(object):
return return
if self.type == 'file': if self.type == 'file':
self.entity = format_file_path(self.entity) self.entity = format_file_path(self.entity)
self._format_local_file()
if not self._file_exists(): if not self._file_exists():
self.skip = u('File does not exist; ignoring this heartbeat.') self.skip = u('File does not exist; ignoring this heartbeat.')
return return
@ -236,6 +238,84 @@ class Heartbeat(object):
return (self.entity and os.path.isfile(self.entity) or return (self.entity and os.path.isfile(self.entity) or
self.args.local_file and os.path.isfile(self.args.local_file)) self.args.local_file and os.path.isfile(self.args.local_file))
def _format_local_file(self):
"""When args.local_file empty on Windows, tries to map args.entity to a
unc path.
Updates args.local_file in-place without returning anything.
"""
if self.type != 'file':
return
if not is_win:
return
if self._file_exists():
return
self.args.local_file = self._to_unc_path(self.entity)
def _to_unc_path(self, filepath):
drive, rest = self._splitdrive(filepath)
if not drive:
return filepath
stdout = None
try:
stdout, stderr = Popen(['net', 'use'], stdout=PIPE, stderr=PIPE).communicate()
except OSError:
pass
else:
if stdout:
cols = None
for line in stdout.strip().splitlines()[1:]:
line = u(line)
if not line.strip():
continue
if not cols:
cols = self._unc_columns(line)
continue
start, end = cols.get('local', (0, 0))
if not start and not end:
break
local = line[start:end].strip().split(':')[0].upper()
if not local.isalpha():
continue
if local == drive:
start, end = cols.get('remote', (0, 0))
if not start and not end:
break
remote = line[start:end].strip()
return remote + rest
return filepath
def _unc_columns(self, line):
cols = {}
current_col = u('')
newcol = False
start, end = 0, 0
for char in line:
if char.isalpha():
if newcol:
cols[current_col.strip().lower()] = (start, end)
current_col = u('')
start = end
newcol = False
current_col += u(char)
else:
newcol = True
end += 1
if start != end and current_col:
cols[current_col.strip().lower()] = (start, -1)
return cols
def _splitdrive(self, filepath):
if filepath[1:2] != ':' or not filepath[0].isalpha():
return None, filepath
return filepath[0].upper(), filepath[2:]
def _excluded_by_pattern(self): def _excluded_by_pattern(self):
return should_exclude(self.entity, self.args.include, self.args.exclude) return should_exclude(self.entity, self.args.include, self.args.exclude)

View file

@ -70,17 +70,17 @@ class Subversion(BaseProject):
if not self._is_mac() or self._has_xcode_tools(): if not self._is_mac() or self._has_xcode_tools():
stdout = None stdout = None
try: try:
os.environ['LANG'] = 'en_US' stdout, stderr = Popen(
stdout, stderr = Popen([ [self._find_binary(), 'info', os.path.realpath(path)],
self._find_binary(), 'info', os.path.realpath(path) stdout=PIPE,
], stdout=PIPE, stderr=PIPE).communicate() stderr=PIPE,
).communicate()
except OSError: except OSError:
pass pass
else: else:
if stdout: if stdout:
for line in stdout.splitlines(): for line in stdout.splitlines():
line = u(line) line = u(line).split(': ', 1)
line = line.split(': ', 1)
if len(line) == 2: if len(line) == 2:
info[line[0]] = line[1] info[line[0]] = line[1]
return info return info