Cassandra Bulk Loading (sstableloader)

Cassandra Bulk Loading (sstableloader)

Introdução

A bulk loading, ou bulk insert, é o processo no qual uma grande quantidade de registros é inserida em um banco de dados em curto período de tempo. Um exemplo desta funcionalidade é o Cassandra Bulk Loader, também chamado de sstableloader.

Apache Cassandra
Apache Cassandra

Para ilustrar o funcionamento do Cassandra Bulk Loader vamos inserir um grande dataset com milhões de registros em aproximadamente 5 minutos e usando hardware de baixo custo. Neste sentido, a inserção individual de registros, apesar de otimizada, não é tão rápida quanto a bulk loading. Uma alternativa ao sstableloader seria o utilitário COPY FROM, mas ele é indicado para datasets com menos de 2 milhões de registros.

Dataset

O dataset de exemplo é o TLC Trip Record Data, disponibilizado pela cidade de Nova Iorque com as viagens dos FHVs, ou For-Hive-Vehicles. Estão também disponíveis os dados dos táxis amarelos e dos green cars, mas o maior dataset é o do FHV, por isso vamos usar apenas este.

Os dados coletados são o base license number, a pick-up/drop-off date-time, e a taxi zone location ID e a shared flag. A partir daí é possível fazer diversas análises bacanas sobre o comportamento dos habitantes da metrópole. A versão mais recente dos dados é de Junho de 2018, que tem aproximadamente 1.6 GB e 21.089.618 de linhas.

Criação da SSTable

O sstableloader é usado para importar dados de uma SSTable (Sorted String Table). A SSTable é um mecanismo do Cassandra para armazenamento de dados baseado em um par chave-valor ordenado pela chave que é formado blocos (geralmente) de 64 KB.

A SSTable é criada por meio da CQLSSTableWriter, uma classe da API do Cassandra. A CQLSSTableWriter recebe como parâmetro os scripts para criação da tabela e para inserção de registros, o arquivo de origem dos dados e o diretório de saída, onde será gravada finalmente a SSTable. Neste exemplo são usados o keyspace datalake e a tabela tb_fhv.

Para mostrar o funcionamento da CQLSSTableWriter foi criada a classe CassandraBulkImport, com os atributos usados na criação da SSTable. O código inicial está logo a seguir:

public class CassandraBulkImport {
    private static Logger logger = Logger.getLogger(CassandraBulkImport.class);
    private int importedRecords = 0;
    private String INSERT_STMT = "INSERT INTO datalake.tb_fhv "
        + "(ID, base_licence, pickup_datetime, dropoff_datetime, initial_location, final_location, shared_ride) "
        + "VALUES (?, ?, ?, ?, ?, ?, ?)";
    private String CREATE_TABLE = "CREATE TABLE datalake.tb_fhv "
        + "(ID UUID, base_licence TEXT, pickup_datetime TIMESTAMP, dropoff_datetime TIMESTAMP, initial_location INT, final_location INT, shared_ride BOOLEAN, "
        + "PRIMARY KEY(ID, base_licence))";
    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    private static String keyspace = "datalake";
    private static String tableName = "tb_fhv";
    private CQLSSTableWriter writer;
    private String inputFile;
    // {...}
}

A inicialização das variáveis pode ser conferida no código abaixo, onde o writer é construído a partir dos atributos informados.

public CassandraBulkImport(String inputFile, String outputDir) {
    this.inputFile = inputFile;
    DatabaseDescriptor.clientInitialization();
    CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder();
    builder.inDirectory(outputDir).forTable(CREATE_TABLE).using(String.format(INSERT_STMT, keyspace, tableName))
        .withPartitioner(new Murmur3Partitioner());
    writer = builder.build();
}

O método bulkImport é responsável por ler cada linha do arquivo de entrada, preparar os dados e inserir na SSTable por meio do writer.

