module elasticsearch.bulk_proxy; import elasticsearch.client; import vibe.data.json; import vibe.core.log; static const uint ESBulkUploadThreshold = 12_000_000; /// Contains a buffer that automatically flushes and calls a specified action when the size hits the given /// threshold. struct EsBulkProxy { Client client; ulong threshold = ESBulkUploadThreshold; ulong recordCount; string buffer; void flush() { logDebug("Flushing Elasticsearch bulk proxy: %s records, %s bytes", recordCount, buffer.length); if (buffer.length == 0) return; if (buffer[$ -1] != '\n') buffer ~= "\n"; auto response = client.bulk(buffer); buffer = ""; recordCount = 0; } void append(string input) { if (input.length > threshold) threshold = input.length; // If a single input doesn't fit within the threshold, expand the threshold to allow it if (input.length > (threshold - buffer.length)) flush; // Flush the buffer if the input doesn't fit buffer ~= input; ++recordCount; } void appendIndex(string index, string type, string id, string data) { string actionString; if (data[$ -1] != '\n') data ~= "\n"; actionString = `{"create":{"_index":"` ~ index ~ `","_type":"` ~ type ~ `","_id":"` ~ id ~ `"}}` ~ "\n"; actionString ~= data; append(actionString); } void appendIndex(string index, string type, string id, Json record) { appendIndex(index, type, id, record.toString); } alias buffer this; }