riemann echo
- 13 minutes read - 2652 wordsVamos detalhar algumas características bem interessante utilizadas na implementação de um servidor riemann chamado katja_echo.
riemann.io
riemann.io é um servidor para processamento de stream de eventos, capaz de aplicar funções específicas para cada evento recebido e agregar ou gerar outros eventos para sistemas externos, escrito em Clojure e roda usando a JVM.
riemann pode ser utilizado para processar métricas onde aplicações
instrumentadas enviam para um servidor central métricas relacionadas com algo
importante ocorrido. Este modelo se chama push metrics
.
Atualmente pode não ser a solução de métricas mais conhecida no momento mas é uma opção a ser considerada dependendo dos casos de uso. Veja mais informações no site do projeto.
Existem algumas bibliotecas cliente para Elixir e Erlang:
Bem como plugin para o framework telemetry, telemetry_metrics_riemann.
No entanto, nosso foco é no seguinte cenário:
Imagine que temos uma aplicação instrumentada para enviar métricas para um servidor riemann usando conexão TCP ou UDP. E durante os testes unitários e integração a nossa aplicação tenta fazer uma conexão com o servidor riemann para enviar as métricas. No entanto como não temos um servidor iniciado a aplicação falha em tentar fazer a conexão e enviar as métricas.
Claro que o cenário acima é hipotético e pode ser resolvido de outras formas como por exemplo utilizando o telemetry ou configurando a aplicação para não enviar métricas durante testes ou implementando algum servidor no qual imita o comportamento de um servidor real riemann.
Já trabalhei em alguns projetos nos quais havia um docker compose apenas para subir a infraestrutura local (contento JVM e riemann) quando executávamos testes unitários. Os testes eram executados e no final destruíamos todo o ambiente do docker compose.
Recentemente criei uma aplicação chamada katja_echo justamente para implementar uma imitação de um servidor riemann no qual recebe eventos utilizando TCP ou UDP, armazena cada evento e responde alguma eventual consulta dos eventos armazenados.
Neste pull request Introduce katja_echo when running integration tests podemos ver uma aplicação mais prática. katja_vmstats é uma aplicação Erlang no qual faz uma leitura de várias estatísticas da BEAM VM e envia para um servidor riemann. Os testes de integração do projeto precisava de um servidor riemann ativo. Utilizando a aplicação katja_echo durante os testes não é mais necessário conviver com o overhead de instanciar uma JVM e riemann.
Então este artigo vai discutir alguns detalhes de implementação do projeto katja_echo nos quais foram bastante interessantes do ponto de vista dos recursos utilizados do OTP.
O protocolo riemann
Vamos começar pelo protocolo riemann, no qual é muito simples. Basicamente cada evento é um conjunto de campos obrigatórios e opcionais:
- host: “api1”
- service: “HTTP reqs/sec”
- state: “ok”
- time: in unix epoch
- description
- tags: [“rate”, “xyz”]
- metric: numero
- ttl: valor indicando quando este evento deve ser considerado expirado
Estes campos são serializados utilizando Google Protocol Buffers e especificados no arquivo proto.proto no é gerado código para criar as funções de encode e decode.
Sendo que uma mensagem no formato riemann é descrita assim:
Protocolo riemann https://github.com/katja-beam/katja_echo/blob/0.1.1/proto/katja_echo.proto1message RiemannPB_Msg {
2 optional bool ok = 2;
3 optional string error = 3;
4 repeated RiemannPB_State states = 4;
5 optional RiemannPB_Query query = 5;
6 repeated RiemannPB_Event events = 6;
7}
Podendo conter um Event
, Query
, State
.
Projetos em Erlang/Elixir possuem suporte para criar encoders e decoders para protocol buffers. Exemplo: utilizando o plugin rebar3_gpb_plugin e adicionando a dependência gpb as funcões de encode e decode são criadas automaticamente:
Encoder/Decoder https://github.com/katja-beam/katja_echo/blob/0.1.1/src/katja_echo_pb.erl 1%% -*- coding: utf-8 -*-
2%% @private
3%% Automatically generated, do not edit
4%% Generated by gpb_compile version 4.10.5
5-module(katja_echo_pb).
6
7-export([encode_msg/1, encode_msg/2, encode_msg/3]).
8-export([encode/1]). %% epb compatibility
9-export([encode_riemannpb_state/1]).
10-export([encode_riemannpb_event/1]).
11-export([encode_riemannpb_query/1]).
12-export([encode_riemannpb_msg/1]).
13-export([encode_riemannpb_attribute/1]).
14
15-export([decode_msg/2, decode_msg/3]).
16-export([decode/2]). %% epb compatibility
17-export([decode_riemannpb_state/1]).
18-export([decode_riemannpb_event/1]).
19-export([decode_riemannpb_query/1]).
20-export([decode_riemannpb_msg/1]).
21-export([decode_riemannpb_attribute/1]).
O plugin rebar3gpb_plugin suporta algumas configurações definidas no arquivo _rebar.config. Dependendo do caso, pode ser necessário alterar:
Encoder/Decoder https://github.com/katja-beam/katja_echo/blob/0.1.1/rebar.config1{gpb_opts, [
2 {i, "proto"},
3 {module_name_suffix, "_pb"},
4 {o_erl, "src"},
5 {o_hrl, "include"},
6 {strings_as_binaries, true},
7 type_specs,
8 epb_compatibility]}.
Bom, estas funções possibilitam serializar mensagens no formato correto. Agora precisamos implementar alguns servidores (gen_statem e gen_server) para receber e enviar estas mensagens.
katja_echo suporta receber mensagens via TCP e/ou UDP.
Servidor UDP
O servidor UDP, katja_echo_udp.erl, foi implementado utilizando um gen_server.
gen_server init callback https://github.com/katja-beam/katja_echo/blob/0.1.1/src/katja_echo_udp.erl1-spec init(Args :: list()) -> {ok, State :: state()}.
2
3init([Opts]) ->
4 Port = proplists:get_value(port, Opts),
5 {ok, Socket} = gen_udp:open(Port, [binary, {active, true}]),
6 Cbk = katja_echo:callback(Opts),
7 {ok, #state{socket = Socket, callback = Cbk}}.
A função gen_udp:open/2 faz a criação do socket UDP na porta especificada. Já o segundo parâmetro especifica como queremos receber as mensagens. Escolhemos o modo ativo no qual vamos precisar implementar a callback handle_info/2 para receber os pacotes UDP.
Depois armazenamos o socket no estado do gen_server.
Usando o modo ativo, significa que o processo katja_echo_udp irá receber uma mensagem toda vez que haja um pacote recebido pelo gen_udp.
gen_server init callback https://github.com/katja-beam/katja_echo/blob/0.1.1/src/katja_echo_udp.erl1handle_info({udp, _Socket, _IP, _InPortNo, Packet},
2 #state{callback = Cbk, errors = Errors} = State) ->
O formato da mensagem é {udp, Socket, IP, InPort, Packet}. Neste caso estamos interessados no quinto elemento da tupla (Packet) pois a seguinte função vai tentar decodificar o pacote recebido:
handle_info packet https://github.com/katja-beam/katja_echo/blob/0.1.1/src/katja_echo_udp.erl 1 NState =
2 case katja_echo:decode(udp, Packet) of
3 {ok, #riemannpb_msg{ok=true, events = Events}} ->
4 ok = katja_echo:events(Cbk, Events),
5 State;
6 {error, #riemannpb_msg{ok=false, error=Reason}} ->
7 Errors0 = maps:update_with(Reason, fun katja_echo:incr/1, 1, Errors),
8 State#state{errors = Errors0};
9 {error, Reason} ->
10 Errors0 = maps:update_with(Reason, fun katja_echo:incr/1, 1, Errors),
11 State#state{errors = Errors0}
12 end,
13 {noreply, NState};
Neste caso estamos delegando para outra função katja_echo:decode/2 no qual faz a diferenciação entre ‘udp’ e ’tcp’:
decode udp https://github.com/katja-beam/katja_echo/blob/0.1.1/src/katja_echo.erl1decode(udp, Msg) when is_binary(Msg) ->
2 try katja_echo_pb:decode_riemannpb_msg(Msg) of
3 R ->
4 {ok, R#riemannpb_msg{ok=true}}
5 catch
6 error:_Reason ->
7 {eror, #riemannpb_msg{ok=false, error="Response could not be decoded"}}
8 end;
Isso é necessário pois um pacote UDP (User Datagram Packet) deve conter uma mensagem riemann completa, ou seja, o decoder de proto buffer para termos Erlang deve decodificar uma mensagem ou falhar.
Servidor TCP
Também temos um servidor TCP implementado usando gen_statem. Não é muito comum pelo fato de que gen_statem é mais complexo que gen_server. Entretanto complexidade não é o caso aqui.
No caso TCP, quando recebemos um novo cliente, criamos um socket chamando a função gen_tcp:accept/1 e seguimos para o próximo estado definido chamado: connected.
tcp connection https://github.com/katja-beam/katja_echo/blob/0.1.1/src/katja_echo_tcp.erl1idle(internal, establish, #state{lsocket = LSocket} = Data) ->
2 {ok, AcceptSocket} = gen_tcp:accept(LSocket),
3
4 NData = Data#state{socket = AcceptSocket},
5
6 {next_state, connected, NData}.
Quando no estado ‘connected’ podemos receber mensagens enviadas pelo gen*tcp quando há alguma mensagem recebida pelo socket (exemplo: *{tcp, Socket, Packet}_).
connected state https://github.com/katja-beam/katja_echo/blob/0.1.1/src/katja_echo_tcp.erl 1connected(info, {tcp, Socket, Packet}, #state{socket = Socket} = Data) ->
2 {ok, NData} = process_packet(Data, Packet),
3
4 ok = inet:setopts(Socket, [{active, once}]),
5
6 {keep_state, NData};
7
8connected(info, {tcp_closed, Socket}, #state{socket = Socket}) ->
9 {stop, normal};
10
11connected(info, {tcp_error, Socket, _Reason}, #state{socket = Socket} = Data) ->
12 ok = gen_tcp:close(Socket),
13 {stop, normal, Data#state{socket = undefined}}.
A função process_packet/2 é onde fazemos a decodificação do pacote recebido:
connected state https://github.com/katja-beam/katja_echo/blob/0.1.1/src/katja_echo_tcp.erl1process_packet(#state{socket = Socket, callback = Cbk, errors = Errors, data = Acc} = State,
2 Packet) ->
3 BinMsg2 = <<Acc/binary, Packet/binary>>,
4 case katja_echo:decode(tcp, BinMsg2) of
O importante desta função é que precisamos decodificar um pacote contendo o payload atual e também qualquer buffer armazenado do payload anterior ao atual. Como o protocolo TCP é um stream (não tem início e fim) temos que identificar quantos bytes irão formar uma mensagem riemann possível de ser decodificada. E o restante dos bytes, se houver, precisam ser bufferizados para concatenar com próximo pacote recebido.
Seguindo no detalhe da função process_packet/2 :
connected state https://github.com/katja-beam/katja_echo/blob/0.1.1/src/katja_echo_tcp.erl 1 case katja_echo:decode(tcp, BinMsg2) of
2 {ok, #riemannpb_msg{ok=true, events = [], states = [],
3 query = #riemannpb_query{string = Query}}} ->
4
5 {ok, Results} = katja_echo:query(Cbk, Query),
6
7 send_riemann_reply(Socket, Results),
8
9 {ok, State#state{data = <<>>}};
10 {ok, #riemannpb_msg{ok=true, events = Events}} ->
11 send_riemann_reply(Socket),
12
13 ok = katja_echo:events(Cbk, Events),
14
15 {ok, State#state{data = <<>>}};
16 {error, #riemannpb_msg{ok=false, error=Reason}} ->
17 Errors0 = maps:update_with(Reason, fun katja_echo:incr/1, Errors),
18 {ok, State#state{data = <<>>, errors = Errors0}};
19 {error, Reason} ->
20 Errors0 = maps:update_with(Reason, fun katja_echo:incr/1, Errors),
21 {ok, State#state{data = BinMsg2, errors = Errors0}}
22 end.
Caso ocorra algum erro na decodificação, devemos armazenar o BinMsg2 no estado. Pois a próxima vez que recebermos um novo pacote TCP, iremos fazer um append desta forma: BinMsg2 = «Acc/binary, Packet/binary», e tentar decodificar novamente a mensagem.
Os outros branches do case acima fazem o tratamento quando a mensagem riemann é um evento ou uma query.
Para decodificar um pacote TCP recebido com sucesso, primeiro temos que verificar se o pacote se parece com uma mensagem riemann aplicando um functional pattern match no segundo parâmetro:
decode udp https://github.com/katja-beam/katja_echo/blob/0.1.1/src/katja_echo.erl1decode(tcp, <<MsgSize:32/integer-big, Msg/binary>>) ->
2 case Msg of
3 <<Msg2:MsgSize/binary, _Rest/binary>> ->
4 R = katja_echo_pb:decode_riemannpb_msg(Msg2),
5 {ok, R#riemannpb_msg{ok=true}};
6 _ ->
7 {error, #riemannpb_msg{ok=false, error="Response could not be decoded"}}
8 end;
«MsgSize:32/integer-big, Msg/binary> é um binary pattern match no qual diz que qualquer pacote que inicie com um inteiro de tamanho 32 bits (integer-big) seguido por um restante binary pode ser interpretado como uma mensagem riemann. Entretanto, apenas chamando a função katja_echo_pb:decode_riemannpb_msg/1 vamos ter certeza se Msg contém ou não uma mensagem riemann.
Repare também que MsgSize serve para especificar o tamanho da variável Msg2. Isso é muito importante pois precisamos passar o tamanho correto da mensagem riemann.
Quando um cliente riemann envia um pacote TCP para o servidor, o mesmo coloca um valor int32 como sendo o tamanho da mensagem riemann. Logo depois vem a mensagem riemann.
Events riemann
Relembrando, quando recebemos uma mensagem riemann do tipo Event, chamamos a função:
events https://github.com/katja-beam/katja_echo/blob/0.1.1/src/katja_echo_tcp.erl1 ok = katja_echo:events(Cbk, Events),
Com o argumento Events contento uma representação em termos Erlang de uma lista de eventos riemann.
No caso do katja_echo vamos armazenar todos os eventos em uma tabela ETS para posterior consulta:
decode udp https://github.com/katja-beam/katja_echo/blob/0.1.1/src/katja_echo.erl 1do_events(Events) when is_list(Events) ->
2 _ = [insert_event(Event) || Event <- Events],
3 ok.
4
5
6insert_event(#riemannpb_event{host = H, service = S} = E) ->
7 ets:insert(?TAB, {{H, S}, E});
8
9insert_event(#riemannpb_state{host = H, service = S} = E) ->
10 ets:insert(?TAB, {{H, S}, E}).
A chave de cada linha na tabela vai ser uma tupla contento o nome do Host e o nome do Service: #riemannpb_event{host = H, service = S} e o valor é o evento integral.
Desta forma podemos fazer consultas via host e/ou service bem como aplicar match specification para buscar eventos específicos.
Query riemann
Todos os eventos armazenados podem ser consultados usando simples queries. Exemplos:
- state = “ok”
- metric_f > 2.0 and not host = nil
- tagged “product”
Geralmente cliente riemann possui uma API para fazer as consultas. Os dados retornados são todos os eventos riemann selecionados pela consulta.
O servidor riemann oficial implementa uma gramática e parser para interpretar uma query, katja_echo implementa uma gramática e parser baseados na implementação oficial mas usando uma aplicação disponível no OTP chamada Parse Tools.
Parse Tools possui duas ferramentas:
- yecc, no qual gera um parser LALR-1 usando uma gramática BNF
- leex, um tokenizador baseado em expressões regulares
Dependendo do caso podemos usar outro tipo de tokenizador, não necessariamente precisamos usar o leex. Mas por conveniência iniciamos com a criação das regras do token como definido no arquivo katja_echo_query_lexer.xr.
Logo em seguida definimos as regras gramaticais katja_echo_query_grammar.yrl.
Com o tokenizer e a gramática, definimos um modulo e criamos algumas funções nas quais recebem uma string chamando o lexer e grammar para analisar:
parse https://github.com/katja-beam/katja_echo/blob/0.1.1/src/katja_echo_query.erl1parse(<<Bin/binary>>) ->
2 parse(binary_to_list(Bin));
3
4parse(String) ->
5 {ok, Tokens, _EndLine} = katja_echo_query_lexer:string(String),
6 katja_echo_query_grammar:parse(Tokens).
Expondo uma API para receber uma query, fazer a análise e baseado em uma árvore parseada realizar as operações da query recebida:
calling parse tree https://github.com/katja-beam/katja_echo/blob/0.1.1/src/katja_echo.erl1do_query({ok, ParseTree}) ->
2 katja_echo_query:query(?TAB, ParseTree);
3
4do_query({error, {_LineNumber, Module, Message}}) ->
5 {error, {"syntax error", Module:format_error(Message)}};
6
7do_query(Query) ->
8 do_query(katja_echo_query:parse(Query)).
ParseTree contém termos Erlang nas quais outras funções irão processar e retornar os eventos:
getting data https://github.com/katja-beam/katja_echo/blob/0.1.1/src/katja_echo_query.erl 1qq(_Tab, [], R) ->
2 R;
3
4qq(Tab, [{'and', P} | Hs], R) ->
5 {ok, Results} = do_query(Tab, make_match_conditions('andalso', Tab, P)),
6 qq(Tab, Hs, Results ++ R);
7
8qq(Tab, [{'or', P} | Hs], R) ->
9 {ok, Results} = do_query(Tab, make_match_conditions('orelse', Tab, P)),
10 qq(Tab, Hs, Results ++ R);
11
12qq(Tab, [{field, _, _, _} = P | [] = Hs], R) ->
13 {ok, Results} = do_query(Tab, make_match_conditions('andalso', Tab, P)),
14 qq(Tab, Hs, Results ++ R);
15
16qq(Tab, [{field, _, _, _} = P | Hs], R) ->
17 qq(Tab, Hs, make_match_conditions('andalso', Tab, P) ++ R);
18
19qq(Tab, [{field, _, _} = P | Hs], R) ->
20 qq(Tab, Hs, make_match_conditions('andalso', Tab, P) ++ R).
Acima usamos pattern match e processamento com listas. Não existe uma forma padronizada de implementar tal código. O segredo é encontrar a melhor estrutura de dados que case com a necessidade.
Após todo o processamento da query, chega o momento de consultar a tabela ETS e obter os eventos. Fazemos isso usando uma feature do OTP chamada match specification.
No nosso caso, com o resultado do parser da query apenas precisamos criar a parte chamada MatchConditions e enviar chamar a função ets:select/2 .
getting data https://github.com/katja-beam/katja_echo/blob/0.1.1/src/katja_echo_query.erl1do_query(Tab, MatchConditions) ->
2 MS = [{{'_', '$1'}, MatchConditions, ['$1']}],
3 {ok, ets:select(Tab, MS)}.
A intenção de implementar uma interface para queries riemann foi justamente poder consultar os dados da tabela ETS sem precisar criar queries manuais, ou seja, match specifications são criados a partir da query recebida.
Um último ponto sobre este assunto é que a ferramenta rebar3 possui suporte para geração de código a partir dos arquivos yrl e xrl:
rebar yrl e xrl https://github.com/katja-beam/katja_echo/blob/0.1.1/rebar.config1{yrl_opts, [{verbose, true}]}.
2
3{xrl_opts, [{verbose, true}]}.
Testes
A estratégia de testes segue duas linhas principais sendo testes unitários e testes de integração.
katja_echo_SUITE.erl, usando o framework Common Test e iniciando a aplicação katja_echo de forma controlada.
O importante é levar em conta a testabilidade durante o design da aplicação. Um exemplo clássico é utilizar callbacks nas quais podemos trocar quando em testes. Isso facilita verificar se o caso de teste está atingindo os postos necessários. Exemplo:
Changing callback https://github.com/katja-beam/katja_echo/blob/0.1.1/test/katja_echo_SUITE.erl 1listen_tcp_events(_Config) ->
2 {ok, Ref, Fun} = reply_events(),
3
4 Event = default_event(),
5 Events = default_event(10),
6
7 {ok, Pid} = katja_echo:start_link([{callback, Fun}]),
8
9 ok = katja:send_event(katja_writer, tcp, Event),
10
11 check_event(Ref, Event),
12
13 ok = katja:send_events(katja_writer, tcp, Events),
14
15 check_event(Ref, Events),
16
17 katja_echo_sup:stop(Pid),
18
19 ok.
Repare que a função reply_events() retorna uma tuple onde o terceiro elemento é uma função anônima na qual vai ser usada para configurar a callback.
Quando a callback for executada, dentro da aplicação katja_echo, podemos verificar chamando a função check_event/2 .
katja_echo_query_tests, usando o framework eunit e seguindo uma abordagem mais teste unitário onde definimos um gerador automático dos casos de teste baseados em uma lista de queries. É um uso mais avançado do eunit e traz grande economia de tempo na escrita dos testes.
Conclusão
As ideias e técnicas apresentadas neste artigo podem servir como inspiração para aqueles que precisam criar alguma solução similar. Principalmente para isolarem algum subsistema utilizado para testes. Ou reimplementarem algum outro protocolo ou servidor utilizando Erlang ou Elixir.