If you use an unsupported system things may work (because, for instance, the
database may use the same wire protocol as PostgreSQL) but we cannot guarantee
the correct working or a smooth ride.
This will install a self-contained package with all the libraries needed.
You will need pip 20.3 at least
: please run
install
--upgrade
to update it beforehand.
The above package should work in most situations. It
will not work
some cases though.
If your platform is not supported you should proceed to a
local
installation
pure Python installation
See also
Did Psycopg 3 install ok? Great! You can now move on to the
basic
module usage
to learn how it works.
Keep on reading if the above method didn’t work and you need a different
way to install Psycopg 3.
For further information about the differences between the packages see
pq module implementations
A "Local installation" results in a performing and maintainable library. The
library will include the speed-up C module and will be linked to the system
libraries (
libpq
libssl
…) so that system upgrade of libraries will
upgrade the libraries used by Psycopg 3 too. This is the preferred way to
install Psycopg for a production site.
In order to perform a local installation you need some prerequisites:
to troubleshoot an extension build, for instance you must
be able to read your compiler’s error message. If you are not, please don’t
try this and follow the
binary installation
instead.
If your build prerequisites are in place you can run:
pip install "psycopg[c]"
extras you will obtain a pure Python
implementation. This is particularly handy to debug and hack, but it still
requires the system libpq to operate (which will be imported dynamically via
ctypes
In order to use the pure Python installation you will need the
libpq
installed in the system: for instance on Debian system you will probably
need:
sudo apt install libpq5
, the
PostgreSQL command line client, to connect to the database. On most
systems, installing
will install the
libpq
too as a
dependency.
If you are not able to fulfill this requirement please follow the
binary
installation
package with the right
interface and leaves the possibility of choosing a specific implementation
to the end user of your library.
If your project is a final application (e.g. a service running on a server)
you can require a specific implementation, for instance
psycopg[c]
after you have made sure that the prerequisites are met (e.g. the depending
libraries and tools are installed in the host machine).
In both cases you can specify which version of Psycopg to use using
requirement specifiers
If you want to make sure that a specific implementation is used you can
specify the
environment variable: importing the library
will fail if the implementation specified is not available. See
pq module implementations
# Connect to an existing database
with psycopg.connect("dbname=test user=postgres") as conn:
# Open a cursor to perform database operations
with conn.cursor() as cur:
# Execute a command: this creates a new table
cur.execute("""
CREATE TABLE test (
id serial PRIMARY KEY,
num integer,
data text)
""")
# Pass data to fill a query placeholders and let Psycopg perform
# the correct conversion (no SQL injections!)
cur.execute(
"INSERT INTO test (num, data) VALUES (%s, %s)",
(100, "abc'def"))
# Query the database and obtain data as Python objects.
cur.execute("SELECT * FROM test")
cur.fetchone()
# will return (1, 100, "abc'def")
# You can use `cur.fetchmany()`, `cur.fetchall()` to return a list
# of several records, or even iterate on the cursor
for record in cur:
print(record)
# Make the changes to the database persistent
conn.commit()
) will make sure
to close them and free their resources at the end of the block (notice that
this is different from psycopg2
When the block is exited, if there is a transaction open, it will be
committed. If an exception is raised within the block the transaction is
rolled back. In both cases the connection is closed. It is roughly the
equivalent of:
conn = psycopg.connect()
try:
... # use the connection
except BaseException:
conn.rollback()
else:
conn.commit()
finally:
conn.close()
statements to manage different transactions. This behaviour has
been considered non-standard and surprising so it has been replaced by the
more explicit
transaction()
block.
Note that, while the above pattern is what most people would use,
connect()
doesn’t enter a block itself, but returns an "un-entered" connection, so that
it is still possible to use a connection regardless of the code scope and the
developer is free to use (and responsible for calling)
commit()
rollback()
close()
as and where needed.
Warning
If a connection is just left to go out of scope, the way it will behave
with or without the use of a
block is different:
block when your intention is just to execute a
set of operations and then committing the result, which is the most usual
thing to do with a connection. If your connection life cycle and
transaction pattern is different, and want more control on it, the use
without
might be more convenient.
Transactions management
for more information.
only shows the default behaviour of
the adapter. Psycopg can be customised in several ways, to allow the smoothest
integration between your Python program and your PostgreSQL database:
If you want to customise the objects that the cursor returns, instead of
receiving tuples, you can specify your
row factories
If you want to customise how Python values and PostgreSQL types are mapped
into each other, beside the
basic type mapping
you can
configure your types
placeholders in the SQL statement, and
passing a sequence of values as the second argument of the function. For
example the Python function call:
cur.execute("""
INSERT INTO some_table (id, created_at, last_name)
VALUES (%s, %s, %s);
""",
(10, datetime.date(2020, 11, 18), "O'Reilly"))
Note that the parameters will not be really merged to the query: query and the
parameters are sent to the server separately: see
Server-side binding
for details.
Named arguments are supported too using
placeholders in the
query and specifying the values into a mapping. Using named arguments allows
to specify the values in any order and to repeat the same value in several
places in the query:
cur.execute("""
INSERT INTO some_table (id, created_at, updated_at, last_name)
VALUES (%(id)s, %(created)s, %(created)s, %(name)s);
""",
{'id': 10, 'name': "O'Reilly", 'created': datetime.date(2020, 11, 18)})
While the mechanism resembles regular Python strings manipulation, there are a
few subtle differences you should care about when passing parameters to a
query.
cur.execute("INSERT INTO foo VALUES (%s)", "bar") # WRONG
cur.execute("INSERT INTO foo VALUES (%s)", ("bar")) # WRONG
cur.execute("INSERT INTO foo VALUES (%s)", ("bar",)) # correct
cur.execute("INSERT INTO foo VALUES (%s)", ["bar"]) # correct
for floats) may look
more appropriate for the type. You may find other placeholders used in
Psycopg queries (
) but they are not related to the
type of the argument: see
Binary parameters and results
if you want to read more:
cur.execute("INSERT INTO numbers VALUES (%d)", (10,)) # WRONG
cur.execute("INSERT INTO numbers VALUES (%s)", (10,)) # correct
Only query values should be bound via this method: it shouldn’t be used to
merge table or field names to the query. If you need to generate SQL queries
dynamically (for instance choosing a table name at runtime) you can use the
functionalities provided in the
psycopg.sql
module:
cur.execute("INSERT INTO %s VALUES (%s)", ('numbers', 10)) # WRONG
cur.execute( # correct
SQL("INSERT INTO {} VALUES (%s)").format(Identifier('numbers')),
(10,))
The SQL representation of many data types is often different from their Python
string representation. The typical example is with single quotes in strings:
in SQL single quotes are used as string literal delimiters, so the ones
appearing inside the string itself must be escaped, whereas in Python single
quotes can be left unescaped if the string is delimited by double quotes.
Because of the difference, sometimes subtle, between the data types
representations, a naïve approach to query strings composition, such as using
Python strings concatenation, is a recipe for
terrible
problems:
SQL = "INSERT INTO authors (name) VALUES ('%s')" # NEVER DO THIS
data = ("O'Reilly", )
cur.execute(SQL % data) # THIS WILL FAIL MISERABLY
# SyntaxError: syntax error at or near "Reilly"
If the variables containing the data to send to the database come from an
untrusted source (such as data coming from a form on a web site) an attacker
could easily craft a malformed string, either gaining access to unauthorized
data or performing destructive operations on the database. This form of attack
is called
SQL injection
and is known to be one of the most widespread forms
of attack on database systems. Before continuing, please print
this page
as a memo and hang it onto your desk.
Psycopg can
automatically convert Python objects to SQL
values
: using this feature your code will be more robust
and reliable. We must stress this point:
Warning
Don’t manually merge values to a query: hackers from a foreign country
will break into your computer and steal not only your disks, but also
your cds, leaving you only with the three most embarrassing records you
ever bought. On cassette tapes.
operator to merge values to a query, con artists
will seduce your cat, who will run away taking your credit card
and your sunglasses with them.
to merge a textual value to a string, bad guys in
balaclava will find their way to your fridge, drink all your beer, and
leave your toilet seat up and your toilet paper in the wrong orientation.
SQL = "INSERT INTO authors (name) VALUES (%s)" # Note: no quotes
data = ("O'Reilly", )
cur.execute(SQL, data) # Note: no % operator
Python static code checkers are not quite there yet, but, in the future,
it will be possible to check your code for improper use of string
expressions in queries. See
Checking literal strings in queries
for details.
available most of the times but not always. Usually the binary format is more
efficient to use.
Psycopg can support both formats for each data type. Whenever a value
is passed to a query using the normal
placeholder, the best format
available is chosen (often, but not always, the binary format is picked as the
best choice).
If you have a reason to select explicitly the binary format or the text format
for a value you can use respectively a
placeholder or a
placeholder instead of the normal
execute()
will fail if a
Dumper
for the right data type and format is not available.
The same two formats, text or binary, are used by PostgreSQL to return data
from a query to the client. Unlike with parameters, where you can choose the
format value-by-value, all the columns returned by a query will have the same
format. Every type returned by the query should have a
Loader
configured, otherwise the data will be returned as unparsed
(for text
results) or buffer (for binary results).
pg_type
table defines which format is supported for each PostgreSQL
data type. Text input/output is managed by the functions declared in the
typinput
typoutput
fields (always present), binary
input/output is managed by the
typsend
typreceive
(which are
optional).
Because not every PostgreSQL type supports binary output, by default, the data
will be returned in text format. In order to return data in binary format you
can create the cursor using
Connection.cursor
. A case in which
requesting binary results is a clear winner is when you have large binary data
in the database, such as images:
cur.execute(
"SELECT image_data FROM images WHERE id = %s", [image_id], binary=True)
data = cur.fetchone()[0]
Many standard Python types are adapted into SQL and returned as Python
objects when a query is executed.
Converting the following data types between Python and PostgreSQL works
out-of-the-box and doesn’t require any configuration. In case you need to
customise the conversion you should take a look at
Data adaptation configuration
, according to their numeric
value. Psycopg will choose the smallest data type available, because
PostgreSQL can automatically cast a type up (e.g. passing a
smallint
where
PostgreSQL expect an
integer
is gladly accepted) but will not cast down
automatically (e.g. if a function has an
integer
argument, passing it
bigint
value will fail, even if the value is 1).
conn = psycopg.connect()
conn.execute(
"INSERT INTO menu (id, entry) VALUES (%s, %s)",
(1, "Crème Brûlée at 4.99€"))
conn.execute("SELECT entry FROM menu WHERE id = 1").fetchone()[0]
'Crème Brûlée at 4.99€'
attribute. If your database and connection are in UTF-8 encoding you will
likely have no problem, otherwise you will have to make sure that your
application only deals with the non-ASCII chars that the database can handle;
failing to do so may result in encoding/decoding errors:
# The encoding is set at connection time according to the db configuration
conn.info.encoding
'utf-8'
# The Latin-9 encoding can manage some European accented letters
# and the Euro symbol
conn.execute("SET client_encoding TO LATIN9")
conn.execute("SELECT entry FROM menu WHERE id = 1").fetchone()[0]
'Crème Brûlée at 4.99€'
# The Latin-1 encoding doesn't have a representation for the Euro symbol
conn.execute("SET client_encoding TO LATIN1")
conn.execute("SELECT entry FROM menu WHERE id = 1").fetchone()[0]
# Traceback (most recent call last)
# ...
# UntranslatableCharacter: character with byte sequence 0xe2 0x82 0xac
# in encoding "UTF8" has no equivalent in encoding "LATIN1"
conn.execute("SET client_encoding TO SQL_ASCII")
conn.execute("SELECT entry FROM menu WHERE id = 1").fetchone()[0]
b'Cr\xc3\xa8me Br\xc3\xbbl\xc3\xa9e at 4.99\xe2\x82\xac'
If you are storing large binary data in bytea fields (such as binary documents
or images) you should probably use the binary format to pass and return
values, otherwise binary data will undergo
ASCII escaping
, taking some CPU
time and more bandwidth. See
Binary parameters and results
for details.
zoneinfo.ZoneInfo(key='Europe/London')
>>> conn.execute("select '2048-07-08 12:00'::timestamptz").fetchone()[0]
datetime.datetime(2048, 7, 8, 12, 0, tzinfo=zoneinfo.ZoneInfo(key='Europe/London'))
doesn’t store "a timestamp with a timezone
attached": it stores a timestamp always in UTC, which is converted, on
output, to the connection TimeZone setting:
>>> conn.execute("SET TIMEZONE to 'Europe/Rome'") # UTC+2 in summer
>>> conn.execute("SELECT '2042-07-01 12:00Z'::timestamptz").fetchone()[0] # UTC input
datetime.datetime(2042, 7, 1, 14, 0, tzinfo=zoneinfo.ZoneInfo(key='Europe/Rome'))
Because several Python objects could be considered JSON (dicts, lists,
scalars, even date/time if using a dumps function customised to use them),
Psycopg requires you to wrap the object to dump as JSON into a wrapper:
either
psycopg.types.json.Json
Jsonb
from psycopg.types.json import Jsonb
thing = {"foo": ["bar", 42]}
conn.execute("INSERT INTO mytable VALUES (%s)", [Jsonb(thing)])
functions to serialize and de-serialize Python objects to JSON. If you want to
customise how serialization happens, for instance changing serialization
parameters or using a different JSON library, you can specify your own
functions using the
psycopg.types.json.set_json_dumps()
set_json_loads()
functions, to apply either globally or
to a specific context (connection or cursor).
from functools import partial
from psycopg.types.json import Jsonb, set_json_dumps, set_json_loads
import ujson
# Use a faster dump function
set_json_dumps(ujson.dumps)
# Return floating point values as Decimal, just in one connection
set_json_loads(partial(json.loads, parse_float=Decimal), conn)
conn.execute("SELECT %s", [Jsonb({"value": 123.45})]).fetchone()[0]
# {'value': Decimal('123.45')}
If you need an even more specific dump customisation only for certain objects
(including different configurations in the same query) you can specify a
dumps
parameter in the
Jsonb
wrapper, which will
take precedence over what is specified by
set_json_dumps()
from uuid import UUID, uuid4
class UUIDEncoder(json.JSONEncoder):
"""A JSON encoder which can dump UUID."""
def default(self, obj):
if isinstance(obj, UUID):
return str(obj)
return json.JSONEncoder.default(self, obj)
uuid_dumps = partial(json.dumps, cls=UUIDEncoder)
obj = {"uuid": uuid4()}
cnn.execute("INSERT INTO objs VALUES %s", [Json(obj, dumps=uuid_dumps)])
# will insert: {'uuid': '0a40799d-3980-4c65-8315-2956b18ab0e1'}
and back. Only
lists containing objects of the same type can be dumped to PostgreSQL (but the
list may contain
elements).
If you have a list of values which you want to use with the
operator… don’t. It won’t work (neither with a list nor with a tuple):
>>> conn.execute("SELECT * FROM mytable WHERE id IN %s", [[10,20,30]])
Traceback (most recent call last):
File "", line 1, in
psycopg.errors.SyntaxError: syntax error at or near "$1"
LINE 1: SELECT * FROM mytable WHERE id IN $1
>>> conn.execute("select gen_random_uuid()").fetchone()[0]
UUID('97f0dd62-3bd2-459e-89b8-a5e36ea3c16c')
>>> from uuid import uuid4
>>> conn.execute("select gen_random_uuid() = %s", [uuid4()]).fetchone()[0]
False # long shot
>>> conn.execute("select '192.168.0.1'::inet, '192.168.0.1/24'::inet").fetchone()
(IPv4Address('192.168.0.1'), IPv4Interface('192.168.0.1/24'))
>>> conn.execute("select '::ffff:1.2.3.0/120'::cidr").fetchone()[0]
IPv6Network('::ffff:102:300/120')
classes are dumped as normal strings, using their member
names as value. The unknown oid is used, so PostgreSQL should be able to
use this string in most contexts (such as an enum or a text field).
Changed in version 3.1:
In previous version dumping pure enums is not supported and raise a
"cannot adapt" error.
After registering, fetching data of the registered enum will cast
PostgreSQL enum labels into corresponding Python enum members.
If no
is specified, a new
is created based on
PostgreSQL enum labels.
>>> from enum import Enum, auto
>>> from psycopg.types.enum import EnumInfo, register_enum
>>> class UserRole(Enum):
... ADMIN = auto()
... EDITOR = auto()
... GUEST = auto()
>>> conn.execute("CREATE TYPE user_role AS ENUM ('ADMIN', 'EDITOR', 'GUEST')")
>>> info = EnumInfo.fetch(conn, "user_role")
>>> register_enum(info, conn, UserRole)
>>> some_editor = info.enum.EDITOR
>>> some_editor
>>> conn.execute(
... "SELECT pg_typeof(%(editor)s), %(editor)s",
... {"editor": some_editor}
... ).fetchone()
('user_role', )
>>> conn.execute(
... "SELECT ARRAY[%s, %s]",
... [UserRole.ADMIN, UserRole.GUEST]
... ).fetchone()
[, ]
If the Python and the PostgreSQL enum don’t match 1:1 (for instance if members
have a different name, or if more than one Python enum should map to the same
PostgreSQL enum, or vice versa), you can specify the exceptions using the
mapping
parameter.
should be a dictionary with Python enum members as keys and the
matching PostgreSQL enum labels as values, or a list of
(member,
label)
pairs with the same meaning (useful when some members are repeated). Order
matters: if an element on either side is specified more than once, the last
pair in the sequence will take precedence:
# Legacy roles, defined in medieval times.
>>> conn.execute(
... "CREATE TYPE abbey_role AS ENUM ('ABBOT', 'SCRIBE', 'MONK', 'GUEST')")
>>> info = EnumInfo.fetch(conn, "abbey_role")
>>> register_enum(info, conn, UserRole, mapping=[
... (UserRole.ADMIN, "ABBOT"),
... (UserRole.EDITOR, "SCRIBE"),
... (UserRole.EDITOR, "MONK")])
>>> conn.execute("SELECT '{ABBOT,SCRIBE,MONK,GUEST}'::abbey_role[]").fetchone()[0]
[<UserRole.ADMIN: 1>,
<UserRole.EDITOR: 2>,
<UserRole.EDITOR: 2>,
<UserRole.GUEST: 3>]
>>> conn.execute("SELECT %s::text[]", [list(UserRole)]).fetchone()[0]
['ABBOT', 'MONK', 'GUEST']
... "CREATE TYPE lowercase_role AS ENUM ('admin', 'editor', 'guest')")
>>> info = EnumInfo.fetch(conn, "lowercase_role")
>>> register_enum(
... info, conn, LowercaseRole, mapping={m: m.value for m in LowercaseRole})
>>> conn.execute("SELECT 'editor'::lowercase_role").fetchone()[0]
Registering the adapters doesn’t affect objects already created, even
if they are children of the registered context. For instance,
registering the adapter globally doesn’t affect already existing
connections.
After registering, fetching data of the registered composite will invoke
factory
to create corresponding Python objects.
If no factory is specified, a
namedtuple
is created and used
to return data.
If the
factory
is a type (and not a generic callable), then dumpers for
that type are created and registered too, so that passing objects of that
type to a query will adapt them to the registered type.
>>> from psycopg.types.composite import CompositeInfo, register_composite
>>> conn.execute("CREATE TYPE card AS (value int, suit text)")
>>> info = CompositeInfo.fetch(conn, "card")
>>> register_composite(info, conn)
>>> my_card = info.python_type(8, "hearts")
>>> my_card
card(value=8, suit='hearts')
>>> conn.execute(
... "SELECT pg_typeof(%(card)s), (%(card)s).suit", {"card": my_card}
... ).fetchone()
('card', 'hearts')
>>> conn.execute("SELECT (%s, %s)::card", [1, "spades"]).fetchone()[0]
card(value=1, suit='spades')
>>> conn.execute("CREATE TYPE card_back AS (face card, back text)")
>>> info2 = CompositeInfo.fetch(conn, "card_back")
>>> register_composite(info2, conn)
>>> conn.execute("SELECT ((8, 'hearts'), 'blue')::card_back").fetchone()[0]
card_back(face=card(value=8, suit='hearts'), back='blue')
are a family of data types representing a range of
values between two elements. The type of the element is called the range
subtype
. PostgreSQL offers a few built-in range types and allows the
definition of custom ones.
All the PostgreSQL range types are loaded as the
Range
Python type, which is a
Generic
type and can hold bounds of
different types.
This Python type is only used to pass and retrieve range values to and
from PostgreSQL and doesn’t attempt to replicate the PostgreSQL range
features: it doesn’t perform normalization and doesn’t implement all the
operators
supported by the database.
PostgreSQL will perform normalisation on
Range
objects used as query
parameters, so, when they are fetched back, they will be found in the
normal form (for instance ranges on integers will have
bounds).
operator
(checking if an element is within the range). They can be tested for
equivalence. Empty ranges evaluate to
False
in a boolean context,
nonempty ones evaluate to
Registering the adapters doesn’t affect objects already created, even
if they are children of the registered context. For instance,
registering the adapter globally doesn’t affect already existing
connections.
>>> from psycopg.types.range import Range, RangeInfo, register_range
>>> conn.execute("CREATE TYPE strrange AS RANGE (SUBTYPE = text)")
>>> info = RangeInfo.fetch(conn, "strrange")
>>> register_range(info, conn)
>>> conn.execute("SELECT pg_typeof(%s)", [Range("a", "z")]).fetchone()[0]
'strrange'
>>> conn.execute("SELECT '[a,z]'::strrange").fetchone()[0]
Range('a', 'z', '[]')
type representing a disjoint set of ranges. A multirange is
automatically available for every range, built-in and user-defined.
All the PostgreSQL range types are loaded as the
Multirange
Python type, which is a mutable
sequence of
Range
elements.
This Python type is only used to pass and retrieve multirange values to
and from PostgreSQL and doesn’t attempt to replicate the PostgreSQL
multirange features: overlapping items are not merged, empty ranges are
not discarded, the items are not ordered, the behaviour of
multirange
operators
is not replicated in Python.
PostgreSQL will perform normalisation on
Multirange
objects used as
query parameters, so, when they are fetched back, they will be found
ordered, with overlapping ranges merged, etc.
Registering the adapters doesn’t affect objects already created, even
if they are children of the registered context. For instance,
registering the adapter globally doesn’t affect already existing
connections.
>>> from psycopg.types.multirange import \
... Multirange, MultirangeInfo, register_multirange
>>> from psycopg.types.range import Range
>>> conn.execute("CREATE TYPE strrange AS RANGE (SUBTYPE = text)")
>>> info = MultirangeInfo.fetch(conn, "strmultirange")
>>> register_multirange(info, conn)
>>> rec = conn.execute(
... "SELECT pg_typeof(%(mr)s), %(mr)s",
... {"mr": Multirange([Range("a", "q"), Range("l", "z")])}).fetchone()
>>> rec[0]
'strmultirange'
>>> rec[1]
Multirange([Range('a', 'z', '[)')])
data type is a key-value store embedded in PostgreSQL. It
supports GiST or GIN indexes allowing search by keys or key/value pairs as
well as regular BTree indexes for equality, uniqueness etc.
Psycopg can convert Python
objects to and from
hstore
structures.
Only dictionaries with string keys and values are supported.
is also
allowed as value but not as a key.
In order to use the
hstore
data type it is necessary to load it in a
database using:
=# CREATE EXTENSION hstore;
Registering the adapters doesn’t affect objects already created, even
if they are children of the registered context. For instance,
registering the adapter globally doesn’t affect already existing
connections.
>>> from psycopg.types.hstore import register_hstore
>>> info = TypeInfo.fetch(conn, "hstore")
>>> register_hstore(info, conn)
>>> conn.execute("SELECT pg_typeof(%s)", [{"a": "b"}]).fetchone()[0]
'hstore'
>>> conn.execute("SELECT 'foo => bar'::hstore").fetchone()[0]
{'foo': 'bar'}
instances. Likewise,
you may want to store such instances in the database and have the conversion
happen automatically.
Warning
Psycopg doesn’t have a dependency on the
shapely
package: you should
install the library as an additional dependency of your project.
After invoking this function on an adapter, the queries retrieving
PostGIS geometry objects will return Shapely’s shape object instances
both in text and binary mode.
Similarly, shape objects can be sent to the database.
This requires the Shapely library to be installed.
Parameters
Registering the adapters doesn’t affect objects already created, even
if they are children of the registered context. For instance,
registering the adapter globally doesn’t affect already existing
connections.
>>> from psycopg.types import TypeInfo
>>> from psycopg.types.shapely import register_shapely
>>> from shapely.geometry import Point
>>> info = TypeInfo.fetch(conn, "geometry")
>>> register_shapely(info, conn)
>>> conn.execute("SELECT pg_typeof(%s)", [Point(1.2, 3.4)]).fetchone()[0]
'geometry'
>>> conn.execute("""
... SELECT ST_GeomFromGeoJSON('{
... "type":"Point",
... "coordinates":[-48.23456,20.12345]}')
... """).fetchone()[0]
>>> conn2 = psycopg.connect(CONN_STR)
>>> conn2.execute("""
... SELECT ST_GeomFromGeoJSON('{
... "type":"Point",
... "coordinates":[-48.23456,20.12345]}')
... """).fetchone()[0]
'0101000020E61000009279E40F061E48C0F2B0506B9A1F3440'
: by default, any database operation will start a new
transaction. As a consequence, changes made by any cursor of the connection
will not be visible until
Connection.commit()
is called, and will be
discarded by
Connection.rollback()
. The following operation on the same
connection will start a new transaction.
If a database operation fails, the server will refuse further commands, until
rollback()
is called.
If the cursor is closed with a transaction open, no COMMIT command is sent to
the server, which will then discard the connection. Certain middleware (such
as PgBouncer) will also discard a connection left in transaction state, so, if
possible you will want to commit or rollback a connection before finishing
working with it.
An example of what will happen, the first time you will use Psycopg (and to be
disappointed by it), is likely:
conn = psycopg.connect()
# Creating a cursor doesn't start a transaction or affect the connection
# in any way.
cur = conn.cursor()
cur.execute("SELECT count(*) FROM my_table")
# This function call executes:
# - BEGIN
# - SELECT count(*) FROM my_table
# So now a transaction has started.
# If your program spends a long time in this state, the server will keep
# a connection "idle in transaction", which is likely something undesired
cur.execute("INSERT INTO data VALUES (%s)", ("Hello",))
# This statement is executed inside the transaction
conn.close()
# No COMMIT was sent: the INSERT was discarded.
, Psycopg will commit the
connection at the end of the block (or roll it back if the block is exited
with an exception):
The code modified using a connection context will result in the following
sequence of database statements:
with psycopg.connect() as conn:
cur = conn.cursor()
cur.execute("SELECT count(*) FROM my_table")
# This function call executes:
# - BEGIN
# - SELECT count(*) FROM my_table
# So now a transaction has started.
cur.execute("INSERT INTO data VALUES (%s)", ("Hello",))
# This statement is executed inside the transaction
# No exception at the end of the block:
# COMMIT is executed.
and the database operations actually have a persistent effect. The code might
still do something you don’t expect: keep a transaction from the first
operation to the connection closure. You can have a finer control over the
transactions using an
autocommit transaction
and/or
transaction contexts
Warning
By default even a simple
SELECT
will start a transaction: in
long-running programs, if no further action is taken, the session will
remain
idle in transaction
, an undesirable condition for several
reasons (locks are held by the session, tables bloat…). For long lived
scripts, either make sure to terminate a transaction as soon as possible or
use an
autocommit
connection.
If a database operation fails with an error message such as
InFailedSqlTransaction: current transaction is aborted, commands ignored
until end of transaction block
, it means that
a previous operation
failed
and the database session is in a state of error. You need to call
rollback()
if you want to keep on using the same connection.
parameter. This may be required to run operations that cannot be executed
inside a transaction, such as
CREATE
DATABASE
VACUUM
stored procedures
using transaction control.
With an autocommit transaction, the above sequence of operation results in:
with psycopg.connect(autocommit=True) as conn:
cur = conn.cursor()
cur.execute("SELECT count(*) FROM my_table")
# This function call now only executes:
# - SELECT count(*) FROM my_table
# and no transaction starts.
cur.execute("INSERT INTO data VALUES (%s)", ("Hello",))
# The result of this statement is persisted immediately by the database
# The connection is closed at the end of the block but, because it is not
# in a transaction state, no COMMIT is executed.
would expect. This has a beneficial performance effect, because less queries
are sent and less operations are performed by the database. The statements,
however, are not executed in an atomic transaction; if you need to execute
certain operations inside a transaction, you can achieve that with an
autocommit connection too, using an explicit
transaction block
A more transparent way to make sure that transactions are finalised at the
right time is to use
to create a
transaction context. When the context is entered, a transaction is started;
when leaving the context the transaction is committed, or it is rolled back if
an exception is raised inside the block.
Continuing the example above, if you want to use an autocommit connection but
still wrap selected groups of commands inside an atomic transaction, you can
use a
transaction()
context:
with psycopg.connect(autocommit=True) as conn:
cur = conn.cursor()
cur.execute("SELECT count(*) FROM my_table")
# The connection is autocommit, so no BEGIN executed.
with conn.transaction():
# BEGIN is executed, a transaction started
cur.execute("INSERT INTO data VALUES (%s)", ("Hello",))
cur.execute("INSERT INTO times VALUES (now())")
# These two operation run atomically in the same transaction
# COMMIT is executed at the end of the block.
# The connection is in idle state again.
# The connection is closed at the end of the block.
Note that connection blocks can also be used with non-autocommit connections:
in this case you still need to pay attention to eventual transactions started
automatically. If an operation starts an implicit transaction, a
transaction()
block will only manage
a savepoint sub-transaction
, leaving the caller to deal with the main transaction,
as explained in
Transactions management
conn = psycopg.connect()
cur = conn.cursor()
cur.execute("SELECT count(*) FROM my_table")
# This function call executes:
# - BEGIN
# - SELECT count(*) FROM my_table
# So now a transaction has started.
with conn.transaction():
# The block starts with a transaction already open, so it will execute
# - SAVEPOINT
cur.execute("INSERT INTO data VALUES (%s)", ("Hello",))
# The block was executing a sub-transaction so on exit it will only run:
# - RELEASE SAVEPOINT
# The transaction is still on.
conn.close()
# No COMMIT was sent: the INSERT was discarded.
block starts when no transaction is active then it will
manage a proper transaction. In essence, a transaction context tries to leave
a connection in the state it found it, and leaves you to deal with the wider
context.
The interaction between non-autocommit transactions and transaction
contexts is probably surprising. Although the non-autocommit default is
what’s demanded by the DBAPI, the personal preference of several experienced
developers is to:
): an exception raised inside an inner block
has a chance of being handled and not completely fail outer operations. The
following is an example where a series of operations interact with the
database: operations are allowed to fail; at the end we also want to store the
number of operations successfully processed.
with conn.transaction() as tx1:
num_ok = 0
for operation in operations:
try:
with conn.transaction() as tx2:
unreliable_operation(conn, operation)
except Exception:
logger.exception(f"{operation} failed")
else:
num_ok += 1
save_number_of_successes(conn, num_ok)
causes an error, including an operation causing a
database error, all its changes will be reverted. The exception bubbles up
outside the block: in the example it is intercepted by the
so that the
loop can complete. The outermost block is unaffected (unless other errors
happen there).
You can also write code to explicitly roll back any currently active
transaction block, by raising the
Rollback
exception. The exception "jumps"
to the end of a transaction block, rolling back its transaction but allowing
the program execution to continue from there. By default the exception rolls
back the innermost transaction block, but any current block can be specified
as the target. In the following example, a hypothetical
CancelCommand
may stop the processing and cancel any operation previously performed,
but not entirely committed yet.
from psycopg import Rollback
with conn.transaction() as outer_tx:
for command in commands():
with conn.transaction() as inner_tx:
if isinstance(command, CancelCommand):
raise Rollback(outer_tx)
process_command(command)
# If `Rollback` is raised, it would propagate only up to this block,
# and the program would continue from here with no exception.
for the transactions that Psycopg
handles. They affect the transactions started implicitly by non-autocommit
transactions and the ones started explicitly by
Connection.transaction()
both autocommit and non-autocommit transactions. Leaving these parameters as
will use the server’s default behaviour (which is controlled
by server settings such as
default_transaction_isolation
In order to set these parameters you can use the connection attributes
isolation_level
read_only
deferrable
. For async connections you must use the equivalent
set_isolation_level()
method and similar. The parameters
can only be changed if there isn’t a transaction already active on the
connection.
Warning
Applications running at
REPEATABLE_READ
SERIALIZABLE
isolation level are exposed to serialization
failures.
In certain concurrent update cases
, PostgreSQL will raise an
exception looking like:
psycopg2.errors.SerializationFailure: could not serialize access
due to concurrent update
For a particular global transaction, the first two components will be the same
for all the resources. Every resource will be assigned a different branch
qualifier.
According to the DBAPI specification, a transaction ID is created using the
Connection.xid()
method. Once you have a transaction id, a distributed
transaction can be started with
Connection.tpc_begin()
, prepared using
tpc_prepare()
and completed using
tpc_commit()
tpc_rollback()
. Transaction IDs can also be retrieved from the
database using
tpc_recover()
and completed using the above
tpc_commit()
tpc_rollback()
PostgreSQL doesn’t follow the XA standard though, and the ID for a PostgreSQL
prepared transaction can be any string up to 200 characters long. Psycopg’s
objects can represent both XA-style transactions IDs (such as the ones
created by the
xid()
method) and PostgreSQL transaction IDs identified by
an unparsed string.
The format in which the Xids are converted into strings passed to the
database is the same employed by the
PostgreSQL JDBC driver
: this should
allow interoperation between tools written in Python and in Java. For example
a recovery tool written in Python would be able to recognize the components of
transactions produced by a Java program.
For further details see the documentation for the
Two-Phase Commit support methods
one of the most efficient ways to load data into the database (and to modify
it, with some SQL creativity).
Copy is supported using the
Cursor.copy()
method, passing it a query of the
STDIN
STDOUT
, and managing the
resulting
object in a
block:
with cursor.copy("COPY table_name (col1, col2) FROM STDIN") as copy:
# pass data to the 'copy' object using write()/write_row()
with cursor.copy(
sql.SQL("COPY {} TO STDOUT").format(sql.Identifier("table_name"))
) as copy:
# read data from the 'copy' object using read()/read_row()
with cur.copy("COPY (SELECT * FROM table_name LIMIT %s) TO STDOUT", (3,)) as copy:
# expect no more than three records
The connection is subject to the usual transaction behaviour, so, unless the
connection is in autocommit, at the end of the COPY operation you will still
have to commit the pending changes and you can still roll them back. See
Transactions management
for details.
Using a copy operation you can load data into the database from any Python
iterable (a list of tuples, or any iterable of sequences): the Python values
are adapted as they would be in normal querying. To perform such operation use
STDIN
Cursor.copy()
and use
write_row()
on the resulting object in a
block. On exiting the block the
operation will be concluded:
records = [(10, 20, "hello"), (40, None, "world")]
with cursor.copy("COPY sample (col1, col2, col3) FROM STDIN") as copy:
for record in records:
copy.write_row(record)
If an exception is raised inside the block, the operation is interrupted and
the records inserted so far are discarded.
In order to read or write from
row-by-row you must not specify
options such as
FORMAT
DELIMITER
please leave these details alone, thank you :)
. However this is not something you
may want to do normally: usually the normal query process will be easier to
PostgreSQL, currently, doesn’t give complete type information on
, so the rows returned will have unparsed data, as strings or bytes,
according to the format.
with cur.copy("COPY (VALUES (10::int, current_date)) TO STDOUT") as copy:
for row in copy.rows():
print(row) # return unparsed data: ('10', '2046-12-24')
with cur.copy("COPY (VALUES (10::int, current_date)) TO STDOUT") as copy:
copy.set_types(["int4", "date"])
for row in copy.rows():
print(row) # (10, datetime.date(2046, 12, 24))
If data is already formatted in a way suitable for copy (for instance because
it is coming from a file resulting from a previous
operation) it can
be loaded into the database using
Copy.write()
instead.
with open("data", "r") as f:
with cursor.copy("COPY data FROM STDIN") as copy:
while data := f.read(BLOCK_SIZE):
copy.write(data)
with open("data.out", "wb") as f:
with cursor.copy("COPY table_name TO STDOUT") as copy:
for data in copy:
f.write(data)
, all the
types passed to the database must have a binary dumper registered; this is not
necessary if the data is copied
block-by-block
using
write()
Warning
PostgreSQL is particularly finicky when loading data in binary mode and
will apply
no cast rules
. This means, for example, that passing the
value 100 to an
integer
column
will fail
, because Psycopg will pass
it as a
smallint
value, and the server will reject it because its size
doesn’t match what expected.
You can work around the problem using the
set_types()
method of
object and specifying carefully the types to load.
async with cursor.copy("COPY data FROM STDIN") as copy:
while data := await f.read():
await copy.write(data)
In order to copy a table, or a portion of a table, across servers, you can use
two COPY operations on two different connections, reading from the first and
writing to the second.
with psycopg.connect(dsn_src) as conn1, psycopg.connect(dsn_tgt) as conn2:
with conn1.cursor().copy("COPY src TO STDOUT (FORMAT BINARY)") as copy1:
with conn2.cursor().copy("COPY tgt FROM STDIN (FORMAT BINARY)") as copy2:
for data in copy1:
copy2.write(data)
Psycopg 3 uses the common DBAPI structure of many other database adapters and
tries to behave as close as possible to
psycopg2
. There are however a few
differences to be aware of.
Most of the times, the workarounds suggested here will work with both
Psycopg 2 and 3, which could be useful if you are porting a program or
writing a program that should work with both Psycopg 2 and 3.
Psycopg 3 sends the query and the parameters to the server separately, instead
of merging them on the client side. Server-side binding works for normal
SELECT
and data manipulation statements (
INSERT
UPDATE
DELETE
), but it doesn’t work with many other statements. For instance,
it doesn’t work with
or with
NOTIFY
>>> conn.execute("SET TimeZone TO %s", ["UTC"])
Traceback (most recent call last):
psycopg.errors.SyntaxError: syntax error at or near "$1"
LINE 1: SET TimeZone TO $1
>>> conn.execute("NOTIFY %s, %s", ["chan", 42])
Traceback (most recent call last):
psycopg.errors.SyntaxError: syntax error at or near "$1"
LINE 1: NOTIFY $1, $2
>>> conn.execute("CREATE TABLE foo (id int DEFAULT %s)", [42])
Traceback (most recent call last):
psycopg.errors.UndefinedParameter: there is no parameter $1
LINE 1: CREATE TABLE foo (id int DEFAULT $1)
>>> from psycopg import sql
# This will quote the user and the password using the right quotes
>>> conn.execute(
... sql.SQL("ALTER USER {} SET PASSWORD {}")
... .format(sql.Identifier(username), password))
>>> conn.execute(
... "INSERT INTO foo VALUES (%s); INSERT INTO foo VALUES (%s)",
... (10, 20))
Traceback (most recent call last):
psycopg.errors.SyntaxError: cannot insert multiple commands into a prepared statement
There is no such limitation if no parameters are used. As a consequence, you
can compose a multiple query on the client side and run them all in the same
execute()
call, using the
psycopg.sql
objects:
>>> from psycopg import sql
>>> conn.execute(
... sql.SQL("INSERT INTO foo VALUES ({}); INSERT INTO foo values ({})"
... .format(10, 20))
>>> cur = psycopg.ClientCursor(conn)
>>> cur.execute(
... "INSERT INTO foo VALUES (%s); INSERT INTO foo VALUES (%s)",
... (10, 20))
>>> conn.autocommit = True
>>> conn.execute("CREATE DATABASE foo; SELECT 1")
Traceback (most recent call last):
psycopg.errors.ActiveSqlTransaction: CREATE DATABASE cannot run inside a transaction block
This happens because PostgreSQL will wrap multiple statements in a transaction
itself and is different from how
behaves (
split the queries on semicolons and send them separately). This is not new in
Psycopg 3: the same limitation is present in
psycopg2
>>> conn.execute("SELECT json_build_array(%s, %s)", ["foo", "bar"])
Traceback (most recent call last):
psycopg.errors.IndeterminateDatatype: could not determine data type of parameter $1
>>> conn.execute("SELECT * FROM foo WHERE id IN %s", [(10,20,30)])
Traceback (most recent call last):
psycopg.errors.SyntaxError: syntax error at or near "$1"
LINE 1: SELECT * FROM foo WHERE id IN $1
construct and pass the candidate
values as a list instead of a tuple, which will be adapted to a PostgreSQL
array:
>>> conn.execute("SELECT * FROM foo WHERE id = ANY(%s)", [[10,20,30]])
too, and has the advantage of
accepting an empty list of values too as argument, which is not supported by
operator instead.
The adaptation system has been completely rewritten, in order to address
server-side parameters adaptation, but also to consider performance,
flexibility, ease of customization.
The default behaviour with builtin data should be
what you would expect
. If you have customised the way to adapt data, or if you
are managing your own extension types, you should look at the
adaptation system
See also
. Their file-based interface doesn’t make it easy to load
dynamically-generated data into a database.
There is now a single
copy()
method, which is similar to
psycopg2
command and
returns an object to read/write data, block-wise or record-wise. The different
usage pattern also enables
to be used in async interactions.
See also
Using COPY TO and COPY FROM
for the details.
only the transaction is closed, not the connection. This behaviour is
surprising for people used to several other Python classes wrapping resources,
such as files.
In Psycopg 3, using
with connection
will close the
connection at the end of the
block, making handling the connection
resources more familiar.
In order to manage transactions as blocks you can use the
Connection.transaction()
method, which allows for finer control, for
instance to use nested transactions.
See also
Transaction contexts
for details.
is not implemented. The method has a simplistic semantic
which doesn’t account for PostgreSQL positional parameters, procedures,
set-returning functions… Use a normal
execute()
SELECT
function_name(...)
procedure_name(...)
instead.
PostgreSQL can represent a much wider range of dates and timestamps than
Python. While Python dates are limited to the years between 1 and 9999
(represented by constants such as
datetime.date.min
), PostgreSQL dates extend to BC dates and past the year
10K. Furthermore PostgreSQL can also represent symbolic dates "infinity", in
both directions.
In psycopg2, by default,
infinity dates and timestamps map to ‘date.max’
and similar constants. This has the problem of creating a non-bijective
mapping (two Postgres dates, infinity and 9999-12-31, both map to the same
Python date). There is also the perversity that valid Postgres dates, greater
than Python
date.max
but arguably lesser than infinity, will still
overflow.
In Psycopg 3, every date greater than year 9999 will overflow, including
infinity. If you would like to customize this mapping (for instance flattening
every date past Y10K on
date.max
) you can subclass and adapt the
appropriate loaders: take a look at
this example
to see how.
The design of the asynchronous objects is pretty much the same of the sync
ones: in order to use them you will only have to scatter the
await
keyword
here and there.
async with await psycopg.AsyncConnection.connect(
"dbname=test user=postgres") as aconn:
async with aconn.cursor() as acur:
await acur.execute(
"INSERT INTO test (num, data) VALUES (%s, %s)",
(100, "abc'def"))
await acur.execute("SELECT * FROM test")
await acur.fetchone()
# will return (1, 100, "abc'def")
async for record in acur:
print(record)
with psycopg.connect("dbname=test user=postgres") as conn:
with conn.cursor() as cur:
cur.execute(...)
# the cursor is closed upon leaving the context
# the transaction is committed, the connection closed
. That’s because there are several use
cases where it’s useful to handle the objects manually and only
close()
when required.
As a consequence you cannot use
async
connect()
: you have to do it in
two steps instead, as in
aconn = await psycopg.AsyncConnection.connect()
async with aconn:
async with aconn.cursor() as cur:
await cur.execute(...)
async with await psycopg.AsyncConnection.connect() as aconn:
async with aconn.cursor() as cur:
await cur.execute(...)
If a long running operation is interrupted by a Ctrl-C on a normal connection
running in the main thread, the operation will be cancelled and the connection
will be put in error state, from which can be recovered with a normal
rollback()
If the query is running in an async connection, a Ctrl-C will be likely
intercepted by the async loop and interrupt the whole program. In order to
emulate what normally happens with blocking connections, you can use
asyncio’s add_signal_handler()
, to call
Connection.cancel()
import asyncio
import signal
async with await psycopg.AsyncConnection.connect() as conn:
loop.add_signal_handler(signal.SIGINT, conn.cancel)
informative messages
about the operation just performed, such as warnings or debug information.
Notices may be raised even if the operations are successful and don’t indicate
an error. You are probably familiar with some of them, because they are
reported by
$ psql
=# ROLLBACK;
WARNING: there is no transaction in progress
ROLLBACK
raised). The level of the messages received can be controlled using the
client_min_messages
setting.
By default, the messages received are ignored. If you want to process them on
the client you can use the
Connection.add_notice_handler()
function to
register a function that will be invoked whenever a message is received. The
message is passed to the callback as a
Diagnostic
instance,
containing all the information passed by the server, such as the message text
and the severity. The object is the same found on the
attribute of the errors raised by the server:
>>> import psycopg
>>> def log_notice(diag):
... print(f"The server says: {diag.severity} - {diag.message_primary}")
>>> conn = psycopg.connect(autocommit=True)
>>> conn.add_notice_handler(log_notice)
>>> cur = conn.execute("ROLLBACK")
The server says: WARNING - there is no transaction in progress
>>> print(cur.statusmessage)
ROLLBACK
object received by the callback should not be used after
the callback function terminates, because its data is deallocated after
the callbacks have been processed. If you need to use the information
later please extract the attributes requested and forward them instead of
forwarding the whole
Diagnostic
object.
. Please
refer to the PostgreSQL documentation for examples about how to use this form
of communication.
Because of the way sessions interact with notifications (see
NOTIFY
documentation), you should keep the connection in
autocommit
mode if you wish to receive or send notifications in a timely manner.
Notifications are received as instances of
Notify
. If you are reserving a
connection only to receive notifications, the simplest way is to consume the
Connection.notifies
generator. The generator can be stopped using
close()
You don’t need an
AsyncConnection
to handle notifications: a normal
blocking
Connection
is perfectly valid.
The following example will print notifications and stop when one containing
message is received.
import psycopg
conn = psycopg.connect("", autocommit=True)
conn.execute("LISTEN mychan")
gen = conn.notifies()
for notify in gen:
print(notify)
if notify.payload == "stop":
gen.close()
print("there, I stopped")
Notify(channel='mychan', payload='hello', pid=961823)
Notify(channel='mychan', payload='hey', pid=961823)
Notify(channel='mychan', payload='stop', pid=961823)
there, I stopped
to register a
callback function, which will be invoked whenever a notification is received,
during the normal query processing; you will be then able to use the
connection normally. Please note that in this case notifications will not be
received immediately, but only during a connection operation, such as a query.
conn.add_notify_handler(lambda n: print(f"got this: {n}"))
# meanwhile in psql...
# =# NOTIFY mychan, 'hey';
# NOTIFY
print(conn.execute("SELECT 1").fetchone())
# got this: Notify(channel='mychan', payload='hey', pid=961823)
# (1,)
Sometimes it is useful to detect immediately when the connection with the
database is lost. One brutal way to do so is to poll a connection in a loop
running an endless stream of
SELECT
Don’t
do so: polling is
out of fashion. Besides, it is inefficient (unless what you really want is a
client-server generator of ones), it generates useless traffic and will only
detect a disconnection with an average delay of half the polling time.
A more efficient and timely way to detect a server disconnection is to create
an additional connection and wait for a notification from the OS that this
connection has something to say: only then you can run some checks. You
can dedicate a thread (or an asyncio task) to wait on this connection: such
thread will perform no activity until awaken by the OS.
In a normal (non asyncio) program you can use the
selectors
module. Because
Connection
implements a
fileno()
method you can just
register it as a file-like object. You can run such code in a dedicated thread
(and using a dedicated connection) if the rest of the program happens to have
something else to do too.
import selectors
sel = selectors.DefaultSelector()
sel.register(conn, selectors.EVENT_READ)
while True:
if not sel.select(timeout=60.0):
continue # No FD activity detected in one minute
# Activity detected. Is the connection still ok?
try:
conn.execute("SELECT 1")
except psycopg.OperationalError:
# You were disconnected: do something useful such as panicking
logger.error("we lost our database!")
sys.exit(1)
ev = asyncio.Event()
loop = asyncio.get_event_loop()
loop.add_reader(conn.fileno(), ev.set)
while True:
try:
await asyncio.wait_for(ev.wait(), 60.0)
except asyncio.TimeoutError:
continue # No FD activity detected in one minute
# Activity detected. Is the connection still ok?
try:
await conn.execute("SELECT 1")
except psycopg.OperationalError:
# Guess what happened
If your application is checked using Mypy too you can make use of Psycopg
types to validate the correct use of Psycopg objects and of the data returned
by the database.
Generic types
Psycopg
Connection
Cursor
objects are
Generic
objects and
support a
parameter which is the type of the records returned.
By default methods such as
Cursor.fetchall()
return normal tuples of unknown
size and content. As such, the
connect()
function returns an object of type
psycopg.Connection[Tuple[Any,
...]]
Connection.cursor()
returns an
object of type
psycopg.Cursor[Tuple[Any,
...]]
. If you are writing generic
plumbing code it might be practical to use annotations such as
Connection[Any]
Cursor[Any]
conn = psycopg.connect() # type is psycopg.Connection[Tuple[Any, ...]]
cur = conn.cursor() # type is psycopg.Cursor[Tuple[Any, ...]]
rec = cur.fetchone() # type is Optional[Tuple[Any, ...]]
recs = cur.fetchall() # type is List[Tuple[Any, ...]]
If you want to use connections and cursors returning your data as different
types, for instance as dictionaries, you can use the
row_factory
argument
of the
connect()
and the
cursor()
method, which
will control what type of record is returned by the fetch methods of the
cursors and annotate the returned objects accordingly. See
Row factories
for more details.
dconn = psycopg.connect(row_factory=dict_row)
# dconn type is psycopg.Connection[Dict[str, Any]]
dcur = conn.cursor(row_factory=dict_row)
dcur = dconn.cursor()
# dcur type is psycopg.Cursor[Dict[str, Any]] in both cases
drec = dcur.fetchone()
# drec type is Optional[Dict[str, Any]]
it is possible to enforce static typing at runtime. Using a
Pydantic model factory the code can be checked statically using Mypy and
querying the database will raise an exception if the rows returned is not
compatible with the model.
The following example can be checked with
--strict
without reporting
any issue. Pydantic will also raise a runtime error in case the
Person
is used with a query that returns incompatible data.
from datetime import date
from typing import Optional
import psycopg
from psycopg.rows import class_row
from pydantic import BaseModel
class Person(BaseModel):
id: int
first_name: str
last_name: str
dob: Optional[date]
def fetch_person(id: int) -> Person:
with psycopg.connect() as conn:
with conn.cursor(row_factory=class_row(Person)) as cur:
cur.execute(
SELECT id, first_name, last_name, dob
FROM (VALUES
(1, 'John', 'Doe', '2000-01-01'::date),
(2, 'Jane', 'White', NULL)
) AS data (id, first_name, last_name, dob)
WHERE id = %(id)s;
""",
{"id": id},
obj = cur.fetchone()
# reveal_type(obj) would return 'Optional[Person]' here
if not obj:
raise KeyError(f"person {id} not found")
# reveal_type(obj) would return 'Person' here
return obj
for id in [1, 2]:
p = fetch_person(id)
if p.dob:
print(f"{p.first_name} was born in {p.dob.year}")
else:
print(f"Who knows when {p.first_name} was born")
. This means that the query should
come from a literal string in your code, not from an arbitrary string
expression.
For instance, passing an argument to the query should be done via the second
argument to
execute()
, not by string composition:
def get_record(conn: psycopg.Connection[Any], id: int) -> Any:
cur = conn.execute("SELECT * FROM my_table WHERE id = %s" % id) # BAD!
return cur.fetchone()
# the function should be implemented as:
def get_record(conn: psycopg.Connection[Any], id: int) -> Any:
cur = conn.execute("select * FROM my_table WHERE id = %s", (id,))
return cur.fetchone()
def count_records(conn: psycopg.Connection[Any], table: str) -> int:
query = "SELECT count(*) FROM %s" % table # BAD!
return conn.execute(query).fetchone()[0]
# the function should be implemented as:
def count_records(conn: psycopg.Connection[Any], table: str) -> int:
query = sql.SQL("SELECT count(*) FROM {}").format(sql.Identifier(table))
return conn.execute(query).fetchone()[0]
methods, by default, return the records received from the
database as tuples. This can be changed to better suit the needs of the
programmer by using custom
row factories
The module
psycopg.rows
exposes several row factories ready to be used. For
instance, if you want to return your records as dictionaries, you can use
dict_row
>>> from psycopg.rows import dict_row
>>> conn = psycopg.connect(DSN, row_factory=dict_row)
>>> conn.execute("select 'John Doe' as name, 33 as age").fetchone()
{'name': 'John Doe', 'age': 33}
>>> cur = conn.cursor(row_factory=dict_row)
>>> cur.execute("select 'John Doe' as name, 33 as age").fetchone()
{'name': 'John Doe', 'age': 33}
>>> from psycopg.rows import namedtuple_row
>>> cur.row_factory = namedtuple_row
>>> cur.execute("select 'John Doe' as name, 33 as age").fetchone()
Row(name='John Doe', age=33)
>>> from psycopg.rows import class_row
>>> cur = conn.cursor(row_factory=class_row(Person))
>>> cur.execute("select 'John Doe' as name, 33 as age").fetchone()
Person(name='John Doe', age=33, weight=None)
are available on the cursor) and to prepare a callable
which is efficient to call repeatedly (because, for instance, the names of the
columns are extracted, sanitised, and stored in local variables).
Formally, these objects are represented by the
RowFactory
RowMaker
protocols.
class DictRowFactory:
def __init__(self, cursor: Cursor[Any]):
self.fields = [c.name for c in cursor.description]
def __call__(self, values: Sequence[Any]) -> dict[str, Any]:
return dict(zip(self.fields, values))
def dict_row_factory(cursor: Cursor[Any]) -> RowMaker[dict[str, Any]]:
fields = [c.name for c in cursor.description]
def make_row(values: Sequence[Any]) -> dict[str, Any]:
return dict(zip(fields, values))
return make_row
conn = psycopg.connect(row_factory=DictRowFactory)
cur = conn.execute("SELECT first_name, last_name, age FROM persons")
person = cur.fetchone()
print(f"{person['first_name']} {person['last_name']}")
is an object managing a set of connections and allowing
their use in functions needing one. Because the time to establish a new
connection can be relatively long, keeping connections open can reduce latency.
This page explains a few basic concepts of Psycopg connection pool’s
behaviour. Please refer to the
ConnectionPool
object API for details about
the pool operations.
The connection pool objects are distributed in a package separate
from the main
psycopg
package: use
install
"psycopg[pool]"
install
psycopg_pool
to make the
psycopg_pool
package available. See
Installing the connection pool
A simple way to use the pool is to create a single instance of it, as a
global object, and to use this object in the rest of the program, allowing
other functions, modules, threads to use it:
# module db.py in your program
from psycopg_pool import ConnectionPool
pool = ConnectionPool(conninfo, **kwargs)
# the pool starts connecting immediately.
# in another module
from .db import pool
def my_function():
with pool.connection() as conn:
conn.execute(...)
at the end of the program is not
terribly bad: probably it will just result in some warnings printed on stderr.
However, if you think that it’s sloppy, you could use the
atexit
module to
close()
called at the end of the program.
If you want to avoid starting to connect to the database at import time, and
want to wait for the application to be ready, you can create the pool using
open=False
, and call the
open()
close()
methods when the conditions are right. Certain
frameworks provide callbacks triggered when the program is started and stopped
(for instance
FastAPI startup/shutdown events
): they are perfect to
initiate and terminate the pool operations:
pool = ConnectionPool(conninfo, open=False, **kwargs)
@app.on_event("startup")
def open_pool():
pool.open()
@app.on_event("shutdown")
def close_pool():
pool.close()
Creating a single pool as a global variable is not the mandatory use: your
program can create more than one pool, which might be useful to connect to
more than one database, or to provide different types of connections, for
instance to provide separate read/write and read-only connections. The pool
also acts as a context manager and is open and closed, if necessary, on
entering and exiting the context block:
from psycopg_pool import ConnectionPool
with ConnectionPool(conninfo, **kwargs) as pool:
run_app(pool)
# the pool is now closed
method) returns immediately. This allows the program some leeway to start
before the target database is up and running. However, if your application is
misconfigured, or the network is down, it means that the program will be able
to start, but the threads requesting a connection will fail with a
PoolTimeout
only after the timeout on
connection()
expired. If this behaviour is not desirable (and you prefer your program to
crash hard and fast, if the surrounding conditions are not right, because
something else will respawn it) you should call the
wait()
method after creating the pool, or call
open(wait=True)
: these methods will
block until the pool is full, or will raise a
PoolTimeout
exception if the
pool isn’t ready within the allocated time.
callback, if provided, after which it is put in the pool (or
passed to a client requesting it, if someone is already knocking at the door).
If a connection expires (it passes
max_lifetime
), or is returned to the pool
in broken state, or is found closed by
check()
), then the
pool will dispose of it and will start a new connection attempt in the
background.
The pool can be used to request connections from multiple threads or
concurrent tasks - it is hardly useful otherwise! If more connections than the
ones available in the pool are requested, the requesting threads are queued
and are served a connection as soon as one is available, either because
another client has finished using it or because the pool is allowed to grow
(when
max_size
object
context: at the end of the block, if there is a transaction open, it will be
committed, or rolled back if the context is exited with as exception.
At the end of the block the connection is returned to the pool and shouldn’t
be used anymore by the code which obtained it. If a
reset()
function is
specified in the pool constructor, it is called on the connection before
returning it to the pool. Note that the
reset()
function is called in a
worker thread, so that the thread which used the connection can keep its
execution without being slowed down by it.
If an attempt to create a connection fails, a new attempt will be made soon
after, using an exponential backoff to increase the time between attempts,
until a maximum of
reconnect_timeout
is reached. When that happens, the pool
will call the
reconnect_failed()
function, if provided to the pool, and just
start a new connection attempt. You can use this function either to send
alerts or to interrupt the program and allow the rest of your infrastructure
to restart it.
If more than
min_size
connections are requested concurrently, new ones are
created, up to
max_size
. Note that the connections are always created by the
background workers, not by the thread asking for the connection: if a client
requests a new connection, and a previous client terminates its job before the
new connection is ready, the waiting client will be served the existing
connection. This is especially useful in scenarios where the time to establish
a connection dominates the time for which the connection is used (see
analysis
, for instance).
If a pool grows above
min_size
, but its usage decreases afterwards, a number
of connections are eventually closed: one every time a connection is unused
after the
max_idle
time specified in the pool constructor.
What’s the right size for the pool?
Big question. Who knows. However, probably not as large as you imagine. Please
take a look at
this analysis
for some ideas.
Something useful you can do is probably to use the
get_stats()
method and monitor the behaviour of your program
to tune the configuration parameters. The size of the pool can also be changed
at runtime using the
resize()
method.
Sometimes you may want leave the choice of using or not using a connection
pool as a configuration parameter of your application. For instance, you might
want to use a pool if you are deploying a "large instance" of your application
and can dedicate it a handful of connections; conversely you might not want to
use it if you deploy the application in several instances, behind a load
balancer, and/or using an external connection pool process such as PgBouncer.
Switching between using or not using a pool requires some code change, because
ConnectionPool
API is different from the normal
connect()
function and because the pool can perform additional connection configuration
(in the
configure
parameter) that, if the pool is removed, should be
performed in some different code path of your application.
psycopg_pool
3.1 package introduces the
NullConnectionPool
class.
This class has the same interface, and largely the same behaviour, of the
ConnectionPool
, but doesn’t create any connection beforehand. When a
connection is returned, unless there are other clients already waiting, it
is closed immediately and not kept in the pool state.
A null pool is not only a configuration convenience, but can also be used to
regulate the access to the server by a client program. If
max_size
is set to
a value greater than 0, the pool will make sure that no more than
max_size
connections are created at any given time. If more clients ask for further
connections, they will be queued and served a connection as soon as a previous
client has finished using it, like for the basic pool. Other mechanisms to
throttle client requests (such as
timeout
max_waiting
) are respected
Queued clients will be handed an already established connection, as soon
as a previous client has finished using it (and after the pool has
returned it to idle state and called
reset()
on it, if necessary).
Because normally (i.e. unless queued) every client will be served a new
connection, the time to obtain the connection is paid by the waiting client;
background workers are not normally involved in obtaining new connections.
The state of the connection is verified when a connection is returned to the
pool: if a connection is broken during its usage it will be discarded on
return and a new connection will be created.
Warning
The health of the connection is not checked when the pool gives it to a
client.
Why not? Because doing so would require an extra network roundtrip: we want to
save you from its latency. Before getting too angry about it, just think that
the connection can be lost any moment while your program is using it. As your
program should already be able to cope with a loss of a connection during its
process, it should be able to tolerate to be served a broken connection:
unpleasant but not the end of the world.
Warning
The health of the connection is not checked when the connection is in the
pool.
Does the pool keep a watchful eye on the quality of the connections inside it?
No, it doesn’t. Why not? Because you will do it for us! Your program is only
a big ruse to make sure the connections are still alive…
Not (entirely) trolling: if you are using a connection pool, we assume that
you are using and returning connections at a good pace. If the pool had to
check for the quality of a broken connection before your program notices it,
it should be polling each connection even faster than your program uses them.
Your database server wouldn’t be amused…
Can you do something better than that? Of course you can, there is always a
better way than polling. You can use the same recipe of
Detecting disconnections
reserving a connection and using a thread to monitor for any activity
happening on it. If any activity is detected, you can call the pool
check()
method, which will run a quick check on each
connection in the pool, removing the ones found in broken state, and using the
background workers to replace them with fresh ones.
If you set up a similar check in your program, in case the database connection
is temporarily lost, we cannot do anything for the threads which had taken
already a connection from the pool, but no other thread should be served a
broken connection, because
check()
would empty the pool and refill it with
working connections, as soon as they are available.
Faster than you can say poll. Or pool.
. Both methods
return the same values, but the latter reset the counters after its use. The
values can be sent to a monitoring system such as
Graphite
Prometheus
The following values should be provided, but please don’t consider them as a
rigid interface: it is possible that they might change in the future. Keys
whose value is 0 may not be returned.
Psycopg can manage kinds of "cursors" which differ in where the state of a
query being processed is stored:
Client-side cursors
Server-side cursors
), the server replies transferring to the client the whole
set of results requested, which is stored in the state of the same cursor and
from where it can be read from Python code (using methods such as
fetchone()
and siblings).
This querying process is very scalable because, after a query result has been
transmitted to the client, the server doesn’t keep any state. Because the
results are already in the client memory, iterating its rows is very quick.
The downside of this querying method is that the entire result has to be
transmitted completely to the client (with a time proportional to its size)
and the client needs enough memory to hold it, so it is only suitable for
reasonably small result sets.
the query and the parameters separately to the server. This is the most
efficient way to process parametrised queries and allows to build several
features and optimizations. However, not all types of queries can be bound
server-side; in particular no Data Definition Language query can. See
Server-side binding
for the description of these problems.
ClientCursor
(and its
AsyncClientCursor
async counterpart) merge the
query on the client and send the query and the parameters merged together to
the server. This allows to parametrize any type of PostgreSQL statement, not
only queries (
SELECT
) and Data Manipulation statements (
INSERT
UPDATE
DELETE
Using
ClientCursor
, Psycopg 3 behaviour will be more similar to
psycopg2
(which only implements client-side binding) and could be useful to port
Psycopg 2 programs more easily to Psycopg 3. The objects in the
module
allow for greater flexibility (for instance to parametrize a table name too,
not only values); however, for simple cases, a
ClientCursor
could be the
right object.
In order to obtain
ClientCursor
from a connection, you can set its
cursor_factory
(at init time or changing its attribute
afterwards):
from psycopg import connect, ClientCursor
conn = psycopg.connect(DSN, cursor_factory=ClientCursor)
cur = conn.cursor()
The best use for client-side binding cursors is probably to port large
Psycopg 2 code to Psycopg 3, especially for programs making wide use of
Data Definition Language statements.
psycopg.sql
module allows for more generic client-side query
composition, to mix client- and server-side parameters binding, and allows
to parametrize tables and fields names too, or entirely generic SQL
snippets.
). When a database cursor is created, the query is not necessarily
completely processed: the server might be able to produce results only as they
are needed. Only the results requested are transmitted to the client: if the
query result is very large but the client only needs the first few records it
is possible to transmit only them.
The downside is that the server needs to keep track of the partially
processed results, so it uses more memory and resources on the server.
Psycopg allows the use of server-side cursors using the classes
ServerCursor
AsyncServerCursor
. They are usually created by passing the
parameter to the
cursor()
method (reason for which, in
psycopg2
, they are usually called
named cursors
). The use of these classes
is similar to their client-side counterparts: their interface is the same, but
behind the scene they send commands to control the state of the cursor on the
server (for instance when fetching new records or when moving using
scroll()
Using a server-side cursor it is possible to process datasets larger than what
would fit in the client’s memory. However for small queries they are less
efficient because it takes more commands to receive their result, so you
should use them only if you need to process huge results or if only a partial
result is needed.
See also
Server-side cursors are created and managed by
ServerCursor
using SQL
commands such as
DECLARE
FETCH
. The PostgreSQL documentation
gives a good idea of what is possible to do with them.
CREATE FUNCTION reffunc(refcursor) RETURNS refcursor AS $$
BEGIN
OPEN $1 FOR SELECT col FROM test;
RETURN $1;
END;
$$ LANGUAGE plpgsql;
cur = conn.cursor('curname')
# no cur.execute()
for record in cur: # or cur.fetchone(), cur.fetchmany()...
# do something with record
The adaptation system is at the core of Psycopg and allows to customise the
way Python objects are converted to PostgreSQL when a query is performed and
how PostgreSQL values are converted to Python objects when query results are
returned.
For a high-level view of the conversion of types between Python and
PostgreSQL please look at
Passing parameters to SQL queries
. Using the objects
described in this page is useful if you intend to
customise
adaptation rules.
Every context object derived from another context inherits its adapters
mapping: cursors created from a connection inherit the connection’s
configuration.
By default, connections obtain an adapters map from the global map
exposed as
psycopg.adapters
: changing the content of this object will
affect every connection created afterwards. You may specify a different
template adapters map using the
context
parameter on
connect()
classes. Changing this mapping
(e.g. writing and registering your own adapters, or using a different
configuration of builtin adapters) affects how types are converted between
Python and PostgreSQL.
protocol) are
the objects used to perform the conversion from a Python object to a bytes
sequence in a format understood by PostgreSQL. The string returned
shouldn’t be quoted
: the value will be passed to the database using
functions such as
PQexecParams()
so quoting and quotes escaping is
not necessary. The dumper usually also suggests to the server what type to
use, via its
attribute.
; the objects already created are not
affected. For instance, changing the global context will only change newly
created connections, not the ones already existing.
Psycopg doesn’t provide adapters for the XML data type, because there are just
too many ways of handling XML in Python. Creating a loader to parse the
PostgreSQL xml type
ElementTree
is very simple, using the
psycopg.adapt.Loader
base class and implementing the
load()
method:
>>> import xml.etree.ElementTree as ET
>>> from psycopg.adapt import Loader
>>> # Create a class implementing the `load()` method.
>>> class XmlLoader(Loader):
... def load(self, data):
... return ET.fromstring(data)
>>> # Register the loader on the adapters of a context.
>>> conn.adapters.register_loader("xml", XmlLoader)
>>> # Now just query the database returning XML data.
>>> cur = conn.execute(
... """select XMLPARSE (DOCUMENT '
... Manual...')
... """)
>>> elem = cur.fetchone()[0]
>>> class XmlDumper(Dumper):
... # Setting an OID is not necessary but can be helpful
... oid = psycopg.adapters.types["xml"].oid
... def dump(self, elem):
... return ET.tostring(elem)
>>> # Register the dumper on the adapters of a context
>>> conn.adapters.register_dumper(ET.Element, XmlDumper)
>>> # Now, in that context, it is possible to use ET.Element objects as parameters
>>> conn.execute("SELECT xpath('//title/text()', %s)", [elem]).fetchone()[0]
['Manual']
instances, because both the types allow fixed-precision
arithmetic and are not subject to rounding.
Sometimes, however, you may want to perform floating-point math on
numeric
values, and
Decimal
may get in the way (maybe because it is
slower, or maybe because mixing
float
Decimal
values causes Python
errors).
If you are fine with the potential loss of precision and you simply want to
receive
numeric
values as Python
float
, you can register on
numeric
the same
Loader
class used to load
float4
float8
values. Because the PostgreSQL textual
representation of both floats and decimal is the same, the two loaders are
compatible.
conn = psycopg.connect()
conn.execute("SELECT 123.45").fetchone()[0]
# Decimal('123.45')
conn.adapters.register_loader("numeric", psycopg.types.numeric.FloatLoader)
conn.execute("SELECT 123.45").fetchone()[0]
# 123.45
>>> conn.execute("SELECT 'infinity'::date").fetchone()
Traceback (most recent call last):
DataError: date too large (after year 10K): 'infinity'
as PostgreSQL
infinity. For this, let’s create a subclass for the dumper and the loader and
register them in the working scope (globally or just on a connection or
cursor):
from datetime import date
# Subclass existing adapters so that the base case is handled normally.
from psycopg.types.datetime import DateLoader, DateDumper
class InfDateDumper(DateDumper):
def dump(self, obj):
if obj == date.max:
return b"infinity"
elif obj == date.min:
return b"-infinity"
else:
return super().dump(obj)
class InfDateLoader(DateLoader):
def load(self, data):
if data == b"infinity":
return date.max
elif data == b"-infinity":
return date.min
else:
return super().load(data)
# The new classes can be registered globally, on a connection, on a cursor
cur.adapters.register_dumper(date, InfDateDumper)
cur.adapters.register_loader("date", InfDateLoader)
cur.execute("SELECT %s::text, %s::text", [date(2020, 12, 31), date.max]).fetchone()
# ('2020-12-31', 'infinity')
cur.execute("SELECT '2020-12-31'::date, 'infinity'::date").fetchone()
# (datetime.date(2020, 12, 31), datetime.date(9999, 12, 31))
Registering dumpers and loaders will instruct Psycopg to use them
in the queries to follow, in the context where they have been registered.
When a query is performed on a
Cursor
Transformer
object is created as a local context to manage
adaptation during the query, instantiating the required dumpers and loaders
and dispatching the values to perform the wanted conversions from Python to
Postgres and back.
Sometimes, just looking at the Python type is not enough to decide the
best PostgreSQL type to use (for instance the PostgreSQL type of a Python
list depends on the objects it contains, whether to use an
integer
bigint
depends on the number size…) In these cases the
mechanism provided by
get_key()
upgrade()
is used to create more specific dumpers.
As a consequence it is possible to perform certain choices only once per query
(e.g. looking up the connection encoding) and then call a fast-path operation
for each value to convert.
Querying will fail if a Python object for which there isn’t a
Dumper
registered (for the right
Format
) is used as query parameter.
If the query returns a data type whose OID doesn’t have a
Loader
, the
value will be returned as a string (or bytes string for binary types).
. When a
query is prepared, its parsing and planning is stored in the server session,
so that further executions of the same query on the same connection (even with
different parameters) are optimised.
A query is prepared automatically after it is executed more than
prepare_threshold
times on a connection.
psycopg
will make
sure that no more than
prepared_max
statements are planned: if
further queries are executed, the least recently used ones are deallocated and
the associated resources freed.
Statement preparation can be controlled in several ways:
will avoid to prepare
the query, regardless of the number of times it is executed. The default for
the parameter is
, meaning that the query is prepared if the
conditions described above are met.
PostgreSQL documentation contains plenty of details about
prepared statements in PostgreSQL.
Note however that Psycopg doesn’t use SQL statements such as
PREPARE
EXECUTE
, but protocol level commands such as the
ones exposed by
PQsendPrepare
PQsendQueryPrepared
Using external connection poolers, such as PgBouncer, is not compatible
with prepared statements, because the same client connection may change
the server session it refers to. If such middleware is used you should
disable prepared statements, by setting the
Connection.prepare_threshold
attribute to
allows PostgreSQL client applications to send a query
without having to read the result of the previously sent query. Taking
advantage of the pipeline mode, a client will wait less for the server, since
multiple queries/results can be sent/received in a single network roundtrip.
Pipeline mode can provide a significant performance boost to the application.
Pipeline mode is most useful when the server is distant, i.e., network latency
("ping time") is high, and also when many small operations are being performed
in rapid succession. There is usually less benefit in using pipelined commands
when each query takes many multiples of the client/server round-trip time to
execute. A 100-statement operation run on a server 300 ms round-trip-time away
would take 30 seconds in network latency alone without pipelining; with
pipelining it may spend as little as 0.3 s waiting for results from the
server.
The server executes statements, and returns results, in the order the client
sends them. The server will begin executing the commands in the pipeline
immediately, not waiting for the end of the pipeline. Note that results are
buffered on the server side; the server flushes that buffer when a
synchronization point
is established.
See also
The PostgreSQL documentation about:
In order to understand better how the pipeline mode works, we should take a
closer look at the
PostgreSQL client-server message flow
During normal querying, each statement is transmitted by the client to the
server as a stream of request messages, terminating with a
message to
tell it that it should process the messages sent so far. The server will
execute the statement and describe the results back as a stream of messages,
terminating with a
ReadyForQuery
, telling the client that it may now send a
new query.
For example, the statement (returning no result):
conn.execute("INSERT INTO mytable (data) VALUES (%s)", ["hello"])
The two statements, sent consecutively, pay the communication overhead four
times, once per leg.
The pipeline mode allows the client to combine several operations in longer
streams of messages to the server, then to receive more than one response in a
single batch. If we execute the two operations above in a pipeline:
with conn.pipeline():
conn.execute("INSERT INTO mytable (data) VALUES (%s)", ["hello"])
conn.execute("SELECT data FROM mytable WHERE id = %s", [1])
>>> with conn.pipeline():
... conn.execute("INSERT INTO mytable VALUES (%s)", ["hello"])
... with conn.cursor() as cur:
... cur.execute("INSERT INTO othertable VALUES (%s)", ["world"])
... cur.executemany(
... "INSERT INTO elsewhere VALUES (%s)",
... [("one",), ("two",), ("four",)])
Unlike in normal mode, Psycopg will not wait for the server to receive the
result of each query; the client will receive results in batches when the
server flushes it output buffer.
When a flush (or a sync) is performed, all pending results are sent back to
the cursors which executed them. If a cursor had run more than one query, it
will receive more than one result; results after the first will be available,
in their execution order, using
nextset()
>>> with conn.pipeline():
... with conn.cursor() as cur:
... cur.execute("INSERT INTO mytable (data) VALUES (%s) RETURNING *", ["hello"])
... cur.execute("INSERT INTO mytable (data) VALUES (%s) RETURNING *", ["world"])
... while True:
... print(cur.fetchall())
... if not cur.nextset():
... break
[(1, 'hello')]
[(2, 'world')]
If any statement encounters an error, the server aborts the current
transaction and will not execute any subsequent command in the queue until the
synchronization point
PipelineAborted
exception is raised for each such command. Query processing resumes after the
synchronization point.
Warning
Certain features are not available in pipeline mode, including:
makes use internally of
the pipeline mode; as a consequence there is no need to handle a pipeline
block just to call
executemany()
once.
The server might perform a flush on its own initiative, for instance when the
output buffer is full.
Note that, even in
autocommit
, the server wraps the
statements sent in pipeline mode in an implicit transaction, which will be
only committed when the Sync is received. As such, a failure in a group of
statements will probably invalidate the effect of statements executed after
the previous Sync, and will propagate to the following Sync.
For example, in the following block:
>>> with psycopg.connect(autocommit=True) as conn:
... with conn.pipeline() as p, conn.cursor() as cur:
... try:
... cur.execute("INSERT INTO mytable (data) VALUES (%s)", ["one"])
... cur.execute("INSERT INTO no_such_table (data) VALUES (%s)", ["two"])
... conn.execute("INSERT INTO mytable (data) VALUES (%s)", ["three"])
... p.sync()
... except psycopg.errors.UndefinedTable:
... pass
... cur.execute("INSERT INTO mytable (data) VALUES (%s)", ["four"])
The exact Python statement where an exception caused by a server error is
raised is somewhat arbitrary: it depends on when the server flushes its
buffered result.
If you want to make sure that a group of statements is applied atomically
by the server, do make use of transaction methods such as
commit()
transaction()
: these methods will
also sync the pipeline and raise an exception if there was any error in
the commands executed so far.
Its behaviour, especially around error conditions and concurrency, hasn’t
been explored as much as the normal request-response messages pattern, and
its async nature makes it inherently more complex.
As we gain more experience and feedback (which is welcome), we might find
bugs and shortcomings forcing us to change the current interface or
behaviour.
The pipeline mode is available on any currently supported PostgreSQL version,
but, in order to make use of it, the client must use a libpq from PostgreSQL
14 or higher. You can use
Pipeline.is_supported()
to make sure your client
has the right library.
module. The latter also exposes more specific
exceptions, mapping to the database error states (see
SQLSTATE exceptions
Exception
__ Warning
__ Error
__ InterfaceError
__ DatabaseError
__ DataError
__ OperationalError
__ IntegrityError
__ InternalError
__ ProgrammingError
__ NotSupportedError
The default adapters map establishing how Python and PostgreSQL types are
converted into each other.
This map is used as a template when new connections are created, using
psycopg.connect()
. Its
types
attribute is a
TypesRegistry
containing information about every
PostgreSQL builtin type, useful for adaptation customisation (see
Data adaptation configuration
>>> psycopg.adapters.types["int4"]
classes are the main wrappers for a
PostgreSQL database session. You can imagine them similar to a
session.
One of the differences compared to
is that a
Connection
usually handles a transaction automatically: other sessions will not be able
to see the changes until you have committed them, more or less explicitly.
Take a look to
Transactions management
for the details.
Connection
class
Connections behave as context managers: on block exit, the current
transaction will be committed (or rolled back, in case of exception) and
the connection will be closed.
- Further parameters specifying the connection string.
They override the ones specified in
conninfo
- If not specified create a client-side cursor, if
specified create a server-side cursor. See
Cursor types
for details.
return binary values from the database. All
the types returned by the query must have a binary
loader. See
Binary parameters and results
for details.
the cursor will return binary values from the
database. All the types returned by the query must have a binary
loader. See
Binary parameters and results
for details.
At the end of the block, a synchronization point is established and
the connection returns in normal mode.
You can call the method recursively from within a pipeline block.
Innermost blocks will establish a synchronization point on exit, but
pipeline mode will be kept until the outermost block exits.
Pipeline mode support
for details.
, a single phase
commit is performed. A transaction manager may choose to do this if
only a single resource is participating in the global transaction.
When called with a transaction ID
, the database commits the
given transaction. If an invalid transaction ID is provided, a
ProgrammingError
will be raised. This form should be called outside
of a transaction, and is intended for use in recovery.
On return, the TPC transaction is ended.
See also
COMMIT
PREPARED
PostgreSQL command.
is raised. This form should be called outside of a
transaction, and is intended for use in recovery.
On return, the TPC transaction is ended.
See also
ROLLBACK
PREPARED
PostgreSQL command.
set to the PostgreSQL transaction ID: such Xids are
still usable for recovery. Psycopg uses the same algorithm of the
PostgreSQL JDBC driver
to encode a XA triple in a string, so
transactions initiated by a program using such driver should be
unpacked correctly.
Xids returned by
tpc_recover()
also have extra attributes
prepared
owner
database
populated with the
values read from the server.
See also
pg_prepared_xacts
system view.
This class implements a DBAPI-inspired interface, with all the blocking
methods implemented as coroutines. Unless specified otherwise,
non-blocking methods are shared with the
Connection
class.
The following methods have the same behaviour of the matching
Connection
methods, but should be called using the
await
keyword.
Automatically resolve domain names asynchronously. In previous
versions, name resolution blocks, unless the
hostaddr
parameter is specified, or the
resolve_hostaddr_async()
function is used.
to close the connection
automatically when the block is exited, but be careful about
the async quirkness: see
with async connections
for details.
classes are the main objects to send commands
to a PostgreSQL database session. They are normally created by the
connection’s
cursor()
method.
Using the
parameter on
cursor()
will create a
ServerCursor
AsyncServerCursor
, which can be used to retrieve partial results from a
database.
Connection
can create several cursors, but only one at time can perform
operations, so they are not the best way to achieve parallelism (you may want
to operate with several connections instead). All the cursors on the same
connection have a view of the same session, so they can see each other’s
uncommitted data.
Cursor
class
Cursors behave as context managers: on block exit they are closed and
further operation will not be possible. Closing a cursor will not
terminate a transaction or a session though.
Return the cursor itself, so that it will be possible to chain a fetch
operation after the call.
Passing parameters to SQL queries
for all the details about executing
queries.
This command is similar to execute + iter; however it supports endless
data streams. The feature is not available in PostgreSQL, but some
implementations exist: Materialize
and CockroachDB
CHANGEFEED
for instance.
The feature, and the API supporting it, are still experimental.
Beware… 👀
The parameters are the same of
execute()
Warning
Failing to consume the iterator entirely will result in a
connection left in
transaction_status
with closing(cur.stream("select generate_series(1, 10000)")) as gen:
for rec in gen:
something(rec) # might fail
, depending on whether the server
managed to send the entire resultset to the client. An autocommit
connection will be
instead.
. They will raise
an exception if used with operations that don’t return result, such as an
INSERT
with no
RETURNING
or an
ALTER
TABLE
Cursors are iterable objects, so just using the:
for record in cursor:
The result returned by the last query and currently exposed by the
cursor, if available, else
It can be used to obtain low level info about the last query result
and to access to features not currently wrapped by Psycopg.
This attribute is exposed because it might be helpful to debug
problems when the communication between Python and PostgreSQL
doesn’t work as expected. For this reason, the attribute is
available when a query fails too.
Warning
You shouldn’t consider it part of the public interface of the
object: it might change without warnings.
Except this warning, I guess.
If you would like to build reliable features using this object,
please get in touch so we can try and design an useful interface
for it.
Among the properties currently exposed by this object:
subclass has exactly the same interface of its parent class,
but, instead of sending query and parameters separately to the server, it
merges them on the client and sends them as a non-parametric query on the
server. This allows, for instance, to execute parametrized data definition
statements and other
problematic queries
parameter. Using this
object results in the creation of an equivalent PostgreSQL cursor in the
server. DBAPI-extension methods (such as
copy()
stream()
) are not implemented on this object: use a normal
Cursor
instead.
Most attribute and methods behave exactly like in
Cursor
, here are
documented the differences:
Closing a server-side cursor is more important than
closing a client-side one because it also releases the resources
on the server, which otherwise might remain allocated until the
end of the session (memory, locks). Using the pattern:
with conn.cursor():
is not appropriate (for instance because the
cursor is returned by calling a stored procedure) you can avoid to use
execute()
, crete the cursor in other ways, and use directly the
fetch*()
methods instead. See
"Stealing" an existing cursor
for an example.
Using
execute()
more than once will close the previous cursor and
open a new one with the same name.
in case a scroll operation would leave the result
set. In this case the position will not change.
This method uses the
SQL statement to move the current position
in the server-side cursor, which will affect following
fetch*()
operations. If you need to scroll backwards you should probably
cursor()
using
scrollable=True
Note that PostgreSQL doesn’t provide a reliable way to report when a
cursor moves out of bound, so the method might not raise
IndexError
when it happens, but it might rather stop at the cursor boundary.
This class implements a DBAPI-inspired interface, with all the blocking
methods implemented as coroutines. Unless specified otherwise,
non-blocking methods are shared with the
Cursor
class.
The following methods have the same behaviour of the matching
Cursor
methods, but should be called using the
await
keyword.
) present the main interface to exchange
data during a COPY operations. These objects are normally obtained by the
methods
Cursor.copy()
AsyncCursor.copy()
; however, they can be also
created directly, for instance to write to a destination which is not a
database (e.g. using a
FileWriter
Using COPY TO and COPY FROM
for details.
Main Copy objects
operation, because the operation result describes the format
too. The parameter is useful when a
object is created manually and
no operation is performed on the cursor, such as when using
writer=
allows to choose what type the
database expects. This is especially useful in binary copy, because
PostgreSQL will apply no cast rule.
is a list of tuples containing
data to save in COPY format to a file (e.g. for later import), it can be
used as:
with open("target-file.pgcopy", "wb") as f:
with Copy(cur, writer=FileWriter(f)) as copy:
for record in records
copy.write_row(record)
The string contains all the parameters set to a non-default value,
which might come either from the connection string and parameters
passed to
connect()
or from environment variables. The
password is never returned (you can read it using the
password
attribute).
get_parameters()
method returns the same information
as a dict.
The number is formed by converting the major, minor, and revision
numbers into two-decimal-digit numbers and appending them together.
Starting from PostgreSQL 10 the minor version was dropped, so the
second group of digits is always 00. For example, version 9.3.5 is
returned as 90305, version 10.2 as 100002.
This can be a host name, an IP address, or a directory path if the
connection is via Unix socket. (The path case can be distinguished
because it will always be an absolute path, beginning with
If a transaction context is specified in the constructor, rollback
enclosing transactions contexts up to and including the one specified.
It can be used as
The module contains objects and functions useful to generate SQL dynamically,
in a convenient and safe way. SQL identifiers (e.g. names of tables and
fields) cannot be passed to the
execute()
method like query
arguments:
# This will not work
table_name = 'my_table'
cur.execute("INSERT INTO %s VALUES (%s, %s)", [table_name, 10, 20])
table_name = 'my_table'
cur.execute(
"INSERT INTO %s VALUES (%%s, %%s)" % table_name,
[10, 20])
This sort of works, but it is an accident waiting to happen: the table name
may be an invalid SQL literal and need quoting; even more serious is the
security problem in case the table name comes from an untrusted source. The
name should be escaped using
escape_identifier()
from psycopg.pq import Escaping
# This works, but it is not optimal
table_name = 'my_table'
cur.execute(
"INSERT INTO %s VALUES (%%s, %%s)" % Escaping.escape_identifier(table_name),
[10, 20])
This is now safe, but it somewhat ad-hoc. In case, for some reason, it is
necessary to include a value in the query string (as opposite as in a value)
the merging rule is still different. It is also still relatively dangerous: if
escape_identifier()
is forgotten somewhere, the program will usually work,
but will eventually crash in the presence of a table or field name with
containing characters to escape, or will present a potentially exploitable
weakness.
The objects exposed by the
psycopg.sql
module allow generating SQL
statements on the fly, separating clearly the variable parts of the statement
from the query parameters:
from psycopg import sql
cur.execute(
sql.SQL("INSERT INTO {} VALUES (%s, %s)")
.format(sql.Identifier('my_table')),
[10, 20])
query = sql.SQL("SELECT {field} FROM {table} WHERE {pkey} = %s").format(
field=sql.Identifier('my_name'),
table=sql.Identifier('some_table'),
pkey=sql.Identifier('id'))
query = sql.SQL("SELECT {fields} FROM {table}").format(
fields=sql.SQL(',').join([
sql.Identifier('field1'),
sql.Identifier('field2'),
sql.Identifier('field3'),
table=sql.Identifier('some_table'))
methods useful to create a template
where to merge variable parts of a query (for instance field or table
names).
string
doesn’t undergo any form of escaping, so it is not suitable to
represent variable identifiers or values: you should only use it to pass
constant strings representing templates or snippets of SQL statements; use
other objects such as
Identifier
Literal
to represent variable
parts.
Example:
>>> query = sql.SQL("SELECT {0} FROM {1}").format(
... sql.SQL(', ').join([sql.Identifier('foo'), sql.Identifier('bar')]),
... sql.Identifier('table'))
>>> print(query.as_string(conn))
SELECT "foo", "bar" FROM "table"
), with positional
arguments replacing the numbered placeholders and keywords replacing
the named ones. However placeholder modifiers (
{0!r}
{0:<10}
are not supported.
Composable
objects is passed to the template it will be merged
according to its
as_string()
method. If any other Python object is
passed, it will be wrapped in a
Literal
object and so escaped
according to SQL rules.
Example:
>>> print(sql.SQL("SELECT * FROM {} WHERE {} = %s")
... .format(sql.Identifier('people'), sql.Identifier('id'))
... .as_string(conn))
SELECT * FROM "people" WHERE "id" = %s
>>> print(sql.SQL("SELECT * FROM {tbl} WHERE name = {name}")
... .format(tbl=sql.Identifier('people'), name="O'Rourke"))
... .as_string(conn))
SELECT * FROM "people" WHERE name = 'O''Rourke'
>>> snip = sql.SQL(', ').join(
... sql.Identifier(n) for n in ['foo', 'bar', 'baz'])
>>> print(snip.as_string(conn))
"foo", "bar", "baz"
Identifiers usually represent names of database objects, such as tables or
fields. PostgreSQL identifiers follow
different rules
than SQL string
literals for escaping (e.g. they use double quotes instead of single).
Example:
>>> t1 = sql.Identifier("foo")
>>> t2 = sql.Identifier("ba'r")
>>> t3 = sql.Identifier('ba"z')
>>> print(sql.SQL(', ').join([t1, t2, t3]).as_string(conn))
"foo", "ba'r", "ba""z"
Multiple strings can be passed to the object to represent a qualified name,
i.e. a dot-separated sequence of identifiers.
Example:
>>> query = sql.SQL("SELECT {} FROM {}").format(
... sql.Identifier("table", "field"),
... sql.Identifier("schema", "table"))
>>> print(query.as_string(conn))
SELECT "table"."field" FROM "schema"."table"
arguments. If however you really really need to
include a literal value in the query you can use this object.
The string returned by
as_string()
follows the normal
adaptation
rules
for Python objects.
Example:
>>> s1 = sql.Literal("fo'o")
>>> s2 = sql.Literal(42)
>>> s3 = sql.Literal(date(2000, 1, 1))
>>> print(sql.SQL(', ').join([s1, s2, s3]).as_string(conn))
'fo''o', 42, '2000-01-01'::date
Changed in version 3.1:
Add a type cast to the representation if useful in ambiguous context
(e.g.
'2000-01-01'::date
>>> names = ['foo', 'bar', 'baz']
>>> q1 = sql.SQL("INSERT INTO my_table ({}) VALUES ({})").format(
... sql.SQL(', ').join(map(sql.Identifier, names)),
... sql.SQL(', ').join(sql.Placeholder() * len(names)))
>>> print(q1.as_string(conn))
INSERT INTO my_table ("foo", "bar", "baz") VALUES (%s, %s, %s)
>>> q2 = sql.SQL("INSERT INTO my_table ({}) VALUES ({})").format(
... sql.SQL(', ').join(map(sql.Identifier, names)),
... sql.SQL(', ').join(map(sql.Placeholder, names)))
>>> print(q2.as_string(conn))
INSERT INTO my_table ("foo", "bar", "baz") VALUES (%(foo)s, %(bar)s, %(baz)s)
>>> comp = sql.Composed(
... [sql.SQL("INSERT INTO "), sql.Identifier("table")])
>>> print(comp.as_string(conn))
INSERT INTO "table"
>>> fields = sql.Identifier('foo') + sql.Identifier('bar') # a Composed
>>> print(fields.join(', ').as_string(conn))
"foo", "bar"
Use this function only if you absolutely want to convert a Python string to
an SQL quoted literal to use e.g. to generate batch SQL and you won’t have
a connection available when you will need to use it.
This function is relatively inefficient, because it doesn’t cache the
adaptation rules. If you pass a
context
you can adapt the adaptation
rules used, otherwise only global rules are used.
Return type
implementation, which
can be used to retrieve data from the database in more complex structures than
the basic tuples.
Check out
Row factories
for information about how to use these objects.
cur = conn.cursor(row_factory=class_row(Person))
cur.execute("select 'John' as first_name, 'Smith' as last_name").fetchone()
# Person(first_name='John', last_name='Smith', age=None)
These objects can be used to describe your own rows adapter for static typing
checks, such as
The sequence of value is what is returned from a database query, already
adapted to the right Python types. The return value is the object that your
program would like to receive: by default (
tuple_row()
) it is a simple
tuple, but it may be any type of object.
Typically,
RowMaker
functions are returned by
RowFactory
In compliance with the DB-API, all the exceptions raised by Psycopg
derive from the following classes:
Exception
__ Warning
__ Error
__ InterfaceError
__ DatabaseError
__ DataError
__ OperationalError
__ IntegrityError
__ InternalError
__ ProgrammingError
__ NotSupportedError
These classes are exposed both by this module and the root
psycopg
module.
Exception that is the base class of all other error exceptions. You can
use this to catch all errors with one single
except
statement.
This exception is guaranteed to be picklable.
These errors are not necessarily under the control of the programmer, e.g.
an unexpected disconnect occurs, the data source name is not found, a
transaction could not be processed, a memory allocation error occurred
during processing, etc.
if not available.
The attribute value is available only for errors sent by the server:
not all the fields are available for all the errors and for all the
server versions.
Errors coming from a database server (as opposite as ones generated
client-side, such as connection failed) usually have a 5-letters error code
called SQLSTATE (available in the
sqlstate
attribute of the
error’s
attribute).
Psycopg exposes a different class for each SQLSTATE value, allowing to
write idiomatic error handling code according to specific conditions happening
in the database:
try:
cur.execute("LOCK TABLE mytable IN ACCESS EXCLUSIVE MODE NOWAIT")
except psycopg.errors.LockNotAvailable:
locked = True
The exception names are generated from the PostgreSQL source code and includes
classes for every error defined by PostgreSQL in versions between 9.6 and 14.
Every class in the module is named after what referred as "condition name"
the documentation
, converted to CamelCase: e.g. the error 22012,
division_by_zero
is exposed by this module as the class
DivisionByZero
There is a handful of… exceptions to this rule, required for disambiguate
name clashes: please refer to the
table below
for all
the classes defined.
Every exception class is a subclass of one of the
standard DB-API
exception
, thus exposing the
Error
interface.
try:
cur.execute("LOCK TABLE mytable IN ACCESS EXCLUSIVE MODE NOWAIT")
except psycopg.errors.lookup("UNDEFINED_TABLE"):
missing = True
except psycopg.errors.lookup("55P03"):
locked = True
A connection pool is an object used to create and maintain a limited amount of
PostgreSQL connections, reducing the time requested by the program to obtain a
working connection and allowing an arbitrary large number of concurrent
threads or tasks to use a controlled amount of resources on the server. See
Connection pools
for more details and usage pattern.
This package exposes a few connection pool classes:
subclass exposing the same
interface of its parent, but not keeping any unused connection in its state.
Null connection pools
for details about related use cases.
subclass which doesn’t create
connections preemptively and doesn’t keep unused connections in its state. See
Null connection pools
for further details.
The interface of the object is entirely compatible with its parent class. Its
behaviour is similar, with the following differences:
AsyncNullConnectionPool
is, similarly, an
AsyncConnectionPool
subclass
with the same behaviour of the
NullConnectionPool
These objects are useful if you need to configure data adaptation, i.e.
if you need to change the default way that Psycopg converts between types or
if you want to adapt custom data types and objects. You don’t need this object
in the normal use of Psycopg.
Data adaptation configuration
for an overview of the Psycopg adaptation system.
Dumpers and loaders
value quoted and sanitised, so
that the result can be used to build a SQL string. This works well
for most types and you won’t likely have to implement this method in a
subclass.
Return type
passed in the constructor.
Subclasses needing to specialise the PostgreSQL type according to the
value
of the object dumped (not only according to to its type)
should override this class.
Return type
attribute: changing such map allows
to customise adaptation in a context without changing separated contexts.
When a context is created from another context (for instance when a
Cursor
is created from a
Connection
), the parent’s
adapters
are used as template for the child’s
adapters
, so that every
cursor created from the same connection use the connection’s types
configuration, but separate connections have independent mappings.
Once created,
AdaptersMap
are independent. This means that objects
already created are not affected if a wider scope (e.g. the global one) is
changed.
The connections adapters are initialised using a global
AdptersMap
template, exposed as
psycopg.adapters
: changing such mapping allows to
customise the type mapping for every connections created afterwards.
The object can start empty or copy from another object of the same class.
Copies are copy-on-write: if the maps are updated make a copy. This way
extending e.g. global map by a connection or a connection map from a cursor
is cheap: a copy is only made on customisation.
See also
are registered for the
same type, the last one registered will be chosen when the query
doesn’t specify a format (i.e. when the value is used with a
" placeholder).
Parameters
is specified as string it will be lazy-loaded, so that it
will be possible to register it without importing it before. In this
case it should be the fully qualified name of the object (e.g.
"uuid.UUID"
is None, only use the dumper when looking up using
get_dumper_by_oid()
, which happens when we know the Postgres type to
adapt to, but not the Python type that will be adapted (e.g. in COPY
after using
set_types()
The life cycle of the object is the query, so it is assumed that attributes
such as the server version or the connection encoding will not change. The
object have its state so adapting several values of the same type can be
optimised.
Parameters
object describes simple information about a PostgreSQL data
type, such as its name, oid and array oid.
TypeInfo
subclasses may hold more
information, for instance the components of a composite type.
You can use
TypeInfo.fetch()
to query information from a database catalog,
which is then used by helper functions, such as
register_hstore()
, to register adapters on types whose
OID is not known upfront or to create more specialised adapters.
TypeInfo
object doesn’t instruct Psycopg to convert a PostgreSQL type
into a Python type: this is the role of a
Loader
. However it
can extend the behaviour of other adapters: if you create a loader for
MyType
, using the
TypeInfo
information, Psycopg will be able to manage
seamlessly arrays of
MyType
or ranges and composite types using
MyType
as a subtype.
See also
t.register(conn)
for record in conn.execute("SELECT mytypearray FROM mytable"):
# records will return lists of "mytype" as string
class MyTypeLoader(Loader):
def load(self, data):
# parse the data and return a MyType instance
conn.adapters.register_loader("mytype", MyTypeLoader)
for record in conn.execute("SELECT mytypearray FROM mytable"):
# records will return lists of MyType instances
in a context allows the adapters of that
context to look up type information: for instance it allows to
recognise automatically arrays of that type and load them from the
database as a list of the base type.
or None) - The context where the transformation is performed. If not
specified the conversion might be inaccurate, for instance it will not
be possible to know the connection encoding or the server date format.
This method only makes sense for text dumpers; the result of calling
it on a binary dumper is undefined. It might scratch your car, or burn
your cake. Don’t tell me I didn’t warn you.
If the OID is not specified, PostgreSQL will try to infer the type
from the context, but this may fail in some contexts and may require a
cast (e.g. specifying
for its placeholder).
You can use the
psycopg.adapters
A Python int could be stored as several Postgres types: int2, int4,
int8, numeric. If a type too small is used, it may result in an
overflow. If a type too large is used, PostgreSQL may not want to
cast it to a smaller type.
Python lists should be dumped according to the type they contain to
convert them to e.g. array of strings, array of ints (and which
size of int?…)
and return a new
class, or sequence of classes, that can be used to identify the same
dumper again. If the mechanism is not needed, the method should return
the same
object passed in the constructor.
If a dumper implements
get_key()
it should also implement
upgrade()
Return type
or None) - The context where the transformation is performed. If not
specified the conversion might be inaccurate, for instance it will not
be possible to know the connection encoding or the server date format.
, the PostgreSQL client library, which
performs most of the network communications and returns query results in C
structures.
The low-level functions of the library are exposed by the objects in the
psycopg.pq
module.
implementation, however it requires development packages installed on the
client machine. It can be installed using the
extra, i.e. running
install
"psycopg[c]"
: a pre-compiled C implementation, bundled with all the required
libraries. It is the easiest option to deal with, fast to install and it
should require no development tool or client library, however it may be not
available for every platform. You can install it using the
binary
extra,
i.e. running
install
"psycopg[binary]"
At import time, Psycopg 3 will try to use the best implementation available
and will fail if none is usable. You can force the use of a specific
implementation by exporting the env var
>>> enc = conn.info.encoding
>>> encrypted = conn.pgconn.encrypt_password(password.encode(enc), rolename.encode(enc))
b'SCRAM-SHA-256$4096:...
>>> conn.pgconn.trace(sys.stderr.fileno())
>>> conn.pgconn.set_trace_flags(pq.Trace.SUPPRESS_TIMESTAMPS pq.Trace.REGRESS_MODE)
>>> conn.execute("select now()")
F 13 Parse "" "BEGIN" 0
F 14 Bind "" "" 0 0 1 0
F 6 Describe P ""
F 9 Execute "" 0
F 4 Sync
B 4 ParseComplete
B 4 BindComplete
B 4 NoData
B 10 CommandComplete "BEGIN"
B 5 ReadyForQuery T
F 17 Query "select now()"
B 28 RowDescription 1 "now" NNNN 0 NNNN 8 -1 0
B 39 DataRow 1 29 '2022-09-14 14:12:16.648035+02'
B 13 CommandComplete "SELECT 1"
B 5 ReadyForQuery T
>>> conn.pgconn.untrace()
is a distributed database using the same fronted-backend protocol
of PostgreSQL. As such, Psycopg can be used to write Python programs
interacting with CockroachDB.
Opening a connection to a CRDB database using
psycopg.connect()
provides a
largely working object. However, using the
psycopg.crdb.connect()
function
instead, Psycopg will create more specialised objects and provide a types
mapping tweaked on the CockroachDB data model.
: please refer to the
database documentation for details. These are some of the main differences
affecting Psycopg behaviour:
is only populated from CockroachDB
22.1. Note however that you cannot use the PID to terminate the session; use
SHOW session_id
to find the id of a session, which you may terminate with
CANCEL SESSION
in lieu of PostgreSQL’s
pg_terminate_backend()
The default adapters map establishing how Python and CockroachDB types are
converted into each other.
The map is used as a template when new connections are created, using
psycopg.crdb.connect()
(similarly to the way
psycopg.adapters
is used
as template for new PostgreSQL connections).
This registry contains only the types and adapters supported by
CockroachDB. Several PostgreSQL types and adapters are missing or
different from PostgreSQL, among which:
This module is experimental and its interface could change in the future,
without warning or respect for the version scheme. It is provided here to
allow experimentation before making it more stable.
package. The package is currently
not installed automatically as a Psycopg dependency and must be installed
manually:
$ pip install "dnspython >= 2.1"
If lookup is successful, return a params dict with hosts and ports replaced
with the looked-up entries.
Raise
OperationalError
if no lookup is successful and no host
(looked up or unchanged) could be returned.
In addition to the rules defined by RFC 2782 about the host name pattern,
perform SRV lookup also if the the port is the string
(case
insensitive).
Warning
This is an experimental functionality.
import psycopg._dns # not imported automatically
class SrvCognizantConnection(psycopg.Connection):
@classmethod
def _get_connection_params(cls, conninfo, **kwargs):
params = super()._get_connection_params(conninfo, **kwargs)
params = psycopg._dns.resolve_srv(params)
return params
# The name will be resolved to db1.example.com
cnn = SrvCognizantConnection.connect("host=_postgres._tcp.db.psycopg.org")
This method is a subclass hook allowing to manipulate the connection
parameters before performing the connection. Make sure to call the
super()
implementation before further manipulation of the arguments: