#!/usr/bin/python
"""
USAGE with Python 2.6
python ofx2n3.py --n3 < foo.ofx > foo.rdf
"""
__version__ = "$Id: ofx2n3.py Exp $"
# from swap.myStore import load, Namespace
# from swap.diag import chatty_flag, progress
import sys, re, os
def main(argv):
filenames = []
for arg in argv[1:]: # skip script name
if arg[0] != "-": # Not an option
filenames.append(arg)
if filenames == []:
fyi("Reading OFX document")
doc = sys.stdin.read()
fyi("Parsing STDIN OFX document")
contentLines(doc, argv)
else:
for fn in filenames:
f = open(fn, "r")
doc=f.read()
fyi("Parsing STDIN OFX document %s" % fn)
contentLines(doc, argv, fn)
def fyi(s):
pass
# sys.stderr.write(s+"\n")
CR = chr(13)
LF = chr(10)
CRLF = CR + LF
SPACE = chr(32)
TAB = chr(9)
# See qfx2n3.sed
# Date time maps to \1-\2-\3T\4:\5:\6
dt1 = [re.compile(r'([0-9][0-9][0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9])'), "%s-%s-%sT%s:%s:%s"]
# Date maps to \1-\2-\3
dt2 = [re.compile(r'([0-9][0-9][0-9][0-9])([0-9][0-9])([0-9][0-9])'), "%s-%s-%s"]
# Date with Timezone -- maps to \1-\2-\3T\4:\5:\6\70\800
# Like 20100317075059[-7:PDT]
dt3 = [re.compile('([0-9][0-9][0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9])\[([-+])([0-9]):[A-Z]*\]'), "%s-%s-%sT%s:%s:%s%s0%s00"]
# Like 20100317075059.000[-7:PDT]
#dt4 = [re.compile('([0-9][0-9][0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9]).[0-9][0-9][0-9]\[([-+])([0-9]):[A-Z]*\]'), "%s-%s-%sT%s:%s:%s%s0%s00"]
dt4 = [re.compile('([0-9][0-9][0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9]).000\[([-+])([0-9]):[A-Z]*\]'), "%s-%s-%sT%s:%s:%s%s0%s00"]
# Most complex first
dtcases = [dt4, dt3, dt2, dt1]
def sanitize(tag):
str = ""
for ch in tag:
if ch in ".-": str+= "_"
else: str += ch
return str
def de_escapeXML(st0):
return st0.replace('&','&').replace('<', '<').replace('>', '>');
def contentLines(doc, argv, fn=None):
"Process the content as a single buffer"
n3 = "--n3" in argv
makeName = "--rename" in argv
version = "$Id: ofx2n3.py,v 1.6 2013-10-14 Exp $"[1:-1]
if n3:
print """# Generated by %s""" % version
print """@prefix ofx: <http://www.w3.org/2000/10/swap/pim/ofx#>.
@prefix ofxh: <http://www.w3.org/2000/10/swap/pim/ofx-headers#>.
<> ofxh:headers [
"""
for ch in doc:
if ch in CRLF: break # Find delimiter used in the file
if ch == CR and LF in doc: ch = CRLF
lines = doc.split(ch)
header = {}
stack = []
filenamebits = {}
ln = 0
while 1:
ln = ln + 1
line = lines[ln]
colon = line.find(":")
if colon < 0:
if line == "": break #
if "<OFX>" in line: # NatWest OFX error - missing gap line
ln = ln - 1 # Back up and do it again
break;
raise SyntaxError("No colon in header line, line %i: %s" % (
ln, line))
hname, value = line[:colon], line[colon+1:]
while " " in hname:
i = hname.find(" ")
hname = hname[:i] + hname[i+1:]
# fyi("Header line %s:%s" % (hname, value))
if n3: print " ofxh:%s \"%s\";" % (hname, value) #@@ do n3 escaping
header[hname] = value
if n3: print "];\n"
assert header["ENCODING"] == "USASCII" # Our assumption
while ln+1 < len(lines):
ln = ln + 1
line = lines[ln]
while line != "" and line[0] in " \t": line = line[1:] # Strip leading space
while line != "" and line[-1:] in " \t\r": line = line[:-1] # and trailing returns
if line == "": continue # Possible on last line
if line[0] != "<": raise SyntaxError("No < on line %i: %s" %(
ln, line))
i = line.find(">")
if i < 0: raise SyntaxError("No > on line %i: %s" %(
ln, line))
tag = sanitize(line[1:i])
if line[1] == "/": # End tag
tag = tag[1:]
tag2 = stack.pop()
if tag != tag2: raise SyntaxError(
"Found </%s> when </%s> expected.\nStack: %s" %
(tag, tag2, stack))
if n3: print "%s]; # %s" % (" "*len(stack), tag)
elif line[i+1:] == "": # Start tag
if n3: print "%s ofx:%s [" %(" "*len(stack), tag)
stack.append(tag)
else: # Data tag
e = line.find('</')
if e > 0:
line = line[:e] # If so strip off
value = de_escapeXML(line[i+1:]);
if tag[:2] == "DT": # Datetimes
for re_fmt in dtcases:
m = re_fmt[0].search(value)
if m:
value = re_fmt[1] % m.groups()
break
else:
raise SyntaxError("Unexpected date format on line %i: %s" %(
ln, line))
if n3: print "%s ofx:%s \"%s\";" % (" "*len(stack), tag, value)
if tag in [ "ACCTID", "DTSTART", "DTEND", "ACCTTYPE"]:
filenamebits[tag] = value;
if stack: raise SyntaxError("Unclosed tags: %s" % stack)
if n3: print "."
if makeName:
# Not always present but on old BBoA a/c needed top differentiate between
# checking and savings accounts of SAME ACCOUNT NUMBER!
at = filenamebits.get("ACCTTYPE", 'ac').lower()
name = filenamebits["DTSTART"][:10]+"-on-" + at + "-" + filenamebits["ACCTID"][-4:]+".ofx"
if name == fn:
print "Name is already as suggested. Not renamed: %s"%fn
else:
print "mv %s %s" % (fn, name)
if "--no" not in sys.argv[1:]: os.rename(fn, name)
def _test():
import sys
from pprint import pprint
import doctest, fromOFX
doctest.testmod(fromOFX)
lines = contentLines(open(sys.argv[1]))
#print lines
c, lines = findComponents(lines)
assert lines == []
pprint(c)
#unittest.main()
if __name__ == '__main__':
import sys
if "--help" in sys.argv[1:] or "-help" in sys.argv[1:]:
print __doc__
elif sys.argv[1:2] == ['--test']:
del sys.argv[1]
_test()
else:
main(sys.argv)
"""
USAGE with Python 2.6
python ofx2n3.py --n3 < foo.ofx > foo.rdf
"""
__version__ = "$Id: ofx2n3.py Exp $"
# from swap.myStore import load, Namespace
# from swap.diag import chatty_flag, progress
import sys, re, os
def main(argv):
filenames = []
for arg in argv[1:]: # skip script name
if arg[0] != "-": # Not an option
filenames.append(arg)
if filenames == []:
fyi("Reading OFX document")
doc = sys.stdin.read()
fyi("Parsing STDIN OFX document")
contentLines(doc, argv)
else:
for fn in filenames:
f = open(fn, "r")
doc=f.read()
fyi("Parsing STDIN OFX document %s" % fn)
contentLines(doc, argv, fn)
def fyi(s):
pass
# sys.stderr.write(s+"\n")
CR = chr(13)
LF = chr(10)
CRLF = CR + LF
SPACE = chr(32)
TAB = chr(9)
# See qfx2n3.sed
# Date time maps to \1-\2-\3T\4:\5:\6
dt1 = [re.compile(r'([0-9][0-9][0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9])'), "%s-%s-%sT%s:%s:%s"]
# Date maps to \1-\2-\3
dt2 = [re.compile(r'([0-9][0-9][0-9][0-9])([0-9][0-9])([0-9][0-9])'), "%s-%s-%s"]
# Date with Timezone -- maps to \1-\2-\3T\4:\5:\6\70\800
# Like 20100317075059[-7:PDT]
dt3 = [re.compile('([0-9][0-9][0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9])\[([-+])([0-9]):[A-Z]*\]'), "%s-%s-%sT%s:%s:%s%s0%s00"]
# Like 20100317075059.000[-7:PDT]
#dt4 = [re.compile('([0-9][0-9][0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9]).[0-9][0-9][0-9]\[([-+])([0-9]):[A-Z]*\]'), "%s-%s-%sT%s:%s:%s%s0%s00"]
dt4 = [re.compile('([0-9][0-9][0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9])([0-9][0-9]).000\[([-+])([0-9]):[A-Z]*\]'), "%s-%s-%sT%s:%s:%s%s0%s00"]
# Most complex first
dtcases = [dt4, dt3, dt2, dt1]
def sanitize(tag):
str = ""
for ch in tag:
if ch in ".-": str+= "_"
else: str += ch
return str
def de_escapeXML(st0):
return st0.replace('&','&').replace('<', '<').replace('>', '>');
def contentLines(doc, argv, fn=None):
"Process the content as a single buffer"
n3 = "--n3" in argv
makeName = "--rename" in argv
version = "$Id: ofx2n3.py,v 1.6 2013-10-14 Exp $"[1:-1]
if n3:
print """# Generated by %s""" % version
print """@prefix ofx: <http://www.w3.org/2000/10/swap/pim/ofx#>.
@prefix ofxh: <http://www.w3.org/2000/10/swap/pim/ofx-headers#>.
<> ofxh:headers [
"""
for ch in doc:
if ch in CRLF: break # Find delimiter used in the file
if ch == CR and LF in doc: ch = CRLF
lines = doc.split(ch)
header = {}
stack = []
filenamebits = {}
ln = 0
while 1:
ln = ln + 1
line = lines[ln]
colon = line.find(":")
if colon < 0:
if line == "": break #
if "<OFX>" in line: # NatWest OFX error - missing gap line
ln = ln - 1 # Back up and do it again
break;
raise SyntaxError("No colon in header line, line %i: %s" % (
ln, line))
hname, value = line[:colon], line[colon+1:]
while " " in hname:
i = hname.find(" ")
hname = hname[:i] + hname[i+1:]
# fyi("Header line %s:%s" % (hname, value))
if n3: print " ofxh:%s \"%s\";" % (hname, value) #@@ do n3 escaping
header[hname] = value
if n3: print "];\n"
assert header["ENCODING"] == "USASCII" # Our assumption
while ln+1 < len(lines):
ln = ln + 1
line = lines[ln]
while line != "" and line[0] in " \t": line = line[1:] # Strip leading space
while line != "" and line[-1:] in " \t\r": line = line[:-1] # and trailing returns
if line == "": continue # Possible on last line
if line[0] != "<": raise SyntaxError("No < on line %i: %s" %(
ln, line))
i = line.find(">")
if i < 0: raise SyntaxError("No > on line %i: %s" %(
ln, line))
tag = sanitize(line[1:i])
if line[1] == "/": # End tag
tag = tag[1:]
tag2 = stack.pop()
if tag != tag2: raise SyntaxError(
"Found </%s> when </%s> expected.\nStack: %s" %
(tag, tag2, stack))
if n3: print "%s]; # %s" % (" "*len(stack), tag)
elif line[i+1:] == "": # Start tag
if n3: print "%s ofx:%s [" %(" "*len(stack), tag)
stack.append(tag)
else: # Data tag
e = line.find('</')
if e > 0:
line = line[:e] # If so strip off
value = de_escapeXML(line[i+1:]);
if tag[:2] == "DT": # Datetimes
for re_fmt in dtcases:
m = re_fmt[0].search(value)
if m:
value = re_fmt[1] % m.groups()
break
else:
raise SyntaxError("Unexpected date format on line %i: %s" %(
ln, line))
if n3: print "%s ofx:%s \"%s\";" % (" "*len(stack), tag, value)
if tag in [ "ACCTID", "DTSTART", "DTEND", "ACCTTYPE"]:
filenamebits[tag] = value;
if stack: raise SyntaxError("Unclosed tags: %s" % stack)
if n3: print "."
if makeName:
# Not always present but on old BBoA a/c needed top differentiate between
# checking and savings accounts of SAME ACCOUNT NUMBER!
at = filenamebits.get("ACCTTYPE", 'ac').lower()
name = filenamebits["DTSTART"][:10]+"-on-" + at + "-" + filenamebits["ACCTID"][-4:]+".ofx"
if name == fn:
print "Name is already as suggested. Not renamed: %s"%fn
else:
print "mv %s %s" % (fn, name)
if "--no" not in sys.argv[1:]: os.rename(fn, name)
def _test():
import sys
from pprint import pprint
import doctest, fromOFX
doctest.testmod(fromOFX)
lines = contentLines(open(sys.argv[1]))
#print lines
c, lines = findComponents(lines)
assert lines == []
pprint(c)
#unittest.main()
if __name__ == '__main__':
import sys
if "--help" in sys.argv[1:] or "-help" in sys.argv[1:]:
print __doc__
elif sys.argv[1:2] == ['--test']:
del sys.argv[1]
_test()
else:
main(sys.argv)