1 module elasticsearch.transport.connections.connection; 2 3 import elasticsearch.transport.transport; 4 import elasticsearch.api.parameters; 5 import std.datetime; 6 7 /** 8 * Wraps the connection information and logic. 9 * 10 * The Connection instance wraps the host information (hostname, port, attributes, etc), 11 * as well as the "session" (a transport client object, such as a {elasticsearch.transport.http.vibe} instance). 12 * 13 * It provides methods to construct and properly encode the URLs and paths for passing them 14 * to the transport client object. 15 * 16 * It provides methods to handle connection lifecycle (dead, alive, healthy). 17 */ 18 19 class Connection { 20 package { 21 Host _host; 22 Transport _connection; 23 int _resurrectTimeout = 60; // Seconds 24 int _failures = 0; 25 bool _dead = false; 26 SysTime _deadSince; 27 28 } 29 30 this(Host host, Transport connection, int resurrectTimeout = 60) { 31 _host = host; 32 _connection = connection; 33 } 34 35 @property Host host() { return _host; } 36 @property ref int resurrectTimeout() { return _resurrectTimeout; } 37 @property bool dead() { return _dead; } 38 @property SysTime deadSince() { return _deadSince; } 39 @property bool alive() { return !_dead; } 40 @property int failures() { return _failures; } 41 @property bool resurrectable() { 42 import std.math; 43 return Clock.currTime > (_deadSince + dur!"seconds"(_resurrectTimeout * pow(2, _failures - 1))); 44 } 45 46 /** 47 * Marks this connection as dead, incrementing the `failures` counter and 48 * storing the current time as `dead_since`. 49 */ 50 Connection makeDead() { 51 _dead = true; 52 _failures++; 53 _deadSince = Clock.currTime; 54 return this; 55 } 56 57 /// Marks this connection as alive, ie. it is eligible to be returned from the pool by the selector. 58 Connection makeAlive() { 59 _dead = false; 60 return this; 61 } 62 63 /// Marks this connection as healthy, ie. a request has been successfully performed with it. 64 Connection makeHealthy() { 65 makeAlive(); 66 _failures = 0; 67 return this; 68 } 69 70 /// Marks this connection as alive, if the required timeout has passed. 71 Connection resurrect() { 72 if (resurrectable) makeAlive(); 73 return this; 74 } 75 76 /// Returns the complete endpoint URL with host, port, path and serialized parameters. 77 string fullURL(string path, ESParams params) { 78 import std.conv; 79 80 auto url = host.url; 81 url ~= "/" ~ fullPath(path, params); 82 83 return url; 84 } 85 86 string fullPath(string path, ESParams params) { 87 import std.array; 88 import std.uri; 89 90 auto returnPath = path; 91 92 if (params.length) { 93 string[] paramArray; 94 95 foreach(key, value; params) 96 paramArray ~= encodeComponent(key) ~ "=" ~ encodeComponent(value); 97 returnPath ~= "?" ~ join(paramArray, "&"); 98 } 99 100 return returnPath; 101 } 102 103 override string toString() { 104 auto returnString = "<Connection host: " ~ _host.hostName; 105 returnString ~= " " ~ (_dead ? ("dead since " ~ _deadSince.toString()) : "alive"); 106 returnString ~= ">"; 107 return returnString; 108 } 109 } 110 111 unittest { 112 import elasticsearch.transport.http.vibe; 113 114 auto host = Host(); 115 auto c = new Connection(host, new VibeTransport()); 116 117 assert(c.alive); 118 assert(c.makeDead.dead); 119 assert(c.failures == 1); 120 assert(!c.resurrectable); 121 assert(c.resurrect.dead); 122 c._deadSince = Clock.currTime - dur!"minutes"(2); 123 assert(c.resurrectable); 124 assert(c.resurrect.alive); 125 assert(c.failures == 1); 126 assert(c.makeHealthy.failures == 0); 127 }