Source code for davies.compass

"""
davies.compass: Module for parsing and working with Compass source files
"""

import os.path
import logging
import datetime
import codecs
from collections import OrderedDict

log = logging.getLogger(__name__)

__all__ = 'Project', 'UTMLocation', 'UTMDatum', \
          'DatFile', 'Survey', 'Shot', 'Exclude', \
          'CompassProjectParser', 'CompassDatParser', 'ParseException'


# Compass OO Model


DEFAULT_HEADER = ['FROM', 'TO', 'LENGTH', 'BEARING', 'INC', 'LEFT', 'UP', 'DOWN', 'RIGHT', 'FLAGS', 'COMMENTS']
DEFAULT_HEADER_BACKSIGHTS = ['FROM', 'TO', 'LENGTH', 'BEARING', 'INC', 'LEFT', 'UP', 'DOWN', 'RIGHT', 'AZM2', 'INC2', 'FLAGS', 'COMMENTS']


[docs]class Exclude: """Shot flags""" LENGTH = 'L' TOTAL = 'X' CLOSURE = 'C' PLOT = 'P'
[docs]class Shot(OrderedDict): """ Representation of a single shot in a Compass Survey. See: `Compass Survey Data File Format <http://www.fountainware.com/compass/Documents/FileFormats/SurveyDataFormat.htm>`_ """ # FIXME: support compass, back-compass, and tape corrections
[docs] def __init__(self, *args, **kwargs): """ :kwarg FROM: (str) from station :kwarg TO: (str) to station :kwarg BEARING: (float) forward compass in **decimal degrees** :kwarg AZM2: (float) back compass in **decimal degrees** :kwarg INC: (float) forward inclination in **decimal degrees** :kwarg INC2: (float) back inclination in **decimal degrees** :kwarg LENGTH: (float) distance in **decimal feet** :kwarg FLAGS: (collection of :class:`Exclude`) shot exclusion flags :kwarg COMMENTS: (str) text comments, up to 80 characters long :kwarg declination: (float) magnetic declination in decimal degrees :ivar declination: (float) set or get magnetic declination adjustment """ self.declination = kwargs.pop('declination', 0.0) OrderedDict.__init__(self, *args, **kwargs)
@property def azm(self): """Corrected azimuth, taking into account backsight, declination, and compass corrections.""" azm1 = self.get('BEARING', None) azm2 = self.get('AZM2', None) if azm1 is None and azm2 is None: return None if azm2 is None: return azm1 + self.declination if azm1 is None: return (azm2 + 180) % 360 + self.declination return (azm1 + (azm2 + 180) % 360) / 2.0 + self.declination @property def inc(self): """Corrected inclination, taking into account backsight and clino corrections.""" inc1 = self.get('INC', None) inc2 = self.get('INC2', None) if inc1 is None and inc2 is None: return None if inc2 is None: return inc1 if inc1 is None: return -1 * inc2 return (inc1 - inc2) / 2.0 @property def flags(self): """Shot exclusion flags as a `set`""" return set(self.get('FLAGS', '')) # older data may not have FLAGS field @property def length(self): """Corrected distance, taking into account tape correction.""" return self.get('LENGTH', None) @property def is_included(self): return Exclude.LENGTH not in self.flags and Exclude.TOTAL not in self.flags @property def is_excluded(self): return Exclude.LENGTH in self.flags or Exclude.TOTAL in self.flags def __str__(self): return ', '.join('%s=%s' % (k,v) for (k,v) in self.items()) def __repr__(self): return '%s(%s)' % (self.__class__.__name__, self)
[docs]class Survey(object): """ Representation of a Compass Survey object. A Survey is a container for :class:`Shot` objects. See: `Compass Survey Data File Format <http://www.fountainware.com/compass/Documents/FileFormats/SurveyDataFormat.htm>`_ :ivar file_format: (str) format string which defines how Compass will view/edit underlying \ survey data; setting this property will in turn set all the other file \ format properties listed below; should be a string of 11 - 13 characters :ivar bearing_units: (chr) 'D' :ivar length_units: (chr) :ivar passage_units: (chr) :ivar inclination_units: (chr) :ivar passage_dimension_order: (list of chr) :ivar shot_item_order: (list of chr) :ivar backsight: (chr) :ivar lrud_association: (chr) """ def __init__(self, name='', date=None, comment='', team='', declination=0.0, file_format=None, corrections=(0.0,0.0,0.0), corrections2=(0.0,0.0), cave_name='', shot_header=(), shots=None): self.name = name self.date = date self.comment = comment self.team = team self.declination = declination self.corrections, self.corrections2 = corrections, corrections2 # TODO: instrument corrections not supported self.cave_name = cave_name self.shot_header = shot_header # FIXME: this ordering is not optional! self.shots = shots if shots else [] self.bearing_units = 'D' self.length_units = 'D' self.passage_units = 'D' self.inclination_units = 'D' self.passage_dimension_order = ['U','D','L','R'] self.shot_item_order = ['L', 'A', 'D'] self.backsight = 'N' self.lrud_association = 'F' if file_format: self.file_format = file_format @property def file_format(self): return '%s%s%s%s%s%s%s%s' % \ (self.bearing_units, self.length_units, self.passage_units, self.inclination_units, ''.join(self.passage_dimension_order), ''.join(self.shot_item_order), self.backsight, self.lrud_association) @file_format.setter def file_format(self, fmt): if len(fmt) < 11: raise ValueError(fmt) self.bearing_units = fmt[0] self.length_units = fmt[1] self.passage_units = fmt[2] self.inclination_units = fmt[3] self.passage_dimension_order = list(fmt[4:8]) if len(fmt) < 15: self.shot_item_order = list(fmt[8:11]) else: self.shot_item_order = list(fmt[8:13]) if len(fmt) < 12: self.backsight = 'N' elif len(fmt) > 12: self.backsight = fmt[-2] else: self.backsight = fmt[-1] if len(fmt) < 13: self.lrud_association = 'F' else: self.lrud_association = fmt[-1]
[docs] def add_shot(self, shot): """Add a shot dictionary to :attr:`shots`, applying this survey's magnetic declination""" shot.declination = self.declination self.shots.append(shot)
@property def length(self): """Total surveyed length, regardless of exclusion flags.""" return sum([shot.length for shot in self.shots]) @property def included_length(self): """Surveyed length, not including "excluded" shots""" return sum([shot.length for shot in self.shots if shot.is_included]) @property def excluded_length(self): """Surveyed length which does not count toward the included total""" return sum([shot.length for shot in self.shots if Exclude.LENGTH in shot.flags or Exclude.TOTAL in shot.flags]) @property def backsights_enabled(self): for shot in self: if 'AZM2' in shot or 'INC2' in shot: return True return False def __str__(self): return self.name def __repr__(self): return '<%s %s>' % (self.__class__.__name__, self.name) def __len__(self): return len(self.shots) def __iter__(self): for shot in self.shots: yield shot def __contains__(self, item): for shot in self.shots: if item in (shot.get('FROM', None), shot.get('TO', None)): return True return False def _serialize(self): date = self.date.strftime('%m %d %Y').lstrip('0') if self.date else '' if self.shot_header: header = self.shot_header else: header = DEFAULT_HEADER_BACKSIGHTS if self.backsights_enabled else DEFAULT_HEADER lines = [ self.cave_name, 'SURVEY NAME: %s' % self.name, 'SURVEY DATE: %s COMMENT:%s' % (date, self.comment), 'SURVEY TEAM:', ','.join(self.team) if self.team else '', 'DECLINATION: %7.2f FORMAT: %s CORRECTIONS: %s CORRECTIONS2: %s' % (self.declination, self.file_format, '%.2f %.2f %.2f' % tuple(self.corrections), '%.2f %.2f' % tuple(self.corrections2)), '', '\t'.join(header), '', ] for shot in self.shots: vals = [] for k in header: v = shot.get(k, None) if k in ('BEARING', 'INC', 'AZM2', 'INC2'): vals.append('%7.2f' % (v if v is not None else -999.0)) elif k == 'LENGTH': vals.append('%8.3f' % (v if v is not None else -999.0)) elif k in ('LEFT', 'RIGHT', 'UP', 'DOWN'): vals.append('%7.2f' % (v if v is not None and v != float('inf') else -9.90)) elif k in ('FROM', 'TO'): v = v or '' vals.append(v.rjust(6)) elif k in ('FLAGS', 'COMMENTS'): pass # handle them together below else: vals.append(str(v) if v is not None else '') if shot.get('FLAGS', ''): flags = '#|%s#' % ''.join(list(shot.get('FLAGS', ()))) vals.append('%s %s' % (flags, shot.get('COMMENTS', '') or '')) else: vals.append((shot.get('COMMENTS', '') or '')[:80]) lines.append('\t'.join(vals)) return lines
[docs]class DatFile(object): """ Representation of a Compass .DAT File. A DatFile is a container for :class:`Survey` objects. See: `Compass Survey Data File Format <http://www.fountainware.com/compass/Documents/FileFormats/SurveyDataFormat.htm>`_ :ivar name: (string) the DatFile's "name", not necessarily related to its filename :ivar filename: (string) underlying .DAT file's filename :ivar surveys: (list of :class:`Survey`) """ def __init__(self, name=None, filename=None): self.name = name self.filename = filename self.surveys = []
[docs] def add_survey(self, survey): """Add a :class:`Survey` to :attr:`surveys`.""" self.surveys.append(survey)
@property def length(self): """Total surveyed length.""" return sum([survey.length for survey in self.surveys]) @property def included_length(self): """Surveyed length, not including "excluded" shots""" return sum([survey.included_length for survey in self.surveys]) @property def excluded_length(self): """Surveyed length which does not count toward the included total""" return sum([survey.excluded_length for survey in self.surveys]) def __len__(self): return len(self.surveys) def __iter__(self): for survey in self.surveys: yield survey def __contains__(self, item): for survey in self.surveys: if item == survey.name or item == survey: return True return False def __getitem__(self, item): for survey in self.surveys: if item == survey.name or item == survey: return survey raise KeyError(item) @staticmethod
[docs] def read(fname): """Read a .DAT file and produce a `Survey`""" return CompassDatParser(fname).parse()
[docs] def write(self, outfname=None): """Write or overwrite a `Survey` to the specified .DAT file""" outfname = outfname or self.filename with codecs.open(outfname, 'wb', 'windows-1252') as outf: for survey in self.surveys: outf.write('\r\n'.join(survey._serialize())) outf.write('\r\n'+'\f'+'\r\n') # ASCII "form feed" ^L outf.write('\x1A') # ASCII "sub" ^Z marks EOF
[docs]class UTMDatum: """Enumeration of common geographic datums.""" NAD27 = 'North American 1927' NAD83 = 'North American 1983' WGS84 = 'WGS 1984'
[docs]class UTMLocation(object): """Represents a UTM-based coordinate for fixed stations.""" def __init__(self, easting, northing, elevation=0.0, zone=0, datum=None, convergence=0.0): self.easting = float(easting) self.northing = float(northing) self.elevation = float(elevation) self.zone = int(zone) self.convergence = float(convergence) self.datum = datum @property def __geo_interface__(self): return {'type': 'Point', 'coordinates': (self.easting, self.northing, self.elevation)} def __str__(self): return "<%s UTM Zone %s %0.1fE %0.1fN %0.1f>" % (self.datum, self.zone, self.easting, self.northing, self.elevation)
[docs]class Project(object): """ Representation of a Compass .MAK Project file. A Project is a container for :class:`DatFile` objects. See: `Compass Project File Format <http://www.fountainware.com/compass/Documents/FileFormats/ProjectFileFormat.htm>`_ :ivar name: (string) :ivar filename: (string) :ivar base_location: (:class:`UTMLocation`) :ivar linked_files: (list of :class:`DatFile`) :ivar fixed_stations: (map of :class:`DatFile` -> station -> :class:`UTMLocation`) """ def __init__(self, name=None, filename=None): self.name = name self.filename = filename self.linked_files = [] self.fixed_stations = {} # datfile -> station -> UTMLocation self.base_location = None self._utm_zone = None self._utm_datum = None self._utm_convergence = None # TODO: file parameters self.override_lrud = False self.lrud_association = 'FROM' # or 'TO'
[docs] def set_base_location(self, location): """Configure the project's base location""" self.base_location = location self._utm_zone = location.zone self._utm_datum = location.datum self._utm_convergence = location.convergence
[docs] def add_linked_file(self, datfile): """Add a :class:`DatFile` to :attr:`linked_files`.""" self.linked_files.append(datfile)
[docs] def add_linked_station(self, datfile, station, location=None): """Add a linked or fixed station""" if datfile not in self.fixed_stations: self.fixed_stations[datfile] = {station: location} else: self.fixed_stations[datfile][station] = location if location and not self.base_location: self._utm_zone = location.zone self._utm_datum = location.datum self._utm_convergence = location.convergence
def __len__(self): return len(self.linked_files) def __iter__(self): for linked_file in self.linked_files: yield linked_file def __getitem__(self, item): for datfile in self.linked_files: if item == datfile.name or item == datfile: return datfile raise KeyError(item) @staticmethod
[docs] def read(fname): """Read a .MAK file and produce a `Project`""" return CompassProjectParser(fname).parse()
def _serialize(self): lines = [] if self.base_location: loc = self.base_location lines.append('@%.3f,%.3f,%.3f,%d,%.3f;' % (loc.easting, loc.northing, loc.elevation, loc.zone, loc.convergence)) lines.append('&%s;' % loc.datum) override = 'O' if self.override_lrud else 'o' association = 't' if self.lrud_association == 'FROM' else 'T' lines.append('!%s%s;' % (override, association)) if self._utm_zone: lines.append('$%d;' % self._utm_zone) if self._utm_datum: lines.append('&%s;' % self._utm_datum) if self._utm_convergence: lines.append('%%%.2f;' % self._utm_convergence) for dat in self.linked_files: datfname = dat.filename if type(dat) == DatFile else dat if not datfname.lower().endswith('.dat'): raise ValueError('Unable to locate MAK file "%s"' % dat) if dat not in self.fixed_stations: lines.append('#%s;' % datfname) else: lines.append('#%s,' % datfname) for station, loc in self.fixed_stations[dat].items(): if loc: lines.append('\t%s[m,%.3f,%.3f,%.3f,],' % (station, loc.easting, loc.northing, loc.elevation)) else: lines.append('\t%s,' % station) lines.append(';') return lines
[docs] def write(self, outfilename=None): """Write or overwrite this .MAK file""" outfilename = outfilename or self.filename if not outfilename: raise ValueError('Unable to write MAK file without a filename') with codecs.open(outfilename, 'wb', 'windows-1252') as outf: outf.write('\r\n'.join(self._serialize()))
#outf.write('\x1A') # ASCII "sub" ^Z marks EOF # File Parsing Utilities _FLOAT_KEYS = ['LENGTH', 'BEARING', 'AZM2', 'INC', 'INC2', 'LEFT', 'RIGHT', 'UP', 'DOWN'] _INF_KEYS = ['LEFT', 'RIGHT', 'UP', 'DOWN'] def name_from_filename(fname): return os.path.splitext(os.path.basename(fname))[0].replace('_', ' ')
[docs]class ParseException(Exception): """Exception raised when parsing fails.""" pass
class CompassSurveyParser(object): """Parser for a Compass survey string.""" def __init__(self, survey_str): """:param survey_str: string multiline representation of survey as found in .DAT file""" self.survey_str = survey_str @staticmethod def _coerce(key, val): if val == '-999.00': # no data return None if key in _INF_KEYS and val in ('-9.90', '-9999.00'): # passage return float('inf') if key in _FLOAT_KEYS: try: return float(val) except TypeError as e: log.warn("Unable to coerce to float %s=%s (%s)", key, val, type(val)) return val @staticmethod def _parse_date(datestr): datestr = datestr.strip() for fmt in ['%m %d %Y', '%m %d %y']: try: return datetime.datetime.strptime(datestr, fmt).date() except ValueError: pass raise ParseException("Unable to parse SURVEY DATE: %s" % datestr) @staticmethod def _parse_declination_line(line): declination, fmt, corrections, corrections2 = 0.0, '', (0.0, 0.0, 0.0), (0.0, 0.0) toks = line.strip().split() for i, tok in enumerate(toks): if tok == 'DECLINATION:': declination = float(toks[i+1]) elif tok == 'FORMAT:': fmt = toks[i+1] elif tok == 'CORRECTIONS:': corrections = map(float, toks[i+1:i+4]) elif tok == 'CORRECTIONS2:': corrections2 = map(float, toks[i+1:i+3]) return declination, fmt, corrections, corrections2 def parse(self): """Parse our string and return a Survey object, None, or raise :exc:`ParseException`""" if not self.survey_str: return None lines = self.survey_str.splitlines() if len(lines) < 10: raise ParseException("Expected at least 10 lines in a Compass Survey, only found %d!\nlines=%s" % (len(lines), lines)) # undelimited Cave Name may be empty string and "skipped" first_line = lines.pop(0).strip() if first_line.startswith('SURVEY NAME:'): cave_name = '' name = first_line.strip('SURVEY NAME:').strip() else: cave_name = first_line name = lines.pop(0).split('SURVEY NAME:', 1)[1].strip() # Date and Comment on one line, Comment may be missing date_comment_toks = lines.pop(0).split('SURVEY DATE:', 1)[1].split('COMMENT:') date = CompassSurveyParser._parse_date(date_comment_toks[0]) comment = date_comment_toks[1].strip() if len(date_comment_toks) > 1 else '' lines.pop(0) # SURVEY TEAM:\n (actual team members are on the next line) team = [member.strip() for member in lines.pop(0).split(',')] # We're already decoding from windows-1252 codec so we have unicode for names like 'Tanya Pietra\xdf' # TODO: implement format (units!), instrument correction(s) dec_fmt_corr = lines.pop(0) declination, fmt, corrections, corrections2 = CompassSurveyParser._parse_declination_line(dec_fmt_corr) lines.pop(0) shot_header = lines.pop(0).split() val_count = len(shot_header) - 2 if 'FLAGS' in shot_header else len(shot_header) # 1998 vintage data has no FLAGS, COMMENTS at end lines.pop(0) survey = Survey(name=name, date=date, comment=comment, team=team, cave_name=cave_name, shot_header=shot_header, declination=declination, file_format=fmt, corrections=corrections, corrections2=corrections2) shots = [] shot_lines = lines for shot_line in shot_lines: shot_vals = shot_line.split(None, val_count) if len(shot_vals) > val_count: # last two spare columns are FLAGS and COMMENTS, either value may be missing flags_comment = shot_vals.pop() if not flags_comment.startswith('#|'): flags, comment = '', flags_comment else: try: flags, comment = flags_comment.split('#|', 1)[1].split('#', 1) except ValueError: raise ParseException('Invalid flags in %s survey: %s' % (name, flags_comment)) # A 2013 bug in Compass inserted corrupt binary garbage into FLAGS column, causes parse to barf shot_vals += [flags, comment.strip()] shot_vals = [(header, self._coerce(header, val)) for (header, val) in zip(shot_header, shot_vals)] shot = Shot(shot_vals) survey.add_shot(shot) #log.debug("Survey: name=%s shots=%d length=%0.1f date=%s team=%s\n%s", name, len(shots), survey.length, date, team, '\n'.join([str(shot) for shot in survey.shots])) return survey
[docs]class CompassDatParser(object): """Parser for Compass .DAT data files"""
[docs] def __init__(self, datfilename): """:param datfilename: (string) filename""" self.datfilename = datfilename
[docs] def parse(self): """Parse our data file and return a :class:`DatFile` or raise :exc:`ParseException`.""" log.debug("Parsing Compass .DAT file %s ...", self.datfilename) datobj = DatFile(name_from_filename(self.datfilename), filename=self.datfilename) with codecs.open(self.datfilename, 'rb', 'windows-1252') as datfile: full_contents = datfile.read() survey_strs = [survey_str.strip() for survey_str in full_contents.split('\x0C')] if survey_strs[-1] == '\x1A': survey_strs.pop() # Compass may place a "soft EOF" with ASCII SUB char log.debug("Parsed %d raw surveys from Compass .DAT file %s.", len(survey_strs), self.datfilename) for survey_str in survey_strs: if not survey_str: continue survey = CompassSurveyParser(survey_str).parse() datobj.add_survey(survey) return datobj
[docs]class CompassProjectParser(object): """Parser for Compass .MAK project files."""
[docs] def __init__(self, projectfile): """:param projectfile: (string) filename""" self.makfilename = projectfile
[docs] def parse(self): """Parse our project file and return :class:`Project` object or raise :exc:`ParseException`.""" log.debug("Parsing Compass .MAK file %s ...", self.makfilename) base_location = None linked_files = [] file_params = set() def parse_linked_file(value): log.debug("Parsing linked file: %s", value) value = value.rstrip(';') toks = value.split(',', 1) if len(toks) == 1: return toks[0] else: return toks[0] # TODO: implement link stations and fixed stations with codecs.open(self.makfilename, 'rb', 'windows-1252') as makfile: prev = None for line in makfile: line = line.strip() if not line: continue header, value = line[0], line[1:] if prev: if line.endswith(';'): linked_file = parse_linked_file(prev + line.rstrip(';')) linked_files.append(linked_file) prev = None else: prev += value continue if header == '/': pass # comment elif header == '@': value = value.rstrip(';') base_location = UTMLocation(*(float(v) for v in value.split(','))) elif header == '&': value = value.rstrip(';') base_location.datum = value elif header == '%': value = value.rstrip(';') base_location.convergence = float(value) elif header == '!': value = value.rstrip(';') #file_params = set(value) # TODO elif header == '#': if value.endswith(';'): linked_files.append(parse_linked_file(value)) prev = None else: prev = value log.debug("Project: base_loc=%s linked_files=%s", base_location, linked_files) project = Project(name_from_filename(self.makfilename), filename=self.makfilename) project.set_base_location(base_location) for linked_file in linked_files: # TODO: we need to support case-insensitive path resolution on case-sensitive filesystems linked_file_path = os.path.join(os.path.dirname(self.makfilename), os.path.normpath(linked_file.replace('\\', '/'))) datfile = CompassDatParser(linked_file_path).parse() project.add_linked_file(datfile) return project