Calcular a velocidade média das estradas [fechada]

20

Eu fui a uma entrevista de emprego de engenheiro de dados. O entrevistador me fez uma pergunta. Ele me deu uma situação e me pediu para projetar o fluxo de dados para esse sistema. Eu resolvi isso, mas ele não gostou da minha solução e eu falhei. Gostaria de saber se você tem idéias melhores sobre como resolver esse desafio.

A questão era:

Nosso sistema recebe quatro fluxos de dados. Os dados contêm as coordenadas de identificação do veículo, velocidade e localização geográfica. Todo veículo envia seus dados uma vez por minuto. Não há conexão entre um fluxo específico e uma estrada ou veículo específico ou qualquer outra coisa. Há uma função que aceita coordenações e retorna um nome de seção da estrada. Precisamos saber a velocidade média por seção da estrada por 5 minutos. Finalmente, queremos escrever os resultados para Kafka.

insira a descrição da imagem aqui

Então, minha solução foi:

Primeiro, grave todos os dados em um cluster Kafka, em um tópico, particionado pelos 5-6 primeiros dígitos da latitude concatenados aos 5-6 primeiros dígitos da longitude. Em seguida, leia os dados por Streaming estruturado, adicionando para cada linha o nome da seção da estrada pelas coordenadas (há um udf predefinido para isso) e, em seguida, coletando os dados pelo nome da seção da estrada.

Como particiono os dados no Kafka pelos 5-6 primeiros dígitos das coordenadas, depois de converter as coordenadas no nome da seção, não é necessário transferir muitos dados para a partição correta e, portanto, posso aproveitar a operação colesce () isso não desencadeia uma reprodução aleatória completa.

Em seguida, calcule a velocidade média por executor.

Todo o processo ocorrerá a cada 5 minutos e gravaremos os dados no modo Anexar no coletor Kafka final.

insira a descrição da imagem aqui

Então, novamente, o entrevistador não gostou da minha solução. Alguém poderia sugerir como melhorá-lo ou uma ideia completamente diferente e melhor?

Alon
fonte
Não seria melhor perguntar à pessoa do que ela não gostava exatamente?
Gino Pane
Eu acho que é uma má ideia particionar pelo concatenado lat-long. O ponto de dados não será relatado para cada faixa como uma coordenada ligeiramente diferente?
Webdec #
@webber, portanto, eu uso apenas alguns dígitos, para que a posição não seja única, mas relativamente no tamanho de uma seção de estrada.
Alon

Respostas:

6

Achei essa pergunta muito interessante e pensei em tentar.

Conforme eu avaliei mais, sua tentativa em si é boa, exceto o seguinte:

particionado pelos 5-6 primeiros dígitos da latitude concatenados aos 5-6 primeiros dígitos da longitude

Se você já possui um método para obter o ID / nome da seção de estrada com base na latitude e longitude, por que não chamar esse método primeiro e usar o ID / nome da seção de estrada para particionar os dados em primeiro lugar?

E depois disso, tudo fica bem fácil, então a topologia será

Merge all four streams ->
Select key as the road section id/name ->
Group the stream by Key -> 
Use time windowed aggregation for the given time ->
Materialize it to a store. 

(Uma explicação mais detalhada pode ser encontrada nos comentários no código abaixo. Por favor, pergunte se algo não está claro)

Adicionei o código no final desta resposta. Observe que, em vez da média, usei soma, pois é mais fácil de demonstrar. É possível fazer a média armazenando alguns dados extras.

