object
amqp
Portable AMQP 0-9-1 (Advanced Message Queuing Protocol) client. Uses the sockets library for TCP communication.
logtalk_load(amqp(loader))static, context_switching_calls
Supported backends: ECLiPSe, GNU Prolog, SICStus Prolog, SWI-Prolog, Trealla Prolog, and XVM (same as the sockets library).
Protocol version: Implements AMQP 0-9-1 specification.
Binary protocol: AMQP is a binary protocol with typed frame encoding.
Channels: Supports multiple concurrent channels over a single connection.
Exchanges and queues: Full support for declaring exchanges, queues, and bindings.
Content: Supports message publishing and consuming with content headers.
Transactions: Supports AMQP transactions with tx.select, tx.commit, and tx.rollback.
Publisher confirms: Support for publisher confirms can be added.
Heartbeat: Supports heartbeat negotiation to keep connections alive.
Reconnection: Automatic reconnection with configurable retry attempts and delays.
Public predicates
connect/4
Connects to an AMQP 0-9-1 server and performs the protocol handshake. Returns a connection handle for subsequent operations. Supports automatic reconnection on connection failures.
staticconnect(Host,Port,Connection,Options)connect(+atom,+integer,--compound,+list) - one_or_erroramqp_error(connection_failed)amqp_error(protocol_error(Message))amqp_error(auth_failed)amqp_error(reconnect_failed)
Option username(Username): Username for authentication. Default is guest.
Option password(Password): Password for authentication. Default is guest.
Option virtual_host(VHost): Virtual host name. Default is /.
Option heartbeat(Seconds): Heartbeat interval in seconds. Default is 60.
Option channel_max(Max): Maximum number of channels. Default is 0 (no limit).
Option frame_max(Max): Maximum frame size. Default is 131072.
Option reconnect(Boolean): Enable automatic reconnection on connection failure. Default is false.
Option reconnect_attempts(N): Maximum number of reconnection attempts. Default is 3. Only used when reconnect(true).
Option reconnect_delay(Seconds): Delay between reconnection attempts in seconds. Default is 1. Only used when reconnect(true).
close/1
Gracefully closes the AMQP connection. Closes all channels and the connection itself.
staticclose(Connection)close(+compound) - one_or_errorclose/3
Closes the AMQP connection with a specific reply code and reason.
staticclose(Connection,ReplyCode,ReplyText)close(+compound,+integer,+atom) - one_or_errorconnection_alive/1
Checks if the connection is still open and valid.
staticconnection_alive(Connection)connection_alive(+compound) - zero_or_onechannel_open/3
Opens a new channel on the connection. Returns a channel handle.
staticchannel_open(Connection,ChannelNumber,Channel)channel_open(+compound,+integer,--compound) - one_or_erroramqp_error(channel_error(Message))channel_close/1
Closes a channel.
staticchannel_close(Channel)channel_close(+compound) - one_or_errorchannel_close/3
Closes a channel with a specific reply code and reason.
staticchannel_close(Channel,ReplyCode,ReplyText)channel_close(+compound,+integer,+atom) - one_or_errorexchange_declare/3
Declares an exchange on the server.
staticexchange_declare(Channel,Exchange,Options)exchange_declare(+compound,+atom,+list) - one_or_error
Option type(Type): Exchange type: direct, fanout, topic, headers. Default is direct.
Option durable(Boolean): Survive server restart. Default is false.
Option auto_delete(Boolean): Delete when unused. Default is false.
Option internal(Boolean): Internal exchange. Default is false.
Option arguments(Arguments): Additional arguments as key-value pairs.
exchange_delete/3
Deletes an exchange.
staticexchange_delete(Channel,Exchange,Options)exchange_delete(+compound,+atom,+list) - one_or_error
Option if_unused(Boolean): Only delete if unused. Default is false.
exchange_bind/4
Binds an exchange to another exchange.
staticexchange_bind(Channel,Destination,Source,Options)exchange_bind(+compound,+atom,+atom,+list) - one_or_error
Option routing_key(Key): Routing key for binding. Default is empty.
Option arguments(Arguments): Additional arguments.
exchange_unbind/4
Unbinds an exchange from another exchange.
staticexchange_unbind(Channel,Destination,Source,Options)exchange_unbind(+compound,+atom,+atom,+list) - one_or_errorqueue_declare/3
Declares a queue on the server. If Queue is a variable, the server generates a unique name.
staticqueue_declare(Channel,Queue,Options)queue_declare(+compound,?atom,+list) - one_or_error
Option durable(Boolean): Survive server restart. Default is false.
Option exclusive(Boolean): Exclusive to this connection. Default is false.
Option auto_delete(Boolean): Delete when unused. Default is false.
Option arguments(Arguments): Additional arguments (e.g., message TTL, dead letter exchange).
queue_delete/3
Deletes a queue.
staticqueue_delete(Channel,Queue,Options)queue_delete(+compound,+atom,+list) - one_or_error
Option if_unused(Boolean): Only delete if unused. Default is false.
Option if_empty(Boolean): Only delete if empty. Default is false.
queue_bind/4
Binds a queue to an exchange.
staticqueue_bind(Channel,Queue,Exchange,Options)queue_bind(+compound,+atom,+atom,+list) - one_or_error
Option routing_key(Key): Routing key for binding. Default is empty.
Option arguments(Arguments): Additional arguments.
queue_unbind/4
Unbinds a queue from an exchange.
staticqueue_unbind(Channel,Queue,Exchange,Options)queue_unbind(+compound,+atom,+atom,+list) - one_or_errorqueue_purge/2
Purges all messages from a queue.
staticqueue_purge(Channel,Queue)queue_purge(+compound,+atom) - one_or_errorbasic_publish/4
Publishes a message to an exchange.
staticbasic_publish(Channel,Exchange,Body,Options)basic_publish(+compound,+atom,+term,+list) - one_or_error
Option routing_key(Key): Routing key for message. Default is empty.
Option mandatory(Boolean): Return if not routable. Default is false.
Option immediate(Boolean): Return if not deliverable. Default is false (deprecated in RabbitMQ).
Option content_type(Type): MIME content type.
Option content_encoding(Enc): Content encoding.
Option correlation_id(Id): Correlation identifier.
Option reply_to(Queue): Reply queue name.
Option expiration(Ms): Message TTL in milliseconds.
Option message_id(Id): Application message identifier.
Option timestamp(Ts): Message timestamp.
Option type(Type): Message type name.
Option user_id(Id): Creating user ID.
Option app_id(Id): Creating application ID.
Option delivery_mode(Mode): 1 for non-persistent, 2 for persistent.
Option priority(P): Message priority (0-9).
Option headers(H): Application headers as key-value pairs.
basic_consume/3
Starts consuming messages from a queue.
staticbasic_consume(Channel,Queue,Options)basic_consume(+compound,+atom,+list) - one_or_error
Option consumer_tag(Tag): Consumer identifier. Server generates if not provided.
Option no_local(Boolean): Do not receive own messages. Default is false.
Option no_ack(Boolean): No acknowledgment required. Default is false.
Option exclusive(Boolean): Exclusive consumer. Default is false.
Option arguments(Arguments): Additional arguments.
basic_cancel/3
Cancels a consumer.
staticbasic_cancel(Channel,ConsumerTag,Options)basic_cancel(+compound,+atom,+list) - one_or_errorbasic_get/3
Synchronously gets a message from a queue.
staticbasic_get(Channel,Queue,Options)basic_get(+compound,+atom,+list) - one_or_error
Option no_ack(Boolean): No acknowledgment required. Default is false.
basic_ack/3
Acknowledges a message.
staticbasic_ack(Channel,DeliveryTag,Options)basic_ack(+compound,+integer,+list) - one_or_error
Option multiple(Boolean): Acknowledge all up to this tag. Default is false.
basic_nack/3
Negatively acknowledges a message (RabbitMQ extension).
staticbasic_nack(Channel,DeliveryTag,Options)basic_nack(+compound,+integer,+list) - one_or_error
Option multiple(Boolean): Reject all up to this tag. Default is false.
Option requeue(Boolean): Requeue the message. Default is true.
basic_reject/3
Rejects a message.
staticbasic_reject(Channel,DeliveryTag,Options)basic_reject(+compound,+integer,+list) - one_or_error
Option requeue(Boolean): Requeue the message. Default is true.
basic_qos/2
Sets quality of service (prefetch) settings.
staticbasic_qos(Channel,Options)basic_qos(+compound,+list) - one_or_error
Option prefetch_size(Size): Prefetch window size in bytes. Default is 0 (no limit).
Option prefetch_count(Count): Prefetch window in messages. Default is 0 (no limit).
Option global(Boolean): Apply to entire connection. Default is false.
basic_recover/2
Asks the server to redeliver unacknowledged messages.
staticbasic_recover(Channel,Options)basic_recover(+compound,+list) - one_or_error
Option requeue(Boolean): Requeue messages. Default is false.
receive/3
Receives a message or method from the server. Blocks until data is available or timeout.
staticreceive(Channel,Message,Options)receive(+compound,-compound,+list) - zero_or_one_or_error
Option timeout(Milliseconds): Timeout in milliseconds. 0 for non-blocking, -1 for infinite. Default is -1.
tx_select/1
Enables transaction mode on a channel.
statictx_select(Channel)tx_select(+compound) - one_or_errortx_commit/1
Commits the current transaction.
statictx_commit(Channel)tx_commit(+compound) - one_or_errortx_rollback/1
Rolls back the current transaction.
statictx_rollback(Channel)tx_rollback(+compound) - one_or_errorconfirm_select/1
Enables publisher confirms on a channel (RabbitMQ extension).
staticconfirm_select(Channel)confirm_select(+compound) - one_or_errorsend_heartbeat/1
Sends a heartbeat frame to the server.
staticsend_heartbeat(Connection)send_heartbeat(+compound) - one_or_errormessage_body/2
Extracts the body from a message.
staticmessage_body(Message,Body)message_body(+compound,-term) - onemessage_properties/2
Extracts the properties from a message as a list.
staticmessage_properties(Message,Properties)message_properties(+compound,-list) - onemessage_property/3
Extracts a specific property from a message. Fails if not present.
staticmessage_property(Message,PropertyName,Value)message_property(+compound,+atom,-term) - zero_or_onemessage_delivery_tag/2
Extracts the delivery tag from a message.
staticmessage_delivery_tag(Message,DeliveryTag)message_delivery_tag(+compound,-integer) - onemessage_exchange/2
Extracts the exchange name from a message.
staticmessage_exchange(Message,Exchange)message_exchange(+compound,-atom) - onemessage_routing_key/2
Extracts the routing key from a message.
staticmessage_routing_key(Message,RoutingKey)message_routing_key(+compound,-atom) - oneencode_frame/2
Encodes an AMQP frame to a list of bytes.
staticencode_frame(Frame,Bytes)encode_frame(+compound,-list) - onedecode_frame/2
Decodes a list of bytes to an AMQP frame.
staticdecode_frame(Bytes,Frame)decode_frame(+list,-compound) - one_or_errorProtected predicates
(no local declarations; see entity ancestors if any)
Private predicates
(no local declarations; see entity ancestors if any)
Operators
(none)