1 module elasticsearch.transport.transport;
2 
3 import std.string;
4 import vibe.data.json;
5 public import elasticsearch.transport.response;
6 public import elasticsearch.transport.connections.collection;
7 
8 import elasticsearch.transport.connections.collection;
9 import elasticsearch.transport.exceptions;
10 import elasticsearch.api.parameters;
11 import elasticsearch.transport.sniffer;
12 
13 enum RequestMethod {
14 	HEAD,
15 	GET,
16 	POST,
17 	PUT,
18 	DELETE
19 }
20 
21 struct Host {
22 	string hostName = "localhost";
23 	int port = 9200;
24 	string protocol = "http";
25 	string path;
26 	string user;
27 	string password;
28 
29 	@property string url() {
30 		auto urlString = protocol ~ "://";
31 		if (user.length) urlString ~= user ~ ":" ~ password ~ "@";
32 		urlString ~= hostName ~ ":" ~ to!string(port);
33 		if(path.length) urlString ~= path;
34 
35 		return urlString;
36 	}
37 }
38 
39 enum LogLevel {
40 	debug_,
41 	info,
42 	error
43 }
44 
45 class Transport {
46 	private {
47 		Host[] _hosts;
48 		Host[] _activeHosts;
49 
50 		//int _snifferTimeout = 1; # TODO: Implement sniffer timeout
51 		Collection _connections;
52 		bool _reloadOnFailure = true;
53 
54 		int _reloadAfter = 10_000; // Requests
55 		int _resurrectAfter = 60;  // Seconds
56 		int _maxRetries = 3; // Requests
57 
58 		int connectionCounter;
59 	}  
60 
61 	protected { 
62 		abstract Response performTransportRequest(Connection connection, RequestMethod method, string path, ESParams parameters = ESParams(), string requestBody = "");
63 		abstract void transportLog(LogLevel level, string message);
64 	}
65 
66 	this() {
67 	}
68 
69 	abstract @property string protocol();
70 	@property ref Host[] hosts() { return _hosts; }
71 	//@property ref int snifferTimeout() { return _snifferTimeout; }
72 
73 	protected Collection buildConnections() {
74 		auto collection = new Collection();
75 		foreach(host; _hosts) {
76 			collection.all ~= new Connection(host, this);
77 		}
78 
79 		return collection;
80 	}
81 
82 	/// Returns a connection from the connection pool by delegating to Collection.
83 	///
84 	/// Resurrects dead connection if the `resurrect_after` timeout has passed.
85 	/// Increments the counter and performs connection reloading if the `reload_connections` option is set.
86 	///
87 	/// @return [Connections::Connection]
88 	/// @see    Connections::Collection///get_connection
89 	///
90 	Connection getConnection() {
91 		if (!_connections) _connections = buildConnections();
92 		// Resurrect dead connections here
93 		auto connection = _connections.getConnection();
94 		connectionCounter++;
95 		if (_reloadAfter && !(connectionCounter %_reloadAfter)) reloadConnections();
96 
97 		return connection;
98 	}
99 
100 	/// Reloads and replaces the connection collection based on cluster information.
101 	/// 
102 	void reloadConnections() {
103 		auto sniffer = new Sniffer(this);
104 
105 		transportLog(LogLevel.debug_, "Reloading connections");
106 		auto hosts = sniffer.hosts;
107 		if (hosts.length) {
108 			transportLog(LogLevel.debug_, format("Sniffer found %s hosts", hosts.length));
109 			_hosts = hosts;
110 		}
111 		rebuildConnections();
112 	}
113 
114 	/// Tries to "resurrect" all eligible dead connections.
115 	///
116 	/// @see Connections::Connection///resurrect!
117 	/// 
118 	void resurrectDeadConnections() {
119 		foreach(connection; _connections.dead) {
120 			connection.resurrect;
121 		}
122 	}
123 
124 	/// Replaces the connections collection
125 	/// 
126 	private void rebuildConnections() {
127 		_connections = buildConnections;
128 	}
129 
130 	Response performRequest(RequestMethod method, string path, ESParams parameters = ESParams(), string requestBody = "") {
131 		// TODO: Make this more like the official method where it logs failures and automatically reloads connections on failure etc...
132 		int tries;
133 		bool success;
134 
135 		Response response;
136 		auto c = getConnection();
137 
138 		assert(c);
139 
140 		try {
141 			response = performTransportRequest(c, method, path, parameters, requestBody);
142 		}
143 		catch (HostUnreachableException exception) {
144 			transportLog(LogLevel.error, exception.msg);
145 			exception.connection.makeDead();
146 		}
147 		catch (RequestException exception) {
148 			transportLog(LogLevel.error, exception.msg);
149 		}
150 		catch (Exception exception) {
151 			transportLog(LogLevel.error, exception.msg);
152 			transportLog(LogLevel.error, method.to!string ~ " " ~ c.fullURL(path, parameters));
153 			transportLog(LogLevel.error, requestBody);
154 		}
155 
156 		return response;
157 	}
158 }
159