Skip to content

Cassandra Bulk Loading (sstableloader)

Introdução

A bulk loading, ou bulk insert, é o processo no qual uma grande quantidade de registros é inserida em um banco de dados em curto período de tempo. Um exemplo desta funcionalidade é o Cassandra Bulk Loader, também chamado de sstableloader.

Apache Cassandra
Apache Cassandra

Para ilustrar o funcionamento do Cassandra Bulk Loader vamos inserir um grande dataset com milhões de registros em aproximadamente 5 minutos e usando hardware de baixo custo. Neste sentido, a inserção individual de registros, apesar de otimizada, não é tão rápida quanto a bulk loading. Uma alternativa ao sstableloader seria o utilitário COPY FROM, mas ele é indicado para datasets com menos de 2 milhões de registros.

Dataset

O dataset de exemplo é o TLC Trip Record Data, disponibilizado pela cidade de Nova Iorque com as viagens dos FHVs, ou For-Hive-Vehicles. Estão também disponíveis os dados dos táxis amarelos e dos green cars, mas o maior dataset é o do FHV, por isso vamos usar apenas este.

Os dados coletados são o base license number, a pick-up/drop-off date-time, e a taxi zone location ID e a shared flag. A partir daí é possível fazer diversas análises bacanas sobre o comportamento dos habitantes da metrópole. A versão mais recente dos dados é de Junho de 2018, que tem aproximadamente 1.6 GB e 21.089.618 de linhas.

Criação da SSTable

O sstableloader é usado para importar dados de uma SSTable (Sorted String Table). A SSTable é um mecanismo do Cassandra para armazenamento de dados baseado em um par chave-valor ordenado pela chave que é formado blocos (geralmente) de 64 KB.

A SSTable é criada por meio da CQLSSTableWriter, uma classe da API do Cassandra. A CQLSSTableWriter recebe como parâmetro os scripts para criação da tabela e para inserção de registros, o arquivo de origem dos dados e o diretório de saída, onde será gravada finalmente a SSTable. Neste exemplo são usados o keyspace datalake e a tabela tb_fhv.

Para mostrar o funcionamento da CQLSSTableWriter foi criada a classe CassandraBulkImport, com os atributos usados na criação da SSTable. O código inicial está logo a seguir:

public class CassandraBulkImport {
    private static Logger logger = Logger.getLogger(CassandraBulkImport.class);
    private int importedRecords = 0;
    private String INSERT_STMT = "INSERT INTO datalake.tb_fhv "
        + "(ID, base_licence, pickup_datetime, dropoff_datetime, initial_location, final_location, shared_ride) "
        + "VALUES (?, ?, ?, ?, ?, ?, ?)";
    private String CREATE_TABLE = "CREATE TABLE datalake.tb_fhv "
        + "(ID UUID, base_licence TEXT, pickup_datetime TIMESTAMP, dropoff_datetime TIMESTAMP, initial_location INT, final_location INT, shared_ride BOOLEAN, "
        + "PRIMARY KEY(ID, base_licence))";
    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    private static String keyspace = "datalake";
    private static String tableName = "tb_fhv";
    private CQLSSTableWriter writer;
    private String inputFile;
    // {...}
}

A inicialização das variáveis pode ser conferida no código abaixo, onde o writer é construído a partir dos atributos informados.

public CassandraBulkImport(String inputFile, String outputDir) {
    this.inputFile = inputFile;
    DatabaseDescriptor.clientInitialization();
    CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder();
    builder.inDirectory(outputDir).forTable(CREATE_TABLE).using(String.format(INSERT_STMT, keyspace, tableName))
        .withPartitioner(new Murmur3Partitioner());
    writer = builder.build();
}

O método bulkImport é responsável por ler cada linha do arquivo de entrada, preparar os dados e inserir na SSTable por meio do writer.

