Árvore de páginas

Introdução

Esta página tem como objetivo ajudar os times da suíte logística a migrarem o framework para publicação e consumo de mensagens do RabbitMQ das suas aplicações, de Spring Cloud Stream para Spring AMQP. Esta migração é necessária pois a anotação StreamListener do Spring Cloud Stream foi depreciada/descontinuada e a alternativa oferecida exigiria uma mudança grande e de difícil transição. 

Após vários estudos, foi desenvolvido um plano para permitir que os times realizem a migração de uma forma suavizada, considerando as seguintes premissas:

Premissas

  1. Não haverá perda de mensagens durante o processo de migração;
  2. Não haverá mensagens duplicadas por conta do processo de migração;
    1. Exceto para eventos recebidos do RAC que no momento de troca de pods pode duplicar mensagem, contudo, atualmente a duplicação de mensagem neste ponto já é conhecida e tratada.
  3. A migração poderá ser feita aos poucos, por mensagem específica, por um conjunto de mensagens ou de todas as mensagens desde que respeitado os procedimentos detalhados a seguir; 
    1. Exceto os casos de manter a utilização das filas que já apontam para exchanges do tipo header e possuem 'n' suscribers
  4. Não quebrará a integração com outros produtos da suíte logística, nem com a infra estrutura de webhooks utilizado para integração com específicos e produtos externos;
  5. Ao fim da migração, o time estará com a mensageria pronta para futura migração do TJF 4.

Informações relevantes

Reorganização das exchanges e filas.

Atualmente, boa parte dos serviços que publicam mensagens o fazem através de uma exchange do tipo TOPIC. Para receber mensagens de uma exchange, boa parte dos serviços o fazem através de um único bind com uma fila que representa o serviço (exemplo agendamento-exchange.yms-core). Esta abordagem é bastante simples, contudo é muito sensível a altos volumes de mensagens de tipos específico. Exemplo, o serviço recebe mensagens de vários tipos, ClienteCriadoEvent, FornecedorCriadoEvent, MotoristaCriadoEvent entre outros. Em um determinado momento, passa a receber um alto volume da mensagem ClienteCriadoEvent, como há somente uma única fila, as demais mensagens se misturam na fila esperando para serem consumidas. Conclusão, a alta carga de mensagens de um único tipo acaba impactando o desempenho de todo o serviço.

Para endereçar esta situação, vamos criar novos exchanges do tipo HEADER, estes exchanges serão utilizados no lugar das exchanges do tipo TOPIC. Exchanges do tipo HEADER ofereçam uma maior flexibilidade na forma de realizar bind para filas em relação as exchanges do tipo TOPIC. Uma vez que tenhamos as exchanges do tipo HEADER, ao invés de criarmos uma única fila para todo o serviço,  criaremos novas filas com base em cada agregado do serviço ou alguma outra separação que o serviço julgar interessante para evitar que eventos específicos impactem o funcionamento geral do serviço. Desta forma, o filtro para entregar mensagens de uma exchange para uma fila passa a ficar no RabbitMQ.

Quando não há necessidade de reorganização das exchanges e filas 

Para os serviços que já possuem exchanges do tipo HEADER e filas organizadas conforme algum critério previamente estruturado e que esteja atendendo as necessidades atuais, boa parte desta documentação é irrelevante bastando seguir os passos abaixo:

  • Redeclarar as exchanges e filas conforme novo modelo
    • Importante! Há dois formatos de gerenciamento de filas de erros, a nativa do RabbitMQ e a do TJF, utilizar a do TJF. A nativa requer a recriação da fila.
  • Ajustar os publicadores para fazer uso do RabbitTemplate.
  • Ajustar os consumidores substituindo as anotações @StreamListener por @RabbitListener e @RabbitHandler 
  • Os passos mencionados acima estão nesta documentação. Atentar-se também para as limitações mencionadas abaixo

