.. index:: single: amqp .. _amqp/0: .. rst-class:: right **object** ``amqp`` ======== Portable AMQP 0-9-1 (Advanced Message Queuing Protocol) client. Uses the sockets library for TCP communication. | **Availability:** | ``logtalk_load(amqp(loader))`` | **Author:** Paulo Moura | **Version:** 1:0:0 | **Date:** 2026-02-19 | **Compilation flags:** | ``static, context_switching_calls`` | **Uses:** | :ref:`list ` | :ref:`os ` | :ref:`socket ` | :ref:`term_io ` | **Remarks:** - Supported backends: ECLiPSe, GNU Prolog, SICStus Prolog, SWI-Prolog, Trealla Prolog, and XVM (same as the sockets library). - Protocol version: Implements AMQP 0-9-1 specification. - Binary protocol: AMQP is a binary protocol with typed frame encoding. - Channels: Supports multiple concurrent channels over a single connection. - Exchanges and queues: Full support for declaring exchanges, queues, and bindings. - Content: Supports message publishing and consuming with content headers. - Transactions: Supports AMQP transactions with tx.select, tx.commit, and tx.rollback. - Publisher confirms: Support for publisher confirms can be added. - Heartbeat: Supports heartbeat negotiation to keep connections alive. - Reconnection: Automatic reconnection with configurable retry attempts and delays. | **Inherited public predicates:** | (none) .. contents:: :local: :backlinks: top Public predicates ----------------- .. index:: connect/4 .. _amqp/0::connect/4: ``connect/4`` ^^^^^^^^^^^^^ Connects to an AMQP 0-9-1 server and performs the protocol handshake. Returns a connection handle for subsequent operations. Supports automatic reconnection on connection failures. | **Compilation flags:** | ``static`` | **Template:** | ``connect(Host,Port,Connection,Options)`` | **Mode and number of proofs:** | ``connect(+atom,+integer,--compound,+list)`` - ``one_or_error`` | **Exceptions:** | Connection refused or network error: | ``amqp_error(connection_failed)`` | Server rejected connection: | ``amqp_error(protocol_error(Message))`` | Authentication failed: | ``amqp_error(auth_failed)`` | All reconnection attempts failed: | ``amqp_error(reconnect_failed)`` | **Remarks:** - Option username(Username): Username for authentication. Default is guest. - Option password(Password): Password for authentication. Default is guest. - Option virtual_host(VHost): Virtual host name. Default is /. - Option heartbeat(Seconds): Heartbeat interval in seconds. Default is 60. - Option channel_max(Max): Maximum number of channels. Default is 0 (no limit). - Option frame_max(Max): Maximum frame size. Default is 131072. - Option reconnect(Boolean): Enable automatic reconnection on connection failure. Default is false. - Option reconnect_attempts(N): Maximum number of reconnection attempts. Default is 3. Only used when reconnect(true). - Option reconnect_delay(Seconds): Delay between reconnection attempts in seconds. Default is 1. Only used when reconnect(true). ------------ .. index:: close/1 .. _amqp/0::close/1: ``close/1`` ^^^^^^^^^^^ Gracefully closes the AMQP connection. Closes all channels and the connection itself. | **Compilation flags:** | ``static`` | **Template:** | ``close(Connection)`` | **Mode and number of proofs:** | ``close(+compound)`` - ``one_or_error`` ------------ .. index:: close/3 .. _amqp/0::close/3: ``close/3`` ^^^^^^^^^^^ Closes the AMQP connection with a specific reply code and reason. | **Compilation flags:** | ``static`` | **Template:** | ``close(Connection,ReplyCode,ReplyText)`` | **Mode and number of proofs:** | ``close(+compound,+integer,+atom)`` - ``one_or_error`` ------------ .. index:: connection_alive/1 .. _amqp/0::connection_alive/1: ``connection_alive/1`` ^^^^^^^^^^^^^^^^^^^^^^ Checks if the connection is still open and valid. | **Compilation flags:** | ``static`` | **Template:** | ``connection_alive(Connection)`` | **Mode and number of proofs:** | ``connection_alive(+compound)`` - ``zero_or_one`` ------------ .. index:: channel_open/3 .. _amqp/0::channel_open/3: ``channel_open/3`` ^^^^^^^^^^^^^^^^^^ Opens a new channel on the connection. Returns a channel handle. | **Compilation flags:** | ``static`` | **Template:** | ``channel_open(Connection,ChannelNumber,Channel)`` | **Mode and number of proofs:** | ``channel_open(+compound,+integer,--compound)`` - ``one_or_error`` | **Exceptions:** | Channel already open: | ``amqp_error(channel_error(Message))`` ------------ .. index:: channel_close/1 .. _amqp/0::channel_close/1: ``channel_close/1`` ^^^^^^^^^^^^^^^^^^^ Closes a channel. | **Compilation flags:** | ``static`` | **Template:** | ``channel_close(Channel)`` | **Mode and number of proofs:** | ``channel_close(+compound)`` - ``one_or_error`` ------------ .. index:: channel_close/3 .. _amqp/0::channel_close/3: ``channel_close/3`` ^^^^^^^^^^^^^^^^^^^ Closes a channel with a specific reply code and reason. | **Compilation flags:** | ``static`` | **Template:** | ``channel_close(Channel,ReplyCode,ReplyText)`` | **Mode and number of proofs:** | ``channel_close(+compound,+integer,+atom)`` - ``one_or_error`` ------------ .. index:: exchange_declare/3 .. _amqp/0::exchange_declare/3: ``exchange_declare/3`` ^^^^^^^^^^^^^^^^^^^^^^ Declares an exchange on the server. | **Compilation flags:** | ``static`` | **Template:** | ``exchange_declare(Channel,Exchange,Options)`` | **Mode and number of proofs:** | ``exchange_declare(+compound,+atom,+list)`` - ``one_or_error`` | **Remarks:** - Option type(Type): Exchange type: direct, fanout, topic, headers. Default is direct. - Option durable(Boolean): Survive server restart. Default is false. - Option auto_delete(Boolean): Delete when unused. Default is false. - Option internal(Boolean): Internal exchange. Default is false. - Option arguments(Arguments): Additional arguments as key-value pairs. ------------ .. index:: exchange_delete/3 .. _amqp/0::exchange_delete/3: ``exchange_delete/3`` ^^^^^^^^^^^^^^^^^^^^^ Deletes an exchange. | **Compilation flags:** | ``static`` | **Template:** | ``exchange_delete(Channel,Exchange,Options)`` | **Mode and number of proofs:** | ``exchange_delete(+compound,+atom,+list)`` - ``one_or_error`` | **Remarks:** - Option if_unused(Boolean): Only delete if unused. Default is false. ------------ .. index:: exchange_bind/4 .. _amqp/0::exchange_bind/4: ``exchange_bind/4`` ^^^^^^^^^^^^^^^^^^^ Binds an exchange to another exchange. | **Compilation flags:** | ``static`` | **Template:** | ``exchange_bind(Channel,Destination,Source,Options)`` | **Mode and number of proofs:** | ``exchange_bind(+compound,+atom,+atom,+list)`` - ``one_or_error`` | **Remarks:** - Option routing_key(Key): Routing key for binding. Default is empty. - Option arguments(Arguments): Additional arguments. ------------ .. index:: exchange_unbind/4 .. _amqp/0::exchange_unbind/4: ``exchange_unbind/4`` ^^^^^^^^^^^^^^^^^^^^^ Unbinds an exchange from another exchange. | **Compilation flags:** | ``static`` | **Template:** | ``exchange_unbind(Channel,Destination,Source,Options)`` | **Mode and number of proofs:** | ``exchange_unbind(+compound,+atom,+atom,+list)`` - ``one_or_error`` ------------ .. index:: queue_declare/3 .. _amqp/0::queue_declare/3: ``queue_declare/3`` ^^^^^^^^^^^^^^^^^^^ Declares a queue on the server. If Queue is a variable, the server generates a unique name. | **Compilation flags:** | ``static`` | **Template:** | ``queue_declare(Channel,Queue,Options)`` | **Mode and number of proofs:** | ``queue_declare(+compound,?atom,+list)`` - ``one_or_error`` | **Remarks:** - Option durable(Boolean): Survive server restart. Default is false. - Option exclusive(Boolean): Exclusive to this connection. Default is false. - Option auto_delete(Boolean): Delete when unused. Default is false. - Option arguments(Arguments): Additional arguments (e.g., message TTL, dead letter exchange). ------------ .. index:: queue_delete/3 .. _amqp/0::queue_delete/3: ``queue_delete/3`` ^^^^^^^^^^^^^^^^^^ Deletes a queue. | **Compilation flags:** | ``static`` | **Template:** | ``queue_delete(Channel,Queue,Options)`` | **Mode and number of proofs:** | ``queue_delete(+compound,+atom,+list)`` - ``one_or_error`` | **Remarks:** - Option if_unused(Boolean): Only delete if unused. Default is false. - Option if_empty(Boolean): Only delete if empty. Default is false. ------------ .. index:: queue_bind/4 .. _amqp/0::queue_bind/4: ``queue_bind/4`` ^^^^^^^^^^^^^^^^ Binds a queue to an exchange. | **Compilation flags:** | ``static`` | **Template:** | ``queue_bind(Channel,Queue,Exchange,Options)`` | **Mode and number of proofs:** | ``queue_bind(+compound,+atom,+atom,+list)`` - ``one_or_error`` | **Remarks:** - Option routing_key(Key): Routing key for binding. Default is empty. - Option arguments(Arguments): Additional arguments. ------------ .. index:: queue_unbind/4 .. _amqp/0::queue_unbind/4: ``queue_unbind/4`` ^^^^^^^^^^^^^^^^^^ Unbinds a queue from an exchange. | **Compilation flags:** | ``static`` | **Template:** | ``queue_unbind(Channel,Queue,Exchange,Options)`` | **Mode and number of proofs:** | ``queue_unbind(+compound,+atom,+atom,+list)`` - ``one_or_error`` ------------ .. index:: queue_purge/2 .. _amqp/0::queue_purge/2: ``queue_purge/2`` ^^^^^^^^^^^^^^^^^ Purges all messages from a queue. | **Compilation flags:** | ``static`` | **Template:** | ``queue_purge(Channel,Queue)`` | **Mode and number of proofs:** | ``queue_purge(+compound,+atom)`` - ``one_or_error`` ------------ .. index:: basic_publish/4 .. _amqp/0::basic_publish/4: ``basic_publish/4`` ^^^^^^^^^^^^^^^^^^^ Publishes a message to an exchange. | **Compilation flags:** | ``static`` | **Template:** | ``basic_publish(Channel,Exchange,Body,Options)`` | **Mode and number of proofs:** | ``basic_publish(+compound,+atom,+term,+list)`` - ``one_or_error`` | **Remarks:** - Option routing_key(Key): Routing key for message. Default is empty. - Option mandatory(Boolean): Return if not routable. Default is false. - Option immediate(Boolean): Return if not deliverable. Default is false (deprecated in RabbitMQ). - Option content_type(Type): MIME content type. - Option content_encoding(Enc): Content encoding. - Option correlation_id(Id): Correlation identifier. - Option reply_to(Queue): Reply queue name. - Option expiration(Ms): Message TTL in milliseconds. - Option message_id(Id): Application message identifier. - Option timestamp(Ts): Message timestamp. - Option type(Type): Message type name. - Option user_id(Id): Creating user ID. - Option app_id(Id): Creating application ID. - Option delivery_mode(Mode): 1 for non-persistent, 2 for persistent. - Option priority(P): Message priority (0-9). - Option headers(H): Application headers as key-value pairs. ------------ .. index:: basic_consume/3 .. _amqp/0::basic_consume/3: ``basic_consume/3`` ^^^^^^^^^^^^^^^^^^^ Starts consuming messages from a queue. | **Compilation flags:** | ``static`` | **Template:** | ``basic_consume(Channel,Queue,Options)`` | **Mode and number of proofs:** | ``basic_consume(+compound,+atom,+list)`` - ``one_or_error`` | **Remarks:** - Option consumer_tag(Tag): Consumer identifier. Server generates if not provided. - Option no_local(Boolean): Do not receive own messages. Default is false. - Option no_ack(Boolean): No acknowledgment required. Default is false. - Option exclusive(Boolean): Exclusive consumer. Default is false. - Option arguments(Arguments): Additional arguments. ------------ .. index:: basic_cancel/3 .. _amqp/0::basic_cancel/3: ``basic_cancel/3`` ^^^^^^^^^^^^^^^^^^ Cancels a consumer. | **Compilation flags:** | ``static`` | **Template:** | ``basic_cancel(Channel,ConsumerTag,Options)`` | **Mode and number of proofs:** | ``basic_cancel(+compound,+atom,+list)`` - ``one_or_error`` ------------ .. index:: basic_get/3 .. _amqp/0::basic_get/3: ``basic_get/3`` ^^^^^^^^^^^^^^^ Synchronously gets a message from a queue. | **Compilation flags:** | ``static`` | **Template:** | ``basic_get(Channel,Queue,Options)`` | **Mode and number of proofs:** | ``basic_get(+compound,+atom,+list)`` - ``one_or_error`` | **Remarks:** - Option no_ack(Boolean): No acknowledgment required. Default is false. ------------ .. index:: basic_ack/3 .. _amqp/0::basic_ack/3: ``basic_ack/3`` ^^^^^^^^^^^^^^^ Acknowledges a message. | **Compilation flags:** | ``static`` | **Template:** | ``basic_ack(Channel,DeliveryTag,Options)`` | **Mode and number of proofs:** | ``basic_ack(+compound,+integer,+list)`` - ``one_or_error`` | **Remarks:** - Option multiple(Boolean): Acknowledge all up to this tag. Default is false. ------------ .. index:: basic_nack/3 .. _amqp/0::basic_nack/3: ``basic_nack/3`` ^^^^^^^^^^^^^^^^ Negatively acknowledges a message (RabbitMQ extension). | **Compilation flags:** | ``static`` | **Template:** | ``basic_nack(Channel,DeliveryTag,Options)`` | **Mode and number of proofs:** | ``basic_nack(+compound,+integer,+list)`` - ``one_or_error`` | **Remarks:** - Option multiple(Boolean): Reject all up to this tag. Default is false. - Option requeue(Boolean): Requeue the message. Default is true. ------------ .. index:: basic_reject/3 .. _amqp/0::basic_reject/3: ``basic_reject/3`` ^^^^^^^^^^^^^^^^^^ Rejects a message. | **Compilation flags:** | ``static`` | **Template:** | ``basic_reject(Channel,DeliveryTag,Options)`` | **Mode and number of proofs:** | ``basic_reject(+compound,+integer,+list)`` - ``one_or_error`` | **Remarks:** - Option requeue(Boolean): Requeue the message. Default is true. ------------ .. index:: basic_qos/2 .. _amqp/0::basic_qos/2: ``basic_qos/2`` ^^^^^^^^^^^^^^^ Sets quality of service (prefetch) settings. | **Compilation flags:** | ``static`` | **Template:** | ``basic_qos(Channel,Options)`` | **Mode and number of proofs:** | ``basic_qos(+compound,+list)`` - ``one_or_error`` | **Remarks:** - Option prefetch_size(Size): Prefetch window size in bytes. Default is 0 (no limit). - Option prefetch_count(Count): Prefetch window in messages. Default is 0 (no limit). - Option global(Boolean): Apply to entire connection. Default is false. ------------ .. index:: basic_recover/2 .. _amqp/0::basic_recover/2: ``basic_recover/2`` ^^^^^^^^^^^^^^^^^^^ Asks the server to redeliver unacknowledged messages. | **Compilation flags:** | ``static`` | **Template:** | ``basic_recover(Channel,Options)`` | **Mode and number of proofs:** | ``basic_recover(+compound,+list)`` - ``one_or_error`` | **Remarks:** - Option requeue(Boolean): Requeue messages. Default is false. ------------ .. index:: receive/3 .. _amqp/0::receive/3: ``receive/3`` ^^^^^^^^^^^^^ Receives a message or method from the server. Blocks until data is available or timeout. | **Compilation flags:** | ``static`` | **Template:** | ``receive(Channel,Message,Options)`` | **Mode and number of proofs:** | ``receive(+compound,-compound,+list)`` - ``zero_or_one_or_error`` | **Remarks:** - Option timeout(Milliseconds): Timeout in milliseconds. 0 for non-blocking, -1 for infinite. Default is -1. ------------ .. index:: tx_select/1 .. _amqp/0::tx_select/1: ``tx_select/1`` ^^^^^^^^^^^^^^^ Enables transaction mode on a channel. | **Compilation flags:** | ``static`` | **Template:** | ``tx_select(Channel)`` | **Mode and number of proofs:** | ``tx_select(+compound)`` - ``one_or_error`` ------------ .. index:: tx_commit/1 .. _amqp/0::tx_commit/1: ``tx_commit/1`` ^^^^^^^^^^^^^^^ Commits the current transaction. | **Compilation flags:** | ``static`` | **Template:** | ``tx_commit(Channel)`` | **Mode and number of proofs:** | ``tx_commit(+compound)`` - ``one_or_error`` ------------ .. index:: tx_rollback/1 .. _amqp/0::tx_rollback/1: ``tx_rollback/1`` ^^^^^^^^^^^^^^^^^ Rolls back the current transaction. | **Compilation flags:** | ``static`` | **Template:** | ``tx_rollback(Channel)`` | **Mode and number of proofs:** | ``tx_rollback(+compound)`` - ``one_or_error`` ------------ .. index:: confirm_select/1 .. _amqp/0::confirm_select/1: ``confirm_select/1`` ^^^^^^^^^^^^^^^^^^^^ Enables publisher confirms on a channel (RabbitMQ extension). | **Compilation flags:** | ``static`` | **Template:** | ``confirm_select(Channel)`` | **Mode and number of proofs:** | ``confirm_select(+compound)`` - ``one_or_error`` ------------ .. index:: send_heartbeat/1 .. _amqp/0::send_heartbeat/1: ``send_heartbeat/1`` ^^^^^^^^^^^^^^^^^^^^ Sends a heartbeat frame to the server. | **Compilation flags:** | ``static`` | **Template:** | ``send_heartbeat(Connection)`` | **Mode and number of proofs:** | ``send_heartbeat(+compound)`` - ``one_or_error`` ------------ .. index:: message_body/2 .. _amqp/0::message_body/2: ``message_body/2`` ^^^^^^^^^^^^^^^^^^ Extracts the body from a message. | **Compilation flags:** | ``static`` | **Template:** | ``message_body(Message,Body)`` | **Mode and number of proofs:** | ``message_body(+compound,-term)`` - ``one`` ------------ .. index:: message_properties/2 .. _amqp/0::message_properties/2: ``message_properties/2`` ^^^^^^^^^^^^^^^^^^^^^^^^ Extracts the properties from a message as a list. | **Compilation flags:** | ``static`` | **Template:** | ``message_properties(Message,Properties)`` | **Mode and number of proofs:** | ``message_properties(+compound,-list)`` - ``one`` ------------ .. index:: message_property/3 .. _amqp/0::message_property/3: ``message_property/3`` ^^^^^^^^^^^^^^^^^^^^^^ Extracts a specific property from a message. Fails if not present. | **Compilation flags:** | ``static`` | **Template:** | ``message_property(Message,PropertyName,Value)`` | **Mode and number of proofs:** | ``message_property(+compound,+atom,-term)`` - ``zero_or_one`` ------------ .. index:: message_delivery_tag/2 .. _amqp/0::message_delivery_tag/2: ``message_delivery_tag/2`` ^^^^^^^^^^^^^^^^^^^^^^^^^^ Extracts the delivery tag from a message. | **Compilation flags:** | ``static`` | **Template:** | ``message_delivery_tag(Message,DeliveryTag)`` | **Mode and number of proofs:** | ``message_delivery_tag(+compound,-integer)`` - ``one`` ------------ .. index:: message_exchange/2 .. _amqp/0::message_exchange/2: ``message_exchange/2`` ^^^^^^^^^^^^^^^^^^^^^^ Extracts the exchange name from a message. | **Compilation flags:** | ``static`` | **Template:** | ``message_exchange(Message,Exchange)`` | **Mode and number of proofs:** | ``message_exchange(+compound,-atom)`` - ``one`` ------------ .. index:: message_routing_key/2 .. _amqp/0::message_routing_key/2: ``message_routing_key/2`` ^^^^^^^^^^^^^^^^^^^^^^^^^ Extracts the routing key from a message. | **Compilation flags:** | ``static`` | **Template:** | ``message_routing_key(Message,RoutingKey)`` | **Mode and number of proofs:** | ``message_routing_key(+compound,-atom)`` - ``one`` ------------ .. index:: encode_frame/2 .. _amqp/0::encode_frame/2: ``encode_frame/2`` ^^^^^^^^^^^^^^^^^^ Encodes an AMQP frame to a list of bytes. | **Compilation flags:** | ``static`` | **Template:** | ``encode_frame(Frame,Bytes)`` | **Mode and number of proofs:** | ``encode_frame(+compound,-list)`` - ``one`` ------------ .. index:: decode_frame/2 .. _amqp/0::decode_frame/2: ``decode_frame/2`` ^^^^^^^^^^^^^^^^^^ Decodes a list of bytes to an AMQP frame. | **Compilation flags:** | ``static`` | **Template:** | ``decode_frame(Bytes,Frame)`` | **Mode and number of proofs:** | ``decode_frame(+list,-compound)`` - ``one_or_error`` ------------ Protected predicates -------------------- (no local declarations; see entity ancestors if any) Private predicates ------------------ (no local declarations; see entity ancestors if any) Operators --------- (none)