1 /** 2 Vibe-based AWS client 3 */ 4 5 module vibe.aws.aws; 6 7 import std.algorithm; 8 import std.datetime; 9 import std.random; 10 import std.range; 11 import std.stdio; 12 import std.string; 13 14 import vibe.core.core; 15 import vibe.core.log; 16 import vibe.data.json; 17 import vibe.http.client; 18 19 import vibe.aws.sigv4; 20 21 public import vibe.aws.credentials; 22 23 class AWSException : Exception 24 { 25 immutable string type; 26 immutable bool retriable; 27 28 this(string type, bool retriable, string message) 29 { 30 super(type ~ ": " ~ message); 31 this.type = type; 32 this.retriable = retriable; 33 } 34 35 /** 36 Returns the 'ThrottlingException' from 'com.amazon.coral.service#ThrottlingException' 37 */ 38 @property string simpleType() 39 { 40 auto h = type.indexOf('#'); 41 if (h == -1) 42 return type; 43 return type[h + 1 .. $]; 44 } 45 } 46 47 /** 48 Configuraton for AWS clients 49 */ 50 struct ClientConfiguration 51 { 52 uint maxErrorRetry = 3; 53 } 54 55 /** 56 Thrown when the signature/authorization information is wrong 57 */ 58 class AuthorizationException : AWSException 59 { 60 this(string type, string message) 61 { 62 super(type, false, message); 63 } 64 } 65 66 struct ExponentialBackoff 67 { 68 immutable uint maxRetries; 69 uint tries = 0; 70 uint maxSleepMs = 10; 71 72 this(uint maxRetries) 73 { 74 this.maxRetries = maxRetries; 75 } 76 77 @property bool canRetry() 78 { 79 return tries < maxRetries; 80 } 81 82 @property bool finished() 83 { 84 return tries >= maxRetries + 1; 85 } 86 87 void inc() 88 { 89 tries++; 90 maxSleepMs *= 2; 91 } 92 93 void sleep() 94 { 95 vibe.core.core.sleep(uniform!("[]")(1, maxSleepMs).msecs); 96 } 97 } 98 99 /// 100 class AWSClient 101 { 102 protected static immutable exceptionPrefix = "com.amazon.coral.service#"; 103 104 immutable string endpoint; 105 immutable string endpointUrl; 106 immutable string region; 107 immutable string service; 108 109 private AWSCredentialSource m_credsSource; 110 private ClientConfiguration m_config; 111 112 this(string endpoint, string region, string service, AWSCredentialSource credsSource, 113 ClientConfiguration config = ClientConfiguration()) 114 { 115 this.region = region; 116 this.service = service; 117 this.m_credsSource = credsSource; 118 this.m_config = config; 119 120 if (endpoint.startsWith("http://")) 121 { 122 this.endpointUrl = endpoint; 123 this.endpoint = endpoint[7 .. $]; 124 } 125 else if (endpoint.startsWith("https://")) 126 { 127 this.endpointUrl = endpoint; 128 this.endpoint = endpoint[8 .. $]; 129 } 130 else 131 { 132 assert(false, "endpoints shall start with http:// or https://"); 133 } 134 } 135 136 AWSResponse doRequest(string operation, Json request) 137 { 138 auto backoff = ExponentialBackoff(m_config.maxErrorRetry); 139 140 for (; !backoff.finished; backoff.inc()) 141 { 142 auto credScope = region ~ "/" ~ service; 143 auto creds = m_credsSource.credentials(credScope); 144 try 145 { 146 // FIXME: Auto-retries for retriable errors 147 // FIXME: Report credential errors and retry for failed credentials 148 auto resp = requestHTTP(this.endpointUrl ~ "/", (scope req) { 149 auto timeString = currentTimeString(); 150 auto jsonString = cast(ubyte[]) request.toString(); 151 152 req.method = HTTPMethod.POST; 153 req.headers["x-amz-content-sha256"] = hash(jsonString); 154 req.headers["x-amz-target"] = operation; 155 req.headers["x-amz-date"] = currentTimeString(); 156 req.headers["host"] = endpoint; 157 if (creds.sessionToken && !creds.sessionToken.empty) 158 req.headers["x-amz-security-token"] = creds.sessionToken; 159 req.contentType = "application/x-amz-json-1.0"; 160 signRequest(req, jsonString, creds, timeString, region, service); 161 req.writeBody(jsonString); 162 }); 163 164 checkForError(resp); 165 166 return new AWSResponse(resp); 167 } 168 catch (AuthorizationException ex) 169 { 170 logWarn(ex.msg); 171 // Report credentials as invalid. Will retry if possible. 172 m_credsSource.credentialsInvalid(credScope, creds, ex.msg); 173 if (!backoff.canRetry) throw ex; 174 } 175 catch (AWSException ex) 176 { 177 logWarn(ex.msg); 178 // Retry if possible and retriable, otherwise give up. 179 if (!backoff.canRetry || !ex.retriable) throw ex; 180 } 181 182 // We're going again, but sleep first 183 backoff.sleep(); 184 } 185 assert(0); 186 } 187 188 protected auto currentTimeString() 189 { 190 auto t = Clock.currTime(UTC()); 191 t.fracSecs = Duration.zero; 192 return t.toISOString(); 193 } 194 195 void checkForError(HTTPClientResponse response) 196 { 197 if (response.statusCode < 400) 198 return; // No error 199 200 auto bod = response.readJson(); 201 202 //logError("error: %s",bod); 203 204 throw makeException(bod["__type"].get!string, 205 response.statusCode / 100 == 5, bod["Message"].opt!string("")); 206 } 207 208 AWSException makeException(string type, bool retriable, string message) 209 { 210 if (type == exceptionPrefix ~ "UnrecognizedClientException" 211 || type == exceptionPrefix ~ "InvalidSignatureException" 212 || type == exceptionPrefix ~ "AccessDeniedException") 213 throw new AuthorizationException(type, message); 214 return new AWSException(type, retriable, message); 215 } 216 } 217 218 private void signRequest(HTTPClientRequest req, ubyte[] requestBody, 219 AWSCredentials creds, string timeString, string region, string service) 220 { 221 auto dateString = dateFromISOString(timeString); 222 auto credScope = dateString ~ "/" ~ region ~ "/" ~ service; 223 224 SignableRequest signRequest; 225 signRequest.dateString = dateString; 226 signRequest.timeStringUTC = timeFromISOString(timeString); 227 signRequest.region = region; 228 signRequest.service = service; 229 import std.conv : to; 230 231 signRequest.canonicalRequest.method = req.method.to!string(); 232 signRequest.canonicalRequest.uri = req.requestURL; // FIXME: Can include query params 233 auto reqHeaders = req.headers.toRepresentation; 234 foreach (x; reqHeaders) 235 { 236 if (x.key.toLower == "connection") 237 continue; 238 signRequest.canonicalRequest.headers[x.key] = x.value; 239 } 240 signRequest.canonicalRequest.payload = requestBody; 241 242 ubyte[] signKey = signingKey(creds.accessKeySecret, dateString, region, service).dup; 243 ubyte[] stringToSign = cast(ubyte[]) signableString(signRequest); 244 auto signature = sign(signKey, stringToSign); 245 246 auto authHeader = createSignatureHeader(creds.accessKeyID, credScope, 247 signRequest.canonicalRequest.headers, signature); 248 req.headers["authorization"] = authHeader; 249 } 250 251 class AWSResponse 252 { 253 private HTTPClientResponse m_response; 254 private Json m_body; 255 256 this(HTTPClientResponse response) 257 { 258 m_response = response; 259 m_body = response.readJson(); 260 } 261 262 @property Json responseBody() 263 { 264 return m_body; 265 } 266 }