1 module elasticsearch.transport.transport; 2 3 import std.string; 4 import vibe.data.json; 5 public import elasticsearch.transport.response; 6 public import elasticsearch.transport.connections.collection; 7 8 import elasticsearch.transport.connections.collection; 9 import elasticsearch.transport.exceptions; 10 import elasticsearch.api.parameters; 11 import elasticsearch.transport.sniffer; 12 13 enum RequestMethod { 14 HEAD, 15 GET, 16 POST, 17 PUT, 18 DELETE 19 } 20 21 struct Host { 22 string hostName = "localhost"; 23 int port = 9200; 24 string protocol = "http"; 25 string path; 26 string user; 27 string password; 28 29 @property string url() { 30 auto urlString = protocol ~ "://"; 31 if (user.length) urlString ~= user ~ ":" ~ password ~ "@"; 32 urlString ~= hostName ~ ":" ~ to!string(port); 33 if(path.length) urlString ~= path; 34 35 return urlString; 36 } 37 } 38 39 enum LogLevel { 40 debug_, 41 info, 42 error 43 } 44 45 class Transport { 46 private { 47 Host[] _hosts; 48 Host[] _activeHosts; 49 50 //int _snifferTimeout = 1; # TODO: Implement sniffer timeout 51 Collection _connections; 52 bool _reloadOnFailure = true; 53 54 int _reloadAfter = 10_000; // Requests 55 int _resurrectAfter = 60; // Seconds 56 int _maxRetries = 3; // Requests 57 58 int connectionCounter; 59 } 60 61 protected { 62 abstract Response performTransportRequest(Connection connection, RequestMethod method, string path, ESParams parameters = ESParams(), string requestBody = ""); 63 abstract void transportLog(LogLevel level, string message); 64 } 65 66 this() { 67 } 68 69 abstract @property string protocol(); 70 @property ref Host[] hosts() { return _hosts; } 71 //@property ref int snifferTimeout() { return _snifferTimeout; } 72 73 protected Collection buildConnections() { 74 auto collection = new Collection(); 75 foreach(host; _hosts) { 76 collection.all ~= new Connection(host, this); 77 } 78 79 return collection; 80 } 81 82 /// Returns a connection from the connection pool by delegating to Collection. 83 /// 84 /// Resurrects dead connection if the `resurrect_after` timeout has passed. 85 /// Increments the counter and performs connection reloading if the `reload_connections` option is set. 86 /// 87 /// @return [Connections::Connection] 88 /// @see Connections::Collection///get_connection 89 /// 90 Connection getConnection() { 91 if (!_connections) _connections = buildConnections(); 92 // Resurrect dead connections here 93 auto connection = _connections.getConnection(); 94 connectionCounter++; 95 if (_reloadAfter && !(connectionCounter %_reloadAfter)) reloadConnections(); 96 97 return connection; 98 } 99 100 /// Reloads and replaces the connection collection based on cluster information. 101 /// 102 void reloadConnections() { 103 auto sniffer = new Sniffer(this); 104 105 transportLog(LogLevel.debug_, "Reloading connections"); 106 auto hosts = sniffer.hosts; 107 if (hosts.length) { 108 transportLog(LogLevel.debug_, format("Sniffer found %s hosts", hosts.length)); 109 _hosts = hosts; 110 } 111 rebuildConnections(); 112 } 113 114 /// Tries to "resurrect" all eligible dead connections. 115 /// 116 /// @see Connections::Connection///resurrect! 117 /// 118 void resurrectDeadConnections() { 119 foreach(connection; _connections.dead) { 120 connection.resurrect; 121 } 122 } 123 124 /// Replaces the connections collection 125 /// 126 private void rebuildConnections() { 127 _connections = buildConnections; 128 } 129 130 Response performRequest(RequestMethod method, string path, ESParams parameters = ESParams(), string requestBody = "") { 131 // TODO: Make this more like the official method where it logs failures and automatically reloads connections on failure etc... 132 int tries; 133 bool success; 134 135 Response response; 136 auto c = getConnection(); 137 138 assert(c); 139 140 try { 141 response = performTransportRequest(c, method, path, parameters, requestBody); 142 } 143 catch (HostUnreachableException exception) { 144 transportLog(LogLevel.error, exception.msg); 145 exception.connection.makeDead(); 146 } 147 catch (RequestException exception) { 148 transportLog(LogLevel.error, exception.msg); 149 } 150 catch (Exception exception) { 151 transportLog(LogLevel.error, exception.msg); 152 transportLog(LogLevel.error, method.to!string ~ " " ~ c.fullURL(path, parameters)); 153 transportLog(LogLevel.error, requestBody); 154 } 155 156 return response; 157 } 158 } 159