Introdução
A bulk loading, ou bulk insert, é o processo no qual uma grande quantidade de registros é inserida em um banco de dados em curto período de tempo. Um exemplo desta funcionalidade é o Cassandra Bulk Loader, também chamado de sstableloader
.
Para ilustrar o funcionamento do Cassandra Bulk Loader vamos inserir um grande dataset com milhões de registros em aproximadamente 5 minutos e usando hardware de baixo custo. Neste sentido, a inserção individual de registros, apesar de otimizada, não é tão rápida quanto a bulk loading. Uma alternativa ao sstableloader seria o utilitário COPY FROM
, mas ele é indicado para datasets com menos de 2 milhões de registros.
Dataset
O dataset de exemplo é o TLC Trip Record Data, disponibilizado pela cidade de Nova Iorque com as viagens dos FHVs, ou For-Hive-Vehicles. Estão também disponíveis os dados dos táxis amarelos e dos green cars, mas o maior dataset é o do FHV, por isso vamos usar apenas este.
Os dados coletados são o base license number, a pick-up/drop-off date-time, e a taxi zone location ID e a shared flag. A partir daí é possível fazer diversas análises bacanas sobre o comportamento dos habitantes da metrópole. A versão mais recente dos dados é de Junho de 2018, que tem aproximadamente 1.6 GB e 21.089.618 de linhas.
Criação da SSTable
O sstableloader
é usado para importar dados de uma SSTable (Sorted String Table). A SSTable é um mecanismo do Cassandra para armazenamento de dados baseado em um par chave-valor ordenado pela chave que é formado blocos (geralmente) de 64 KB.
A SSTable é criada por meio da CQLSSTableWriter
, uma classe da API do Cassandra. A CQLSSTableWriter
recebe como parâmetro os scripts para criação da tabela e para inserção de registros, o arquivo de origem dos dados e o diretório de saída, onde será gravada finalmente a SSTable. Neste exemplo são usados o keyspace datalake
e a tabela tb_fhv
.
Para mostrar o funcionamento da CQLSSTableWriter
foi criada a classe CassandraBulkImport
, com os atributos usados na criação da SSTable. O código inicial está logo a seguir:
public class CassandraBulkImport {
private static Logger logger = Logger.getLogger(CassandraBulkImport.class);
private int importedRecords = 0;
private String INSERT_STMT = "INSERT INTO datalake.tb_fhv "
+ "(ID, base_licence, pickup_datetime, dropoff_datetime, initial_location, final_location, shared_ride) "
+ "VALUES (?, ?, ?, ?, ?, ?, ?)";
private String CREATE_TABLE = "CREATE TABLE datalake.tb_fhv "
+ "(ID UUID, base_licence TEXT, pickup_datetime TIMESTAMP, dropoff_datetime TIMESTAMP, initial_location INT, final_location INT, shared_ride BOOLEAN, "
+ "PRIMARY KEY(ID, base_licence))";
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private static String keyspace = "datalake";
private static String tableName = "tb_fhv";
private CQLSSTableWriter writer;
private String inputFile;
// {...}
}
A inicialização das variáveis pode ser conferida no código abaixo, onde o writer
é construído a partir dos atributos informados.
public CassandraBulkImport(String inputFile, String outputDir) {
this.inputFile = inputFile;
DatabaseDescriptor.clientInitialization();
CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder();
builder.inDirectory(outputDir).forTable(CREATE_TABLE).using(String.format(INSERT_STMT, keyspace, tableName))
.withPartitioner(new Murmur3Partitioner());
writer = builder.build();
}
O método bulkImport
é responsável por ler cada linha do arquivo de entrada, preparar os dados e inserir na SSTable por meio do writer
.
public void bulkImport() throws IOException {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(inputFile)))) {
// Discard the header
reader.readLine();
String line;
while ((line = reader.readLine()) != null) {
// Prepare the columns
line = StringUtils.remove(line, "\"");
List<String> columns = Arrays.asList(line.split(",", -1));
UUID uuid = UUIDs.timeBased();
String baseLicence = columns.get(0);
Date pickup = sdf.parse(columns.get(1));
Date dropoff = sdf.parse(columns.get(2));
Integer initialLocation = NumberUtils.toInt(columns.get(3), 0);
Integer finalLocation = NumberUtils.toInt(columns.get(4), 0);
Boolean sharedRide = "1".equals(columns.get(5));
// Write the record
writer.addRow(uuid, baseLicence, pickup, dropoff, initialLocation, finalLocation, sharedRide);
importedRecords++;
}
logger.info("Imported records: " + importedRecords);
} catch (Exception e) {
logger.error(e);
}
writer.close();
}
Finalizamos a criação da SSTable com o método main
para rodar o programa. Deve-se observar que o diretório de saída deve usar a estrutura a seguir, no formato keyspace/tableName
, que foram definidos acima.
public static void main(String[] args) throws IOException {
String inputFile = "/home/marco/dados/nyc/fhv_tripdata_2018-06.csv";
String outputDir = System.getProperty("user.home") + "/temp/" + keyspace + "/" + tableName;
File file = new File(outputDir);
FileUtils.deleteDirectory(file);
file.mkdirs();
//
CassandraBulkImport cassandraBulkImport = new CassandraBulkImport(inputFile, outputDir);
cassandraBulkImport.bulkImport();
}
E para executar a aplicação podemos usar a própria IDE, o Eclipse por exemplo, ou rodar pelo terminal com o comando abaixo. É importante definir o parâmetro Xmx
para 4 GB porque o programa demanda muita memória para processar os registros.
java -Xmx4g \
-cp target/big-data-bulk-import-jar-with-dependencies.jar \
net.marcoreis.dataimport.cassandra.CassandraBulkImport
O processo demora aproximados 4 minutos em um notebook com 4 CPUs Intel i7 e 16 GB de memória. Com a SSTable criada podemos proceder para o bulk loading.
O utilitário sstableloader
A SSTable foi criada no diretório datalake/tb_fhv
e agora precisa ser importada para o Cassandra. Essa carga é feita pelo utilitário sstableloader
. Para facilitar o processo, vamos utilizar as variáveis de ambiente abaixo, todos em ambiente Linux. Naturalmente, deve-se adequar os diretórios de acordo com cada máquina.
CASSANDRA_HOME=/home/marco/software/cassandra
PATH=$PATH:$CASSANDRA_HOME/bin
SSTABLE_DIR=/home/marco/temp/datalake/tb_fhv
O terminal para comandos no Cassanda é o cqlsh
. Ao conectar, ele mostra as informações de conexão que podemos ver logo abaixo.
Connected to Test Cluster at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 3.11.3 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
cqlsh>
O primeiro comando deve ser a criação do keyspace datalake
. Vamos criar um keyspace padrão, sem replicação apenas para teste.
CREATE KEYSPACE datalake
WITH REPLICATION = {
'class' : 'SimpleStrategy',
'replication_factor' : 1
};
Para criar a tabela tb_fhv
o script está aqui. Claro que deve ser a mesma estrutura usada na classe CassandraBulkImport
.
CREATE TABLE datalake.tb_fhv
(
ID UUID,
base_licence TEXT,
pickup_datetime TIMESTAMP,
dropoff_datetime TIMESTAMP,
initial_location INT,
final_location INT,
shared_ride BOOLEAN,
PRIMARY KEY(ID, base_licence)
);
E o último passo é a execução do próprio sstableloader
, indicando qual o destino (-d
) e ativando o mode verbose (-v
). Lembrando que o parâmetro SSTABLE_DIR
é a própria SSTable que foi criada anteriormente.
sstableloader -v -d 127.0.0.1 $SSTABLE_DIR
Conclusão
Para o dataset selecionado, esse comando demora 1.5 minuto que somado aos 4 minutos da criação da SSTable totaliza 5.5 minutos de processamento para os 21 milhões de registros.