相关文章推荐

I’m sure everybody who worked with Python and a PostgreSQL database is familiar or definitely heard about the psycopg2 library. It is the most popular PostgreSQL database adapter for the Python programming language. In my work, I come in contact with this library every day and execute hundreds of automated statements. As always, I try to improve my code and execution speed, because, in the cloud, time is money. The longer your code runs the more it will cost you. So in this post, I will go over some ways on how you can decrease your database inserts and executions up to 10 times and if you run your code in the cloud, save you some money!

With Psycopg2 we have four ways to execute a command for a (large) list of items:

  • execute()
  • executemany()
  • execute_batch()
  • building a custom string
  • Now let’s go over each of these methods and see how much time it takes to insert 10'000, 100'000, and 1'000'000 items. Before each run, we will truncate the table to make sure we are working under the same conditions.

    execute()

    The execute() method is the standard function to execute a database operation. To insert a large number of rows, we have to loop over each row in our dataset and call the execute method.

        def method_execute(self, values):
            """Loop over the dataset and insert every row separately"""
            for value in values:
                self.cursor.execute("INSERT INTO {table} VALUES (%s, %s)".format(table=TABLE_NAME), value)
            self.connection.commit()

    As we know, loops are quite slow, so it’s not surprising if we look at the results:

    method_execute running time is 0.739 seconds for inserting 10'000 rows.
    method_execute running time is 7.2657 seconds for inserting 100'000 rows.
    method_execute running time is 75.9589 seconds for inserting 1'000'000 rows.

    As we can see the increase in time is almost linear.

    executemany()

    Psycopg2 offers another execution method called executemany() . One would think that this would be some kind of improvement over the simple execute() method. But if we look at the official documentation a warning says otherwise:

    Warning

    In its current implementation this method is not faster than executing execute() in a loop. For better performance you can use the functions described in Fast execution helpers .

    @measure_time 
    def method_execute_many(self, values):
            self.cursor.executemany("INSERT INTO {table} VALUES (%s, %s)".format(table=TABLE_NAME), values)
            self.connection.commit()

    method_execute_many running time is 0.7091 seconds for inserting 10'000 rows.
    method_execute_many running time is 6.895 seconds for inserting 100'000 rows.
    method_execute_many running time is 74.4555 seconds for inserting 1'000'000 rows.

    As we can see this method is somehow a little bit faster than a simple execute() statement loop. But I think we can do better.

    execute_batch()

    Another approach offered by the Psycopg2 library is execute_batch(). It reduces the number of server roundtrips, improving the performance in contrast to the executemany() function. The method achieves this, by joining the statements together until the page_size is reached (usually 8kB in Postgres). Let’s check the performance.

    @measure_time
        def method_execute_batch(self, values):
            psycopg2.extras.execute_batch(self.cursor, "INSERT INTO {table} VALUES (%s, %s)".format(table=TABLE_NAME),
                                          values)
            self.connection.commit()

    method_execute_batch running time is 0.2085 seconds for inserting 10'000 rows.
    method_execute_batch running time is 1.83 seconds for inserting 100'000 rows.
    method_execute_batch running time is 18.6885 seconds for inserting 1'000'000 rows.

    Look at that! We are already almost 4! times faster!

    Building a custom string

    In our last approach, we are going to build a custom string, which we will then run in a single execute() statement.

        @measure_time
        def method_string_building(self, values):
            argument_string = ",".join("('%s', '%s')" % (x, y) for (x, y) in values)
            self.cursor.execute("INSERT INTO {table} VALUES".format(table=TABLE_NAME) + argument_string)
            self.connection.commit()

    method_string_building running time is 0.0456 seconds for inserting 10'000 rows.
    method_string_building running time is 0.5535 seconds for inserting 100'000 rows.
    method_string_building running time is 6.0406 seconds for inserting 1'000'000 rows.

    As you can we successfully reduced the amount of time needed by another factor of 4! In comparison to the loop execute() method we reduced the insertion time for 10'000 rows by a factor of 13!

    Conclusion

    Here is the final code used:

    import time
    import psycopg2
    import psycopg2.extras
    TABLE_NAME = 'TestTable'
    def measure_time(func):
        def time_it(*args, **kwargs):
            time_started = time.time()
            func(*args, **kwargs)
            time_elapsed = time.time()
            print("{execute} running time is {sec} seconds for inserting {rows} rows.".format(execute=func.__name__,
                                                                                              sec=round(
                                                                                                  time_elapsed - time_started,
                                                                                                  4), rows=len(
                    kwargs.get('values'))))
        return time_it
    class PsycopgTest():
        def __init__(self, num_rows):
            self.num_rows = num_rows
        def create_dummy_data(self):
            values = []
            for i in range(self.num_rows):
                values.append((i + 1, 'test'))
            return values
        def connect(self):
            conn_string = "host={0} user={1} dbname={2} password={3}".format('localhost',
                                                                             'postgres',
                                                                             'psycopg2_test', '')
            self.connection = psycopg2.connect(conn_string)
            self.cursor = self.connection.cursor()
        def create_table(self):
            self.cursor.execute(
                "CREATE TABLE IF NOT EXISTS {table} (id INT PRIMARY KEY, NAME text)".format(table=TABLE_NAME))
            self.connection.commit()
        def truncate_table(self):
            self.cursor.execute("TRUNCATE TABLE {table} RESTART IDENTITY".format(table=TABLE_NAME))
            self.connection.commit()
        @measure_time
        def method_execute(self, values):
            """Loop over the dataset and insert every row separately"""
            for value in values:
                self.cursor.execute("INSERT INTO {table} VALUES (%s, %s)".format(table=TABLE_NAME), value)
            self.connection.commit()
        @measure_time
        def method_execute_many(self, values):
            self.cursor.executemany("INSERT INTO {table} VALUES (%s, %s)".format(table=TABLE_NAME), values)
            self.connection.commit()
        @measure_time
        def method_execute_batch(self, values):
            psycopg2.extras.execute_batch(self.cursor, "INSERT INTO {table} VALUES (%s, %s)".format(table=TABLE_NAME),
                                          values)
            self.connection.commit()
        @measure_time
        def method_string_building(self, values):
            argument_string = ",".join("('%s', '%s')" % (x, y) for (x, y) in values)
            self.cursor.execute("INSERT INTO {table} VALUES".format(table=TABLE_NAME) + argument_string)
            self.connection.commit()
    def main():
        psyco = PsycopgTest(10000)
        psyco.connect()
        values = psyco.create_dummy_data()
        psyco.create_table()
        psyco.truncate_table()
        psyco.method_execute(values=values)
        # psyco.method_execute_many(values=values)
        # psyco.method_execute_batch(values=values)
        # psyco.method_string_building(values=values)
    if __name__ == '__main__':
        main()


    As we saw there are huge performance gaps in the different execution methods for Psycopg2. If you are working with small amounts of data, it won’t matter that much. But as the size of the data grows, it will definitely get more interesting to explore and use these alternative methods to speed up the process up to 13 times!

    Written by

    Silas Stulz

    Data Engineer. I write about Machine Learning, Big Data, and Web Technologies.

     
    推荐文章