"""
davies.pockettopo: Module for parsing and working with exported PocketTopo survey data
"""
from __future__ import division
from __future__ import print_function
import re
import codecs
import logging
from datetime import datetime
from collections import OrderedDict, defaultdict
log = logging.getLogger(__name__)
__all__ = 'TxtFile', 'Survey', 'MergingSurvey', 'Shot', 'PocketTopoTxtParser'
# TODO: properly handle zero-length shots with both from/to (station equivalence)
# TODO: older versions didn't specify units?
[docs]class Shot(OrderedDict):
"""
Representation of a single shot in a PocketTopo Survey.
:kwarg FROM: (str) from station
:kwarg TO: (str) optional to station
:kwarg LENGTH: (float) distance
:kwarg AZM: (float) compass
:kwarg INC: (float) inclination
:kwarg COMMENT: (str)
:kwarg declination: (float) optional
:ivar declination: (float) set or get the applied magnetic declination for the shot
"""
def __init__(self, *args, **kwargs):
self.declination = kwargs.pop('declination', 0.0)
OrderedDict.__init__(self, *args, **kwargs)
self.dupe_count = 1 # denotes averaged backsights (2) and triple-shots (3)
@property
def azm(self):
"""Corrected azimuth, taking into account declination."""
return self.get('AZM', -0.0) + self.declination
@property
def inc(self):
"""Corrected inclination."""
return self.get('INC', -0.0)
@property
def length(self):
"""Corrected distance."""
return self.get('LENGTH', -0.0)
@property
def is_splay(self):
"""Is this shot a "splay shot"?"""
return self.get('TO', None) in (None, '')
def __str__(self):
return ', '.join('%s=%s' % (k,v) for (k,v) in self.items())
def __repr__(self):
return '%s(%s)' % (self.__class__.__name__, self)
[docs]class Survey(object):
"""
Representation of a PocketTopo Survey object. A Survey is a container for :class:`Shot` objects.
"""
def __init__(self, name=None, date=None, comment=None, declination=0.0, cave_name=None, length_units='m', angle_units=360, shots=None):
self.name = name
self.date = date
self.comment = comment
self.declination = declination
self.cave_name = cave_name
self.length_units = length_units
self.angle_units = angle_units
self.shots = []
self.splays = defaultdict(list)
if shots:
[self.add_shot(shot) for shot in shots]
[docs] def add_shot(self, shot):
"""Add a Shot to :attr:`shots`, applying our survey's :attr:`declination` to it."""
shot.declination = self.declination
if shot.is_splay:
self.splays[shot['FROM']].append(shot)
self.shots.append(shot)
@property
def length(self):
"""Total surveyed cave length, not including splays."""
return sum([shot.length for shot in self.shots if not shot.is_splay])
@property
def total_length(self):
"""Total surveyed length including splays."""
return sum([shot.length for shot in self.shots])
def __len__(self):
return len(self.shots)
def __iter__(self):
for shot in self.shots:
yield shot
def __contains__(self, item):
for shot in self.shots:
if item in (shot.get('FROM', None), shot.get('TO', None)):
return True
return False
def __str__(self):
return self.name
def __repr__(self):
return '%s(%s)' % (self.__class__.__name__, self.name)
# def _serialize(self):
# return []
[docs]class MergingSurvey(Survey):
"""
Representation of a PocketTopo Survey object. A Survey is a container for :class:`Shot` objects.
This Survey implementation merges "duplicate" shots into a single averaged shot.
PocketTopo (and DistoX) convention is to use triple forward shots for mainline survey. When
adding a new shot to this class with `add_shot()`, if we detect that the previous shot was
between the same two stations, we average values and merge the two together instead of appending
the duplicate shot. We use a "running" mean algorithm, so that this feature works for any number
of subsequent duplicate shots (two, three, four...).
"""
# For performance, we only look backwards at the immediately preceding shots!
def _inverse_azm(self, azm):
"""Convert forward AZM to back AZM and vice versa"""
return (azm + self.angle_units/2) % self.angle_units
def _inverse_inc(self, inc):
"""Convert forward INC to back INC and vice versa"""
return -1 * inc
[docs] def add_shot(self, shot):
"""
Add a shot dictionary to :attr:`shots`, applying our survey's :attr:`declination`, and
optionally averaging and merging with duplicate previous shot.
"""
if not self.shots or not shot.get('TO', None) or not self.shots[-1].get('TO', None):
return super(MergingSurvey, self).add_shot(shot)
from_, to = shot['FROM'], shot['TO']
prev_shot = self.shots[-1]
prev_from, prev_to = prev_shot['FROM'], prev_shot['TO']
if from_ == prev_from and to == prev_to:
# dupe shot! calculate iterative "running" mean and merge into the previous shot
total_count = prev_shot.dupe_count + 1
log.debug('Merging %d shots "%s" <- "%s"', total_count, prev_shot, shot)
if abs(shot['AZM'] - prev_shot['AZM']) > 2.0:
log.warning('Merged forward AZM disagreement of %0.1f for "%s" <- "%s"', abs(shot['AZM'] - prev_shot['AZM']), prev_shot, shot)
if abs(shot['INC'] - prev_shot['INC']) > 2.0:
log.warning('Merged forward INC disagreement of %0.1f for "%s" <- "%s"', abs(shot['INC'] - prev_shot['INC']), prev_shot, shot)
if abs(shot['LENGTH'] - prev_shot['LENGTH']) > 1.0:
log.warning('Merged forward LENGTH disagreement of %0.1f for "%s" <- "%s"', abs(shot['LENGTH'] - prev_shot['LENGTH']), prev_shot, shot)
avg_length = (prev_shot['LENGTH'] * prev_shot.dupe_count + shot['LENGTH']) / total_count
avg_azm = (prev_shot['AZM'] * prev_shot.dupe_count + shot['AZM']) / total_count
avg_inc = (prev_shot['INC'] * prev_shot.dupe_count + shot['INC']) / total_count
merged_comments = ('%s %s' % (prev_shot.get('COMMENT', '') or '', shot.get('COMMENT', '') or '')).strip() or None
prev_shot['LENGTH'], prev_shot['AZM'], prev_shot['INC'], prev_shot['COMMENT'] = avg_length, avg_azm, avg_inc, merged_comments
prev_shot.dupe_count += 1
elif from_ == prev_to and to == prev_from:
# backsight! we do the same iterative "running" mean rather than assuming a single forward and single back
total_count = prev_shot.dupe_count + 1
inv_azm, inv_inc = self._inverse_azm(shot['AZM']), self._inverse_inc(shot['INC'])
log.debug('Merging %d backsights "%s" <- "%s"', total_count, prev_shot, shot)
if abs(inv_azm - prev_shot['AZM']) > 2.0:
log.warning('Backsight AZM disagreement of %0.1f for "%s" <- "%s"', abs(inv_azm - prev_shot['AZM']), prev_shot, shot)
if abs(inv_inc - prev_shot['INC']) > 2.0:
log.warning('Backsight INC disagreement of %0.1f for "%s" <- "%s"', abs(inv_inc - prev_shot['INC']), prev_shot, shot)
if abs(shot['LENGTH'] - prev_shot['LENGTH']) > 1.0:
log.warning('Backsight LENGTH disagreement of %0.1f for "%s" <- "%s"', abs(shot['LENGTH'] - prev_shot['LENGTH']), prev_shot, shot)
avg_length = (prev_shot['LENGTH'] * prev_shot.dupe_count + shot['LENGTH']) / total_count
avg_azm = (prev_shot['AZM'] * prev_shot.dupe_count + inv_azm) / total_count
avg_inc = (prev_shot['INC'] * prev_shot.dupe_count + inv_inc) / total_count
merged_comments = ('%s %s' % (prev_shot.get('COMMENT', '') or '', shot.get('COMMENT', '') or '')).strip() or None
prev_shot['LENGTH'], prev_shot['AZM'], prev_shot['INC'], prev_shot['COMMENT'] = avg_length, avg_azm, avg_inc, merged_comments
prev_shot.dupe_count += 1
else:
# a new, different shot; no merge
return super(MergingSurvey, self).add_shot(shot)
class UTMLocation(object):
"""
Represents a UTM-based coordinate for Reference Point.
Note that PocketTopo doesn't support UTM Zones.
:ivar easting: (float)
:ivar northing: (float)
:ivar elevation: (float) meters
:ivar comment: (str)
"""
def __init__(self, easting, northing, elevation=0.0, comment=None):
self.easting = easting
self.northing = northing
self.elevation = elevation
self.altitude = elevation # alias
self.comment = comment
@property
def __geo_interface__(self):
return {'type': 'Point', 'coordinates': (self.easting, self.northing, self.elevation)}
def __str__(self):
return "<UTM %0.1fE %0.1fN %0.1fm>" % (self.easting, self.northing, self.elevation)
[docs]class TxtFile(object):
"""
Representation of a PocketTopo .TXT File. A TxtFile is a container for :class:`Survey` objects.
:ivar name: (string) the TxtFile's "name"
:ivar length_units: (string) `m` (default) or `feet`
:ivar angle_units: (int) `360` for degrees (default) or `400` for grads
:ivar surveys: (list of :class:`Survey`)
:ivar reference_points: (dict of :class:`UTMLocation` by station)
"""
def __init__(self, name=None, length_units='m', angle_units=360):
self.name = name
if length_units not in ('m', 'feet'):
raise Exception('Length units must be either \'m\' for meters (default) or \'feet\' for feet')
self.length_units = length_units
if angle_units not in (360, '360', 400, '400'):
raise Exception('Angle units must be either `360` for degrees (default) or `400` for grads')
self.angle_units = int(angle_units)
self.surveys = []
self.reference_points = OrderedDict()
[docs] def add_survey(self, survey):
"""Add a :class:`Survey` to :attr:`surveys`."""
survey.length_units = self.length_units
survey.angle_units = self.angle_units
self.surveys.append(survey)
[docs] def add_reference_point(self, station, utm_location):
"""Add a :class:`UTMLocation` to :attr:`reference_points`."""
self.reference_points[station] = utm_location
@property
def length(self):
"""Total surveyed length."""
return sum([survey.length for survey in self.surveys])
def __len__(self):
return len(self.surveys)
def __iter__(self):
for survey in self.surveys:
yield survey
def __contains__(self, item):
for survey in self.surveys:
if item == survey.name or item == survey:
return True
return False
def __getitem__(self, item):
for survey in self.surveys:
if item == survey.name or item == survey:
return survey
raise KeyError(item)
@staticmethod
[docs] def read(fname, merge_duplicate_shots=False, encoding='windows-1252'):
"""Read a PocketTopo .TXT file and produce a `TxtFile` object which represents it"""
return PocketTopoTxtParser(fname, merge_duplicate_shots, encoding).parse()
# def write(self, outf):
# """Write a `Survey` to the specified .DAT file"""
# with codecs.open(outf, 'wb', 'windows-1252') as outf:
# for survey in self.surveys:
# outf.write('\r\n'.join(survey._serialize()))
[docs]class PocketTopoTxtParser(object):
"""Parses the PocketTopo .TXT file format"""
def __init__(self, txtfilename, merge_duplicate_shots=False, encoding='windows-1252'):
self.txtfilename = txtfilename
self.merge_duplicate_shots = merge_duplicate_shots
self.encoding = encoding
[docs] def parse(self):
"""Produce a `TxtFile` object from the .TXT file"""
log.debug('Parsing PocketTopo .TXT file %s ...', self.txtfilename)
SurveyClass = MergingSurvey if self.merge_duplicate_shots else Survey
txtobj = None
with codecs.open(self.txtfilename, 'rb', self.encoding) as txtfile:
lines = txtfile.read().splitlines()
# first line is cave name and units
first_line_re = re.compile(r'^([\w\s]*)\(([\w\s]*),([\w\s]*)')
first_line = lines.pop(0)
cave_name, length_units, angle_units = first_line_re.search(first_line).groups()
cave_name, angle_units = cave_name.strip(), int(angle_units)
txtobj = TxtFile(cave_name, length_units, angle_units)
while not lines[0]:
lines.pop(0) # skip blanks
# next block identifies surveys (trip) metadata
while lines[0].startswith('['):
toks = lines.pop(0).split(None, 3)
id, date, declination = toks[:3]
id = id.strip('[]:')
date = datetime.strptime(date, '%Y/%m/%d').date()
declination = float(declination)
comment = toks[3].strip('"') if len(toks) == 4 else ''
survey = SurveyClass(id, date, comment, declination, cave_name)
txtobj.add_survey(survey)
while not lines[0]:
lines.pop(0) # skip blanks
# finally actual survey data
while lines:
line = lines.pop(0).strip()
if not line:
continue
if '"' in line:
line, comment = line.split('"', 1)
comment = comment.rstrip('"')
else:
comment = None
if '[' not in line:
# this is either a Reference Point or a zero-length fake shot
toks = line.split()
if len(toks) != 4: # ??
log.debug('Skipping unrecognized shot: %s %s', line, '"%s"' % comment if comment else '')
continue
station, vals = toks[0], list(map(float, toks[1:]))
if vals[0] == 0.0: # fake shot
log.debug('Skipping zero-length shot: %s %s', line, '"%s"' % comment if comment else '')
else: # reference point
easting, northing, altitude = vals
reference_point = UTMLocation(easting, northing, altitude, comment)
log.debug('Reference point: %s', reference_point)
txtobj.add_reference_point(station, reference_point)
continue
line, survey_id = line.split('[')
survey_id = survey_id.rstrip().rstrip(']')
toks = line.split()
from_to, (length, azm, inc) = toks[:-3], (float(tok) for tok in toks[-3:])
if len(from_to) == 2:
from_, to = tuple(from_to) # shot
elif len(from_to) == 1:
from_, to = from_to[0], None # splay
elif not from_to and length == 0.0:
continue # skip junk zero-length placeholder shots
else:
raise Exception()
shot = Shot([('FROM',from_), ('TO',to), ('LENGTH',length), ('AZM',azm), ('INC',inc), ('COMMENT',comment)])
txtobj[survey_id].add_shot(shot)
return txtobj
if __name__ == '__main__':
import sys
logging.basicConfig(level=logging.DEBUG)
for fname in sys.argv[1:]:
txtfile = PocketTopoTxtParser(fname, merge_duplicate_shots=True).parse()
print('%s (%s, %d)' % (txtfile.name, txtfile.length_units, txtfile.angle_units))
for survey in txtfile:
print('\t', '[%s] %s (%0.1f %s)' % (survey.name, survey.comment, survey.length, txtfile.length_units))
for shot in survey:
print('\t\t', shot)