Monday, April 7, 2014

Creating PostgreSQL bulk import files with Python

PostgreSQL supports COPY FROM for bulk loading tables from CSV and binary files on the server's filesystem, and the equivalent \copy to load the file through psql. Turn off indexes, constraints, and WAL, and COPY FROM is pretty fast. I don't know if it's the fastest way to bulk load into postgres but it sure is fast enough for me - we loaded 49 million records totalling over a terabyte in about 8 hours. It's reasonably portable across versions, too, especially if you minimize the use of binary import files.

Generating the CSV and binary formats from Python is easy enough. Here are samples for each.

CSV format

You can use code like the following to generate a tab-separated CSV where the first row is a header. The header is ignored by PostgreSQL, so make sure the column ordering matches up with the table definition.
    f_table1 = open(filebase + '_table1.csv', 'wb')
    table1 = csv.DictWriter(f_table1, [
        'col1', 'col2'],
        dialect=csv.excel_tab)
    # writeheader() was added in Python 2.7; for 2.6, see StackOverflow
    f_table1.writeheader(pht_text)

    # Sample - one row
    table1.write_row({
        'col1': 1,
        'col2': 2,
    })

    f_table1.close()

The code above uses a writeheader() method added to csv.DictWriter in Python 2.7. If you're stuck with 2.6, see this Stack Overflow post.

To load the CSV into psql:
\copy table1 FROM 'yyyymm_table1.csv' CSV HEADER DELIMITER E'\t';

Timestamps in CSV format

For timestamp and timestamptz columns PostgreSQL is smart enough to handle a date string like "2014-02-28 12:34:45.123456". For timestamptz it will infer the appropriate timezone for the given date and time. To format the timestamp string you can use something like:
row['timestamp'] = "%04d-%02d-%02d %02d:%02d:%09.6f" % (
                    year, month, day, hour, minute, second)

Binary format

The PostgreSQL binary format is partly documented but once you're past the header and into the field values you're essentially on your own - the advice given is to read the source under utils/backend/adt, and to an outsider it looks like the field formats are heavily dependent on compile-time options. I normally wouldn't touch it with a ten-foot pole but it's the most efficient way to load tables with large bytea columns.

Luckily, the tables I had to worry about only have three columns: a varchar, a bytea and a timestamptz. The binary format of each is straightforward:

  • in all cases the field is preceded by a 32-bit field length
  • varchar and other text fields, and bytea, are encoded as a run of bytes
  • timestamptz could be 64-bit signed integer or double-precision floating point depending on compiler options, but realistically it's always a signed integer: number of seconds since 2000-01-01 00:00:00 UTC, multiplied by one million, plus microseconds
Here's a sample:
    f_table2 = open(filebase + '_table2.bin', 'wb')
    table2 = pgbinwriter(f_table2)
    table2.writeheader()

    # Columns: varchar, bytea, timestamptz
    table2.writerow([
        pgbintext("abcdefg"),
        pgbinbytea(some_bytearray),
        pgbintimestamptz(
            string_to_pythontime('2014-02-28 12:34:45.123456')),
    ])

    table2.writetrailer()
    table2.close()
And the underlying functions:
############################################################################
# PostgreSQL binary format routines
# http://56bytes.blogspot.ca/2014/04/creating-postgresql-bulk-import-files.html
PGCOPY_HEADER = "PGCOPY\n\377\r\n\0"

# 2000-01-01 00:00:00 UTC in Unix (Python) time
PG_DATE_EPOCH = 946684800.0

class pgbinwriter(object):
    def __init__(self, fileobj):
        self.fileobj = fileobj

    def writeheader(self):
        self.fileobj.write(PGCOPY_HEADER)

        # Write flags
        self.fileobj.write(bytearray([0, 0, 0, 0]))

        # Header extension area length
        self.fileobj.write(bytearray([0, 0, 0, 0]))

    def writerow(self, fields):
        # 16 bits: number of fields in the tuple
        self._write16(len(fields))
        for field in fields:
            self.fileobj.write(field.encode())

    def writetrailer(self):
        self._write16(-1)

    # Write a 16-bit integer in network byte order
    def _write16(self, u16):
        self.fileobj.write(bytearray([(u16 >> 8) & 0xff, u16 & 0xff]))

class pgbinfield(object):
    def __init__(self, length):
        self.length = length

    def encode(self):
        return self._encode32(self.length) + self._encode_data()

    # Write a 32-bit integer in network byte order
    def _encode32(self, length):
        return bytearray([
            (length >> 24) & 0xff,
            (length >> 16) & 0xff,
            (length >> 8) & 0xff,
            length & 0xff])

    def _encode_data(self):
        raise

class pgbinnull(pgbinfield):
    def __init__(self):
        super(pgbinnull, self).__init__(-1)

    def _encode_data(self):
        return bytearray()

class pgbintext(pgbinfield):
    def __init__(self, text):
        super(pgbintext, self).__init__(len(text))
        self.text = text

    def _encode_data(self):
        return bytearray(self.text)

class pgbinbytea(pgbinfield):
    def __init__(self, bytearr):
        super(pgbinbytea, self).__init__(len(bytearr))
        self.bytearr = bytearr

    def _encode_data(self):
        return self.bytearr

# Depending on compiler options, PostgreSQL either uses 64-bit signed integers
# or double precision floating point numbers - default 64-bit ints.
# Values are stored as number of seconds since 2000-01-01 00:00:00 UTC,
# multiplied by one million, plus microseconds.
# Example: 2014-03-12 22:48:41.267917-04 -> 0x19772c275aacd
class pgbintimestamptz(pgbinfield):
    def __init__(self, pythontime):
        super(pgbintimestamptz, self).__init__(8)
        self.pythontime = pythontime

    def _encode_data(self):
        pgtime = long((self.pythontime - PG_DATE_EPOCH) * 1000000.0)
        return bytearray([
            (pgtime >> 56) & 0xff,
            (pgtime >> 48) & 0xff,
            (pgtime >> 40) & 0xff,
            (pgtime >> 32) & 0xff,
            (pgtime >> 24) & 0xff,
            (pgtime >> 16) & 0xff,
            (pgtime >> 8) & 0xff,
            pgtime & 0xff])

# Given a string like "2014-02-28 12:34:45.123456", returns a floating point
# number of seconds since the Unix epoch (1970-01-01 00:00:00 UTC).
def string_to_pythontime(s):
    # time.mktime() is smart enough to figure out daylight savings time.
    # To get time.strptime() to work we just have to strip off the fractional
    # part of the second.
    i = s.index('.')
    nofrac = s[0:i]
    frac = float("0." + s[i+1:])
    return time.mktime(time.strptime(nofrac, "%Y-%m-%d %H:%M:%S")) + frac
To load into psql:
\copy table2 FROM 'yyyymm_table2.bin' BINARY;

No comments: