1 module elasticsearch.transport.http.vibe; 2 3 public import elasticsearch.transport.transport; 4 public import elasticsearch.transport.exceptions; 5 import elasticsearch.api.parameters; 6 7 import vibe.core.log; 8 import vibe.http.client; 9 import vibe.stream.operations; 10 import vibe.data.json; 11 12 alias VibeLogLevel = vibe.core.log.LogLevel; 13 alias ESLogLevel = elasticsearch.transport.transport.LogLevel; 14 15 HTTPMethod vibeTransportRequestMethod(RequestMethod method) { 16 final switch(method) { 17 case RequestMethod.HEAD: return HTTPMethod.HEAD; 18 case RequestMethod.GET: return HTTPMethod.GET; 19 case RequestMethod.POST: return HTTPMethod.POST; 20 case RequestMethod.PUT: return HTTPMethod.PUT; 21 case RequestMethod.DELETE: return HTTPMethod.DELETE; 22 } 23 } 24 25 class VibeTransport : Transport { 26 override: 27 protected void transportLog(ESLogLevel level, string message) { 28 final switch(level) { 29 case ESLogLevel.debug_: logDebug("%s", message); 30 break; 31 case ESLogLevel.info: logInfo("%s", message); 32 break; 33 case ESLogLevel.error: logError("%s", message); 34 break; 35 } 36 37 } 38 39 @property string protocol() { return "http"; } 40 41 Response performTransportRequest(Connection connection, RequestMethod method, string path, ESParams parameters, string requestBody = "") { 42 Response response; 43 requestHTTP(connection.fullURL(path, parameters), 44 (scope req) { 45 req.method = vibeTransportRequestMethod(method); 46 if (requestBody != "") { 47 req.writeBody(cast(ubyte[])requestBody); 48 } 49 logDebugV("ES Transport Request: %s %s", method, path); 50 if (requestBody.length) logTrace("ES Transport Request Body: \n%s", requestBody); 51 }, 52 (scope res) { 53 response.status = res.statusCode; 54 response.headers = res.headers; 55 if (method != RequestMethod.HEAD) { 56 if (res.statusCode >= 200 && res.statusCode < 300) { 57 auto responseBody = res.bodyReader.readAllUTF8(); 58 response.responseBody = responseBody; 59 } 60 else { 61 switch(res.statusCode) { 62 case(HTTPStatus.gatewayTimeout, HTTPStatus.requestTimeout): 63 throw new HostUnreachableException(connection); 64 default: 65 auto responseBody = res.bodyReader.readAllUTF8(); 66 response.responseBody = responseBody; 67 throw new RequestException(connection, method, path, parameters, requestBody, response); 68 } 69 } 70 } 71 } 72 ); 73 74 return response; 75 } 76 77 }