Limitações

  1. Se a fila já faz bind com uma exchange do tipo HEADER, deseja-se manter a fila existente e há várias classes do tipo subscriber que apontam para a mesma fila
    1. Será necessário realizar o processo de migração de todos subscribers da mesma fila de uma única vez
      1. Isso porque todos os métodos anotados com @RabbitHandler de uma mesma fila precisam estar na mesma classe.
        1. Estamos analisando uma abordagem para evitar que isso seja necessário, permitindo manter a organização atual de classes por agregado e não por fila.
          1. AHUBLOG-205 - Obtendo detalhes do item... STATUS
  2. Quando há um mesmo evento com dois subscribers
    1. Para estes casos, será necessário 
      1. Criar duas filas diferentes com o mesmo filtro por type. Exemplo, há dois subscribers para um evento ProcessoFinalizadoEvent onde cada subscriber corresponde a um agregado diferente. Neste caso, deverá ser criada uma fila para cada agregado.

        StreamListener
        @Component
        public class PedidoSubscriber {
        	
        	@StreamListener(target="fila.processo")
            public void when(TOTVSMessage<ProcessoFinalizadoEvent> event){
                // lógica
            }	
        }
        
        @Component
        public class EstoqueSubscriber {
        	
        	@StreamListener(target="fila.processo")
            public void when(TOTVSMessage<ProcessoFinalizadoEvent> event){
                // lógica
            }	
        }
        
        RabbitListener
        @RabbitListener(queues = "fila.pedido")
        public class PedidoSubscriber {
        	
            @RabbitHandler
            public void whenProcessoFinalizado(@Payload ProcessoFinalizadoEvent event) {
                // lógica
            }
        }
        
        @RabbitListener(queues = "fila.estoque")
        public class EstoqueSubscriber{
        	
            @RabbitHandler
            public void whenProcessoFinalizado(@Payload ProcessoFinalizadoEvent event) {
                // lógica
            }
        }
        
        @RabbitListener(queues = "fila.agregado")
        public class AgregadoSubscriber {
        	
            @RabbitHandler
            public void whenProcessoFinalizado(@Payload FinalizarProcessoAgregadoCommand event) {
                // lógica
            }
        }
      2. Criar uma etapa após o ProcessoFinalizadoEvent que será responsável por chamar os diferentes comandos para cada agregado. Nesta abordagem também seria necessário disponibilizar os comandos via mensageria. Exemplo:

        StreamListener
        @Component
        public class PedidoSubscriber {
        	
        	@StreamListener(target="fila.processo")
            public void when(TOTVSMessage<ProcessoFinalizadoEvent> event){
                // lógica
            }	
        }
        
        @Component
        public class EstoqueSubscriber {
        	
        	@StreamListener(target="fila.processo")
            public void when(TOTVSMessage<ProcessoFinalizadoEvent> event){
                // lógica
            }	
        }
        
        
        RabbitListener
        @RabbitListener(queues = "fila.processo")
        public class ProcessoSubscriber {
        	
            @RabbitHandler
            public void whenProcessoFinalizado(@Payload ProcessoFinalizadoEvent event) {
                // publicar FinalizarProcessoPedidoCommand
                // publicar FinalizarProcessoEstoqueCommand
                // publicar FinalizarProcessoAgregadoCommand
            }
        }
        
        @RabbitListener(queues = "fila.processo")
        public class PedidoSubscriber {
        	
            @RabbitHandler
            public void whenProcessoFinalizado(@Payload FinalizarProcessoPedidoCommand event) {
                // processar FinalizarProcessoPedidoCommand
            }
        }
        
        @RabbitListener(queues = "fila.processo")
        public class EstoqueSubscriber {
        	
            @RabbitHandler
            public void whenProcessoFinalizado(@Payload FinalizarProcessoEstoqueCommand event) {
                // processar FinalizarProcessoEstoqueCommand
            }
        }
        
        @RabbitListener(queues = "fila.processo")
        public class AgregadoSubscriber{
        	
            @RabbitHandler
            public void whenProcessoFinalizado(@Payload FinalizarProcessoAgregadoCommand event) {
                // processar FinalizarProcessoAgregadoCommand
            }
        }
      3. Outra possibilidade é chamar diretamente os comandos de aplicação em um único subscriber, contudo esta opção altera o formato de execução atual, fazendo com que o segundo comando de aplicação só seja executado após o sucesso do primeiro. Sendo assim, tenha consciência de que não será gerado um impacto indesejado ao produto.

        StreamListener
        @Component
        public class PedidoSubscriber {
        	@StreamListener(target="fila.processo")
            public void when(TOTVSMessage<ProcessoFinalizadoEvent> event){
                // lógica
            }	
        }
        
        @Component
        public class EstoqueSubscriber {	
        	@StreamListener(target="fila.processo")
            public void when(TOTVSMessage<ProcessoFinalizadoEvent> event){
                // lógica
            }	
        }
        
        
        RabbitListener
        @RabbitListener(queues = "fila.processo")
        public class ProcessoSubscriber {
        	
            @RabbitHandler
            public void whenProcessoFinalizado(@Payload ProcessoFinalizadoEvent event) {
                // lógica 01 -> executar FinalizarProcessoPedidoCommand
                // lógica 02 -> executar FinalizarProcessoEstoqueCommand
            }
        }
  3. Quando há necessidade de realizar um filtro com base em informações do corpo da mensagem
    1. Neste caso o filtro que era feito através do parâmetro condition da anotação StreamListener terá que ser movido para o corpo do método através de uma cláusula condicional (if).  Exemplo:

      StreamListener
      public class SubscriberStreamListener {
      	public static final String NAME = "ProcessoExecutadoEvent";
      	public static final String CONDITIONAL_EXPRESSION_PROCESSO = 
      			" && (@MessageFilter.get(payload,'content/origem') == 'MKPL'" + 
      			 " || @MessageFilter.get(payload,'content/origem') == 'PLDT')";
      	public static final String CONDITIONAL_EXPRESSION = "headers['type']=='" + NAME + "'" + CONDITIONAL_EXPRESSION_PROCESSO;
      
      	@StreamListener(target = Channel.PROCESSO_INPUT, condition = CONDITIONAL_EXPRESSION)
      	public void processoExecutado(TOTVSMessage<ProcessoExecutadoEvent> message) {
      		// processamento do evento
      	}
      }
      
      
      RabbitListener
      @RabbitListener(queues = "fila.processo")
      public class SubscriberAMQP {
      	
      	private static final String ORIGEM_MKPL = "MKPL";
      	private static final String ORIGEM_PLDT = "PLDT";
      	
      	@RabbitHandler
          public void whenProcessoFinalizado(@Payload ProcessoExecutadoEvent event) {
              if (ORIGEM_MKPL.equals(event.getOrigem()) || ORIGEM_PLDT.equals(event.getOrigem())) {
          		// processamento do evento
              }
          }
      }
      
      

Plano de migração para exchanges do tipo TOPIC

Visão geral dos procedimentos

Será criada uma nova exchange do tipo HEADER para cada exchange TOPIC, adicionando um bind de exchange para exchange, partindo da TOPIC para HEADER. Assim conseguimos continuar publicando na exchange original, sem quebrar as integrações existentes e, criar as novas filas já com bind para a exchange do tipo HEADER.

Antes de criar as novas filas e subscribers com Spring AMQP, temos que pensar no cenário de deploy da alteração, quando tiver pods com versões do serviço com o subscriber antigo e o novo no ar.

Se removermos os subscribers antigos e adicionarmos o novo no mesmo deploy, corremos risco de perder mensagens na fila antiga, pois o pod antigo pode ser encerrado antes de terminar de drenar as mensagens da fila. Nesse mesmo cenário, pode ocorrer duplicação de mensagens se o pod antigo ficar no ar por algum tempo depois que o novo subiu, a mesma mensagem vai chegar na fila velha e na nova e ambas serão processadas.

Para resolver isso, sem alterar nada na fila antiga, foi necessário criar um mecanismo de versionamento das mensagens que resumidamente consiste em: Quando uma mensagem for recebida com um header indicando que é versão 2 (sl-version=2), ela será processada pelo Spring AMQP (novo), do contrário, será processado pelo Spring Cloud Stream (antigo). Com essas tratativas no lugar, podemos subir os dois subscribers ao mesmo tempo, e após isso, alteramos os publicadores para incluir o header indicando versão 2 e assim as mensagens passam a ser processadas somente pelo Spring AMQP (novo). Após alterar o publicador, o subscriber antigo pode ser removido do produto.