Eu detalhei a resposta nos comentários. A seguir, é apresentado um diagrama de topologia gerado a partir do código (graças a https://zz85.github.io/kafka-streams-viz/ )

Topologia:

Diagrama de topologia

    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.Topology;
    import org.apache.kafka.streams.kstream.KStream;
    import org.apache.kafka.streams.kstream.Materialized;
    import org.apache.kafka.streams.kstream.TimeWindows;
    import org.apache.kafka.streams.state.Stores;
    import org.apache.kafka.streams.state.WindowBytesStoreSupplier;

    import java.util.Arrays;
    import java.util.List;
    import java.util.Properties;
    import java.util.concurrent.CountDownLatch;

    public class VehicleStream {
        // 5 minutes aggregation window
        private static final long AGGREGATION_WINDOW = 5 * 50 * 1000L;

        public static void main(String[] args) throws Exception {
            Properties properties = new Properties();

            // Setting configs, change accordingly
            properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "vehicle.stream.app");
            properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,kafka2:19092");
            properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

            // initializing  a streambuilder for building topology.
            final StreamsBuilder builder = new StreamsBuilder();

            // Our initial 4 streams.
            List<String> streamInputTopics = Arrays.asList(
                    "vehicle.stream1", "vehicle.stream2",
                    "vehicle.stream3", "vehicle.stream4"
            );
            /*
             * Since there is no connection between a specific stream
             * to a specific road or vehicle or anything else,
             * we can take all four streams as a single stream
             */
            KStream<String, String> source = builder.stream(streamInputTopics);

            /*
             * The initial key is unimportant (which can be ignored),
             * Instead, we will be using the section name/id as key.
             * Data will contain comma separated values in following format.
             * VehicleId,Speed,Latitude,Longitude
             */
            WindowBytesStoreSupplier windowSpeedStore = Stores.persistentWindowStore(
                    "windowSpeedStore",
                    AGGREGATION_WINDOW,
                    2, 10, true
            );
            source
                    .peek((k, v) -> printValues("Initial", k, v))
                    // First, we rekey the stream based on the road section.
                    .selectKey(VehicleStream::selectKeyAsRoadSection)
                    .peek((k, v) -> printValues("After rekey", k, v))
                    .groupByKey()
                    .windowedBy(TimeWindows.of(AGGREGATION_WINDOW))
                    .aggregate(
                            () -> "0.0", // Initialize
                            /*
                             * I'm using summing here for the aggregation as that's easier.
                             * It can be converted to average by storing extra details on number of records, etc..
                             */
                            (k, v, previousSpeed) ->  // Aggregator (summing speed)
                                    String.valueOf(
                                            Double.parseDouble(previousSpeed) +
                                                    VehicleSpeed.getVehicleSpeed(v).speed
                                    ),
                            Materialized.as(windowSpeedStore)
                    );
            // generating the topology
            final Topology topology = builder.build();
            System.out.print(topology.describe());

            // constructing a streams client with the properties and topology
            final KafkaStreams streams = new KafkaStreams(topology, properties);
            final CountDownLatch latch = new CountDownLatch(1);

            // attaching shutdown handler
            Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
                @Override
                public void run() {
                    streams.close();
                    latch.countDown();
                }
            });
            try {
                streams.start();
                latch.await();
            } catch (Throwable e) {
                System.exit(1);
            }
            System.exit(0);
        }


        private static void printValues(String message, String key, Object value) {
            System.out.printf("===%s=== key: %s value: %s%n", message, key, value.toString());
        }

        private static String selectKeyAsRoadSection(String key, String speedValue) {
            // Would make more sense when it's the section id, rather than a name.
            return coordinateToRoadSection(
                    VehicleSpeed.getVehicleSpeed(speedValue).latitude,
                    VehicleSpeed.getVehicleSpeed(speedValue).longitude
            );
        }

        private static String coordinateToRoadSection(String latitude, String longitude) {
            // Dummy function
            return "Area 51";
        }

        public static class VehicleSpeed {
            public String vehicleId;
            public double speed;
            public String latitude;
            public String longitude;

            public static VehicleSpeed getVehicleSpeed(String data) {
                return new VehicleSpeed(data);
            }

            public VehicleSpeed(String data) {
                String[] dataArray = data.split(",");
                this.vehicleId = dataArray[0];
                this.speed = Double.parseDouble(dataArray[1]);
                this.latitude = dataArray[2];
                this.longitude = dataArray[3];
            }

            @Override
            public String toString() {
                return String.format("veh: %s, speed: %f, latlong : %s,%s", vehicleId, speed, latitude, longitude);
            }
        }
    }
Irshad PI
fonte
Não é uma má ideia mesclar todos os fluxos? Isso pode se tornar um gargalo para o seu fluxo de dados. O que acontece quando você começa a receber mais e mais fluxos de entrada à medida que o sistema cresce? Isso será escalável?
Wypul
@wypul> não é uma má ideia mesclar todos os fluxos? -> acho que não. O paralelismo em Kafka não é alcançado através de fluxos, mas através de partições (e tarefas), encadeamento, etc. Os fluxos são uma maneira de agrupar os dados. > Isso será escalável? -> sim. Como estamos digitando as seções da estrada e assumindo que as seções da estrada são distribuídas de maneira razoável, podemos aumentar o número de partições para esses tópicos para processar paralelamente o fluxo em diferentes contêineres. Podemos usar um bom algoritmo de particionamento baseado na seção de estrada para distribuir a carga entre réplicas.
Irshad PI
1

O problema, como tal, parece simples e as soluções oferecidas já fazem muito sentido. Gostaria de saber se o entrevistador estava preocupado com o design e desempenho da solução em que você se concentrou ou com a precisão do resultado. Como outros se concentraram no código, no design e no desempenho, pesarei na precisão.

Solução de Streaming

À medida que os dados estão fluindo, podemos fornecer uma estimativa aproximada da velocidade média de uma estrada. Essa estimativa será útil na detecção de congestionamento, mas será desativada na determinação do limite de velocidade.

  1. Combine todos os 4 fluxos de dados juntos.
  2. Crie uma janela de 5 minutos para capturar dados de todos os 4 fluxos em 5 minutos.
  3. Aplique UDF nas coordenadas para obter o nome da rua e o nome da cidade. Os nomes das ruas costumam ser duplicados nas cidades; portanto, usaremos o nome da cidade + o nome da rua como chave.
  4. Calcule a velocidade média com uma sintaxe como -

    vehicle_street_speed
      .groupBy($"city_name_street_name")
      .agg(
        avg($"speed").as("avg_speed")
      )

