object

amqp

Portable AMQP 0-9-1 (Advanced Message Queuing Protocol) client. Uses the sockets library for TCP communication.

Availability:
logtalk_load(amqp(loader))
Author: Paulo Moura
Version: 1:0:0
Date: 2026-02-19
Compilation flags:
static, context_switching_calls
Remarks:
  • Supported backends: ECLiPSe, GNU Prolog, SICStus Prolog, SWI-Prolog, Trealla Prolog, and XVM (same as the sockets library).

  • Protocol version: Implements AMQP 0-9-1 specification.

  • Binary protocol: AMQP is a binary protocol with typed frame encoding.

  • Channels: Supports multiple concurrent channels over a single connection.

  • Exchanges and queues: Full support for declaring exchanges, queues, and bindings.

  • Content: Supports message publishing and consuming with content headers.

  • Transactions: Supports AMQP transactions with tx.select, tx.commit, and tx.rollback.

  • Publisher confirms: Support for publisher confirms can be added.

  • Heartbeat: Supports heartbeat negotiation to keep connections alive.

  • Reconnection: Automatic reconnection with configurable retry attempts and delays.

Inherited public predicates:
(none)

Public predicates

connect/4

Connects to an AMQP 0-9-1 server and performs the protocol handshake. Returns a connection handle for subsequent operations. Supports automatic reconnection on connection failures.

Compilation flags:
static
Template:
connect(Host,Port,Connection,Options)
Mode and number of proofs:
connect(+atom,+integer,--compound,+list) - one_or_error
Exceptions:
Connection refused or network error:
amqp_error(connection_failed)
Server rejected connection:
amqp_error(protocol_error(Message))
Authentication failed:
amqp_error(auth_failed)
All reconnection attempts failed:
amqp_error(reconnect_failed)
Remarks:
  • Option username(Username): Username for authentication. Default is guest.

  • Option password(Password): Password for authentication. Default is guest.

  • Option virtual_host(VHost): Virtual host name. Default is /.

  • Option heartbeat(Seconds): Heartbeat interval in seconds. Default is 60.

  • Option channel_max(Max): Maximum number of channels. Default is 0 (no limit).

  • Option frame_max(Max): Maximum frame size. Default is 131072.

  • Option reconnect(Boolean): Enable automatic reconnection on connection failure. Default is false.

  • Option reconnect_attempts(N): Maximum number of reconnection attempts. Default is 3. Only used when reconnect(true).

  • Option reconnect_delay(Seconds): Delay between reconnection attempts in seconds. Default is 1. Only used when reconnect(true).


close/1

Gracefully closes the AMQP connection. Closes all channels and the connection itself.

Compilation flags:
static
Template:
close(Connection)
Mode and number of proofs:
close(+compound) - one_or_error

close/3

Closes the AMQP connection with a specific reply code and reason.

Compilation flags:
static
Template:
close(Connection,ReplyCode,ReplyText)
Mode and number of proofs:
close(+compound,+integer,+atom) - one_or_error

connection_alive/1

Checks if the connection is still open and valid.

Compilation flags:
static
Template:
connection_alive(Connection)
Mode and number of proofs:
connection_alive(+compound) - zero_or_one

channel_open/3

Opens a new channel on the connection. Returns a channel handle.

Compilation flags:
static
Template:
channel_open(Connection,ChannelNumber,Channel)
Mode and number of proofs:
channel_open(+compound,+integer,--compound) - one_or_error
Exceptions:
Channel already open:
amqp_error(channel_error(Message))

channel_close/1

Closes a channel.

Compilation flags:
static
Template:
channel_close(Channel)
Mode and number of proofs:
channel_close(+compound) - one_or_error

channel_close/3

Closes a channel with a specific reply code and reason.

Compilation flags:
static
Template:
channel_close(Channel,ReplyCode,ReplyText)
Mode and number of proofs:
channel_close(+compound,+integer,+atom) - one_or_error

exchange_declare/3

Declares an exchange on the server.

Compilation flags:
static
Template:
exchange_declare(Channel,Exchange,Options)
Mode and number of proofs:
exchange_declare(+compound,+atom,+list) - one_or_error
Remarks:
  • Option type(Type): Exchange type: direct, fanout, topic, headers. Default is direct.

  • Option durable(Boolean): Survive server restart. Default is false.

  • Option auto_delete(Boolean): Delete when unused. Default is false.

  • Option internal(Boolean): Internal exchange. Default is false.

  • Option arguments(Arguments): Additional arguments as key-value pairs.


exchange_delete/3

Deletes an exchange.

Compilation flags:
static
Template:
exchange_delete(Channel,Exchange,Options)
Mode and number of proofs:
exchange_delete(+compound,+atom,+list) - one_or_error
Remarks:
  • Option if_unused(Boolean): Only delete if unused. Default is false.


exchange_bind/4

Binds an exchange to another exchange.

Compilation flags:
static
Template:
exchange_bind(Channel,Destination,Source,Options)
Mode and number of proofs:
exchange_bind(+compound,+atom,+atom,+list) - one_or_error
Remarks:
  • Option routing_key(Key): Routing key for binding. Default is empty.

  • Option arguments(Arguments): Additional arguments.


exchange_unbind/4

Unbinds an exchange from another exchange.

Compilation flags:
static
Template:
exchange_unbind(Channel,Destination,Source,Options)
Mode and number of proofs:
exchange_unbind(+compound,+atom,+atom,+list) - one_or_error

queue_declare/3

Declares a queue on the server. If Queue is a variable, the server generates a unique name.

Compilation flags:
static
Template:
queue_declare(Channel,Queue,Options)
Mode and number of proofs:
queue_declare(+compound,?atom,+list) - one_or_error
Remarks:
  • Option durable(Boolean): Survive server restart. Default is false.

  • Option exclusive(Boolean): Exclusive to this connection. Default is false.

  • Option auto_delete(Boolean): Delete when unused. Default is false.

  • Option arguments(Arguments): Additional arguments (e.g., message TTL, dead letter exchange).


queue_delete/3

Deletes a queue.

Compilation flags:
static
Template:
queue_delete(Channel,Queue,Options)
Mode and number of proofs:
queue_delete(+compound,+atom,+list) - one_or_error
Remarks:
  • Option if_unused(Boolean): Only delete if unused. Default is false.

  • Option if_empty(Boolean): Only delete if empty. Default is false.


queue_bind/4

Binds a queue to an exchange.

Compilation flags:
static
Template:
queue_bind(Channel,Queue,Exchange,Options)
Mode and number of proofs:
queue_bind(+compound,+atom,+atom,+list) - one_or_error
Remarks:
  • Option routing_key(Key): Routing key for binding. Default is empty.

  • Option arguments(Arguments): Additional arguments.


queue_unbind/4

Unbinds a queue from an exchange.

Compilation flags:
static
Template:
queue_unbind(Channel,Queue,Exchange,Options)
Mode and number of proofs:
queue_unbind(+compound,+atom,+atom,+list) - one_or_error

queue_purge/2

Purges all messages from a queue.

Compilation flags:
static
Template:
queue_purge(Channel,Queue)
Mode and number of proofs:
queue_purge(+compound,+atom) - one_or_error

basic_publish/4

Publishes a message to an exchange.

Compilation flags:
static
Template:
basic_publish(Channel,Exchange,Body,Options)
Mode and number of proofs:
basic_publish(+compound,+atom,+term,+list) - one_or_error
Remarks:
  • Option routing_key(Key): Routing key for message. Default is empty.

  • Option mandatory(Boolean): Return if not routable. Default is false.

  • Option immediate(Boolean): Return if not deliverable. Default is false (deprecated in RabbitMQ).

  • Option content_type(Type): MIME content type.

  • Option content_encoding(Enc): Content encoding.

  • Option correlation_id(Id): Correlation identifier.

  • Option reply_to(Queue): Reply queue name.

  • Option expiration(Ms): Message TTL in milliseconds.

  • Option message_id(Id): Application message identifier.

  • Option timestamp(Ts): Message timestamp.

  • Option type(Type): Message type name.

  • Option user_id(Id): Creating user ID.

  • Option app_id(Id): Creating application ID.

  • Option delivery_mode(Mode): 1 for non-persistent, 2 for persistent.

  • Option priority(P): Message priority (0-9).

  • Option headers(H): Application headers as key-value pairs.


basic_consume/3

Starts consuming messages from a queue.

Compilation flags:
static
Template:
basic_consume(Channel,Queue,Options)
Mode and number of proofs:
basic_consume(+compound,+atom,+list) - one_or_error
Remarks:
  • Option consumer_tag(Tag): Consumer identifier. Server generates if not provided.

  • Option no_local(Boolean): Do not receive own messages. Default is false.

  • Option no_ack(Boolean): No acknowledgment required. Default is false.

  • Option exclusive(Boolean): Exclusive consumer. Default is false.

  • Option arguments(Arguments): Additional arguments.


basic_cancel/3

Cancels a consumer.

Compilation flags:
static
Template:
basic_cancel(Channel,ConsumerTag,Options)
Mode and number of proofs:
basic_cancel(+compound,+atom,+list) - one_or_error

basic_get/3

Synchronously gets a message from a queue.

Compilation flags:
static
Template:
basic_get(Channel,Queue,Options)
Mode and number of proofs:
basic_get(+compound,+atom,+list) - one_or_error
Remarks:
  • Option no_ack(Boolean): No acknowledgment required. Default is false.


basic_ack/3

Acknowledges a message.