Depois de migrar todos os eventos, incluindo integrações externas como por exemplo webhooks, deve-se alterar os publicadores para publicar na exchange HEADER diretamente. Em seguida, remover o bind entre as exchanges do tipo TOPIC e HEADER. Esta última etapa não está coberta neste documento.

A seção abaixo ilustra o passo a passo.

Passo a passo dos procedimentos

Atualmente

Exchanges do tipo TOPIC e serviços com sua respectiva fila recebendo todos os tipos de eventos.

Baixe a animação em MP4

Primeira etapa do processo de expansão 

Adicionar dependência ao sdk de migração que verifica se deve ou não adicionar header de versão (sl-version). Além do header, os interceptors nos subscribers também são adicionados para garantir que somente um dos métodos seja executado (com StreamListener ou com RabbitListener). Neste momento só existirão no código os métodos com StreamListener. Como o sl-version ainda não será enviado, o método com StreamListener continuará sendo executado em todos os casos sempre.

Baixe a animação em MP4

Clique aqui para verificar o passo a passo desta etapa.

Segunda etapa do processo de expansão 

Nova exchange do tipo HEADER, bind entre as exchanges do tipo TOPIC e HEADER e duplicação do(s) subscribers, substituindo as anotações @StreamListener por @RabbitListener e @RabbitHandler

Para os novos subscribers do RAC, TPD e outros sistemas externos que não controlamos a publicação de mensagens, adicionar anotação @IgnoreMessageInterceptor para que os interceptors criados para migração na sdk sejam ignorados.

Caso isso não seja observado, mensagens serão perdidas.

Baixe a animação em MP4

Clique aqui para verificar o passo a passo desta etapa.

Terceira etapa do processo de expansão 

Adicionar a anotação @VersionedEvent nos eventos para que sejam publicados com o header sl-version=2.

TODOS os consumidores (em qualquer serviço) dos eventos a serem anotados com @VersionedEvent devem ter ao menos duplicado seus subscribers (segunda etapa do processo de expansão)

Caso isso não seja observado, mensagens serão perdidas.

Baixe a animação em MP4

Clique aqui para verificar o passo a passo desta etapa.

Primeira etapa do processo de contração 

Remover os subscribers anotados com @StreamListener 

TODOS os publicadores dos subscribers anotados com @StreamListener a serem removidos devem estar publicando o cabeçalho sl-version = 2 (terceira etapa de expansão)

Caso isso não seja observado, mensagens serão perdidas.

Baixe a animação em MP4

Clique aqui para verificar o passo a passo desta etapa.

Segunda etapa do processo de contração 

Desabilitar os interceptors.

TODOS subscribers anotados com @StreamListener já devem ter sido removidos (primeira etapa do processo de contração)

Caso isso não seja observado, mensagens serão perdidas.

Baixe a animação em MP4

Clique aqui para verificar o passo a passo desta etapa.

Terceira etapa do processo de contração 

Remover a anotação @VersionedEvent dos eventos. 

TODOS os consumidores (em todos serviços) dos eventos com anotação @VersionedEvent removido já devem ter removido os subscribers antigos (segunda etapa do processo de contração)

Caso isso não seja observado, mensagens serão perdidas.

Baixe a animação em MP4

Clique aqui para verificar o passo a passo desta etapa.

Passo a passo técnico

1 - Dependências 

Alterar a versão do TJF para 3.24.0-SNAPSHOT. 

  • Esta versão SNAPSHOT será convertida em RELEASE, processo em andamento com TJF Luciano De Araujo 
tjf 3.24.0
    <parent>
        <groupId>com.totvs.tjf</groupId>
        <artifactId>tjf-boot-starter</artifactId>
        <version>3.24.0-SNAPSHOT</version>
    </parent>

Adicionar nova dependência do TJF para tratamento de mensagens

tjf-messaging-amqp
        <dependency>
            <groupId>com.totvs.tjf</groupId>
            <artifactId>tjf-messaging-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>com.totvs.tjf</groupId>
            <artifactId>tjf-test-amqp</artifactId>
            <scope>test</scope>
        </dependency>

Adicionar dependência do SDK de migração com infraestrutura para controlar publicação e recepção das mensagens

