A classe tAMQP representa um objeto de comunicação do tipo AMQP (Advanced Message Queuing Protocol) da versão 0.9.1, que se comunica com servidores RabbitMQ. A instancia de uma classe permite a comunicação, envio e recebimento de mensagens através de um servidor AMQP, sendo possível o desenvolvimento de diversos tipos de aplicações, realizando transações ou comunicações padronizadas, de forma assíncrona, indiferente da arquitetura de cada uma delas.
Essa classe foi desenvolvida com base na existente biblioteca RabbitMQ.Client desenvolvida para C# .Net, sendo inclusive possível ser usado como referencia a documentação presente em https://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html, assim como montar o ambiente de um servidor AMQP em https://www.rabbitmq.com/#getstarted. Para qualquer informação sobre os conceitos de tipos de fila e comportamentos de recebimentos e envios de mensagens, favor também consultar esse site.
Observações
- Atualmente é compatível apenas com o protocolo 0.9.1
Exemplos
#define AMQP_AUTODELETE .T. #define AMQP_EXCLUSIVE .T. #define AMQP_DURABLE .T. #define AMQP_PERSISTENT .T. #define AMQP_AUTOACK .T. #define fixed_channel_id 1 User Function _sender Local oSender := tAmqp():New("localhost",5672,"guest","guest",fixed_channel_id) oSender:QueueDeclare("test_queue", .F.,.F.,AMQP_AUTODELETE ) oSender:BasicPublish("","test_queue", AMQP_PERSISTENT, "hello world!") u___Receiver() Freeobj(oSender) Return User Function _Receiver Local oRecv := tAmqp():New("localhost",5672,"guest","guest",fixed_channel_id) local var oRecv:QueueDeclare("test_queue", .F.,.F.,AMQP_AUTODELETE) oRecv:BasicConsume("test_queue", AMQP_AUTOACK, ) var := oRecv:Body Freeobj(oRecv) Return
Abrangência
17.3.0.x
Hierarquia
- tAMQP
Construtores
Construtor tAmqp:New
Cria um objeto tAMQP com um determinado AMQP Server.
Sintaxe
tAmqp():New( < uParam1 >, [ uParam2 ], [ uParam3 ], [ uParam4 ], [ uParam5 ], [ uParam6 ])
Parâmetros
Nome | Tipo | Descrição | Obrigatório | Referência |
---|---|---|---|---|
uParam1 | caractere | Endereço do AMQP Server | X | |
uParam2 | numérico | Porta do AMQP Server | ||
uParam3 | caractere | Usuário para logar na fila do AMQP Server | ||
uParam4 | caractere | Senha para logar na fila do AMQP Server | ||
uParam5 | caractere | Canal da comunicação com o AMQP Server | ||
uParam6 | caractere | Virtual Host (vhost) no servidor AMQP (padrão: "/") |
Retorno
Nome | Tipo | Descrição |
---|---|---|
oObj | objeto | Nova instância da classe tAmqp |
Exemplos:
1. Conectando ao servidor com vhost padrão ("/"
)
Local oClient := tAmqp():New("localhost",5672,"guest","guest",1)
2. Conectando ao servidor com um VHost personalizado
Local oClient := tAmqp():New("localhost", 5672, "guest", "guest", 1, "meu_vhost")
A funcionalidade de VHosts está disponível a partir da versão 24.3.0.5 do Application Server.
Propriedades
tAmqp:ChannelNumber
Indica qual canal esta sendo usado para a fila atual. Os canais são "planos privados" dentro da mesma conexão TCP”. Ou seja, é necessária apenas uma instância de conexão na mesma porta, e podendo usar varias "subportas" sendo completamente independente de um canal para o outro.
Tipo | Valor Padrão | Somente Leitura |
---|---|---|
numérico | N/A | N |
tAmqp:Body
Conteudo da mensagem recebida após uma solicitação ao server AMQP via BasicConsume().
Tipo | Valor Padrão | Somente Leitura |
---|---|---|
caractere | "" | N |
tAmqp:ConsumeTimeout
Indica qual o timeout atual que esta sendo usado para a comunicação de um consumidor de uma exchange.
Tipo | Valor Padrão | Somente Leitura |
---|---|---|
numérico | 5 | N |
Métodos
tAmqp:QueueDeclare
Cria uma nova fila no AMQP Server.
Sintaxe
QueueDeclare( [ cFila ], [ bisDurable ], [ bisExclusive ], [ bisAutodelete ], [bisPassive] )
Parâmetros
Nome | Tipo | Descrição | Obrigatório | Referência |
---|---|---|---|---|
cFila | caractere | Indica o nome da fila onde será criada. | ||
bisDurable | lógico | Indica que as mensagens serão guardadas mesmo se o servidor for reiniciado ou desligado, mantendo o estado da fila será mantido. Caso seja falso, as mensagens não serão recriadas caso o servidor seja reiniciado ou desligado. | ||
bisExclusive | lógico | Indica que a fila será exclusiva a um único par producer/consumer. | ||
bisAutodelete | lógico | Indica que, ao termino do producer enviar com sucesso a mensagem até o producer, ele irá se deletar. | ||
bisPassive | lógico | Indica que a função irá apenas verificar se existe uma fila com o nome indicado no AMQP Server, caso exista poderá ser verificar com "Status()", caso não exista não sera criado uma. |
Exemplos
oSender:QueueDeclare(cFila,bisDurable,bisExclusive,bisAutodelete )
tAmqp:ExchangeDeclare
Cria uma nova exchange no AMQP Server.
Sintaxe
ExchangeDeclare( [ cexchange ], [ ctype ], [ bisPassive ], [ bisDurable ], [ bisAutodelete ] )
Parâmetros
Nome | Tipo | Descrição | Obrigatório | Referência |
---|---|---|---|---|
cexchange | caractere | Indica o nome da exchange | X | |
ctype | carectere | Indica o tipo da fila (fanout, direct, topic). * | X | |
bisPassive | lógico | Indica que a função irá apenas verificar se existe uma exchange com o nome indicado no AMQP Server, caso exista poderá ser verificar com "Status()", caso não exista não sera criado uma. | ||
bisDurable | lógico | Indica que as mensagens serão guardadas mesmo se o servidor for reiniciado ou desligado, mantendo o estado da fila será mantido. Caso seja falso, as mensagens não serão recriadas caso o servidor seja reiniciado ou desligado. | ||
bisAutodelete | lógico | Indica que a mensagem irá automaticamente ser deletada na primeira tentativa de resgate pelo consumer. |
Exemplos
oSender:ExchangeDeclare(cexchange,"fanout",.F.,.F. )
tAmqp:BasicConsume
Resgata uma mensagem no AMQP Server.
Sintaxe
BasicConsume( < cFila >, < bAck >, < bWaitingEvent > )
Parâmetros
Nome | Tipo | Descrição | Obrigatório | Referência |
---|---|---|---|---|
cFila | caractere | Indica o nome da fila onde será resgatada. | X | |
bAck | lógico | Indica se o consumo irá automaticamente se marcar como entregue, caso contrario, o consumer deverá indicar que a mensagem foi entregue com o método :BasicAck(). | X | |
bWaitingEvent | lógico | Ignora o timeout e se fica aguardando por uma nova mensagem na fila(no maximo por 30 segundos). | X |
Exemplos
oRecv:BasicConsume("test_queue", bAck, bWaitingEvent)
tAmqp:QueueBind
Liga uma fila a uma exchange para que as mensagens fluam (sujeitas a vários critérios) da exchange (origem) para a fila (destino).
Sintaxe
QueueBind( < cQueue >, < cExchange >, [ croutingkey ] )
Parâmetros
Nome | Tipo | Descrição | Obrigatório | Referência |
---|---|---|---|---|
cQueue | caractere | Indica o nome da fila. | X | |
cExchange | caractere | Indica o nome da exchange. | X | |
croutingkey | caractere | Determina um endereço virtual que o exchange pode usar para encaminhar a mensagem para a fila. |
Exemplos
oRecv:BasicConsume("test_queue", bAck, bWaitingEvent)
tAmqp:BasicPublish
Envia uma mensagem para o AMQP Server.
Sintaxe
BasicPublish( < cExchange >, < cRoutingKey >, [ nPERSISTENT ], [ cMsg ], [ correlationID ], [ ReplyTo ] )
Parâmetros
Nome | Tipo | Descrição | Obrigatório | Referência |
---|---|---|---|---|
cExchange | caractere | Indica o nome da exchage onde será enviada a mensagem. | X | |
cRoutingKey | caractere | Indica a classificação de onde será enviada a mensagem (fila, rota, etc.) na exchange. | X | |
nPERSISTENT | lógico | Indica que a requisição será persistente. | ||
cMsg | caractere | Informa a mensagem a ser postada. | ||
correlationID | caractere | Id de correlação | ||
ReplyTo | caractere | Fila para resposta dessa mensagem |
Exemplos
oRecv:BasicPublish("test_exchange", "test_queue", AMQP_PERSISTENT, "Hello World!" )
tAmqp:BasicQos
Como as mensagens são enviadas (enviadas) para os clientes de forma assíncrona, geralmente há mais de uma mensagem "em trânsito" em um canal a qualquer momento. Com essa função, seta para a conexão atual um limite dessas mensagens a serem processadas (unacknowledged) pelo consumer.
Sintaxe
BasicQos( < nprefetchSize >, < nprefetchCount >, < bglobal > )
Parâmetros
Nome | Tipo | Descrição | Obrigatório | Referência |
---|---|---|---|---|
nprefetchSize | numérico | Indica o limite do tamanho das mensagens que poderão ficar em aguardo na atual exchange. | X | |
nprefetchCount | numérico | Indica o numero de mensagens que poderão ficar em aguardo na atual exchange. | X | |
bglobal | lógico | Indica se a configuração atual será global ou não. | X |
Exemplos
oRecv:BasicQos(cprefetchSize, cprefetchCount, bglobal)
tAmqp:BasicAck
Indica para a fila que voce recebeu e processou a mensagem com sucesso (acknowledge)
Sintaxe
BasicAck( < cDeliverytag>, < bMultiple>)
Parâmetros
Nome | Tipo | Descrição | Obrigatório | Referência |
---|---|---|---|---|
cDeliverytag | caractere | Indica o nome da tag da(s) mensagem(s) que será marcadada como Ack(recebidas). | X | |
bMultiple | lógico | Indica que varias mensagens serão setadas como Ack simultaneamente. | X |
Exemplos
oRecv:BasicAck(ctag, bmultiple)
tAmqp:CorrelationID
Indica qual a "apelido" (correlação) da mensagem recebida
Sintaxe
CorrelationID()
Exemplos
oRecv:CorrelationID()
tAmqp:MessageCount
Indica quantas mensagens tem na fila prontas para serem recebidas
Sintaxe
oRecv:MessageCount()
Exemplos
oRecv:MessageCount()
tAmqp:ConsumerCount
Indica quantas consumers existem em uma determinada fila.
Sintaxe
oRecv:ConsumerCount()
Exemplos
oRecv:ConsumerCount()
tAmqp:Error
Descreve o erro da ultima operação realizada.
Sintaxe
Error()
Exemplos
oRecv:Error()
tAmqp:QueueName
Retorna qual o nome da fila atual que esta sendo usado
Sintaxe
QueueName()
tAmqp:ReplyTo
Informa qual a deve ser a fila solicitada para a resposta da mensagem recebida
Sintaxe
ReplyTo()
Exemplos
oRecv:ReplyTo()
tAmqp:Status
Retorna o código de erro da ultima operação realizada
Sintaxe
Status()
Exemplos
oRecv:Status()
tAmqp:Tag
Informa qual tag foi associada a uma fila, se houver
Sintaxe
Tag()
Exemplos
oRecv:Tag()