Friday, March 28, 2014

ofx2n3.py - OFX data to N3

#!/usr/bin/python
"""
USAGE with Python 2.6
  python ofx2n3.py --n3 < foo.ofx > foo.rdf
"""
__version__ = "$Id: ofx2n3.py Exp $"

# from swap.myStore import load, Namespace
# from swap.diag import chatty_flag, progress

import sys, re, os


def main(argv):
    filenames = []
    for arg in argv[1:]:  # skip script name
        if arg[0] != "-": # Not an option
            filenames.append(arg)
    if filenames == []:
        fyi("Reading OFX document")
        doc = sys.stdin.read()
        fyi("Parsing STDIN OFX document")
        contentLines(doc, argv)
    else:
        for fn in filenames:
            f = open(fn, "r")
            doc=f.read()
            fyi("Parsing STDIN OFX document %s" % fn)
            contentLines(doc, argv, fn)

def fyi(s):
    pass
#    sys.stderr.write(s+"\n")
  
CR = chr(13)
LF = chr(10)
CRLF = CR + LF
SPACE = chr(32)
TAB = chr(9)


# See qfx2n3.sed
# Date time maps to \1-\2-\3T\4:\5:\6
dt1 = [re.compile(r'([0-9][0-9][0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9])'),  "%s-%s-%sT%s:%s:%s"]

# Date maps to \1-\2-\3
dt2 = [re.compile(r'([0-9][0-9][0-9][0-9])([0-9][0-9])([0-9][0-9])'), "%s-%s-%s"]

# Date with Timezone  -- maps to \1-\2-\3T\4:\5:\6\70\800
# Like 20100317075059[-7:PDT]
dt3 = [re.compile('([0-9][0-9][0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9])\[([-+])([0-9]):[A-Z]*\]'), "%s-%s-%sT%s:%s:%s%s0%s00"]

# Like 20100317075059.000[-7:PDT]
#dt4 = [re.compile('([0-9][0-9][0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9]).[0-9][0-9][0-9]\[([-+])([0-9]):[A-Z]*\]'), "%s-%s-%sT%s:%s:%s%s0%s00"]
dt4 = [re.compile('([0-9][0-9][0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9]).000\[([-+])([0-9]):[A-Z]*\]'), "%s-%s-%sT%s:%s:%s%s0%s00"]

# Most complex first
dtcases = [dt4, dt3, dt2, dt1]

def sanitize(tag):
    str = ""
    for ch in tag:
        if ch in ".-": str+= "_"
        else: str += ch
    return str
  
def de_escapeXML(st0):
    return st0.replace('&amp;','&').replace('&lt;', '<').replace('&gt;', '>');

def contentLines(doc, argv, fn=None):
    "Process the content as a single buffer"

    n3 = "--n3" in argv
    makeName = "--rename" in argv
  
    version = "$Id: ofx2n3.py,v 1.6 2013-10-14 Exp $"[1:-1]
    if n3:
        print """# Generated by %s""" % version
        print """@prefix ofx: <http://www.w3.org/2000/10/swap/pim/ofx#>.
@prefix ofxh: <http://www.w3.org/2000/10/swap/pim/ofx-headers#>.

<> ofxh:headers [
"""

    for ch in doc:
    if ch in CRLF: break  # Find delimiter used in the file
    if ch == CR and LF in doc: ch = CRLF
    lines = doc.split(ch)
    header = {}
    stack = []
    filenamebits = {}
    ln = 0
    while 1:
    ln = ln + 1
    line = lines[ln]
    colon = line.find(":")
    if colon < 0:
        if line == "": break #
            if "<OFX>" in line:  # NatWest OFX error - missing gap line
                ln = ln - 1  # Back up and do it again
                break;
        raise SyntaxError("No colon in header line, line %i: %s" % (
                        ln, line))
    hname, value = line[:colon], line[colon+1:]
    while " " in hname:
        i = hname.find(" ")
        hname = hname[:i] + hname[i+1:]
#    fyi("Header line %s:%s" % (hname, value))
    if n3: print "  ofxh:%s \"%s\";" % (hname, value)  #@@ do n3 escaping
    header[hname] = value
    if n3: print "];\n"
  
    assert header["ENCODING"] == "USASCII"  # Our assumption
  
    while ln+1 < len(lines):
    ln = ln + 1
    line = lines[ln]
        while line != "" and line[0] in " \t": line = line[1:] # Strip leading space
        while line != "" and line[-1:] in " \t\r": line = line[:-1] # and trailing returns
    if line == "": continue # Possible on last line
    if line[0] != "<": raise SyntaxError("No < on line %i: %s" %(
                ln, line))
    i = line.find(">")
    if i < 0: raise SyntaxError("No > on line %i: %s" %(
                ln, line))
    tag = sanitize(line[1:i])

    if line[1] == "/": # End tag
        tag = tag[1:]
        tag2 = stack.pop()
        if tag != tag2: raise SyntaxError(
        "Found </%s> when </%s> expected.\nStack: %s" %
        (tag, tag2, stack))
        if n3: print "%s];  # %s" % ("  "*len(stack), tag)
    elif line[i+1:] == "":  # Start tag
        if n3: print "%s ofx:%s [" %("  "*len(stack), tag)
        stack.append(tag)
    else:  #  Data tag
            e = line.find('</')
            if e > 0:
                line = line[:e]  # If so strip off
            value = de_escapeXML(line[i+1:]);
            if tag[:2] == "DT": # Datetimes
                for re_fmt in dtcases:
                    m = re_fmt[0].search(value)
                    if m:
                        value = re_fmt[1] % m.groups()
                        break
                else:
                    raise SyntaxError("Unexpected date format on line %i: %s" %(
                ln, line))
              
        if n3: print  "%s ofx:%s \"%s\";" % ("  "*len(stack), tag, value)
            if tag in [ "ACCTID", "DTSTART", "DTEND", "ACCTTYPE"]:
                filenamebits[tag] = value;
              
    if stack: raise SyntaxError("Unclosed tags: %s" % stack)
    if n3: print "."

    if makeName:
         # Not always present but on old BBoA a/c needed top differentiate between
         # checking and savings accounts of SAME ACCOUNT NUMBER!
        at = filenamebits.get("ACCTTYPE", 'ac').lower()
        name = filenamebits["DTSTART"][:10]+"-on-" + at + "-" + filenamebits["ACCTID"][-4:]+".ofx"
        if name == fn:
            print "Name is already as suggested. Not renamed: %s"%fn
        else:
            print "mv %s %s" % (fn, name)
            if "--no" not in sys.argv[1:]: os.rename(fn, name)
  

def _test():
    import sys
    from pprint import pprint
    import doctest, fromOFX
    doctest.testmod(fromOFX)

    lines = contentLines(open(sys.argv[1]))
    #print lines
    c, lines = findComponents(lines)
    assert lines == []
    pprint(c)
    #unittest.main()

if __name__ == '__main__':
    import sys
    if "--help" in sys.argv[1:] or "-help" in sys.argv[1:]:
        print __doc__
    elif sys.argv[1:2] == ['--test']:
        del sys.argv[1]
        _test()
    else:
        main(sys.argv)