O sdk disponibiliza uma função para verificar se deve ou não adicionar o header sl-version com valor 2. Também disponibiliza a anotação @VersionedEvent para indicar se o evento deve ser publicado com sl-version com valor 2.

Essa infraestrutura é composta por interceptors que vão atuar nos métodos anotados com @StreamListener e @RabbitHandler. Antes de entrar no método, será verificado o parâmetro sl-version no cabeçalho da mensagem, se ele estiver vazio ou menor que 2, o método que contém @StreamListener será executado, se tiver 2 ou mais, será executado o @RabbitHandler.  

Deve-se adicionar a dependência da sdk de migração no pom.xml

sdk de migração
        <repository>
            <id>SuiteLogistica</id>
            <url>https://pkgs.dev.azure.com/totvstfs/SuiteLogistica/_packaging/SuiteLogistica/maven/v1</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>

        <dependency>
            <groupId>com.totvs.sl.migracao.amqp.sdk</groupId>
            <artifactId>migracao.amqp.sdk</artifactId>
            <version>1.0.7-RELEASE</version>
        </dependency>

Alterar publicadores 

1.1 - Registrar exchanges no application.yml

Obs.: Registrar todos os exchanges existentes no serviço conforme novo modelo

Exemplo:
Yaml com exchanges existentes
tjf:
  messaging:
    amqp:
      exchanges: 
        topicExchanges:  #Existentes do tipo topic
          - name: coleta-entrega-exchange 
            durable: true
            autoDelete: false
          - name: coleta-entrega-query-errors 
            durable: true
            autoDelete: false
        headersExchanges: #Existentes do tipo headers
          - name: coleta-entrega-header-exchange 
            durable: true
            autoDelete: false

1.2 - Alterar dispatchers

Novo formato de plublicação de mensagens.

Exemplo de Antes:

Exemplo de publicador atual
protected <T> void dispatch(T contentToBeDispatched, TransactionInfo transactionInfo) {
    TOTVSMessageBuilder
        .withType(contentToBeDispatched.getClass().getSimpleName())
        .setTransactionInfo(transactionInfo)
        .setContent(contentToBeDispatched)
        .build()
        .sendTo(getOutput());
}

Exemplo de Depois:

Publicador adicionando sl-version quando necessário
//Injetar RabbitTemplate template na classe do publicador
// Além do  RabbitTemplate, precisará do nome do exchnage e do routing key caso utilizado para poder usar convertAndSend

protected <T> void dispatch(T contentToBeDispatched, TransactionInfo transactionInfo) {
   var totvsMessage = buildAmqp(contentToBeDispatched, transactionInfo);          
   var processor = MessageProcessor.getInstance(contentToBeDispatched);  // usa sdk para identificar se deve ou não adicionar o header sl-version
   template.convertAndSend(exchange, ROUTING_KEY, totvsMessage, processor); 
}

private <T> AmqpTOTVSMessage<Object> buildAmqp(T contentToBeDispatched, TransactionInfo transactionInfo) {
    return TOTVSMessageBuilder
            .withType(contentToBeDispatched.getClass().getSimpleName())
            .setContent(contentToBeDispatched)
            .setTransactionInfo(transactionInfo)
            .buildAmqp();
}

Importante

Deve-se manter o mesmo formato de mensagem publicada pelo serviço atualmente. Ou seja, se utiliza CloudEvents, deve-se manter. Caso não utilize CloudEvents, deve-se manter o formato legado.

Apesar das modificações acima, o envio de mensagens deve continuar com o mesmo comportamento. Nesta etapa do processo, esta é apenas uma etapa de preparação.

Importante

Somente realizar as próximas etapas depois de realizar O DEPLOY EM PRODUÇÃO DAS ALTERAÇÕES ACIMA!

!!! FAZER DEPLOY DA APLICAÇÃO ANTES DE AVANÇAR  !!! 

Voltar para animação

2 - Criar nova exchange, novas filas e subscribers 

Novo exchange do tipo HEADER

No caso das exchanges TOPIC, deverá ser criada uma exchange HEADER e será feito o bind entre elas, conforme mencionado anteriormente.