public void bulkImport() throws IOException {
    try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(inputFile)))) {
        // Discard the header
        reader.readLine();
        String line;
        while ((line = reader.readLine()) != null) {
        // Prepare the columns
        line = StringUtils.remove(line, "\"");
        List<String> columns = Arrays.asList(line.split(",", -1));
        UUID uuid = UUIDs.timeBased();
        String baseLicence = columns.get(0);
        Date pickup = sdf.parse(columns.get(1));
        Date dropoff = sdf.parse(columns.get(2));
        Integer initialLocation = NumberUtils.toInt(columns.get(3), 0);
        Integer finalLocation = NumberUtils.toInt(columns.get(4), 0);
        Boolean sharedRide = "1".equals(columns.get(5));
        // Write the record
        writer.addRow(uuid, baseLicence, pickup, dropoff, initialLocation, finalLocation, sharedRide);
        importedRecords++;
        }
        logger.info("Imported records: " + importedRecords);
    } catch (Exception e) {
        logger.error(e);
    }
    writer.close();
}

Finalizamos a criação da SSTable com o método main para rodar o programa. Deve-se observar que o diretório de saída deve usar a estrutura a seguir, no formato keyspace/tableName, que foram definidos acima.

public static void main(String[] args) throws IOException {
    String inputFile = "/home/marco/dados/nyc/fhv_tripdata_2018-06.csv";
    String outputDir = System.getProperty("user.home") + "/temp/" + keyspace + "/" + tableName;
    File file = new File(outputDir);
    FileUtils.deleteDirectory(file);
    file.mkdirs();
    //
    CassandraBulkImport cassandraBulkImport = new CassandraBulkImport(inputFile, outputDir);
    cassandraBulkImport.bulkImport();
}

E para executar a aplicação podemos usar a própria IDE, o Eclipse por exemplo, ou rodar pelo terminal com o comando abaixo. É importante definir o parâmetro Xmx para 4 GB porque o programa demanda muita memória para processar os registros.

java -Xmx4g \
    -cp target/big-data-bulk-import-jar-with-dependencies.jar \
    net.marcoreis.dataimport.cassandra.CassandraBulkImport

O processo demora aproximados 4 minutos em um notebook com 4 CPUs Intel i7 e 16 GB de memória. Com a SSTable criada podemos proceder para o bulk loading.

O utilitário sstableloader

A SSTable foi criada no diretório datalake/tb_fhv e agora precisa ser importada para o Cassandra. Essa carga é feita pelo utilitário sstableloader. Para facilitar o processo, vamos utilizar as variáveis de ambiente abaixo, todos em ambiente Linux. Naturalmente, deve-se adequar os diretórios de acordo com cada máquina.

CASSANDRA_HOME=/home/marco/software/cassandra
PATH=$PATH:$CASSANDRA_HOME/bin
SSTABLE_DIR=/home/marco/temp/datalake/tb_fhv

O terminal para comandos no Cassanda é o cqlsh. Ao conectar, ele mostra as informações de conexão que podemos ver logo abaixo.

Connected to Test Cluster at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 3.11.3 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
cqlsh>

O primeiro comando deve ser a criação do keyspace datalake. Vamos criar um keyspace padrão, sem replicação apenas para teste.

CREATE KEYSPACE datalake
  WITH REPLICATION = { 
   'class' : 'SimpleStrategy', 
   'replication_factor' : 1 
  };

Para criar a tabela tb_fhv o script está aqui. Claro que deve ser a mesma estrutura usada na classe CassandraBulkImport.

CREATE TABLE datalake.tb_fhv 
(
    ID UUID, 
    base_licence TEXT, 
    pickup_datetime TIMESTAMP, 
    dropoff_datetime TIMESTAMP, 
    initial_location INT, 
    final_location INT, 
    shared_ride BOOLEAN, 
    PRIMARY KEY(ID, base_licence)
);

E o último passo é a execução do próprio sstableloader, indicando qual o destino (-d) e ativando o mode verbose (-v). Lembrando que o parâmetro SSTABLE_DIR é a própria SSTable que foi criada anteriormente.

sstableloader -v -d 127.0.0.1 $SSTABLE_DIR

Conclusão

Para o dataset selecionado, esse comando demora 1.5 minuto que somado aos 4 minutos da criação da SSTable totaliza 5.5 minutos de processamento para os 21 milhões de registros.

Leave a Reply

Your email address will not be published. Required fields are marked *

Marco Reis
Privacy Overview

This website uses cookies so that we can provide you with the best user experience possible. Cookie information is stored in your browser and performs functions such as recognising you when you return to our website and helping our team to understand which sections of the website you find most interesting and useful.