Compilation flags:
static
Template:
basic_ack(Channel,DeliveryTag,Options)
Mode and number of proofs:
basic_ack(+compound,+integer,+list) - one_or_error
Remarks:
  • Option multiple(Boolean): Acknowledge all up to this tag. Default is false.


basic_nack/3

Negatively acknowledges a message (RabbitMQ extension).

Compilation flags:
static
Template:
basic_nack(Channel,DeliveryTag,Options)
Mode and number of proofs:
basic_nack(+compound,+integer,+list) - one_or_error
Remarks:
  • Option multiple(Boolean): Reject all up to this tag. Default is false.

  • Option requeue(Boolean): Requeue the message. Default is true.


basic_reject/3

Rejects a message.

Compilation flags:
static
Template:
basic_reject(Channel,DeliveryTag,Options)
Mode and number of proofs:
basic_reject(+compound,+integer,+list) - one_or_error
Remarks:
  • Option requeue(Boolean): Requeue the message. Default is true.


basic_qos/2

Sets quality of service (prefetch) settings.

Compilation flags:
static
Template:
basic_qos(Channel,Options)
Mode and number of proofs:
basic_qos(+compound,+list) - one_or_error
Remarks:
  • Option prefetch_size(Size): Prefetch window size in bytes. Default is 0 (no limit).

  • Option prefetch_count(Count): Prefetch window in messages. Default is 0 (no limit).

  • Option global(Boolean): Apply to entire connection. Default is false.


basic_recover/2

Asks the server to redeliver unacknowledged messages.

Compilation flags:
static
Template:
basic_recover(Channel,Options)
Mode and number of proofs:
basic_recover(+compound,+list) - one_or_error
Remarks:
  • Option requeue(Boolean): Requeue messages. Default is false.


receive/3

Receives a message or method from the server. Blocks until data is available or timeout.

Compilation flags:
static
Template:
receive(Channel,Message,Options)
Mode and number of proofs:
receive(+compound,-compound,+list) - zero_or_one_or_error
Remarks:
  • Option timeout(Milliseconds): Timeout in milliseconds. 0 for non-blocking, -1 for infinite. Default is -1.


tx_select/1

Enables transaction mode on a channel.

Compilation flags:
static
Template:
tx_select(Channel)
Mode and number of proofs:
tx_select(+compound) - one_or_error

tx_commit/1

Commits the current transaction.

Compilation flags:
static
Template:
tx_commit(Channel)
Mode and number of proofs:
tx_commit(+compound) - one_or_error

tx_rollback/1

Rolls back the current transaction.

Compilation flags:
static
Template:
tx_rollback(Channel)
Mode and number of proofs:
tx_rollback(+compound) - one_or_error

confirm_select/1

Enables publisher confirms on a channel (RabbitMQ extension).

Compilation flags:
static
Template:
confirm_select(Channel)
Mode and number of proofs:
confirm_select(+compound) - one_or_error

send_heartbeat/1

Sends a heartbeat frame to the server.

Compilation flags:
static
Template:
send_heartbeat(Connection)
Mode and number of proofs:
send_heartbeat(+compound) - one_or_error

message_body/2

Extracts the body from a message.

Compilation flags:
static
Template:
message_body(Message,Body)
Mode and number of proofs:
message_body(+compound,-term) - one

message_properties/2

Extracts the properties from a message as a list.

Compilation flags:
static
Template:
message_properties(Message,Properties)
Mode and number of proofs:
message_properties(+compound,-list) - one

message_property/3

Extracts a specific property from a message. Fails if not present.

Compilation flags:
static
Template:
message_property(Message,PropertyName,Value)
Mode and number of proofs:
message_property(+compound,+atom,-term) - zero_or_one

message_delivery_tag/2

Extracts the delivery tag from a message.

Compilation flags:
static
Template:
message_delivery_tag(Message,DeliveryTag)
Mode and number of proofs:
message_delivery_tag(+compound,-integer) - one

message_exchange/2

Extracts the exchange name from a message.

Compilation flags:
static
Template:
message_exchange(Message,Exchange)
Mode and number of proofs:
message_exchange(+compound,-atom) - one

message_routing_key/2

Extracts the routing key from a message.

Compilation flags:
static
Template:
message_routing_key(Message,RoutingKey)
Mode and number of proofs:
message_routing_key(+compound,-atom) - one

encode_frame/2

Encodes an AMQP frame to a list of bytes.

Compilation flags:
static
Template:
encode_frame(Frame,Bytes)
Mode and number of proofs:
encode_frame(+compound,-list) - one

decode_frame/2

Decodes a list of bytes to an AMQP frame.

Compilation flags:
static
Template:
decode_frame(Bytes,Frame)
Mode and number of proofs:
decode_frame(+list,-compound) - one_or_error

Protected predicates

(no local declarations; see entity ancestors if any)

Private predicates

(no local declarations; see entity ancestors if any)

Operators

(none)