Yaml com novo exchange
tjf:
  messaging:
    amqp: 
      error: #NOVO, ADICIONAR
        exchange: coleta-entrega-query-errors # Aponta no tjf a exchange de erros - dlq
      migration: #NOVO, ADICIONAR
        interceptors: 
          enabled: true # Habilita o uso dos interceptors durante o processo de migração
      exchanges: 
        topicExchanges:  
          - name: coleta-entrega-exchange 
            durable: true
            autoDelete: false
          - name: coleta-entrega-query-errors 
            durable: true
            autoDelete: false
        headersExchanges: 
          - name: coleta-entrega-header-exchange 
            durable: true
            autoDelete: false
      bindings: #NOVO, ADICIONAR
        bindingsList:
          - destination: coleta-entrega-header-exchange  #NOVA EXCHANGE DO TIPO HEADER
            exchange: coleta-entrega-exchange  #EXCHANGE EXISTENTE DO TIPO TOPIC
            routingKey: "#"  
            destinationType: exchange

Novas filas

Neste exemplo novas filas serão criadas:

Yaml com novas filas
tjf:
  messaging:
    amqp:
      error:
        exchange: coleta-entrega-query-errors 
      migration:
        interceptors: 
          enabled: true 
      exchanges: 
        topicExchanges: 
          - name: coleta-entrega-exchange 
            durable: true
            autoDelete: false
          - name: coleta-entrega-query-errors 
            durable: true
            autoDelete: false
        headersExchanges:
          - name: coleta-entrega-header-exchange 
            durable: true
            autoDelete: false
      queues: #NOVO, ADICIONAR
        queuesList:         
          - name: coleta-entrega-header-exchange.sl-coleta-entrega.arquivo # Cria a nova queue do domínio de arquivo - V2
            durable: true
            autoDelete: false
          - name: coleta-entrega-header-exchange.sl-coleta-entrega.unidade # Cria a nova queue do domínio de unidade - V2
            durable: true
            autoDelete: false
          - name: coleta-entrega-header-exchange.sl-coleta-entrega.organizacao # Cria a nova queue do domínio de organizacao - V2
            durable: true
            autoDelete: false
          - name: coleta-entrega-header-exchange.sl-coleta-entrega.video # Cria a nova queue do domínio de video - V2
            durable: true
            autoDelete: false
          - name: coleta-entrega-query-errors.coleta-entrega-query-errors # Cria a nova queue de erros - dlq
            durable: true
            autoDelete: false
            exclusive: false
            queue-dlq: coleta-entrega-query-errors.coleta-entrega-query-errors # Define a queue de erros como dlq no tjf
      bindings: #ATUALIZADO, ALTERAR 
        bindingsList:
          - destination: coleta-entrega-header-exchange
            exchange: coleta-entrega-exchange
            routingKey: "#"  
            destinationType: exchange
          - destination: coleta-entrega-query-errors.coleta-entrega-query-errors # Novo, faz o binding da fila de erros - dlq
            exchange: coleta-entrega-query-errors
            destinationType: queue
          - destination: coleta-entrega-header-exchange.sl-coleta-entrega.arquivo # Novo, faz o binding dos eventos de arquivo
            exchange: coleta-entrega-header-exchange
            destinationType: queue
            arguments:
              type: [ArquivoCriadoEvent, ArquivoRemovidoEvent] #UMA DAS POSSIVEIS FORMAS DE DECLARAR UMA LISTA DE EVENTOS
          - destination: coleta-entrega-header-exchange.sl-coleta-entrega.unidade # Novo, faz o binding do evento de unidade
            exchange: coleta-entrega-header-exchange
            destinationType: queue
            arguments:
              type: UnidadeCriadaEvent
          - destination: coleta-entrega-header-exchange.sl-coleta-entrega.organizacao # Novo, faz o binding do evento de organizacao
            exchange: coleta-entrega-header-exchange
            destinationType: queue
            arguments:
              type: OrganizacaoCriadaEvent
          - destination: coleta-entrega-header-exchange.sl-coleta-entrega.video # Novo, faz o binding dos eventos de video
            exchange: coleta-entrega-header-exchange
            destinationType: queue
            arguments:
              type: # OUTRA FORMA DE DECLARAR UMA LISTA DE EVENTOS
				- VideoHowToCriadoEvent
				- VideoHowToRemovidoEvent

Novos subscribers

