Logstash analisando documento xml contendo várias entradas de log

8

Atualmente, estou avaliando se o logstash e a elasticsearch são úteis para nosso caso de uso. O que eu tenho é um arquivo de log contendo várias entradas, que é do formato

<root>
    <entry>
        <fieldx>...</fieldx>
        <fieldy>...</fieldy>
        <fieldz>...</fieldz>
        ...
        <fieldarray>
            <fielda>...</fielda>
            <fielda>...</fielda>
            ...
        </fieldarray>
    </entry>
    <entry>
    ...
    </entry>
    ...
<root>

Cada entryelemento conteria um evento de log. (Se você estiver interessado, na verdade, o arquivo é uma exportação do log de trabalho do Tempo Timesheets (An Atlassian JIRA Plug-in).)

É possível transformar esse arquivo em vários eventos de log sem escrever meu próprio codec?

dualizado
fonte

Respostas:

11

Tudo bem, eu encontrei uma solução que funciona para mim. O maior problema com a solução é que o plugin XML é ... não muito instável, mas mal documentado e com bugs ou documentado de maneira incorreta e incorreta.

TLDR

Linha de comando do Bash:

gzcat -d file.xml.gz | tr -d "\n\r" | xmllint --format - | logstash -f logstash-csv.conf

Configuração do Logstash:

input {
    stdin {}
}

filter {
    # add all lines that have more indentation than double-space to the previous line
    multiline {
        pattern => "^\s\s(\s\s|\<\/entry\>)"
        what => previous
    }
    # multiline filter adds the tag "multiline" only to lines spanning multiple lines
    # We _only_ want those here.
    if "multiline" in [tags] {
        # Add the encoding line here. Could in theory extract this from the
        # first line with a clever filter. Not worth the effort at the moment.
        mutate {
            replace => ["message",'<?xml version="1.0" encoding="UTF-8" ?>%{message}']
        }
        # This filter exports the hierarchy into the field "entry". This will
        # create a very deep structure that elasticsearch does not really like.
        # Which is why I used add_field to flatten it.
        xml {
            target => entry
            source => message
            add_field => {
                fieldx         => "%{[entry][fieldx]}"
                fieldy         => "%{[entry][fieldy]}"
                fieldz         => "%{[entry][fieldz]}"
                # With deeper nested fields, the xml converter actually creates
                # an array containing hashes, which is why you need the [0]
                # -- took me ages to find out.
                fielda         => "%{[entry][fieldarray][0][fielda]}"
                fieldb         => "%{[entry][fieldarray][0][fieldb]}"
                fieldc         => "%{[entry][fieldarray][0][fieldc]}"
            }
        }
        # Remove the intermediate fields before output. "message" contains the
        # original message (XML). You may or may-not want to keep that.
        mutate {
            remove_field => ["message"]
            remove_field => ["entry"]
        }
    }
}

output {
    ...
}

Detalhado

Minha solução funciona porque, pelo menos até o entrynível, minha entrada XML é muito uniforme e, portanto, pode ser manipulada por algum tipo de correspondência de padrão.

Como a exportação é basicamente uma linha muito longa de XML, e o plug-in xml do logstash funciona basicamente apenas com campos (leia-se: colunas em linhas) que contêm dados XML, tive que mudar os dados para um formato mais útil.

Shell: Preparando o arquivo

  • gzcat -d file.xml.gz |: Havia muitos dados - obviamente você pode pular esse
  • tr -d "\n\r" |: Remover quebras de linha dentro de elementos XML: alguns dos elementos podem conter quebras de linha como dados de caracteres. A próxima etapa requer que elas sejam removidas ou codificadas de alguma forma. Mesmo assumindo que, neste ponto, você tenha todo o código XML em uma linha enorme, não importa se esse comando remove qualquer espaço em branco entre os elementos

  • xmllint --format - |: Formate o XML com xmllint (vem com libxml)

    Aqui, a única linha enorme de espaguete de XML ( <root><entry><fieldx>...</fieldx></entry></root>) está formatada corretamente:

    <root>
      <entry>
        <fieldx>...</fieldx>
        <fieldy>...</fieldy>
        <fieldz>...</fieldz>
        <fieldarray>
          <fielda>...</fielda>
          <fieldb>...</fieldb>
          ...
        </fieldarray>
      </entry>
      <entry>
        ...
      </entry>
      ...
    </root>
    

Logstash

logstash -f logstash-csv.conf

(Veja o conteúdo completo do .confarquivo na seção TL; DR.)

Aqui, o multilinefiltro faz o truque. Ele pode mesclar várias linhas em uma única mensagem de log. E é por isso que a formatação com xmllintfoi necessária:

filter {
    # add all lines that have more indentation than double-space to the previous line
    multiline {
        pattern => "^\s\s(\s\s|\<\/entry\>)"
        what => previous
    }
}

Isso basicamente diz que toda linha com indentação com mais de dois espaços (ou é </entry>/ xmllint faz indentação com dois espaços por padrão) pertence a uma linha anterior. Isso também significa que os dados dos caracteres não devem conter novas linhas (removidas com trcasca) e que o xml deve ser normalizado (xmllint)

dualizado
fonte
Oi, você conseguiu fazer isso funcionar? Estou curioso, pois tenho uma necessidade semelhante e a solução multilinha junto com a divisão não funcionou para mim. Obrigado pelo seu feedback
viz
@viz Isso funcionou, mas nunca o usamos na produção. Multiline só funciona se você tem uma estrutura XML muito regular e ter formatado-lo primeiro com recuo (ver resposta, seção "preparando o arquivo")
dualed
1

Eu tive um caso semelhante. Para analisar este xml:

<ROOT number="34">
  <EVENTLIST>
    <EVENT name="hey"/>
    <EVENT name="you"/>
  </EVENTLIST>
</ROOT>

Eu uso essa configuração para logstash:

input {
  file {
    path => "/path/events.xml"
    start_position => "beginning"
    sincedb_path => "/dev/null"
    codec => multiline {
      pattern => "<ROOT"
      negate => "true"
      what => "previous"
      auto_flush_interval => 1
    }
  }
}
filter {
  xml {
    source => "message"
    target => "xml_content"
  }
  split {
    field => "xml_content[EVENTLIST]"
  }
  split {
    field => "xml_content[EVENTLIST][EVENT]"
  }
  mutate {
    add_field => { "number" => "%{xml_content[number]}" }
    add_field => { "name" => "%{xml_content[EVENTLIST][EVENT][name]}" }
    remove_field => ['xml_content', 'message', 'path']
  }
}
output {
  stdout {
    codec => rubydebug
  }
}

Espero que isso possa ajudar alguém. Eu precisava de muito tempo para obtê-lo.

drinor
fonte