1. Carga de dados para DataFrames
O primeiro ponto a se observar é que ao executar um função para carregar dados de uma fonte gerando um data frame os dados não são imediatamente carregados, e sim é criado um plano de execução para a leitura desses dados posteriormente quando uma “Action”for executada. A isso se da o nome de “Lazy loading”(carga preguiçosa em tradução literal)
1.1 Mas como uma simples função de leitura pode demorar se nenhum dado está efetivamente sendo carregado?
Dependendo de como os dados estão sendo lidos como por exemplo com a função spark.read.loadum job é executado para mapear todas as partições existentes no HDFS, dependendo de quantas são, isso pode levar um tempo considerável.E pior, mesmo que seja feito um filtro para ler apenas algumas dessas partições todas elas serão mapeadas da mesma forma.
A solução para isso é usar a função spark.read.table,essa função utiliza os metadados do hive, não sendo necessário descobrir todas as partições.
Outra vantagem de fazer a leitura de arquivos ao invés de tabelas é que o schema fica armazenado no meta store, fazendo com que não seja necessário inferir a partir dos dados, aumentando ainda mais a velocidade de carregamento dos dados.
1.2 Tipos de Arquivo
De preferencia sempre a arquivos colunares como por exemplo parquet, são arquivos que podem ser paralelizados tornando o tipo ideal para utilização com spark. Além disso existe a possibilidade de carregar apenas as colunas envolvidas em uma query, o que torna a mesma mais rápida e eficiente.
Ao utilizar de texto ou json, é sempre necessário carregar a linha inteira, mesmo que apenas duas colunas sejam necessárias para a query, além de não ser possível paralelizar esses tipos de arquivo, oque gera lentidão na execução.
2. Particionamento
2.1 tasks e particionamento
Ao salvar um data frame em disco cada task salva um arquivo, pensando nisso é necessário prestar atenção em como os dados estão distribuídos entre as tasks para evitar a geração de arquivos muito pequenos.
Para isso faça um repartition antes de salvar seus dados garantindo que cada arquivo ira conter uma quantidade homogênea de dados.
É importante também ficar atento para não gerar partições muito grandes pois isso pode impactar no desempenho de queries futuras.
3. Configurações
3.1 Shuffle partitions
shuffle partitions são definidas por padrão como 200, mas esse valor pode ser muito alto ou muito pequeno dependendo da sua aplicação. Elas São utilizadas em operações como group by, joins, repartition e podem impactar muito no desempenho da execução.
Não existe uma receita pronta de qual a configuração ideal, e sim deve haver uma analise caso a caso para determinar oque funciona melhor para a sua aplicação
A quantidade de shuffle partitions pode ser alterada utilizando o comando: spark.sql.shuffle.partitions
lembre-seque essa configuração é levada durante todo o período de execução, por isso fique atento a todos os stages da aplicação e procure um numero que atenda da melhor forma possível todos os stages, ou que otimize os stages mais complexos, com intuito de aumentar ao máximo a performance da aplicação
3.2 Unions
Ao executar um union cada dataframe se comporta de forma individual até que um shuffle seja executado, sendo assim, se os dataframes utilizados no union forem variações de um mesmo dataframe, mas com regras e métricas diferentes aplicadas, por exemplo, o dataframe sera calculado uma vez para cada um dos dataframes do union, gerando assim muito processamento desnecessário. Por isso é importante saber dessa particularidade e evitar esse tipo de union, ou caso não seja possível,persistir o dataframe base para garantir uma boa performance na execução.
3.3 Otimizador baseado em custo
Introduzido na versão 2.2 do spark, o otimizador baseado em custo utiliza estatísticas salvas no meta store para gerar um plano de execução mais eficiente reduzindo em até 90% a quantidade de shuffles.
O processo para gerar estatísticas da tabela não é automático e precisa ser executado de tempos em tempos para garantir o melhor plano de execução possível.
Para calcular as estatísticas utilizadas para otimizar os planos de execução se utiliza o seguinte comando: ANALYZE TABLE DATABASE.TABELA COMPUTE STATISTICS.
Gostou das dicas? Continue navegando nos artigos do Semantix Lab e aprenda muito mais!