Primeiramente deve-se fazer uma cópia da classe subscriber original e nomear a atual como old.

  1. Trocar as anotações @StreamListener por @RabbitHandler
  2. Adicionar @RabbitListener na classe
  3. Se tiver alguma condição além do tipo do evento no "condition" do StreamListener, deve-se fazer o filtro via binding ou dentro do método handler.
  4. Alterar assinatura dos métodos conforme abaixo:

Antes:

StreamListener
@AllArgsConstructor
@EnableBinding(COLENTChannel.class)
public class ArquivoSubscriberOld {
    private final ArquivoService service;

    @StreamListener(target = COLENTChannel.INPUT_NAME, condition = ArquivoCriadoEvent.CONDITIONAL_EXPRESSION)
    public void arquivoCriado(TOTVSMessage<ArquivoCriadoEvent> message) {
       service.on(message.getContent());
    }

    @StreamListener(target = COLENTChannel.INPUT_NAME, condition = ArquivoRemovidoEvent.CONDITIONAL_EXPRESSION)
    public void arquivoRemovido(TOTVSMessage<ArquivoRemovidoEvent> message) {
       service.on(message.getContent());
    }
}

Depois: 

RabbitListener
@Component
@AllArgsConstructor
@RabbitListener(queues = "coleta-entrega-header-exchange.sl-coleta-entrega.arquivo")
public class ArquivoSubscriber {
	private final ArquivoService service;

	@RabbitHandler
	public void arquivoCriado(@Payload ArquivoCriadoEvent message) {
		service.on(message);
	}

	@RabbitHandler
	public void arquivoRemovido(@Payload ArquivoRemovidoEvent message) {
		service.on(message);
	}
}

// TESTES 
@DisplayName("Arquivo Header - Mensageria")
public class ArquivoSubscriberIT extends AdapterConfigIT {
	@MockBean
	private ArquivoService service;

	@Captor
	private ArgumentCaptor<ArquivoCriadoEvent> createCaptor;
	@Captor
	private ArgumentCaptor<ArquivoRemovidoEvent> removeCaptor;

	@Test
	@DisplayName("Deve manipular mensagem recebido por mensageria")
	void deveManipularArquivoCriadoEvent() throws Exception {
		var subscriber = Mockito.mock(ArquivoHeaderSubscriber.class);
		var listener = new MockListener(subscriber);    

        var event = mockArquivoCriadoEvent();
        var message = TOTVSMessageBuilder
             .withType(ArquivoCriadoEvent.NAME)
             .setContent(event)
             .buildAmqp();

        listener.sendMessage(message);

        verify(subscriber).arquivoCriado(createCaptor.capture());
        assertInstanceOf(ArquivoCriadoEvent.class, createCaptor.getValue()); 
    }

	@Test
	@DisplayName("Deve manipular mensagem recebido por mensageria")
	void deveManipularArquivoRemovidoEvent() throws Exception {
		var subscriber = Mockito.mock(ArquivoHeaderSubscriber.class);
		var listener = new MockListener(subscriber);    

        var arquivo = ArquivoTestFactory.umArquivo();
        var event = ArquivoRemovidoEvent.of(arquivo.getId()); 
        var message = TOTVSMessageBuilder
             .withType(ArquivoRemovidoEvent.NAME)
             .setContent(event).buildAmqp();

        listener.sendMessage(message);

        verify(subscriber).arquivoRemovido(removeCaptor.capture());
        assertInstanceOf(ArquivoRemovidoEvent.class, removeCaptor.getValue());  
    }

    @Test
	@DisplayName("Deve criar um arquivo recebido por mensageria")
	void deveCriarUmArquivo() {
		var subscriber = new ArquivoHeaderSubscriber(service);
		var event = mockArquivoCriadoEvent();
		subscriber.arquivoCriado(event);
        var expected = ArquivoCriadoEvent.builder()
             .id(event.getId())
             .nome(event.getNome())
             .tipoConteudo(event.getTipoConteudo())
             .removido(event.getRemovido())
             .dataCriacao(event.getDataCriacao())
             .tamanho(event.getTamanho())
             .prefixo(event.getPrefixo())
             .build();
        verify(service).on(expected);
	}