public void bulkImport() throws IOException {
    try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(inputFile)))) {
        // Discard the header
        reader.readLine();
        String line;
        while ((line = reader.readLine()) != null) {
        // Prepare the columns
        line = StringUtils.remove(line, "\"");
        List<String> columns = Arrays.asList(line.split(",", -1));
        UUID uuid = UUIDs.timeBased();
        String baseLicence = columns.get(0);
        Date pickup = sdf.parse(columns.get(1));
        Date dropoff = sdf.parse(columns.get(2));
        Integer initialLocation = NumberUtils.toInt(columns.get(3), 0);
        Integer finalLocation = NumberUtils.toInt(columns.get(4), 0);
        Boolean sharedRide = "1".equals(columns.get(5));
        // Write the record
        writer.addRow(uuid, baseLicence, pickup, dropoff, initialLocation, finalLocation, sharedRide);
        importedRecords++;
        }
        logger.info("Imported records: " + importedRecords);
    } catch (Exception e) {
        logger.error(e);
    }
    writer.close();
}

Finalizamos a criação da SSTable com o método main para rodar o programa. Deve-se observar que o diretório de saída deve usar a estrutura a seguir, no formato keyspace/tableName, que foram definidos acima.

public static void main(String[] args) throws IOException {
    String inputFile = "/home/marco/dados/nyc/fhv_tripdata_2018-06.csv";
    String outputDir = System.getProperty("user.home") + "/temp/" + keyspace + "/" + tableName;
    File file = new File(outputDir);
    FileUtils.deleteDirectory(file);
    file.mkdirs();
    //
    CassandraBulkImport cassandraBulkImport = new CassandraBulkImport(inputFile, outputDir);
    cassandraBulkImport.bulkImport();
}

E para executar a aplicação podemos usar a própria IDE, o Eclipse por exemplo, ou rodar pelo terminal com o comando abaixo. É importante definir o parâmetro Xmx para 4 GB porque o programa demanda muita memória para processar os registros.

java -Xmx4g \
    -cp target/big-data-bulk-import-jar-with-dependencies.jar \
    net.marcoreis.dataimport.cassandra.CassandraBulkImport

O processo demora aproximados 4 minutos em um notebook com 4 CPUs Intel i7 e 16 GB de memória. Com a SSTable criada podemos proceder para o bulk loading.

O utilitário sstableloader

A SSTable foi criada no diretório datalake/tb_fhv e agora precisa ser importada para o Cassandra. Essa carga é feita pelo utilitário sstableloader. Para facilitar o processo, vamos utilizar as variáveis de ambiente abaixo, todos em ambiente Linux. Naturalmente, deve-se adequar os diretórios de acordo com cada máquina.

CASSANDRA_HOME=/home/marco/software/cassandra
PATH=$PATH:$CASSANDRA_HOME/bin
SSTABLE_DIR=/home/marco/temp/datalake/tb_fhv

O terminal para comandos no Cassanda é o cqlsh. Ao conectar, ele mostra as informações de conexão que podemos ver logo abaixo.

Connected to Test Cluster at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 3.11.3 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
cqlsh>

O primeiro comando deve ser a criação do keyspace datalake. Vamos criar um keyspace padrão, sem replicação apenas para teste.

CREATE KEYSPACE datalake
  WITH REPLICATION = { 
   'class' : 'SimpleStrategy', 
   'replication_factor' : 1 
  };

Para criar a tabela tb_fhv o script está aqui. Claro que deve ser a mesma estrutura usada na classe CassandraBulkImport.

CREATE TABLE datalake.tb_fhv 
(
    ID UUID, 
    base_licence TEXT, 
    pickup_datetime TIMESTAMP, 
    dropoff_datetime TIMESTAMP, 
    initial_location INT, 
    final_location INT, 
    shared_ride BOOLEAN, 
    PRIMARY KEY(ID, base_licence)
);

E o último passo é a execução do próprio sstableloader, indicando qual o destino (-d) e ativando o mode verbose (-v). Lembrando que o parâmetro SSTABLE_DIR é a própria SSTable que foi criada anteriormente.

sstableloader -v -d 127.0.0.1 $SSTABLE_DIR

Conclusão

Para o dataset selecionado, esse comando demora 1.5 minuto que somado aos 4 minutos da criação da SSTable totaliza 5.5 minutos de processamento para os 21 milhões de registros.

Deixe uma resposta

O seu endereço de email não será publicado. Campos obrigatórios marcados com *