5. write the result to the Kafka Topic

Solução em lote

Essa estimativa será desativada porque o tamanho da amostra é pequeno. Precisamos de um processamento em lote com dados completos de mês / trimestre / ano para determinar com mais precisão o limite de velocidade.

  1. Leia os dados de um ano no data lake (ou Tópico Kafka)

  2. Aplique UDF nas coordenadas para obter o nome da rua e o nome da cidade.

  3. Calcule a velocidade média com uma sintaxe como -


    vehicle_street_speed
      .groupBy($"city_name_street_name")
      .agg(
        avg($"speed").as("avg_speed")
      )

  1. escreva o resultado no data lake.

Com base nesse limite de velocidade mais preciso, podemos prever tráfego lento no aplicativo de streaming.

Salim
fonte
1

Vejo alguns problemas com sua estratégia de particionamento:

  • Quando você diz que irá particionar seus dados com base nos primeiros 5 a 6 dígitos de lat de comprimento, não poderá determinar o número de partições kafka antecipadamente. Você terá dados assimétricos; em alguns trechos da estrada, você observará um volume alto do que outros.

  • E sua combinação de teclas não garante os mesmos dados da seção de estrada na mesma partição e, portanto, você não pode ter certeza de que não haverá embaralhamento.

As informações fornecidas pela IMO não são suficientes para projetar todo o pipeline de dados. Porque ao projetar o pipeline, como você particiona seus dados desempenha um papel importante. Você deve saber mais sobre os dados que está recebendo como número de veículos, tamanho dos fluxos de dados de entrada, o número de fluxos é fixo ou pode aumentar no futuro? Os fluxos de dados de entrada que você está recebendo são fluxos kafka? Quantos dados você recebe em 5 minutos?

  • Agora, vamos supor que você tenha 4 fluxos gravados em 4 tópicos em kafka ou 4 partições e não possua nenhuma chave específica, mas seus dados serão particionados com base em alguma chave do datacenter ou em hash. Caso contrário, isso deve ser feito no lado dos dados, em vez de desduplicar os dados em outro fluxo e particionamento kafka.
  • Se você estiver recebendo os dados em diferentes datacenters, precisará trazê-los para um cluster e, para esse fim, poderá usar o Kafka mirror maker ou algo semelhante.
  • Depois de ter todos os dados em um cluster, você pode executar um trabalho de streaming estruturado e com um intervalo de disparo de 5 minutos e marca d'água com base em seus requisitos.
  • Para calcular a média e evitar muitos embaralhamento, você pode usar uma combinação de mapValuese em reduceByKeyvez de groupBy. Consulte isso .
  • Você pode gravar os dados no coletor kafka após o processamento.
wypul
fonte
mapValues ​​e reduzemByKey pertencem ao RDD de baixo nível. O Catalyst não é inteligente o suficiente para gerar o RDD mais eficiente ao agrupar e calcular a média?
Alon
O @Alon Catalyst certamente será capaz de descobrir o melhor plano para executar sua consulta, mas se você usar groupBy, os dados com a mesma chave serão embaralhados para a mesma partição primeiro e depois aplicarão a operação agregada nela. mapValuese, de reduceByfato, pertence ao RDD de baixo nível, mas ainda terá um desempenho melhor nessa situação, pois primeiro acumula agregados por partição e, em seguida, embaralha.
wypul
0

Os principais problemas que vejo com esta solução são:

  • As seções de estrada que estão na borda dos quadrados de 6 dígitos do mapa terão dados em várias partições de tópicos e várias velocidades médias.
  • O tamanho dos dados de ingestão para suas partições Kafka pode estar desequilibrado (cidade versus deserto). Particionar pelos primeiros dígitos da identificação do carro pode ser uma boa ideia para IMO.
  • Não sei se segui a parte de coalescência, mas parece problemático.

Eu diria que a solução precisa fazer: leia do fluxo Kafka -> UDF -> seção da estrada de grupo -> média -> escreva no fluxo Kafka.

David Taub
fonte
0

Meu design dependeria de

  1. Número de estradas
  2. Número de veículos
  3. Custo de computação da estrada a partir de coordenadas

Se eu quiser escalar para qualquer número de contagens, o design ficaria assim insira a descrição da imagem aqui

Questões cruzadas sobre esse design -

  1. Manter um estado durável dos fluxos de entrada (se a entrada for kafka, podemos armazenar compensações com Kafka ou externamente)
  2. Periodicamente estados de ponto de verificação para o sistema externo (eu prefiro usar barreiras assíncronas de ponto de verificação no Flink )

Algumas melhorias práticas possíveis neste design -

  1. Função de mapeamento de seções de estradas em cache, se possível, com base nas estradas
  2. Manipulação de pings perdidos (na prática, nem todos os ping estão disponíveis)
  3. Levando em consideração a curvatura da estrada (rolamento e altitude)
yugandhar
fonte