NAME EV::Pg - asynchronous PostgreSQL client using libpq and EV SYNOPSIS use v5.10; use EV; use EV::Pg; my $pg = EV::Pg->new( conninfo => 'dbname=mydb', on_error => sub { die "PG error: $_[0]\n" }, ); $pg->on_connect(sub { $pg->query_params( 'select $1::int + $2::int', [10, 20], sub { my ($rows, $err) = @_; die $err if $err; say $rows->[0][0]; # 30 EV::break; }, ); }); EV::run; DESCRIPTION EV::Pg is a non-blocking PostgreSQL client that integrates with the EV event loop. It drives the libpq async API ("PQsendQuery", "PQconsumeInput", "PQgetResult") via "ev_io" watchers on the libpq socket, so the event loop never blocks on database I/O. Features: parameterized queries, prepared statements, pipeline mode, single-row mode, chunked rows (libpq >= 17), COPY IN/OUT, LISTEN/NOTIFY, async cancel (libpq >= 17), structured error fields, protocol tracing, and notice handling. CALLBACKS Query callbacks receive "($result)" on success, "(undef, $error)" on error: SELECT / single-row mode "(\@rows)" where each row is an arrayref of column values. "undef" columns map to Perl "undef". INSERT / UPDATE / DELETE "($cmd_tuples)" -- the string returned by "PQcmdTuples" (e.g. "1", "0"). Describe "(\%meta)" with keys "nfields", "nparams", and (when non-zero) "fields" (arrayref of "{name, type}" hashes) and "paramtypes" (arrayref of OIDs). COPY "("COPY_IN")", "("COPY_OUT")", or "("COPY_BOTH")". Pipeline sync "(1)". Error "(undef, $error_message)". Exceptions thrown inside callbacks are caught and emitted as warnings. CONSTRUCTOR new my $pg = EV::Pg->new(%args); Arguments: conninfo libpq connection string. If provided, "connect" is called immediately. conninfo_params Hashref of connection parameters (e.g. "{ host => 'localhost', dbname => 'mydb', port => '5432' }"). Alternative to "conninfo". If provided, "connect_params" is called immediately. expand_dbname If true and "conninfo_params" is used, the "dbname" value is parsed as a connection string (allowing "dbname => 'postgresql://...'"). on_connect Callback invoked (with no arguments) when the connection is established. on_error Callback invoked as "($error_message)" on connection-level errors. Defaults to "sub { die @_ }". on_notify Callback invoked as "($channel, $payload, $backend_pid)" on LISTEN/NOTIFY messages. on_notice Callback invoked as "($message)" on PostgreSQL notice/warning messages. on_drain Callback invoked (with no arguments) when the send buffer has been flushed during COPY IN. Useful for resuming "put_copy_data" after it returns 0. loop An EV loop object. Defaults to "EV::default_loop". CONNECTION METHODS connect $pg->connect($conninfo); Initiates an asynchronous connection. The "on_connect" handler fires on success; "on_error" fires on failure. connect_params $pg->connect_params(\%params); $pg->connect_params(\%params, $expand_dbname); Initiates an asynchronous connection using keyword/value parameters instead of a connection string. $expand_dbname allows the "dbname" parameter to contain a full connection URI. reset $pg->reset; Drops the current connection and reconnects using the original conninfo. Pending callbacks receive "(undef, "connection reset")". Alias: "reconnect". finish $pg->finish; Closes the connection. Pending callbacks receive "(undef, "connection finished")". Alias: "disconnect". is_connected my $bool = $pg->is_connected; Returns 1 if connected and ready for queries. status my $st = $pg->status; Returns the libpq connection status ("CONNECTION_OK" or "CONNECTION_BAD"). QUERY METHODS query $pg->query($sql, sub { my ($result, $err) = @_; }); Sends a simple query. Not allowed in pipeline mode -- use "query_params" instead. Multi-statement strings (e.g. "SELECT 1; SELECT 2") are supported but only the last result is delivered to the callback. PostgreSQL stops executing after the first error, so errors always appear as the last result. query_params $pg->query_params($sql, \@params, sub { my ($result, $err) = @_; }); Sends a parameterized query. Parameters are referenced in SQL as $1, $2, etc. "undef" values are sent as SQL NULL. prepare $pg->prepare($name, $sql, sub { my ($result, $err) = @_; }); Creates a prepared statement. The callback receives an empty string ("") on success. Alias: "prep". query_prepared $pg->query_prepared($name, \@params, sub { my ($result, $err) = @_; }); Executes a prepared statement. Alias: "qx". describe_prepared $pg->describe_prepared($name, sub { my ($meta, $err) = @_; }); Describes a prepared statement. The callback receives a hashref with keys "nfields" and "nparams". When "nfields" is non-zero, a "fields" key is also present (arrayref of "{name, type}" hashes). When "nparams" is non-zero, a "paramtypes" key is also present (arrayref of OIDs). describe_portal $pg->describe_portal($name, sub { my ($meta, $err) = @_; }); Describes a portal. The callback receives the same hashref structure as "describe_prepared". set_single_row_mode my $ok = $pg->set_single_row_mode; Switches the most recently sent query to single-row mode. Returns 1 on success, 0 on failure (e.g. no query pending). The callback fires once per row with "(\@rows)" where @rows is an arrayref containing a single row (e.g. "[[$col1, $col2, ...]]"), then a final empty "(\@rows)" (where @rows has zero elements) for the completion. set_chunked_rows_mode my $ok = $pg->set_chunked_rows_mode($chunk_size); Switches the most recently sent query to chunked rows mode, delivering up to $chunk_size rows at a time (requires libpq >= 17). Like single-row mode, but with lower per-callback overhead for large result sets. Returns 1 on success, 0 on failure. close_prepared $pg->close_prepared($name, sub { my ($result, $err) = @_; }); Closes (deallocates) a prepared statement at protocol level (requires libpq >= 17). The callback receives an empty string ("") on success. Works in pipeline mode, unlike "DEALLOCATE" SQL. close_portal $pg->close_portal($name, sub { my ($result, $err) = @_; }); Closes a portal at protocol level (requires libpq >= 17). The callback receives an empty string ("") on success. cancel my $err = $pg->cancel; Sends a cancel request using the legacy "PQcancel" API. This is a blocking call. Returns "undef" on success, an error string on failure. cancel_async $pg->cancel_async(sub { my ($err) = @_; }); Sends an asynchronous cancel request using the "PQcancelConn" API (requires libpq >= 17). The callback receives no arguments on success, or an error string on failure. Croaks if libpq was built without async cancel support ("LIBPQ_HAS_ASYNC_CANCEL"). pending_count my $n = $pg->pending_count; Returns the number of callbacks in the queue. skip_pending $pg->skip_pending; Cancels all pending callbacks, invoking each with "(undef, "skipped")". PIPELINE METHODS enter_pipeline $pg->enter_pipeline; Enters pipeline mode. Queries are batched and sent without waiting for individual results. exit_pipeline $pg->exit_pipeline; Exits pipeline mode. Croaks if the pipeline is not idle (has pending queries). pipeline_sync $pg->pipeline_sync(sub { my ($ok) = @_; }); Sends a pipeline sync point. The callback fires with "(1)" when all preceding queries have completed. Alias: "sync". send_pipeline_sync $pg->send_pipeline_sync(sub { my ($ok) = @_; }); Like "pipeline_sync" but does not flush the send buffer (requires libpq >= 17). Useful for batching multiple sync points before a single manual flush via "send_flush_request". send_flush_request $pg->send_flush_request; Sends a flush request, asking the server to deliver results for queries sent so far. Alias: "flush". pipeline_status my $st = $pg->pipeline_status; Returns "PQ_PIPELINE_OFF", "PQ_PIPELINE_ON", or "PQ_PIPELINE_ABORTED". COPY METHODS put_copy_data my $ok = $pg->put_copy_data($data); Sends data to the server during a COPY IN operation. Returns 1 on success (data flushed or flush scheduled), 0 if the send buffer is full (wait for writability and retry), or -1 on error. put_copy_end my $ok = $pg->put_copy_end; my $ok = $pg->put_copy_end($errmsg); Ends a COPY IN operation. Pass an error message to abort the COPY. Returns 1 on success, 0 if the send buffer is full (retry after writability), or -1 on error. get_copy_data my $row = $pg->get_copy_data; Retrieves a row during COPY OUT. Returns the row data as a string, -1 when the COPY is complete, or "undef" if no data is available yet. HANDLER METHODS Each handler method is a getter/setter. Called with an argument, it sets the handler and returns the new value (or "undef" if cleared). Called without arguments, it returns the current handler. on_connect Called with no arguments on successful connection. on_error Called as "($error_message)" on connection-level errors. on_notify Called as "($channel, $payload, $backend_pid)" on LISTEN/NOTIFY. on_notice Called as "($message)" on server notice/warning messages. on_drain Called with no arguments when the libpq send buffer has been fully flushed during a COPY IN operation. Use this to resume sending data after "put_copy_data" returns 0 (buffer full). CONNECTION INFO String accessors ("db", "user", "host", "port", "error_message", "parameter_status", "ssl_attribute") return "undef" when not connected. Integer accessors return a default value (typically 0 or -1). Methods that require an active connection ("client_encoding", "set_client_encoding", "set_error_verbosity", "set_error_context_visibility", "conninfo") croak when not connected. error_message Last error message. Alias: "errstr". transaction_status Returns "PQTRANS_IDLE", "PQTRANS_ACTIVE", "PQTRANS_INTRANS", "PQTRANS_INERROR", or "PQTRANS_UNKNOWN". Alias: "txn_status". parameter_status my $val = $pg->parameter_status($name); Returns a server parameter (e.g. "server_version", "client_encoding"). backend_pid Backend process ID. Alias: "pid". server_version Server version as an integer (e.g. 180000 for 18.0). protocol_version Protocol version (typically 3). db Database name. user Connected user name. host Server host. hostaddr Server IP address. port Server port. socket The underlying file descriptor. ssl_in_use Returns 1 if the connection uses SSL. ssl_attribute my $val = $pg->ssl_attribute($name); Returns an SSL attribute (e.g. "protocol", "cipher"). ssl_attribute_names my $names = $pg->ssl_attribute_names; Returns an arrayref of available SSL attribute names, or "undef" if the connection does not use SSL. client_encoding Returns the current client encoding name. set_client_encoding $pg->set_client_encoding($encoding); Sets the client encoding (e.g. "UTF8", "SQL_ASCII"). This is a synchronous (blocking) call that stalls the event loop for one server round trip. Best called right after "on_connect" fires, before any queries are dispatched. Croaks if there are pending queries or on failure. set_error_verbosity my $old = $pg->set_error_verbosity($level); Sets error verbosity. Returns the previous setting. set_error_context_visibility my $old = $pg->set_error_context_visibility($level); Sets error context visibility. $level is one of "PQSHOW_CONTEXT_NEVER", "PQSHOW_CONTEXT_ERRORS" (default), or "PQSHOW_CONTEXT_ALWAYS". Returns the previous setting. error_fields my $fields = $pg->error_fields; Returns a hashref of structured error fields from the most recent "PGRES_FATAL_ERROR" result, or "undef" if no error has occurred. Keys (present only when non-NULL in the server response): sqlstate severity primary detail hint position context schema table column datatype constraint internal_position internal_query source_file source_line source_function result_meta my $meta = $pg->result_meta; Returns a hashref of metadata from the most recent query result, or "undef" if no result has been delivered. Keys: nfields number of columns cmd_status command status string (e.g. "SELECT 3", "INSERT 0 1") inserted_oid OID of inserted row (only present when valid) fields arrayref of column metadata hashrefs: name, type (OID), ftable (OID), ftablecol, fformat (0=text, 1=binary), fsize, fmod conninfo my $info = $pg->conninfo; Returns a hashref of the connection parameters actually used by the live connection (keyword => value pairs). connection_used_password my $bool = $pg->connection_used_password; Returns 1 if the connection authenticated with a password. connection_used_gssapi my $bool = $pg->connection_used_gssapi; Returns 1 if the connection used GSSAPI authentication. connection_needs_password my $bool = $pg->connection_needs_password; Returns 1 if the server requested a password during authentication. trace $pg->trace($filename); Enables libpq protocol tracing to the specified file. Useful for debugging wire-level issues. untrace $pg->untrace; Disables protocol tracing and closes the trace file. set_trace_flags $pg->set_trace_flags($flags); Sets trace output flags (requires libpq >= 14). $flags is a bitmask of "PQTRACE_SUPPRESS_TIMESTAMPS" and "PQTRACE_REGRESS_MODE". UTILITY METHODS escape_literal my $quoted = $pg->escape_literal($string); Returns a string literal escaped for use in SQL. Alias: "quote". escape_identifier my $quoted = $pg->escape_identifier($string); Returns an identifier escaped for use in SQL. Alias: "quote_id". escape_bytea my $escaped = $pg->escape_bytea($binary); Escapes binary data for use in a bytea column. encrypt_password my $hash = $pg->encrypt_password($password, $user); my $hash = $pg->encrypt_password($password, $user, $algorithm); Encrypts a password for use with "ALTER ROLE ... PASSWORD". $algorithm is optional; defaults to the server's "password_encryption" setting (typically "scram-sha-256"). unescape_bytea my $binary = EV::Pg->unescape_bytea($escaped); Class method. Unescapes bytea data. lib_version my $ver = EV::Pg->lib_version; Class method. Returns the libpq version as an integer. conninfo_parse my $params = EV::Pg->conninfo_parse($conninfo); Class method. Parses a connection string and returns a hashref of the recognized keyword/value pairs. Croaks if the string is invalid. Useful for validating connection strings before connecting. ALIASES Short aliases for common methods: q query qp query_params qx query_prepared prep prepare reconnect reset disconnect finish flush send_flush_request sync pipeline_sync quote escape_literal quote_id escape_identifier errstr error_message txn_status transaction_status pid backend_pid EXPORT TAGS :status PGRES_* result status constants :conn CONNECTION_OK, CONNECTION_BAD :transaction PQTRANS_* transaction status constants :pipeline PQ_PIPELINE_* pipeline status constants :verbosity PQERRORS_* verbosity constants :context PQSHOW_CONTEXT_* context visibility constants :trace PQTRACE_* trace flag constants :all all of the above BENCHMARK 500k queries over Unix socket, PostgreSQL 18, libpq 18: Workload EV::Pg sequential EV::Pg pipeline DBD::Pg sync DBD::Pg async+EV SELECT 73,109 q/s 124,092 q/s 56,496 q/s 48,744 q/s INSERT 58,534 q/s 84,467 q/s 39,068 q/s 41,559 q/s UPSERT 26,342 q/s 34,223 q/s 28,134 q/s 27,155 q/s Sequential mode uses prepared statements (parse once, bind+execute per call). Pipeline mode batches queries with "pipeline_sync" every 1000 queries. See bench/bench.pl to reproduce. REQUIREMENTS libpq >= 14 (PostgreSQL client library) and EV. Some features (chunked rows, close prepared/portal, no-flush pipeline sync, async cancel) require libpq >= 17. SEE ALSO EV, DBD::Pg, Mojo::Pg, AnyEvent::Pg LICENSE This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.