	@Test
	@DisplayName("Deve remover um arquivo recebido por mensageria")
	void deveRemoverUmArquivo() {
		var arquivo = ArquivoTestFactory.umArquivo();
		var subscriber = new ArquivoHeaderSubscriber(service);
		var event = ArquivoRemovidoEvent.of(arquivo.getId());
		subscriber.arquivoRemovido(event);
        
        var expected = ArquivoRemovidoEvent.of(event.getId());        
        verify(service).on(expected);  
    }

	private ArquivoCriadoEvent mockArquivoCriadoEvent() {
		return ArquivoCriadoEvent.builder()
				.id(UUID.randomUUID().toString())
				.nome("nome")
				.tipoConteudo("tipoConteudo")
				.removido(false)
				.dataCriacao(ZonedDateTime.now())
				.tamanho(new BigDecimal(127))
				.prefixo(TestUtils.tenantId + "/")
				.build();
	}
}

Novos subscribers para sistemas externos como RAC, TPD e outros devem conter a anotação @IgnoreMessageInterceptor 

StreamListener
@RabbitListener(queues = "queue")
@IgnoreMessageInterceptor // Isto fará o skip do interceptors de todos os metodos da classe
public class AlgumRabbitListenerSubscriber {
}

Importante

Somente realizar as próximas etapas depois de realizar O DEPLOY EM PRODUÇÃO DAS ALTERAÇÕES ACIMA!

!!! FAZER DEPLOY DA APLICAÇÃO ANTES DE AVANÇAR  !!! 

Voltar para animação

3 - Mudar versão do evento  

No serviço que envia as mensagens:

Adicionar a anotação @VersionedEvent nos eventos

Exemplo:

Mudar versão do evento
@VersionedEvent
@Data(staticConstructor = "of")
public final class ArquivoRemovidoEvent implements DomainEvent {
	private final String id;
}

Importante

Somente realizar as próximas etapas depois de realizar O DEPLOY EM PRODUÇÃO DAS ALTERAÇÕES ACIMA!

!!! FAZER DEPLOY DA APLICAÇÃO ANTES DE AVANÇAR  !!! 

Voltar para animação

4 - Remover subscriber antigo

Depois que todos os pods publicadores foram substituídos e todas as mensagens da fila original foram drenadas, pode ser removida a classe subscriber antiga juntamente com suas configurações antigas existentes no yaml.

Voltar para animação

5 - Repetir passos 3-4

Agora é só repetir os passos até fazer a troca de todos os subscribers.

6 - Desabilitar interceptors

Mudar o valor da configuração tjf.messaging.amqp.error.migration.interceptors.enable para false.

Desabilitar interceptors
tjf:
  messaging:
    amqp:
      error:
        #----- configs existentes
      migration:
        interceptors: 
          enabled: false #DESABILITA O USO DOS INTERCEPTORS
      exchanges: 
        topicExchanges:  
          #----- configs existentes
        headersExchanges:
          #----- configs existentes
      bindings:
        bindingsList:            
		  #----- configs existentes

Importante

Somente realizar as próximas etapas depois de realizar O DEPLOY EM PRODUÇÃO DAS ALTERAÇÕES ACIMA!

!!! FAZER DEPLOY DA APLICAÇÃO ANTES DE AVANÇAR  !!! 

Voltar para animação

7 - Remover Versionamento dos eventos e utilização do SDK de migração

Depois que todos os consumidores das mensagens já foram migrados para @RabbitListener, remover a anotação @VersionedEvent, o código adicionado nos Publishers, a configuração referente ao sdk de migração do yaml (migration.interceptors.enabled) e finalmente a dependência do SDK.

Exemplo do publisher sem a utilização do sdk.

Publicador após contração
protected <T> void dispatch(T contentToBeDispatched, TransactionInfo transactionInfo) {
     TOTVSMessageBuilder.withType(contentToBeDispatched.getClass().getSimpleName()
         .setContent(contentToBeDispatched)
         .setTransactionInfo(transactionInfo)
         .buildAmqp()
         .sendTo(rabbitTemplate, EXCHANGE, ROUTING_KEY);
}

Voltar para animação

Todos os fontes usados na POC podem ser consultados abaixo

  • Sem rótulos